Particle Swarm Optimization added.
This commit is contained in:
parent
7f9cf980f3
commit
45d164645b
1 changed files with 135 additions and 0 deletions
135
pso.py
Normal file
135
pso.py
Normal file
|
@ -0,0 +1,135 @@
|
|||
from random import random
|
||||
from math import sqrt
|
||||
|
||||
def product(it,key=lambda x: x):
|
||||
p = 1
|
||||
for n in map(key,it):
|
||||
p*=n
|
||||
return n
|
||||
|
||||
class Vector(list):
|
||||
def __add__(self,other):
|
||||
if len(self) != len(other):
|
||||
raise Exception("Can't add vectors of different size: %d %d" % (len(self), len(other)))
|
||||
return Vector([ self[i]+other[i] for i in range(len(self)) ])
|
||||
|
||||
def __sub__(self,other):
|
||||
if len(self) != len(other):
|
||||
raise Exception("Can't subtract vectors of different size")
|
||||
return Vector([ self[i]-other[i] for i in range(len(self)) ])
|
||||
|
||||
def __mul__(self,factor):
|
||||
return Vector([ self[i]*factor for i in range(len(self)) ])
|
||||
|
||||
def __div__(self,factor):
|
||||
return self.__mul__(1.0/factor)
|
||||
|
||||
def __rmul__(self,factor):
|
||||
return self.__mul__(factor)
|
||||
|
||||
def __rdiv__(self,factor):
|
||||
raise NotImplemented
|
||||
|
||||
def __iadd__(self,other):
|
||||
return self.__add__(other)
|
||||
|
||||
def __isub__(self,other):
|
||||
return self.__sub__(other)
|
||||
|
||||
def __neg__(self):
|
||||
return self.__mul__(-1.0)
|
||||
|
||||
def __setitem__(self,k,v):
|
||||
raise Exception("Not available") # immutable
|
||||
|
||||
def distance(self, other):
|
||||
if len(self) != len(other):
|
||||
raise Exception("Can't calculate distance between vectors of different size")
|
||||
return sqrt(sum([ (self[i]-other[i])**2 for i in range(len(self)) ]))
|
||||
|
||||
class Particle:
|
||||
def __init__(self,position,velocity):
|
||||
self.gsize = len(position)
|
||||
self.position = Vector(position)
|
||||
self.bestposition = Vector(position[:])
|
||||
self.velocity = Vector(velocity)
|
||||
|
||||
class ParticleCreator:
|
||||
def __init__(self,searchspace):
|
||||
self.searchspace = searchspace
|
||||
|
||||
def create(self):
|
||||
return Particle([ (random()*(r[1]-r[0]) + r[0]) for r in self.searchspace ],
|
||||
[0 for i in range(len(self.searchspace))])
|
||||
|
||||
class Fitness:
|
||||
targets = [Vector([0,0])]
|
||||
|
||||
def fitness(self,position):
|
||||
return min([ position.distance(target) for target in self.targets ])
|
||||
|
||||
class Swarm:
|
||||
def __init__(self, n, creator, fitness, inertiaWeight, cog1, cog2):
|
||||
self.particles = map(lambda x: creator.create(), range(n))
|
||||
self.inertiaWeight = inertiaWeight
|
||||
self.cog1 = cog1
|
||||
self.cog2 = cog2
|
||||
self.fitness = fitness
|
||||
|
||||
def bestOfSwarm(self):
|
||||
return min( map(lambda p: p.bestposition, self.particles), key=lambda position: self.fitness.fitness(position) )
|
||||
|
||||
def __bestNeighbor(self,particle, distance=20):
|
||||
#distance = max(min( self.particles, key=lambda p: particle.position.distance(p.position) ),distance)
|
||||
#return min( map(lambda p: p.bestposition, filter(lambda p: particle.position.distance(p.position) <= distance, self.particles) ), key=lambda position: self.fitness.fitness(position))
|
||||
i = self.particles.index(particle)
|
||||
l = (i+1) % len(self.particles)
|
||||
r = (i-1) % len(self.particles)
|
||||
return min( map(lambda p: p.bestposition, [self.particles[l], self.particles[r], particle]), key=lambda position: self.fitness.fitness(position) )
|
||||
|
||||
|
||||
def __updatePosition(self):
|
||||
for p in self.particles:
|
||||
new_position = p.position + p.velocity
|
||||
if self.fitness.fitness(new_position) < self.fitness.fitness(p.position):
|
||||
p.bestposition = new_position
|
||||
p.position = new_position
|
||||
|
||||
def __updateVelocity(self):
|
||||
bestOfSwarm = self.bestOfSwarm()
|
||||
for p in self.particles:
|
||||
#bestNeighbor = self.__bestNeighbor(p)
|
||||
m = p.velocity*self.inertiaWeight + (p.bestposition-p.position)*(self.cog1*random()) + (bestOfSwarm-p.position)*(self.cog2*random())
|
||||
p.velocity = m
|
||||
|
||||
def step(self):
|
||||
self.__updateVelocity()
|
||||
self.__updatePosition()
|
||||
|
||||
def stepN(self,n):
|
||||
for i in range(n):
|
||||
self.step()
|
||||
|
||||
def runUntilSwarmSize(self,sizeMax=1.0):
|
||||
iterations = 0
|
||||
while self.__swarmSize() > sizeMax:
|
||||
self.step()
|
||||
iterations += 1
|
||||
return iterations
|
||||
|
||||
|
||||
def __centerOfSwarm(self):
|
||||
return reduce(lambda a,b: a+b, [ p.position for p in self.particles ], Vector([0,0]) )/len(self.particles)
|
||||
|
||||
def __swarmSize(self):
|
||||
center = self.__centerOfSwarm()
|
||||
return max([ p.position.distance(center) for p in self.particles ])
|
||||
|
||||
if __name__ == '__main__':
|
||||
swarm = Swarm( 20, ParticleCreator([(-2000.0,2000.0),(-2000.0,2000.0)]), Fitness(), 1, 0.4, 0.2 )
|
||||
|
||||
iterations = swarm.runUntilSwarmSize(1)
|
||||
|
||||
print "%d iterations" % iterations
|
||||
|
||||
print swarm.bestOfSwarm()
|
Loading…
Reference in a new issue