diff --git a/PathPlanning/TimeBasedPathPlanning/BaseClasses.py b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py index 745cde45fb..184e344525 100644 --- a/PathPlanning/TimeBasedPathPlanning/BaseClasses.py +++ b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py @@ -4,6 +4,7 @@ Grid, Position, ) +from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId from PathPlanning.TimeBasedPathPlanning.Node import NodePath import random import numpy.random as numpy_random @@ -17,16 +18,16 @@ class SingleAgentPlanner(ABC): """ Base class for single agent planners """ - + @staticmethod @abstractmethod - def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: + def plan(grid: Grid, start: Position, goal: Position, agent_idx: AgentId, verbose: bool = False) -> NodePath: pass @dataclass class StartAndGoal: # Index of this agent - index: int + agent_id: AgentId # Start position of the robot start: Position # Goal position of the robot @@ -38,12 +39,12 @@ def distance_start_to_goal(self) -> float: class MultiAgentPlanner(ABC): """ Base class for multi-agent planners - """ - + """ + @staticmethod @abstractmethod - def plan(grid: Grid, start_and_goal_positions: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + def plan(grid: Grid, start_and_goal_positions: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> dict[AgentId, NodePath]: """ - Plan for all agents. Returned paths are in order corresponding to the returned list of `StartAndGoal` objects + Plan for all agents. Returned paths found for each agent """ - pass \ No newline at end of file + pass diff --git a/PathPlanning/TimeBasedPathPlanning/ConflictBasedSearch.py b/PathPlanning/TimeBasedPathPlanning/ConflictBasedSearch.py new file mode 100644 index 0000000000..9855fe466f --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/ConflictBasedSearch.py @@ -0,0 +1,228 @@ +""" +Conflict Based Search generates paths in 3 dimensions (x, y, time) for a set of agents. It does +so by performing searches on two levels. The top level search applies constraints that agents +must avoid, and the bottom level performs a single-agent search for individual agents given +the constraints provided by the top level search. Initially, paths are generated for each agent +with no constraints. The paths are checked for conflicts with one another. If any are found, the +top level search generates two nodes. These nodes apply a constraint at the point of conflict for +one of the two agents in conflict. This process repeats until a set of paths are found where no +agents are in conflict. The top level search chooses successor nodes based on the sum of the +cost of all paths, which guarantees optimiality of the final solution. + +The full algorithm is defined in this paper: https://cdn.aaai.org/ojs/8140/8140-13-11667-1-2-20201228.pdf +""" + +from copy import deepcopy +from enum import Enum +import numpy as np +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + ObstacleArrangement, + Position, +) +import time + +from PathPlanning.TimeBasedPathPlanning.BaseClasses import MultiAgentPlanner, StartAndGoal +from PathPlanning.TimeBasedPathPlanning.Node import NodePath +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +from PathPlanning.TimeBasedPathPlanning.SpaceTimeAStar import SpaceTimeAStar +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths +from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId, AppliedConstraint, ConstrainedAgent, ConstraintTree, ConstraintTreeNode, ForkingConstraint + +class ConflictBasedSearch(MultiAgentPlanner): + + + @staticmethod + def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> dict[AgentId, NodePath]: + """ + Generate a path from the start to the goal for each agent in the `start_and_goals` list. + """ + print(f"Using single-agent planner: {single_agent_planner_class}") + + constraint_tree: ConstraintTree = ConflictBasedSearch.initialize_constraint_tree(grid, start_and_goals, single_agent_planner_class, verbose) + attempted_constraint_combos: set = set() + + while constraint_tree.nodes_to_expand: + constraint_tree_node = constraint_tree.get_next_node_to_expand() + if constraint_tree_node is None: + raise RuntimeError("No more nodes to expand in the constraint tree.") + + ancestor_constraints = constraint_tree.get_ancestor_constraints(constraint_tree_node.parent_idx) + + if verbose: + print(f"Expanding node with constraint {constraint_tree_node.constraint} and parent {constraint_tree_node.parent_idx}") + print(f"\tCOST: {constraint_tree_node.cost}") + + if not constraint_tree_node.constraint: + # This means we found a solution! + print(f"\nFound a path with {len(constraint_tree_node.all_constraints)} constraints after {constraint_tree.expanded_node_count()} expansions") + print(f"Final cost: {constraint_tree_node.cost}") + final_paths = {} + for start_and_goal in start_and_goals: + final_paths[start_and_goal.agent_id] = constraint_tree_node.paths[start_and_goal.agent_id] + return final_paths + + if not isinstance(constraint_tree_node.constraint, ForkingConstraint): + raise ValueError(f"Expected a ForkingConstraint, but got: {constraint_tree_node.constraint}") + + for constrained_agent in constraint_tree_node.constraint.constrained_agents: + applied_constraint = AppliedConstraint(constrained_agent.constraint, constrained_agent.agent) + + all_constraints = ancestor_constraints + all_constraints.append(applied_constraint) + + new_path = ConflictBasedSearch.plan_for_agent( + constrained_agent, + all_constraints, + constraint_tree, + attempted_constraint_combos, + grid, + single_agent_planner_class, + start_and_goals, + verbose + ) + if not new_path: + continue + + # Deepcopy to update with applied constraint and new paths + applied_constraint_parent = deepcopy(constraint_tree_node) + applied_constraint_parent.paths[constrained_agent.agent] = new_path + applied_constraint_parent.constraint = applied_constraint + parent_idx = constraint_tree.add_expanded_node(applied_constraint_parent) + + new_constraint_tree_node = ConstraintTreeNode(applied_constraint_parent.paths, parent_idx, all_constraints) + if verbose: + print(f"Adding new constraint tree node with constraint: {new_constraint_tree_node.constraint}") + constraint_tree.add_node_to_tree(new_constraint_tree_node) + + raise RuntimeError("No solution found") + + @staticmethod + def get_agents_start_and_goal(start_and_goal_list: list[StartAndGoal], target_index: AgentId) -> StartAndGoal: + """ + Returns the start and goal of a specific agent + """ + for item in start_and_goal_list: + if item.agent_id == target_index: + return item + raise RuntimeError(f"Could not find agent with index {target_index} in {start_and_goal_list}") + + @staticmethod + def plan_for_agent(constrained_agent: ConstrainedAgent, + all_constraints: list[AppliedConstraint], + constraint_tree: ConstraintTree, + attempted_constraint_combos: set, + grid: Grid, + single_agent_planner_class: SingleAgentPlanner, + start_and_goals: list[StartAndGoal], + verbose: bool) -> NodePath | None: + """ + Attempt to generate a path plan for a single agent + """ + num_expansions = constraint_tree.expanded_node_count() + if num_expansions % 50 == 0: + print(f"Expanded {num_expansions} nodes so far...") + print(f"\tNumber of constraints on expanded node: {len(all_constraints)}") + + # Skip if we have already tried this set of constraints + constraint_hash = hash(frozenset(all_constraints)) + if constraint_hash in attempted_constraint_combos: + if verbose: + print(f"\tSkipping already attempted constraint combination: {all_constraints}") + return None + else: + attempted_constraint_combos.add(constraint_hash) + + if verbose: + print(f"\tAll constraints: {all_constraints}") + + grid.clear_constraint_reservations() + grid.apply_constraint_reservations(all_constraints) + + # Just plan for agent with new constraint + start_and_goal: StartAndGoal = ConflictBasedSearch.get_agents_start_and_goal(start_and_goals, constrained_agent.agent) + try: + if verbose: + print("\tplanning for: {}", start_and_goal) + new_path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose) + return new_path + except Exception as e: + # Note: single agent planners can fail if constraints make planning for an agent impossible. The upper level search does not + # consider if a constraint will make it impossible for an agent to plan when adding a new constraint. + if verbose: + print(f"Error: {e}") + return None + + @staticmethod + def initialize_constraint_tree(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool) -> ConstraintTree: + """ + Generate an initial solution by planning for each agent with no obstacles + """ + initial_solution: dict[AgentId, NodePath] = {} + + # Generate initial solution (no constraints) + for start_and_goal in start_and_goals: + path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose) + initial_solution[start_and_goal.agent_id] = path + + if verbose: + print("Initial solution:") + for (agent_idx, path) in initial_solution.items(): + print(f"\nAgent {agent_idx} path:\n {path}") + + return ConstraintTree(initial_solution) + +class Scenario(Enum): + # Five robots all trying to get through a single cell choke point to reach their goals + NARROW_CORRIDOR = 0 + # Three robots in a narrow hallway that requires intelligent conflict resolution + HALLWAY_CROSS = 1 + +scenario = Scenario.HALLWAY_CROSS +verbose = False +show_animation = True +use_sipp = True # Condition here mainly to appease the linter +np.random.seed(42) # For reproducibility +def main(): + grid_side_length = 21 + + # Default: no obstacles + obstacle_arrangement = ObstacleArrangement.NONE + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(10)] + + if scenario == Scenario.NARROW_CORRIDOR: + obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR + start_and_goals = [StartAndGoal(i, Position(1, 8+i), Position(19, 19-i)) for i in range(5)] + elif scenario == Scenario.HALLWAY_CROSS: + obstacle_arrangement=ObstacleArrangement.HALLWAY + start_and_goals = [StartAndGoal(0, Position(6, 10), Position(13, 10)), + StartAndGoal(1, Position(11, 10), Position(6, 10)), + StartAndGoal(2, Position(13, 10), Position(7, 10))] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=[pos for item in start_and_goals for pos in (item.start, item.goal)], + obstacle_arrangement=obstacle_arrangement, + ) + + single_agent_planner = SafeIntervalPathPlanner if use_sipp else SpaceTimeAStar + start_time = time.time() + paths = ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, verbose) + + runtime = time.time() - start_time + print(f"\nPlanning took: {runtime:.5f} seconds") + + if verbose: + print("Paths:") + for path in paths.values(): + print(f"{path}\n") + + if not show_animation: + return + + PlotNodePaths(grid, start_and_goals, paths) + +if __name__ == "__main__": + main() diff --git a/PathPlanning/TimeBasedPathPlanning/ConstraintTree.py b/PathPlanning/TimeBasedPathPlanning/ConstraintTree.py new file mode 100644 index 0000000000..3e7b608813 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/ConstraintTree.py @@ -0,0 +1,186 @@ +from dataclasses import dataclass +import heapq + +from PathPlanning.TimeBasedPathPlanning.Node import NodePath, Position, PositionAtTime + +# ID of an agent +type AgentId = int + +# An (x, y, t) tuple that exists in two or more agents' paths +@dataclass(frozen=True) +class Constraint: + position: Position + time: int + +# An agent that is constrained (must avoid the constraint tuple) +@dataclass +class ConstrainedAgent: + agent: AgentId + constraint: Constraint + +# A constraint to apply to the two colliding agents +@dataclass +class ForkingConstraint: + constrained_agents: tuple[ConstrainedAgent, ConstrainedAgent] + +# A constraint that has been applied to an agent +@dataclass(frozen=True) +class AppliedConstraint: + constraint: Constraint + constrained_agent: AgentId + +@dataclass +class ConstraintTreeNode: + # Index of the parent node + parent_idx: int + # Either a constraint to apply (forking) which will result in two new nodes, a constraint that was already applied + # to generate this node (AppliedConstraint), or None if there are no constraints in this node's paths + constraint: ForkingConstraint | AppliedConstraint | None + + # Paths for the agents + paths: dict[AgentId, NodePath] + # Cost of the solution at this node (sum of duration of each agent's path) + cost: int + + def __init__(self, paths: dict[AgentId, NodePath], parent_idx: int, all_constraints: list[AppliedConstraint]): + self.paths = paths + self.cost = sum(path.goal_reached_time() for path in paths.values()) + self.parent_idx = parent_idx + self.constraint = self.get_constraint_point() + self.all_constraints = all_constraints + + def __lt__(self, other) -> bool: + if self.cost == other.cost: + return len(self.all_constraints) < len(other.all_constraints) + return self.cost < other.cost + + def __le__(self, other) -> bool: + if self.cost == other.cost: + return len(self.all_constraints) <= len(other.all_constraints) + return self.cost < other.cost + + def __ge__(self, other) -> bool: + if self.cost == other.cost: + return len(self.all_constraints) >= len(other.all_constraints) + return self.cost > other.cost + + def get_constraint_point(self, verbose = False) -> ForkingConstraint | None: + """ + Check paths for any constraints, and if any are found return the earliest one. + """ + + final_t = max(path.goal_reached_time() for path in self.paths.values()) + positions_at_time: dict[PositionAtTime, AgentId] = {} + for t in range(final_t + 1): + possible_constraints: list[ForkingConstraint] = self.check_for_constraints_at_time(positions_at_time, t, verbose) + if not possible_constraints: + continue + + if verbose: + print(f"Choosing best constraint of {possible_constraints}") + # first check for edge constraints + for constraint in possible_constraints: + if constraint.constrained_agents[0].constraint.position != constraint.constrained_agents[1].constraint.position: + if verbose: + print(f"\tFound edge conflict constraint: {constraint}") + return constraint + # if none, then return first normal constraint + if verbose: + print(f"\tReturning normal constraint: {possible_constraints[0]}") + return possible_constraints[0] + + return None + + def check_for_constraints_at_time(self, positions_at_time: dict[PositionAtTime, AgentId], t: int, verbose: bool) -> list[ForkingConstraint]: + """ + Check for constraints between paths at a particular time step + """ + possible_constraints: list[ForkingConstraint] = [] + for agent_id, path in self.paths.items(): + + position = path.get_position(t) + if position is None: + continue + position_at_time = PositionAtTime(position, t) + if position_at_time not in positions_at_time: + positions_at_time[position_at_time] = agent_id + + # double reservation at a (cell, time) combination + if positions_at_time[position_at_time] != agent_id: + conflicting_agent_id = positions_at_time[position_at_time] + constraint = Constraint(position=position, time=t) + possible_constraints.append(ForkingConstraint(( + ConstrainedAgent(agent_id, constraint), ConstrainedAgent(conflicting_agent_id, constraint) + ))) + + # Check for edge conflicts (can only happen after first time step) + if t == 0: + continue + last_position = path.get_position(t - 1) + if not last_position: + raise RuntimeError(f"Failed to get position for agent {agent_id} at time {t-1}") + new_position_at_last_time = PositionAtTime(position, t-1) + old_position_at_new_time = PositionAtTime(last_position, t) + if new_position_at_last_time in positions_at_time and old_position_at_new_time in positions_at_time: + conflicting_agent_id1 = positions_at_time[new_position_at_last_time] + conflicting_agent_id2 = positions_at_time[old_position_at_new_time] + + if conflicting_agent_id1 == conflicting_agent_id2 and conflicting_agent_id1 != agent_id: + if verbose: + print(f"Found edge constraint between with agent {conflicting_agent_id1} for {agent_id}") + print(f"\tpositions old: {old_position_at_new_time}, new: {position_at_time}") + new_constraint = ForkingConstraint(( + ConstrainedAgent(agent_id, Constraint(position_at_time.position, position_at_time.time)), + ConstrainedAgent(conflicting_agent_id1, Constraint(position=last_position, time=t)) + )) + possible_constraints.append(new_constraint) + + return possible_constraints + + +class ConstraintTree: + # Child nodes have been created (Maps node_index to ConstraintTreeNode) + expanded_nodes: dict[int, ConstraintTreeNode] + # Need to solve and generate children + nodes_to_expand: list[ConstraintTreeNode] + + def __init__(self, initial_solution: dict[AgentId, NodePath]): + self.expanded_nodes = {} + self.nodes_to_expand = [] + heapq.heappush(self.nodes_to_expand, ConstraintTreeNode(initial_solution, -1, [])) + + def get_next_node_to_expand(self) -> ConstraintTreeNode | None: + if not self.nodes_to_expand: + return None + return heapq.heappop(self.nodes_to_expand) + + """ + Add a node to the tree and generate children if needed. Returns true if the node is a solution, false otherwise. + """ + def add_node_to_tree(self, node: ConstraintTreeNode): + heapq.heappush(self.nodes_to_expand, node) + + """ + Get the constraints that were applied to all parent nodes starting with the node at the provided parent_index. + """ + def get_ancestor_constraints(self, parent_index: int) -> list[AppliedConstraint]: + constraints: list[AppliedConstraint] = [] + while parent_index != -1: + node = self.expanded_nodes[parent_index] + if node.constraint and isinstance(node.constraint, AppliedConstraint): + constraints.append(node.constraint) + else: + raise RuntimeError(f"Expected AppliedConstraint, but got: {node.constraint}") + parent_index = node.parent_idx + return constraints + + """ + Add an expanded node to the tree. Returns the index of this node in the expanded nodes dictionary. + """ + def add_expanded_node(self, node: ConstraintTreeNode) -> int: + node_idx = len(self.expanded_nodes) + self.expanded_nodes[node_idx] = node + return node_idx + + def expanded_node_count(self) -> int: + return len(self.expanded_nodes) diff --git a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py index ccc2989001..bee2999ace 100644 --- a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py +++ b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py @@ -3,11 +3,12 @@ is also infrastructure to generate dynamic obstacles that move around the grid. The obstacles' paths are stored in the reservation matrix on creation. """ -import numpy as np -import matplotlib.pyplot as plt from enum import Enum +import matplotlib.pyplot as plt +import numpy as np from dataclasses import dataclass from PathPlanning.TimeBasedPathPlanning.Node import NodePath, Position +from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AppliedConstraint @dataclass class Interval: @@ -15,12 +16,18 @@ class Interval: end_time: int class ObstacleArrangement(Enum): + # No obstacles + NONE = 0 # Random obstacle positions and movements - RANDOM = 0 + RANDOM = 1 # Obstacles start in a line in y at center of grid and move side-to-side in x - ARRANGEMENT1 = 1 + ARRANGEMENT1 = 2 # Static obstacle arrangement - NARROW_CORRIDOR = 2 + NARROW_CORRIDOR = 3 + # A hallway surrounded by obstacles with a 2-cell opening in the middle + HALLWAY = 4 + # A temporary obstacle that vanishes after some time + TEMPORARY_OBSTACLE = 5 """ Generates a 2d numpy array with lists for elements. @@ -31,6 +38,15 @@ def empty_2d_array_of_lists(x: int, y: int) -> np.ndarray: arr[:] = [[[] for _ in range(y)] for _ in range(x)] return arr +def empty_3d_array_of_sets(x: int, y: int, z: int) -> np.ndarray: + arr = np.empty((x, y, z), dtype=object) + # assign each element individually - np.full creates references to the same list + arr[:] = [[[set() for _ in range(z)] for _ in range(y)] for _ in range(x)] + return arr + +# To ensure we don't collide with an agent's index +OBSTACLE_INDEX: int = 123456 + class Grid: # Set in constructor grid_size: np.ndarray @@ -39,6 +55,12 @@ class Grid: # Obstacles will never occupy these points. Useful to avoid impossible scenarios obstacle_avoid_points: list[Position] = [] + # Constraint points generated by ConflictBasedSearch that are currently applied + applied_constraints: list[AppliedConstraint] = [] + # Note - this is separate from reservation_matrix because multiple robots can have the same + # constraints at the same (x, y, t) tuple + constraint_reservations: np.ndarray + # Number of time steps in the simulation time_limit: int @@ -58,6 +80,8 @@ def __init__( self.grid_size = grid_size self.reservation_matrix = np.zeros((grid_size[0], grid_size[1], self.time_limit)) + self.constraint_reservations = empty_3d_array_of_sets(grid_size[0], grid_size[1], self.time_limit) + if num_obstacles > self.grid_size[0] * self.grid_size[1]: raise Exception("Number of obstacles is greater than grid size!") @@ -67,14 +91,19 @@ def __init__( self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles) elif obstacle_arrangement == ObstacleArrangement.NARROW_CORRIDOR: self.obstacle_paths = self.generate_narrow_corridor_obstacles(num_obstacles) - - for i, path in enumerate(self.obstacle_paths): - obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid + elif obstacle_arrangement == ObstacleArrangement.HALLWAY: + self.obstacle_paths = self.generate_hallway_obstacles() + elif obstacle_arrangement == ObstacleArrangement.TEMPORARY_OBSTACLE: + self.obstacle_paths = self.generate_temporary_obstacle() + elif obstacle_arrangement == ObstacleArrangement.NONE: + self.obstacle_paths = [] + + for path in self.obstacle_paths: for t, position in enumerate(path): # Reserve old & new position at this time step if t > 0: - self.reservation_matrix[path[t - 1].x, path[t - 1].y, t] = obs_idx - self.reservation_matrix[position.x, position.y, t] = obs_idx + self.reservation_matrix[path[t - 1].x, path[t - 1].y, t] = OBSTACLE_INDEX + self.reservation_matrix[position.x, position.y, t] = OBSTACLE_INDEX """ Generate dynamic obstacles that move around the grid. Initial positions and movements are random @@ -164,7 +193,7 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: obstacle_paths.append(path) return obstacle_paths - + def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Position]]: obstacle_paths = [] @@ -185,6 +214,78 @@ def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Positi return obstacle_paths + + def generate_hallway_obstacles(self) -> list[list[Position]]: + """ + Generate obstacles that form a hallway with a 2-cell opening in the middle. + Creates only a 1-cell border around the edges and hallway walls. + + Pattern created: + ********** + * * + *** *** + ********** + + Args: + hallway_length: Length of the hallway (number of rows for the corridor) + + Returns: + List of obstacle paths, where each path represents one obstacle over time + """ + obstacle_paths = [] + + for y in range(8, 12): + for x in range(5, 15): + is_obstacle = False + + # Border walls + if x == 5 or x == 14 or y == 8 or y == 11: + is_obstacle = True + if y == 9 and x not in [9, 10]: + is_obstacle = True + + # If this position should be an obstacle, create a path for it + if is_obstacle: + obstacle_path = [] + for t in range(self.time_limit): + obstacle_path.append(Position(x, y)) + obstacle_paths.append(obstacle_path) + + return obstacle_paths + + def generate_temporary_obstacle(self) -> list[list[Position]]: + """ + Generates a temporary obstacle at (10, 10) that disappears at t=30 + """ + obstacle_path = [] + for _ in range(max(self.time_limit, 40)): + obstacle_path.append(Position(15, 15)) + + return [obstacle_path] + + def apply_constraint_reservations(self, constraints: list[AppliedConstraint], verbose = False): + """ + Add new constraints and reserve them in the constraint_reservations map + """ + self.applied_constraints.extend(constraints) + for constraint in constraints: + if verbose: + print(f"Applying {constraint=}") + if constraint not in self.constraint_reservations[constraint.constraint.position.x, constraint.constraint.position.y, constraint.constraint.time]: + self.constraint_reservations[constraint.constraint.position.x, constraint.constraint.position.y, constraint.constraint.time].add(constraint) + + if verbose: + print(f"\tExisting constraints: {self.constraint_reservations[constraint.constraint.position.x, constraint.constraint.position.y, constraint.constraint.time]}") + + def clear_constraint_reservations(self): + """ + Clear all applied constraints and their associated reservations + """ + for constraint in self.applied_constraints: + if constraint in self.constraint_reservations[constraint.constraint.position.x, constraint.constraint.position.y, constraint.constraint.time]: + self.constraint_reservations[constraint.constraint.position.x, constraint.constraint.position.y, constraint.constraint.time].remove(constraint) + self.applied_constraints.clear() + """ Check if the given position is valid at time t @@ -195,11 +296,16 @@ def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Positi output: bool: True if position/time combination is valid, False otherwise """ - def valid_position(self, position: Position, t: int) -> bool: + def valid_position(self, position: Position, t: int, agent_idx: int) -> bool: # Check if position is in grid if not self.inside_grid_bounds(position): return False + constraints = self.constraint_reservations[position.x, position.y, t] + for constraint in constraints: + if constraint.constrained_agent == agent_idx: + return False + # Check if position is not occupied at time t return self.reservation_matrix[position.x, position.y, t] == 0 @@ -208,7 +314,7 @@ def valid_position(self, position: Position, t: int) -> bool: """ def valid_obstacle_position(self, position: Position, t: int) -> bool: return ( - self.valid_position(position, t) + self.valid_position(position, t, OBSTACLE_INDEX) and position not in self.obstacle_avoid_points ) @@ -249,11 +355,11 @@ def get_obstacle_positions_at_time(self, t: int) -> tuple[list[int], list[int]]: """ Returns safe intervals for each cell. """ - def get_safe_intervals(self) -> np.ndarray: + def get_safe_intervals(self, agent_idx: int) -> np.ndarray: intervals = empty_2d_array_of_lists(self.grid_size[0], self.grid_size[1]) for x in range(intervals.shape[0]): for y in range(intervals.shape[1]): - intervals[x, y] = self.get_safe_intervals_at_cell(Position(x, y)) + intervals[x, y] = self.get_safe_intervals_at_cell(Position(x, y), agent_idx) return intervals @@ -261,11 +367,20 @@ def get_safe_intervals(self) -> np.ndarray: Generate the safe intervals for a given cell. The intervals will be in order of start time. ex: Interval (2, 3) will be before Interval (4, 5) """ - def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]: + def get_safe_intervals_at_cell(self, cell: Position, agent_idx: int) -> list[Interval]: vals = self.reservation_matrix[cell.x, cell.y, :] # Find where the array is zero zero_mask = (vals == 0) + for constraint_set in self.constraint_reservations[cell.x, cell.y]: + for constraint in constraint_set: + if constraint.constrained_agent != agent_idx: + continue + if constraint.constraint.position != cell: + continue + # Mark this cell as constrained + zero_mask[constraint.constraint.time] = 0 + # Identify transitions between zero and nonzero elements diff = np.diff(zero_mask.astype(int)) @@ -288,8 +403,9 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]: # move into and out of the cell each take 1 time step, and the cell is considered occupied during # both the time step when it is entering the cell, and the time step when it is leaving the cell. intervals = [interval for interval in intervals if interval.start_time != interval.end_time] + return intervals - + """ Reserve an agent's path in the grid. Raises an exception if the agent's index is 0, or if a position is already reserved by a different agent. @@ -297,7 +413,7 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]: def reserve_path(self, node_path: NodePath, agent_index: int): if agent_index == 0: raise Exception("Agent index cannot be 0") - + for i, node in enumerate(node_path.path): reservation_finish_time = node.time + 1 if i < len(node_path.path) - 1: diff --git a/PathPlanning/TimeBasedPathPlanning/Node.py b/PathPlanning/TimeBasedPathPlanning/Node.py index 728eebb676..26bc4cedc5 100644 --- a/PathPlanning/TimeBasedPathPlanning/Node.py +++ b/PathPlanning/TimeBasedPathPlanning/Node.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from functools import total_ordering import numpy as np -from typing import Sequence +from collections.abc import Sequence @dataclass(order=True) class Position: @@ -28,6 +28,17 @@ def __sub__(self, other): def __hash__(self): return hash((self.x, self.y)) +@dataclass(order=True) +class PositionAtTime: + position: Position + time: int + + def __hash__(self): + return hash((self.position, self.time)) + + def __eq__(self, other): + return self.position == other.position and self.time == other.time + @dataclass() # Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because # this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent @@ -96,4 +107,4 @@ def __repr__(self): repr_string = "" for i, node in enumerate(self.path): repr_string += f"{i}: {node}\n" - return repr_string \ No newline at end of file + return repr_string diff --git a/PathPlanning/TimeBasedPathPlanning/Plotting.py b/PathPlanning/TimeBasedPathPlanning/Plotting.py index 7cd1f615d8..2c4f6b54d1 100644 --- a/PathPlanning/TimeBasedPathPlanning/Plotting.py +++ b/PathPlanning/TimeBasedPathPlanning/Plotting.py @@ -7,6 +7,7 @@ ) from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal from PathPlanning.TimeBasedPathPlanning.Node import NodePath +from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId ''' Plot a single agent path. @@ -50,7 +51,7 @@ def PlotNodePath(grid: Grid, start: Position, goal: Position, path: NodePath): ''' Plot a series of agent paths. ''' -def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[NodePath]): +def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: dict[AgentId, NodePath]): fig = plt.figure(figsize=(10, 7)) ax = fig.add_subplot( @@ -64,20 +65,22 @@ def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[N ax.set_yticks(np.arange(0, grid.grid_size[1], 1)) # Plot start and goal positions for each agent - colors = [] # generated randomly in loop + colors = {} # generated randomly in loop. Maps agent id to color markers = ['D', 's', '^', 'o', 'p'] # Different markers for visual distinction # Create plots for start and goal positions start_and_goal_plots = [] - for i, path in enumerate(paths): - marker_idx = i % len(markers) - agent_id = start_and_goals[i].index - start = start_and_goals[i].start - goal = start_and_goals[i].goal - + for agent_id, path in paths.items(): + marker_idx = agent_id % len(markers) + start_and_goal = next((elem for elem in start_and_goals if elem.agent_id == agent_id), None) + if not start_and_goal: + raise RuntimeError(f"Failed to get start and goal for agent {agent_id}") + start = start_and_goal.start + goal = start_and_goal.goal + color = np.random.rand(3,) - colors.append(color) - sg_plot, = ax.plot([], [], markers[marker_idx], c=color, ms=15, + colors[agent_id] = color + sg_plot, = ax.plot([], [], markers[marker_idx], c=color, ms=15, label=f"Agent {agent_id} Start/Goal") sg_plot.set_data([start.x, goal.x], [start.y, goal.y]) start_and_goal_plots.append(sg_plot) @@ -86,12 +89,11 @@ def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[N (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") # Create plots for each agent's path - path_plots = [] - for i, path in enumerate(paths): - agent_id = start_and_goals[i].index - path_plot, = ax.plot([], [], "o", c=colors[i], ms=10, + path_plots = {} + for agent_id, path in paths.items(): + path_plot, = ax.plot([], [], "o", c=colors[agent_id], ms=10, label=f"Agent {agent_id} Path") - path_plots.append(path_plot) + path_plots[agent_id] = path_plot ax.legend(bbox_to_anchor=(1.05, 1)) @@ -103,33 +105,33 @@ def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[N ) # Find the maximum time across all paths - max_time = max(path.goal_reached_time() for path in paths) + max_time = max(path.goal_reached_time() for path in paths.values()) # Animation loop - for i in range(0, max_time + 1): + for t in range(0, max_time + 1): # Update obstacle positions - obs_positions = grid.get_obstacle_positions_at_time(i) + obs_positions = grid.get_obstacle_positions_at_time(t) obs_points.set_data(obs_positions[0], obs_positions[1]) - + # Update each agent's position - for (j, path) in enumerate(paths): + for agent_id, path in paths.items(): path_positions = [] - if i <= path.goal_reached_time(): - res = path.get_position(i) + if t <= path.goal_reached_time(): + res = path.get_position(t) if not res: - print(path) - print(i) - path_position = path.get_position(i) + print(f"Error getting position for agent {agent_id} at time {t}") + print(t) + path_position = path.get_position(t) if not path_position: - raise Exception(f"Path position not found for time {i}.") + raise Exception(f"Path position not found for time {t}.") # Verify position is valid - assert not path_position in obs_positions - assert not path_position in path_positions + assert path_position not in obs_positions + assert path_position not in path_positions path_positions.append(path_position) - path_plots[j].set_data([path_position.x], [path_position.y]) - + path_plots[agent_id].set_data([path_position.x], [path_position.y]) + plt.pause(0.2) - plt.show() \ No newline at end of file + plt.show() diff --git a/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py index 2573965cf6..47cdf45086 100644 --- a/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py +++ b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py @@ -18,12 +18,13 @@ from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths +from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId import time class PriorityBasedPlanner(MultiAgentPlanner): @staticmethod - def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> dict[AgentId, NodePath]: """ Generate a path from the start to the goal for each agent in the `start_and_goals` list. Returns the re-ordered StartAndGoal combinations, and a list of path plans. The order of the plans @@ -33,30 +34,30 @@ def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_c # Reserve initial positions for start_and_goal in start_and_goals: - grid.reserve_position(start_and_goal.start, start_and_goal.index, Interval(0, 10)) + grid.reserve_position(start_and_goal.start, start_and_goal.agent_id, Interval(0, 10)) # Plan in descending order of distance from start to goal start_and_goals = sorted(start_and_goals, key=lambda item: item.distance_start_to_goal(), reverse=True) - paths = [] + paths = {} for start_and_goal in start_and_goals: if verbose: print(f"\nPlanning for agent: {start_and_goal}" ) - grid.clear_initial_reservation(start_and_goal.start, start_and_goal.index) + grid.clear_initial_reservation(start_and_goal.start, start_and_goal.agent_id) path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, verbose) if path is None: print(f"Failed to find path for {start_and_goal}") return [] - agent_index = start_and_goal.index + agent_index = start_and_goal.agent_id grid.reserve_path(path, agent_index) - paths.append(path) + paths[start_and_goal.agent_id] =path - return (start_and_goals, paths) + return paths verbose = False show_animation = True @@ -76,13 +77,13 @@ def main(): ) start_time = time.time() - start_and_goals, paths = PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, verbose) + paths = PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, verbose) runtime = time.time() - start_time print(f"\nPlanning took: {runtime:.5f} seconds") if verbose: - print(f"Paths:") + print("Paths:") for path in paths: print(f"{path}\n") @@ -92,4 +93,4 @@ def main(): PlotNodePaths(grid, start_and_goals, paths) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py index 446847ac6d..b37594bb44 100644 --- a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py +++ b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py @@ -24,6 +24,7 @@ from dataclasses import dataclass from functools import total_ordering import time +from collections.abc import Sequence @dataclass() # Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because @@ -34,6 +35,20 @@ class SIPPNode(Node): interval: Interval +class SIPPNodePath(NodePath): + def __init__(self, path: Sequence[SIPPNode], expanded_node_count: int): + super().__init__(path, expanded_node_count) + + self.positions_at_time = {} + last_position = path[0].position + for t in range(0, path[-1].time + 1): + for node in path: + if node.time == t: + last_position = node.position + break + if node.time > t: + break + self.positions_at_time[t] = last_position @dataclass class EntryTimeAndInterval: entry_time: int @@ -47,9 +62,8 @@ class SafeIntervalPathPlanner(SingleAgentPlanner): verbose (bool): set to True to print debug information """ @staticmethod - def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: - - safe_intervals = grid.get_safe_intervals() + def plan(grid: Grid, start: Position, goal: Position, agent_idx: int, verbose: bool = False) -> NodePath: + safe_intervals = grid.get_safe_intervals(agent_idx) open_set: list[SIPPNode] = [] first_node_interval = safe_intervals[start.x, start.y][0] @@ -83,24 +97,24 @@ def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> # reverse path so it goes start -> goal path.reverse() - return NodePath(path, len(expanded_list)) + return SIPPNodePath(path, len(expanded_list)) expanded_idx = len(expanded_list) expanded_list.append(expanded_node) entry_time_and_node = EntryTimeAndInterval(expanded_node.time, expanded_node.interval) add_entry_to_visited_intervals_array(entry_time_and_node, visited_intervals, expanded_node) - for child in SafeIntervalPathPlanner.generate_successors(grid, goal, expanded_node, expanded_idx, safe_intervals, visited_intervals): + for child in SafeIntervalPathPlanner.generate_successors(grid, goal, expanded_node, expanded_idx, safe_intervals, visited_intervals, agent_idx): heapq.heappush(open_set, child) - raise Exception("No path found") + raise RuntimeError("No path found") """ Generate list of possible successors of the provided `parent_node` that are worth expanding """ @staticmethod def generate_successors( - grid: Grid, goal: Position, parent_node: SIPPNode, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray + grid: Grid, goal: Position, parent_node: SIPPNode, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray, agent_idx: int ) -> list[SIPPNode]: new_nodes = [] diffs = [ @@ -116,8 +130,8 @@ def generate_successors( continue current_interval = parent_node.interval - new_cell_intervals: list[Interval] = intervals[new_pos.x, new_pos.y] + for interval in new_cell_intervals: # if interval starts after current ends, break # assumption: intervals are sorted by start time, so all future intervals will hit this condition as well @@ -139,12 +153,13 @@ def generate_successors( # We know there is a node worth expanding. Generate successor at the earliest possible time the # new interval can be entered - for possible_t in range(max(parent_node.time + 1, interval.start_time), min(current_interval.end_time, interval.end_time)): - if grid.valid_position(new_pos, possible_t): + minimum_entry_time = max(parent_node.time + 1, interval.start_time) + maximum_entry_time = min(current_interval.end_time, interval.end_time) + for possible_t in range(minimum_entry_time, maximum_entry_time + 1): + if grid.valid_position(new_pos, possible_t, agent_idx): new_nodes.append(SIPPNode( new_pos, - # entry is max of interval start and parent node time + 1 (get there as soon as possible) - max(interval.start_time, parent_node.time + 1), + possible_t, SafeIntervalPathPlanner.calculate_heuristic(new_pos, goal), parent_node_idx, interval, diff --git a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py index b85569f5dc..aa4231a4bf 100644 --- a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py +++ b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py @@ -24,7 +24,7 @@ class SpaceTimeAStar(SingleAgentPlanner): @staticmethod - def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: + def plan(grid: Grid, start: Position, goal: Position, agent_index: int, verbose: bool = False) -> NodePath: open_set: list[Node] = [] heapq.heappush( open_set, Node(start, 0, SpaceTimeAStar.calculate_heuristic(start, goal), -1) @@ -62,7 +62,7 @@ def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> expanded_list.append(expanded_node) expanded_set.add(expanded_node) - for child in SpaceTimeAStar.generate_successors(grid, goal, expanded_node, expanded_idx, verbose, expanded_set): + for child in SpaceTimeAStar.generate_successors(grid, goal, expanded_node, expanded_idx, agent_index, verbose, expanded_set): heapq.heappush(open_set, child) raise Exception("No path found") @@ -72,8 +72,8 @@ def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> """ @staticmethod def generate_successors( - grid: Grid, goal: Position, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node] - ) -> Generator[Node, None, None]: + grid: Grid, goal: Position, parent_node: Node, parent_node_idx: int, agent_index: int, verbose: bool, expanded_set: set[Node] + ) -> Generator[Node]: diffs = [ Position(0, 0), Position(1, 0), @@ -94,7 +94,7 @@ def generate_successors( continue # Check if the new node is valid for the next 2 time steps - one step to enter, and another to leave - if all([grid.valid_position(new_pos, parent_node.time + dt) for dt in [1, 2]]): + if all([grid.valid_position(new_pos, parent_node.time + dt, agent_index) for dt in [1, 2]]): if verbose: print("\tNew successor node: ", new_node) yield new_node diff --git a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst index a44dd20a97..d3e395b532 100644 --- a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst +++ b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst @@ -100,9 +100,21 @@ Dynamic Obstacles: .. image:: https://raw.githubusercontent.com/AtsushiSakai/PythonRoboticsGifs/refs/heads/master/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner/priority_planner.gif +Conflict Based Search +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This multi-agent planning algorithm performs searches at two levels. The lower level generates paths for individual agents using a set of constraints provided by the top level search. The top level search generates plans for each agent, and checks for constraints. If any are found, two new nodes are created with one of the two conflicting agents constrained to avoid the (x, y, t) tuple where the conflict occurred. + +This process is repeated until the top level search finds a set of path plans with no conflicts. The top level search chooses nodes to expand based on the cost of the paths it has for each agent. This guarantees the solution found is optimal. + +This algorithm can also solve some cases that a priority based planner cannot, such as the below example where one robot must move out of the way of another robot's path. Without cooperation across agent paths, this case cannot be solved. + +TODO: image waiting on this PR: https://github.com/AtsushiSakai/PythonRoboticsGifs/pull/14 + References ~~~~~~~~~~~ - `Cooperative Pathfinding `__ - `SIPP: Safe Interval Path Planning for Dynamic Environments `__ -- `Prioritized Planning Algorithms for Trajectory Coordination of Multiple Mobile Robots `__ \ No newline at end of file +- `Prioritized Planning Algorithms for Trajectory Coordination of Multiple Mobile Robots `__ +- `Conflict-Based Search For Optimal Multi-Agent Path Finding `__ diff --git a/tests/test_conflict_based_search.py b/tests/test_conflict_based_search.py new file mode 100644 index 0000000000..80fba70d81 --- /dev/null +++ b/tests/test_conflict_based_search.py @@ -0,0 +1,133 @@ +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + NodePath, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal +from PathPlanning.TimeBasedPathPlanning import ConflictBasedSearch as m +from PathPlanning.TimeBasedPathPlanning import PriorityBasedPlanner as m2 +from PathPlanning.TimeBasedPathPlanning.SpaceTimeAStar import SpaceTimeAStar +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +import numpy as np +import conftest +import pytest + + +@pytest.mark.parametrize("single_agent_planner", [SpaceTimeAStar, SafeIntervalPathPlanner]) +def test_no_constraints(single_agent_planner): + # Test that planner succeeds with no constraints + grid_side_length = 21 + start_and_goals = [StartAndGoal(i, Position(1, 8+i), Position(19, 19-i)) for i in range(4)] + + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.NONE, + ) + + paths: list[NodePath] + paths = m.ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, False) + + # All paths should start at the specified position and reach the goal + for start_and_goal in start_and_goals: + assert paths[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths[start_and_goal.agent_id].path[-1].position == start_and_goal.goal + +@pytest.mark.parametrize("single_agent_planner", [SpaceTimeAStar, SafeIntervalPathPlanner]) +def test_narrow_corridor(single_agent_planner): + # Test a case that requires a few constraints + grid_side_length = 21 + start_and_goals = [StartAndGoal(i, Position(1, 8+i), Position(19, 19-i)) for i in range(4)] + + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR, + ) + + paths: list[NodePath] + paths = m.ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, False) + + # All paths should start at the specified position and reach the goal + for start_and_goal in start_and_goals: + assert paths[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths[start_and_goal.agent_id].path[-1].position == start_and_goal.goal + +@pytest.mark.parametrize("single_agent_planner", [SpaceTimeAStar, SafeIntervalPathPlanner]) +def test_hallway_pass(single_agent_planner: SingleAgentPlanner): + # Test that search finds a path that requires a robot to temporarily move away from its goal + grid_side_length = 21 + start_and_goals = [StartAndGoal(0, Position(6, 10), Position(13, 10)), + StartAndGoal(1, Position(11, 10), Position(6, 10)), + StartAndGoal(2, Position(13, 10), Position(7, 10))] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.HALLWAY, + ) + + paths: list[NodePath] + paths = m.ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, False) + + # All paths should start at the specified position and reach the goal + for start_and_goal in start_and_goals: + assert paths[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths[start_and_goal.agent_id].path[-1].position == start_and_goal.goal + +@pytest.mark.parametrize("single_agent_planner", [SpaceTimeAStar, SafeIntervalPathPlanner]) +def test_better_than_priority_based_planner(single_agent_planner: SingleAgentPlanner): + """ + Test that CBS returns a better solution than the priority based planner + """ + grid_side_length = 21 + + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 3)] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + paths: dict[int, NodePath] + paths = m2.PriorityBasedPlanner.plan(grid, start_and_goals, single_agent_planner, False) + + # Re-initialize grid to clear reservations that PBP populates + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + paths2: dict[int, NodePath] + paths2 = m.ConflictBasedSearch.plan(grid, start_and_goals, single_agent_planner, False) + + # All paths should start at the specified position and reach the goal + for start_and_goal in start_and_goals: + assert paths[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths[start_and_goal.agent_id].path[-1].position == start_and_goal.goal + assert paths2[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths2[start_and_goal.agent_id].path[-1].position == start_and_goal.goal + + pbp_cost = sum([path.goal_reached_time() for path in paths.values()]) + cbs_cost = sum([path.goal_reached_time() for path in paths2.values()]) + assert pbp_cost > cbs_cost, f"CBS cost {cbs_cost} should be less than PBP cost {pbp_cost}" + +if __name__ == "__main__": + m.show_animation = False + m2.show_animation = False + conftest.run_this_test(__file__) diff --git a/tests/test_priority_based_planner.py b/tests/test_priority_based_planner.py index e2ff602d88..2e648d1f63 100644 --- a/tests/test_priority_based_planner.py +++ b/tests/test_priority_based_planner.py @@ -6,15 +6,18 @@ ) from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal from PathPlanning.TimeBasedPathPlanning import PriorityBasedPlanner as m +from PathPlanning.TimeBasedPathPlanning.SpaceTimeAStar import SpaceTimeAStar from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner import numpy as np import conftest +import pytest - -def test_1(): +@pytest.mark.parametrize("single_agent_planner", [SpaceTimeAStar, SafeIntervalPathPlanner]) +def test_1(single_agent_planner: SingleAgentPlanner): grid_side_length = 21 - start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)] + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 11)] obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] grid = Grid( @@ -26,14 +29,13 @@ def test_1(): m.show_animation = False - start_and_goals: list[StartAndGoal] paths: list[NodePath] - start_and_goals, paths = m.PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, False) + paths = m.PriorityBasedPlanner.plan(grid, start_and_goals, single_agent_planner, False) # All paths should start at the specified position and reach the goal - for i, start_and_goal in enumerate(start_and_goals): - assert paths[i].path[0].position == start_and_goal.start - assert paths[i].path[-1].position == start_and_goal.goal + for start_and_goal in start_and_goals: + assert paths[start_and_goal.agent_id].path[0].position == start_and_goal.start + assert paths[start_and_goal.agent_id].path[-1].position == start_and_goal.goal if __name__ == "__main__": conftest.run_this_test(__file__) diff --git a/tests/test_safe_interval_path_planner.py b/tests/test_safe_interval_path_planner.py index bbcd4e427c..1901927531 100644 --- a/tests/test_safe_interval_path_planner.py +++ b/tests/test_safe_interval_path_planner.py @@ -18,7 +18,7 @@ def test_1(): ) m.show_animation = False - path = m.SafeIntervalPathPlanner.plan(grid, start, goal) + path = m.SafeIntervalPathPlanner.plan(grid, start, goal, 0) # path should have 31 entries assert len(path.path) == 31 diff --git a/tests/test_space_time_astar.py b/tests/test_space_time_astar.py index 1194c61d2e..eefddca270 100644 --- a/tests/test_space_time_astar.py +++ b/tests/test_space_time_astar.py @@ -18,7 +18,7 @@ def test_1(): ) m.show_animation = False - path = m.SpaceTimeAStar.plan(grid, start, goal) + path = m.SpaceTimeAStar.plan(grid, start, goal, 0) # path should have 28 entries assert len(path.path) == 31