1+ """
2+ Particle Swarm Optimization (PSO) Path Planning
3+
4+ author: Anish (@anishk85)
5+
6+ See Wikipedia article (https://en.wikipedia.org/wiki/Particle_swarm_optimization)
7+
8+ References:
9+ - Kennedy, J.; Eberhart, R. (1995). "Particle Swarm Optimization"
10+ - Shi, Y.; Eberhart, R. (1998). "A Modified Particle Swarm Optimizer"
11+ - https://machinelearningmastery.com/a-gentle-introduction-to-particle-swarm-optimization/
12+
13+ This implementation uses PSO to find collision-free paths by treating
14+ path planning as an optimization problem where particles explore the
15+ search space to minimize distance to target while avoiding obstacles.
16+ """
17+ import numpy as np
18+ import matplotlib
19+ import matplotlib .pyplot as plt
20+ import matplotlib .animation as animation
21+ import matplotlib .patches as patches
22+ import signal
23+ import sys
24+
25+ # Add show_animation flag for consistency with other planners
26+ show_animation = True
27+
28+ def signal_handler (sig , frame ):
29+ print ('\n Exiting...' )
30+ plt .close ('all' )
31+ sys .exit (0 )
32+
33+ signal .signal (signal .SIGINT , signal_handler )
34+
35+ class Particle :
36+ def __init__ (self , search_bounds , spawn_bounds ):
37+ self .search_bounds = search_bounds
38+ self .max_velocity = np .array ([(b [1 ] - b [0 ]) * 0.05 for b in search_bounds ])
39+ self .position = np .array ([np .random .uniform (b [0 ], b [1 ]) for b in spawn_bounds ])
40+ self .velocity = np .random .randn (2 ) * 0.1
41+ self .pbest_position = self .position .copy ()
42+ self .pbest_value = np .inf
43+ self .path = [self .position .copy ()]
44+
45+ def update_velocity (self , gbest_pos , w , c1 , c2 ):
46+ """Update particle velocity using PSO equation:
47+ v = w*v + c1*r1*(pbest - x) + c2*r2*(gbest - x)
48+ """
49+ r1 = np .random .rand (2 )
50+ r2 = np .random .rand (2 )
51+
52+ cognitive = c1 * r1 * (self .pbest_position - self .position )
53+ social = c2 * r2 * (gbest_pos - self .position )
54+
55+ self .velocity = w * self .velocity + cognitive + social
56+ self .velocity = np .clip (self .velocity , - self .max_velocity , self .max_velocity )
57+
58+ def update_position (self ):
59+ self .position = self .position + self .velocity
60+
61+ # Keep in bounds
62+ for i in range (2 ):
63+ self .position [i ] = np .clip (self .position [i ],
64+ self .search_bounds [i ][0 ],
65+ self .search_bounds [i ][1 ])
66+
67+ self .path .append (self .position .copy ())
68+
69+
70+ class PSOSwarm :
71+ def __init__ (self , n_particles , max_iter , target , search_bounds ,
72+ spawn_bounds , obstacles ):
73+ self .n_particles = n_particles
74+ self .max_iter = max_iter
75+ self .target = np .array (target )
76+ self .obstacles = obstacles
77+ self .search_bounds = search_bounds
78+
79+ # PSO parameters
80+ self .w_start = 0.9 # Initial inertia weight
81+ self .w_end = 0.4 # Final inertia weight
82+ self .c1 = 1.5 # Cognitive coefficient
83+ self .c2 = 1.5 # Social coefficient
84+
85+ # Initialize particles
86+ self .particles = [Particle (search_bounds , spawn_bounds )
87+ for _ in range (n_particles )]
88+
89+ self .gbest_position = None
90+ self .gbest_value = np .inf
91+ self .gbest_path = []
92+ self .iteration = 0
93+
94+ def fitness (self , pos ):
95+ """Calculate fitness - distance to target + obstacle penalty"""
96+ dist = np .linalg .norm (pos - self .target )
97+
98+ # Obstacle penalty
99+ penalty = 0
100+ for ox , oy , r in self .obstacles :
101+ obs_dist = np .linalg .norm (pos - np .array ([ox , oy ]))
102+ if obs_dist < r :
103+ penalty += 1000 # Inside obstacle
104+ elif obs_dist < r + 5 :
105+ penalty += 50 / (obs_dist - r + 0.1 ) # Too close
106+
107+ return dist + penalty
108+
109+ def check_collision (self , start , end , obstacle ):
110+ """Check if path from start to end hits obstacle using line-circle intersection"""
111+ ox , oy , r = obstacle
112+ center = np .array ([ox , oy ])
113+
114+ # Vector math for line-circle intersection
115+ d = end - start
116+ f = start - center
117+
118+ a = np .dot (d , d )
119+ b = 2 * np .dot (f , d )
120+ c = np .dot (f , f ) - r * r
121+
122+ discriminant = b * b - 4 * a * c
123+
124+ if discriminant < 0 :
125+ return False
126+
127+ # Check if intersection on segment
128+ t1 = (- b - np .sqrt (discriminant )) / (2 * a )
129+ t2 = (- b + np .sqrt (discriminant )) / (2 * a )
130+
131+ return (0 <= t1 <= 1 ) or (0 <= t2 <= 1 )
132+
133+ def step (self ):
134+ """Run one PSO iteration"""
135+ if self .iteration >= self .max_iter :
136+ return False
137+
138+ # Update inertia weight (linear decay)
139+ w = self .w_start - (self .w_start - self .w_end ) * (self .iteration / self .max_iter )
140+
141+ # Evaluate all particles
142+ for particle in self .particles :
143+ value = self .fitness (particle .position )
144+
145+ # Update personal best
146+ if value < particle .pbest_value :
147+ particle .pbest_value = value
148+ particle .pbest_position = particle .position .copy ()
149+
150+ # Update global best
151+ if value < self .gbest_value :
152+ self .gbest_value = value
153+ self .gbest_position = particle .position .copy ()
154+
155+ if self .gbest_position is not None :
156+ self .gbest_path .append (self .gbest_position .copy ())
157+
158+ # Update particles
159+ for particle in self .particles :
160+ particle .update_velocity (self .gbest_position , w , self .c1 , self .c2 )
161+
162+ # Predict next position
163+ next_pos = particle .position + particle .velocity
164+
165+ # Check collision
166+ collision = False
167+ for obs in self .obstacles :
168+ if self .check_collision (particle .position , next_pos , obs ):
169+ collision = True
170+ break
171+
172+ if collision :
173+ # Reduce velocity if collision detected
174+ particle .velocity *= 0.2
175+
176+ particle .update_position ()
177+
178+ self .iteration += 1
179+ if show_animation and self .iteration % 20 == 0 :
180+ print (f"Iteration { self .iteration } /{ self .max_iter } , Best: { self .gbest_value :.2f} " )
181+ return True
182+
183+
184+ def main ():
185+ """Test PSO path planning algorithm"""
186+ print (__file__ + " start!!" )
187+
188+ # Set matplotlib backend for headless environments
189+ if not show_animation :
190+ matplotlib .use ('Agg' ) # Use non-GUI backend for testing
191+
192+ # Setup
193+ N_PARTICLES = 15
194+ MAX_ITER = 150
195+ SEARCH_BOUNDS = [(- 50 , 50 ), (- 50 , 50 )]
196+ TARGET = [40 , 35 ]
197+ SPAWN_AREA = [(- 45 , - 35 ), (- 45 , - 35 )]
198+ OBSTACLES = [
199+ (10 , 15 , 8 ),
200+ (- 20 , 0 , 12 ),
201+ (20 , - 25 , 10 ),
202+ (- 5 , - 30 , 7 )
203+ ]
204+
205+ swarm = PSOSwarm (
206+ n_particles = N_PARTICLES ,
207+ max_iter = MAX_ITER ,
208+ target = TARGET ,
209+ search_bounds = SEARCH_BOUNDS ,
210+ spawn_bounds = SPAWN_AREA ,
211+ obstacles = OBSTACLES
212+ )
213+
214+ if show_animation : # pragma: no cover
215+ # Visualization
216+ fig , ax = plt .subplots (figsize = (10 , 10 ))
217+ ax .set_xlim (SEARCH_BOUNDS [0 ])
218+ ax .set_ylim (SEARCH_BOUNDS [1 ])
219+ ax .set_title ("PSO Path Planning with Collision Avoidance" , fontsize = 14 )
220+ ax .grid (True , alpha = 0.3 )
221+
222+ # Draw obstacles
223+ for ox , oy , r in OBSTACLES :
224+ circle = patches .Circle ((ox , oy ), r , color = 'gray' , alpha = 0.7 )
225+ ax .add_patch (circle )
226+
227+ # Draw spawn area
228+ spawn_rect = patches .Rectangle (
229+ (SPAWN_AREA [0 ][0 ], SPAWN_AREA [1 ][0 ]),
230+ SPAWN_AREA [0 ][1 ] - SPAWN_AREA [0 ][0 ],
231+ SPAWN_AREA [1 ][1 ] - SPAWN_AREA [1 ][0 ],
232+ linewidth = 2 , edgecolor = 'green' , facecolor = 'green' ,
233+ alpha = 0.2 , label = 'Start Zone'
234+ )
235+ ax .add_patch (spawn_rect )
236+
237+ # Draw target
238+ ax .plot (TARGET [0 ], TARGET [1 ], 'r*' , markersize = 20 , label = 'Target' )
239+
240+ # Initialize plot elements
241+ particles_scatter = ax .scatter ([], [], c = 'blue' , s = 50 , alpha = 0.6 , label = 'Particles' )
242+ gbest_scatter = ax .plot ([], [], 'yo' , markersize = 12 , label = 'Best Position' )[0 ]
243+ particle_paths = [ax .plot ([], [], 'b-' , lw = 0.5 , alpha = 0.2 )[0 ] for _ in range (N_PARTICLES )]
244+ gbest_path_line = ax .plot ([], [], 'y--' , lw = 2 , alpha = 0.8 , label = 'Best Path' )[0 ]
245+
246+ iteration_text = ax .text (0.02 , 0.95 , '' , transform = ax .transAxes ,
247+ fontsize = 12 , verticalalignment = 'top' ,
248+ bbox = dict (boxstyle = 'round' , facecolor = 'wheat' , alpha = 0.5 ))
249+
250+ ax .legend (loc = 'upper right' )
251+
252+ def animate (frame ):
253+ if not swarm .step ():
254+ return
255+
256+ # Update particle positions
257+ positions = np .array ([p .position for p in swarm .particles ])
258+ particles_scatter .set_offsets (positions )
259+
260+ # Update particle paths
261+ for i , particle in enumerate (swarm .particles ):
262+ if len (particle .path ) > 1 :
263+ path = np .array (particle .path )
264+ particle_paths [i ].set_data (path [:, 0 ], path [:, 1 ])
265+
266+ # Update global best
267+ if swarm .gbest_position is not None :
268+ gbest_scatter .set_data ([swarm .gbest_position [0 ]],
269+ [swarm .gbest_position [1 ]])
270+
271+ if len (swarm .gbest_path ) > 1 :
272+ gbest = np .array (swarm .gbest_path )
273+ gbest_path_line .set_data (gbest [:, 0 ], gbest [:, 1 ])
274+
275+ # Update text
276+ iteration_text .set_text (
277+ f'Iteration: { swarm .iteration } /{ MAX_ITER } \n '
278+ f'Best Fitness: { swarm .gbest_value :.2f} '
279+ )
280+
281+ return (particles_scatter , gbest_scatter , gbest_path_line ,
282+ iteration_text , * particle_paths )
283+
284+ ani = animation .FuncAnimation (
285+ fig , animate , frames = MAX_ITER ,
286+ interval = 100 , blit = True , repeat = False
287+ )
288+
289+ plt .tight_layout ()
290+ plt .show ()
291+ else :
292+ # Run without animation for testing
293+ print ("Running PSO algorithm without animation..." )
294+ iteration_count = 0
295+ while swarm .step ():
296+ iteration_count += 1
297+ if iteration_count >= MAX_ITER :
298+ break
299+
300+ print (f"PSO completed after { iteration_count } iterations" )
301+ print (f"Best fitness: { swarm .gbest_value :.2f} " )
302+ if swarm .gbest_position is not None :
303+ print (f"Best position: [{ swarm .gbest_position [0 ]:.2f} , { swarm .gbest_position [1 ]:.2f} ]" )
304+
305+
306+ if __name__ == "__main__" :
307+ try :
308+ main ()
309+ except KeyboardInterrupt :
310+ print ("\n Program interrupted by user" )
311+ plt .close ('all' )
312+ sys .exit (0 )
0 commit comments