From 142b41b8e8ca1abb223656adb3230a0beebc78d5 Mon Sep 17 00:00:00 2001 From: Andrey Golovanov Date: Sat, 1 Mar 2025 15:47:40 +0000 Subject: [PATCH] Extending path matching logic with regular expressions. Leveraging it in CapacityProbe and other places. --- ngraph/blueprints.py | 7 +- ngraph/network.py | 381 +++++++++++++++++++------- ngraph/workflow/capacity_probe.py | 75 +++-- tests/scenarios/scenario_3.yaml | 9 +- tests/scenarios/test_scenario_3.py | 61 +++-- tests/test_network.py | 355 ++++++++++++++++++++---- tests/workflow/test_capacity_probe.py | 153 ++++++++++- 7 files changed, 838 insertions(+), 203 deletions(-) diff --git a/ngraph/blueprints.py b/ngraph/blueprints.py index 31681c9..d397938 100644 --- a/ngraph/blueprints.py +++ b/ngraph/blueprints.py @@ -231,8 +231,11 @@ def _expand_adjacency_pattern( wrap-around if one side is an integer multiple of the other. Also skips self-loops. """ - source_nodes = ctx.network.select_nodes_by_path(source_path) - target_nodes = ctx.network.select_nodes_by_path(target_path) + source_node_groups = ctx.network.select_node_groups_by_path(source_path) + target_node_groups = ctx.network.select_node_groups_by_path(target_path) + + source_nodes = [node for _, nodes in source_node_groups.items() for node in nodes] + target_nodes = [node for _, nodes in target_node_groups.items() for node in nodes] if not source_nodes or not target_nodes: return diff --git a/ngraph/network.py b/ngraph/network.py index 0caafc0..44fd2af 100644 --- a/ngraph/network.py +++ b/ngraph/network.py @@ -2,8 +2,9 @@ import uuid import base64 +import re from dataclasses import dataclass, field -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple, Optional from ngraph.lib.graph import StrictMultiDiGraph from ngraph.lib.algorithms.max_flow import calc_max_flow @@ -12,10 +13,10 @@ def new_base64_uuid() -> str: """ - Generates a Base64-encoded UUID without padding (22 characters). + Generate a Base64-encoded, URL-safe UUID (22 characters, no padding). Returns: - str: A 22-character base64 URL-safe string. + str: A 22-character Base64 URL-safe string with trailing '=' removed. """ return base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("ascii").rstrip("=") @@ -25,17 +26,13 @@ class Node: """ Represents a node in the network. - Each node is uniquely identified by its name, which is used as the key - in the Network's node dictionary. + Each node is uniquely identified by its name, which is used as + the key in the Network's node dictionary. Attributes: - name (str): The unique name of the node. - attrs (Dict[str, Any]): Optional extra metadata. For example: - { - "type": "node", # auto-tagged on add_node - "coords": [lat, lon], # user-provided - "region": "west_coast" # user-provided - } + name (str): Unique identifier for the node. + attrs (Dict[str, Any]): Optional metadata (e.g., type, coordinates, region). + Use attrs["disabled"] = True/False to mark active/inactive. """ name: str @@ -45,23 +42,17 @@ class Node: @dataclass(slots=True) class Link: """ - Represents a link connecting two nodes in the network. - - The 'source' and 'target' fields reference node names. A unique link ID - is auto-generated from source, target, and a random Base64-encoded UUID, - allowing multiple distinct links between the same pair of nodes. + Represents a directed link between two nodes in the network. Attributes: source (str): Name of the source node. target (str): Name of the target node. capacity (float): Link capacity (default 1.0). cost (float): Link cost (default 1.0). - attrs (Dict[str, Any]): Optional extra metadata. For example: - { - "type": "link", # auto-tagged on add_link - "distance_km": 1500, # user-provided - } - id (str): Auto-generated unique link identifier, e.g. "SEA|DEN|abc123..." + attrs (Dict[str, Any]): Optional metadata (e.g., type, distance). + Use attrs["disabled"] = True/False to mark active/inactive. + id (str): Auto-generated unique identifier in the form + "{source}|{target}|". """ source: str @@ -73,7 +64,8 @@ class Link: def __post_init__(self) -> None: """ - Combines source, target, and a random UUID to generate the link's ID. + Generate the link's unique ID by combining source, target, + and a random Base64-encoded UUID. """ self.id = f"{self.source}|{self.target}|{new_base64_uuid()}" @@ -84,8 +76,8 @@ class Network: A container for network nodes and links. Attributes: - nodes (Dict[str, Node]): Mapping node_name -> Node object. - links (Dict[str, Link]): Mapping link_id -> Link object. + nodes (Dict[str, Node]): Mapping from node name -> Node object. + links (Dict[str, Link]): Mapping from link ID -> Link object. attrs (Dict[str, Any]): Optional metadata about the network. """ @@ -95,32 +87,33 @@ class Network: def add_node(self, node: Node) -> None: """ - Adds a node to the network (keyed by node.name). + Add a node to the network (keyed by node.name). - Auto-tags the node with node.attrs["type"] = "node" if not set. + Auto-tags node.attrs["type"] = "node" if not already set. Args: - node (Node): The node to add. + node: Node to add. Raises: ValueError: If a node with the same name already exists. """ node.attrs.setdefault("type", "node") + node.attrs.setdefault("disabled", False) if node.name in self.nodes: raise ValueError(f"Node '{node.name}' already exists in the network.") self.nodes[node.name] = node def add_link(self, link: Link) -> None: """ - Adds a link to the network (keyed by the link's auto-generated ID). + Add a link to the network (keyed by the link's auto-generated ID). - Auto-tags the link with link.attrs["type"] = "link" if not set. + Auto-tags link.attrs["type"] = "link" if not already set. Args: - link (Link): The link to add. + link: Link to add. Raises: - ValueError: If source or target node is missing from the network. + ValueError: If the link's source or target node is missing. """ if link.source not in self.nodes: raise ValueError(f"Source node '{link.source}' not found in network.") @@ -128,38 +121,55 @@ def add_link(self, link: Link) -> None: raise ValueError(f"Target node '{link.target}' not found in network.") link.attrs.setdefault("type", "link") + link.attrs.setdefault("disabled", False) self.links[link.id] = link def to_strict_multidigraph(self, add_reverse: bool = True) -> StrictMultiDiGraph: """ - Creates a StrictMultiDiGraph representation of this Network. + Create a StrictMultiDiGraph representation of this Network. + + Nodes and links with attrs["disabled"] = True are omitted. Args: - add_reverse (bool): If True, adds a reverse edge for each link (default True). + add_reverse: If True, also add a reverse edge for each link. Returns: - StrictMultiDiGraph: A directed multigraph representation of the network. + StrictMultiDiGraph: A directed multigraph representation. """ graph = StrictMultiDiGraph() - # Add nodes + # Add enabled nodes + disabled_nodes = { + name + for name, node in self.nodes.items() + if node.attrs.get("disabled", False) + } for node_name, node in self.nodes.items(): + if node.attrs.get("disabled", False): + continue graph.add_node(node_name, **node.attrs) - # Add edges + # Add enabled links for link_id, link in self.links.items(): - # Forward edge + # Skip if link is disabled or if source/target is disabled + if link.attrs.get("disabled", False): + continue + if link.source in disabled_nodes or link.target in disabled_nodes: + continue + + # Add forward edge graph.add_edge( link.source, link.target, - key=link.id, + key=link_id, capacity=link.capacity, cost=link.cost, **link.attrs, ) - # Reverse edge (if requested) + + # Optionally add reverse edge if add_reverse: - reverse_id = f"{link.id}_rev" + reverse_id = f"{link_id}_rev" graph.add_edge( link.target, link.source, @@ -171,93 +181,154 @@ def to_strict_multidigraph(self, add_reverse: bool = True) -> StrictMultiDiGraph return graph - def select_nodes_by_path(self, path: str) -> List[Node]: + def select_node_groups_by_path(self, path: str) -> Dict[str, List[Node]]: """ - Returns nodes matching a path-based search. + Select and group nodes whose names match a given regular expression. - 1) Returns nodes whose name is exactly 'path' or starts with 'path/'. - 2) If none found, tries names starting with 'path-'. - 3) If still none, returns nodes whose names start with 'path' (partial match). + This method uses re.match(), so the pattern is automatically anchored + at the start of the node name. If the pattern includes capturing groups, + the group label is formed by joining all non-None captures with '|'. + If no capturing groups exist, the group label is simply the original + pattern string. Args: - path (str): The path/prefix to search. + path: A Python regular expression pattern (e.g., "^foo", "bar(\\d+)", etc.). Returns: - List[Node]: A list of matching Node objects. - - Examples: - path="SEA/clos_instance/spine" might match - "SEA/clos_instance/spine/myspine-1". - path="S" might match "S1", "S2" (partial match fallback). + A mapping from group label -> list of nodes that matched the pattern. """ - # 1) Exact or slash-based - result = [ - n - for n in self.nodes.values() - if n.name == path or n.name.startswith(f"{path}/") - ] - if result: - return result - - # 2) Dash-based - result = [n for n in self.nodes.values() if n.name.startswith(f"{path}-")] - if result: - return result - - # 3) Partial fallback - return [ - n for n in self.nodes.values() if n.name.startswith(path) and n.name != path - ] + pattern = re.compile(path) + groups_map: Dict[str, List[Node]] = {} + + for node in self.nodes.values(): + m = pattern.match(node.name) + if m: + captures = m.groups() + if captures: + label = "|".join(c for c in captures if c is not None) + else: + label = path + groups_map.setdefault(label, []).append(node) + + return groups_map def max_flow( self, source_path: str, sink_path: str, + mode: str = "combine", shortest_path: bool = False, flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, - ) -> float: + ) -> Dict[Tuple[str, str], float]: """ - Computes the maximum flow between selected source and sink nodes. - - Selects source nodes matching 'source_path' and sink nodes matching 'sink_path'. - Attaches a pseudo-node 'source' connecting to each source node with infinite - capacity edges, and similarly a pseudo-node 'sink' from each sink node. Then - calls calc_max_flow on the resulting graph. + Compute maximum flow between groups of source nodes and sink nodes. + Always returns a dictionary of flow values. The dict keys are + (source_label, sink_label), and the values are the flow amounts. Args: - source_path (str): Path/prefix to select source nodes. - sink_path (str): Path/prefix to select sink nodes. - shortest_path (bool): If True, uses only the shortest paths (default False). - flow_placement (FlowPlacement): Load balancing across parallel edges. + source_path: Regex pattern for selecting source nodes. + sink_path: Regex pattern for selecting sink nodes. + mode: "combine" or "pairwise". + - "combine": All matched sources become one combined source group, + all matched sinks become one combined sink group. Returns a dict + with a single entry {("", ""): flow_value}. + - "pairwise": Compute flow for each (source_group, sink_group) and + return a dict of flows for all pairs. + shortest_path: If True, flow is constrained to shortest paths. + flow_placement: Determines how parallel edges are handled. Returns: - float: The maximum flow found from source to sink. + A dictionary mapping (src_label, snk_label) -> flow. Raises: - ValueError: If no nodes match source_path or sink_path. + ValueError: If no matching source or sink groups are found, + or if mode is invalid. """ - # 1) Select source and sink nodes - sources = self.select_nodes_by_path(source_path) - sinks = self.select_nodes_by_path(sink_path) + src_groups = self.select_node_groups_by_path(source_path) + snk_groups = self.select_node_groups_by_path(sink_path) + + if not src_groups: + raise ValueError(f"No source nodes found matching '{source_path}'.") + if not snk_groups: + raise ValueError(f"No sink nodes found matching '{sink_path}'.") + + if mode == "combine": + combined_src_nodes: List[Node] = [] + combined_snk_nodes: List[Node] = [] + combined_src_label = "|".join(sorted(src_groups.keys())) + combined_snk_label = "|".join(sorted(snk_groups.keys())) + + for group_nodes in src_groups.values(): + combined_src_nodes.extend(group_nodes) + for group_nodes in snk_groups.values(): + combined_snk_nodes.extend(group_nodes) + + if not combined_src_nodes or not combined_snk_nodes: + return {(combined_src_label, combined_snk_label): 0.0} + + flow_val = self._compute_flow_single_group( + combined_src_nodes, combined_snk_nodes, shortest_path, flow_placement + ) + return {(combined_src_label, combined_snk_label): flow_val} + + elif mode == "pairwise": + results: Dict[Tuple[str, str], float] = {} + for src_label, src_nodes in src_groups.items(): + for snk_label, snk_nodes in snk_groups.items(): + if src_nodes and snk_nodes: + flow_val = self._compute_flow_single_group( + src_nodes, snk_nodes, shortest_path, flow_placement + ) + else: + flow_val = 0.0 + results[(src_label, snk_label)] = flow_val + return results + + else: + raise ValueError( + f"Invalid mode '{mode}' for max_flow. Must be 'combine' or 'pairwise'." + ) - if not sources: - raise ValueError(f"No source nodes found matching path '{source_path}'.") - if not sinks: - raise ValueError(f"No sink nodes found matching path '{sink_path}'.") + def _compute_flow_single_group( + self, + sources: List[Node], + sinks: List[Node], + shortest_path: bool, + flow_placement: FlowPlacement, + ) -> float: + """ + Attach a pseudo-source and pseudo-sink to the given node lists, + then run calc_max_flow. Returns the resulting flow from all + sources to all sinks as a single float. + Ignores disabled nodes. - # 2) Build the graph - graph = self.to_strict_multidigraph() + Args: + sources: List of source nodes. + sinks: List of sink nodes. + shortest_path: Whether to use shortest paths only. + flow_placement: How parallel edges are handled. - # 3) Add pseudo-nodes for multi-source / multi-sink flow + Returns: + The computed max-flow value, or 0.0 if either list is empty + or all are disabled. + """ + # Filter out disabled nodes at the source/sink stage + active_sources = [s for s in sources if not s.attrs.get("disabled", False)] + active_sinks = [s for s in sinks if not s.attrs.get("disabled", False)] + + if not active_sources or not active_sinks: + return 0.0 + + graph = self.to_strict_multidigraph() graph.add_node("source") graph.add_node("sink") - for src_node in sources: + for src_node in active_sources: graph.add_edge("source", src_node.name, capacity=float("inf"), cost=0) - for sink_node in sinks: + + for sink_node in active_sinks: graph.add_edge(sink_node.name, "sink", capacity=float("inf"), cost=0) - # 4) Calculate max flow return calc_max_flow( graph, "source", @@ -266,3 +337,117 @@ def max_flow( shortest_path=shortest_path, copy_graph=False, ) + + def disable_node(self, node_name: str) -> None: + """ + Mark a node as disabled. Raises ValueError if the node doesn't exist. + + Args: + node_name: Name of the node to disable. + """ + if node_name not in self.nodes: + raise ValueError(f"Node '{node_name}' does not exist.") + self.nodes[node_name].attrs["disabled"] = True + + def enable_node(self, node_name: str) -> None: + """ + Mark a node as enabled. Raises ValueError if the node doesn't exist. + + Args: + node_name: Name of the node to enable. + """ + if node_name not in self.nodes: + raise ValueError(f"Node '{node_name}' does not exist.") + self.nodes[node_name].attrs["disabled"] = False + + def disable_link(self, link_id: str) -> None: + """ + Mark a link as disabled. Raises ValueError if the link doesn't exist. + + Args: + link_id: ID of the link to disable. + """ + if link_id not in self.links: + raise ValueError(f"Link '{link_id}' does not exist.") + self.links[link_id].attrs["disabled"] = True + + def enable_link(self, link_id: str) -> None: + """ + Mark a link as enabled. Raises ValueError if the link doesn't exist. + + Args: + link_id: ID of the link to enable. + """ + if link_id not in self.links: + raise ValueError(f"Link '{link_id}' does not exist.") + self.links[link_id].attrs["disabled"] = False + + def enable_all(self) -> None: + """ + Mark all nodes and links as enabled. + """ + for node in self.nodes.values(): + node.attrs["disabled"] = False + for link in self.links.values(): + link.attrs["disabled"] = False + + def disable_all(self) -> None: + """ + Mark all nodes and links as disabled. + """ + for node in self.nodes.values(): + node.attrs["disabled"] = True + for link in self.links.values(): + link.attrs["disabled"] = True + + def get_links_between(self, source: str, target: str) -> List[str]: + """ + Return all link IDs that connect the specified source and target exactly. + + Args: + source: Name of the source node. + target: Name of the target node. + + Returns: + A list of link IDs for links where (link.source == source + and link.target == target). + """ + matches = [] + for link_id, link in self.links.items(): + if link.source == source and link.target == target: + matches.append(link_id) + return matches + + def find_links( + self, + source_regex: Optional[str] = None, + target_regex: Optional[str] = None, + ) -> List[Link]: + """ + Search for links based on optional regex patterns for source or target. + + Args: + source_regex: Regex pattern to match the link's source node. + target_regex: Regex pattern to match the link's target node. + + Returns: + A list of Link objects matching the criteria. If both patterns + are None, returns all links. + """ + if source_regex: + src_pat = re.compile(source_regex) + else: + src_pat = None + if target_regex: + tgt_pat = re.compile(target_regex) + else: + tgt_pat = None + + results = [] + for link in self.links.values(): + if src_pat and not src_pat.search(link.source): + continue + if tgt_pat and not tgt_pat.search(link.target): + continue + results.append(link) + return results diff --git a/ngraph/workflow/capacity_probe.py b/ngraph/workflow/capacity_probe.py index c1ab6dd..e41af49 100644 --- a/ngraph/workflow/capacity_probe.py +++ b/ngraph/workflow/capacity_probe.py @@ -1,8 +1,7 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import TYPE_CHECKING - +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Dict, Tuple from ngraph.workflow.base import WorkflowStep, register_workflow_step from ngraph.lib.algorithms.base import FlowPlacement @@ -14,35 +13,79 @@ @dataclass class CapacityProbe(WorkflowStep): """ - A workflow step that probes capacity between selected nodes. + A workflow step that probes capacity (max flow) between selected groups of nodes. Attributes: - source_path (str): Path/prefix to select source nodes. - sink_path (str): Path/prefix to select sink nodes. - shortest_path (bool): If True, uses only the shortest paths (default False). - flow_placement (FlowPlacement): Load balancing across parallel edges. + source_path (str): A regex pattern to select source node groups. + sink_path (str): A regex pattern to select sink node groups. + mode (str): "combine" or "pairwise" (defaults to "combine"). + - "combine": All matched sources form one super-source; all matched sinks form one super-sink. + - "pairwise": Compute flow for each (source_group, sink_group). + probe_reverse (bool): If True, also compute flow in the reverse direction (sink→source). + shortest_path (bool): If True, only use shortest paths when computing flow. + flow_placement (FlowPlacement): Handling strategy for parallel edges (default PROPORTIONAL). """ source_path: str = "" sink_path: str = "" + mode: str = "combine" + probe_reverse: bool = False shortest_path: bool = False flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL def run(self, scenario: Scenario) -> None: """ - Executes the capacity probe by computing the max flow between - nodes selected by source_path and sink_path, then storing the - result in the scenario's results container. + Executes the capacity probe by computing max flow between node groups + matched by source_path and sink_path. Results are stored in scenario.results. + + Depending on 'mode', the returned flow is either a single combined dict entry + or multiple pairwise entries. If 'probe_reverse' is True, flow is computed + in both directions (forward and reverse). Args: scenario (Scenario): The scenario object containing the network and results. """ - flow = scenario.network.max_flow( - self.source_path, - self.sink_path, + # 1) Forward direction (source_path -> sink_path) + fwd_flow_dict = scenario.network.max_flow( + source_path=self.source_path, + sink_path=self.sink_path, + mode=self.mode, shortest_path=self.shortest_path, flow_placement=self.flow_placement, ) + self._store_flow_dict( + scenario=scenario, + flow_dict=fwd_flow_dict, + ) + + # 2) Reverse direction (if enabled) + if self.probe_reverse: + rev_flow_dict = scenario.network.max_flow( + source_path=self.sink_path, + sink_path=self.source_path, + mode=self.mode, + shortest_path=self.shortest_path, + flow_placement=self.flow_placement, + ) + self._store_flow_dict( + scenario=scenario, + flow_dict=rev_flow_dict, + ) - result_label = f"max_flow:[{self.source_path} -> {self.sink_path}]" - scenario.results.put(self.name, result_label, flow) + def _store_flow_dict( + self, + scenario: Scenario, + flow_dict: Dict[Tuple[str, str], float], + ) -> None: + """ + Stores the flow dictionary in the scenario's results container, labeling + each entry consistently. For each (src_label, snk_label) in the flow_dict, + we store: "max_flow:[src_label -> snk_label]". + + Args: + scenario (Scenario): The scenario that holds the results. + flow_dict (Dict[Tuple[str, str], float]): Mapping of (src_label, snk_label) to flow. + """ + for (src_label, snk_label), flow_value in flow_dict.items(): + result_label = f"max_flow:[{src_label} -> {snk_label}]" + scenario.results.put(self.name, result_label, flow_value) diff --git a/tests/scenarios/scenario_3.yaml b/tests/scenarios/scenario_3.yaml index 1b45dee..adbeb33 100644 --- a/tests/scenarios/scenario_3.yaml +++ b/tests/scenarios/scenario_3.yaml @@ -39,6 +39,7 @@ blueprints: link_params: capacity: 2 cost: 1 + network: name: "3tier_clos_network" version: 1.0 @@ -58,12 +59,14 @@ network: capacity: 2 cost: 1 - workflow: - step_type: BuildGraph name: build_graph + - step_type: CapacityProbe name: capacity_probe - source_path: my_clos1/b - sink_path: my_clos2/b + source_path: my_clos1/b.*/t1 + sink_path: my_clos2/b.*/t1 + mode: combine + probe_reverse: True shortest_path: True diff --git a/tests/scenarios/test_scenario_3.py b/tests/scenarios/test_scenario_3.py index 097f53d..762c989 100644 --- a/tests/scenarios/test_scenario_3.py +++ b/tests/scenarios/test_scenario_3.py @@ -8,33 +8,33 @@ def test_scenario_3_build_graph_and_capacity_probe() -> None: """ - Integration test that verifies we can parse scenario_3.yaml, run the workflow + Integration test verifying we can parse scenario_3.yaml, run the workflow (BuildGraph + CapacityProbe), and check results. Checks: - - The expected number of expanded nodes and links (two interconnected 3-tier CLOS fabrics). - - Presence of key expanded nodes. - - The traffic demands are empty in this scenario. - - The failure policy is empty by default. - - The max flow from my_clos1/b -> my_clos2/b matches the expected capacity. + 1) The correct number of expanded nodes and links (two interconnected 3-tier CLOS fabrics). + 2) Presence of certain expanded node names. + 3) No traffic demands in this scenario. + 4) An empty failure policy by default. + 5) The max flow from my_clos1/b -> my_clos2/b (and reverse) is as expected. """ # 1) Load the YAML file scenario_path = Path(__file__).parent / "scenario_3.yaml" yaml_text = scenario_path.read_text() - # 2) Parse into a Scenario object (this calls blueprint expansion) + # 2) Parse into a Scenario object (this also expands blueprints) scenario = Scenario.from_yaml(yaml_text) # 3) Run the scenario's workflow (BuildGraph then CapacityProbe) scenario.run() - # 4) Retrieve the graph built by BuildGraph + # 4) Retrieve the graph from the BuildGraph step graph = scenario.results.get("build_graph", "graph") assert isinstance( graph, StrictMultiDiGraph ), "Expected a StrictMultiDiGraph in scenario.results under key ('build_graph', 'graph')." - # 5) Verify total node count + # 5) Verify total node count: # Each 3-tier CLOS instance has 32 nodes -> 2 instances => 64 total. expected_nodes = 64 actual_nodes = len(graph.nodes) @@ -42,41 +42,50 @@ def test_scenario_3_build_graph_and_capacity_probe() -> None: actual_nodes == expected_nodes ), f"Expected {expected_nodes} nodes, found {actual_nodes}" - # 6) Verify total physical links before direction is applied to Nx - # Each 3-tier CLOS has 64 links internally -> 2 instances => 128 + # 6) Verify total physical links (before direction): + # Each 3-tier CLOS has 64 links internally => 2 instances => 128 # Plus 16 links connecting my_clos1/spine -> my_clos2/spine => 144 total physical links - # Each link => 2 directed edges => 288 total edges in MultiDiGraph + # Each link => 2 directed edges in the digraph => 288 edges in the final MultiDiGraph expected_links = 144 - expected_nx_edges = expected_links * 2 + expected_directed_edges = expected_links * 2 actual_edges = len(graph.edges) assert ( - actual_edges == expected_nx_edges - ), f"Expected {expected_nx_edges} directed edges, found {actual_edges}" + actual_edges == expected_directed_edges + ), f"Expected {expected_directed_edges} edges, found {actual_edges}" - # 7) Verify that there are no traffic demands in this scenario + # 7) Verify no traffic demands in this scenario assert len(scenario.traffic_demands) == 0, "Expected zero traffic demands." # 8) Verify the default (empty) failure policy policy: FailurePolicy = scenario.failure_policy assert len(policy.rules) == 0, "Expected an empty failure policy." - # 9) Check presence of a few key expanded nodes + # 9) Check presence of some expanded nodes assert ( "my_clos1/b1/t1/t1-1" in scenario.network.nodes ), "Missing expected node 'my_clos1/b1/t1/t1-1' in expanded blueprint." assert ( "my_clos2/spine/t3-16" in scenario.network.nodes - ), "Missing expected spine node 'my_clos2/spine/t3-16' in expanded blueprint." + ), "Missing expected node 'my_clos2/spine/t3-16' in expanded blueprint." - # 10) Retrieve max flow result from the CapacityProbe step - # The probe is configured with source_path="my_clos1/b" and sink_path="my_clos2/b". - flow_result_label = "max_flow:[my_clos1/b -> my_clos2/b]" - flow_value = scenario.results.get("capacity_probe", flow_result_label) + # 10) The capacity probe step computed forward and reverse flows in 'combine' mode + flow_result_label_fwd = "max_flow:[my_clos1/b.*/t1 -> my_clos2/b.*/t1]" + flow_result_label_rev = "max_flow:[my_clos2/b.*/t1 -> my_clos1/b.*/t1]" - # 11) Assert the expected max flow value - # The bottleneck is the 16 spine-to-spine links of capacity=2 => total 32. + # Retrieve the forward flow + forward_flow = scenario.results.get("capacity_probe", flow_result_label_fwd) + # Retrieve the reverse flow + reverse_flow = scenario.results.get("capacity_probe", flow_result_label_rev) + + # 11) Assert the expected flows + # The main bottleneck is the 16 spine-to-spine links of capacity=2 => total 32 + # (same in both forward and reverse). expected_flow = 32.0 - assert flow_value == expected_flow, ( - f"Expected max flow of {expected_flow}, got {flow_value}. " + assert forward_flow == expected_flow, ( + f"Expected forward max flow of {expected_flow}, got {forward_flow}. " + "Check blueprint or link capacities if this fails." + ) + assert reverse_flow == expected_flow, ( + f"Expected reverse max flow of {expected_flow}, got {reverse_flow}. " "Check blueprint or link capacities if this fails." ) diff --git a/tests/test_network.py b/tests/test_network.py index 3a78434..1cd02f7 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -159,60 +159,62 @@ def test_add_duplicate_node_raises_valueerror(): network.add_node(node2) -def test_select_nodes_by_path(): +def test_select_node_groups_by_path(): """ - Tests select_nodes_by_path for exact matches, slash-based prefix matches, - and fallback prefix pattern. + Tests select_node_groups_by_path for exact matches, slash-based prefix matches, + and * prefix pattern, plus capturing groups. """ net = Network() # Add some nodes net.add_node(Node("SEA/spine/myspine-1")) - net.add_node(Node("SEA/leaf/leaf-1")) + net.add_node(Node("SEA/spine/myspine-2")) + net.add_node(Node("SEA/leaf1/leaf-1")) + net.add_node(Node("SEA/leaf1/leaf-2")) + net.add_node(Node("SEA/leaf2/leaf-1")) + net.add_node(Node("SEA/leaf2/leaf-2")) net.add_node(Node("SEA-other")) net.add_node(Node("SFO")) # 1) Exact match => "SFO" - nodes = net.select_nodes_by_path("SFO") - assert len(nodes) == 1 + node_groups = net.select_node_groups_by_path("SFO") + assert len(node_groups) == 1 # Only 1 group + nodes = node_groups["SFO"] + assert len(nodes) == 1 # Only 1 node assert nodes[0].name == "SFO" - # 2) Slash prefix => "SEA/spine" matches "SEA/spine/myspine-1" - nodes = net.select_nodes_by_path("SEA/spine") - assert len(nodes) == 1 - assert nodes[0].name == "SEA/spine/myspine-1" - - # 3) Fallback: "SEA-other" won't be found by slash prefix "SEA/other", - # but if we search "SEA-other", we do an exact match, so we get 1 node - nodes = net.select_nodes_by_path("SEA-other") - assert len(nodes) == 1 - assert nodes[0].name == "SEA-other" - - # 4) If we search just "SEA", we match "SEA/spine/myspine-1" and "SEA/leaf/leaf-1" - # by slash prefix, so fallback never triggers, and "SEA-other" is not included. - nodes = net.select_nodes_by_path("SEA") - found = set(n.name for n in nodes) + # 2) Startwith match => "SEA/spine" + node_groups = net.select_node_groups_by_path("SEA/spine") + assert len(node_groups) == 1 # Only 1 group + nodes = node_groups["SEA/spine"] + assert len(nodes) == 2 # 2 nodes + found = {n.name for n in nodes} + assert found == {"SEA/spine/myspine-1", "SEA/spine/myspine-2"} + + # 3) * match => "SEA/leaf*" + node_groups = net.select_node_groups_by_path("SEA/leaf*") + assert len(node_groups) == 1 # Only 1 group + nodes = node_groups["SEA/leaf*"] + assert len(nodes) == 4 # 4 nodes + found = {n.name for n in nodes} assert found == { - "SEA/spine/myspine-1", - "SEA/leaf/leaf-1", + "SEA/leaf1/leaf-1", + "SEA/leaf1/leaf-2", + "SEA/leaf2/leaf-1", + "SEA/leaf2/leaf-2", } + # 4) match with capture => "(SEA/leaf\\d)" + node_groups = net.select_node_groups_by_path("(SEA/leaf\\d)") + assert len(node_groups) == 2 # 2 distinct captures + nodes = node_groups["SEA/leaf1"] + assert len(nodes) == 2 # 2 nodes + found = {n.name for n in nodes} + assert found == {"SEA/leaf1/leaf-1", "SEA/leaf1/leaf-2"} -def test_select_nodes_by_path_partial_fallback(): - """ - Tests the partial prefix logic if both exact/slash-based and dash-based - lookups fail, then partial prefix 'path...' is used. - """ - net = Network() - net.add_node(Node("S1")) - net.add_node(Node("S2")) - net.add_node(Node("SEA-spine")) - net.add_node(Node("NOTMATCH")) - - # The path "S" won't match "S" exactly, won't match "S/" or "S-", so it should - # return partial matches: "S1", "S2", and "SEA-spine". - nodes = net.select_nodes_by_path("S") - found = sorted([n.name for n in nodes]) - assert found == ["S1", "S2", "SEA-spine"] + nodes = node_groups["SEA/leaf2"] + assert len(nodes) == 2 + found = {n.name for n in nodes} + assert found == {"SEA/leaf2/leaf-1", "SEA/leaf2/leaf-2"} def test_to_strict_multidigraph_add_reverse_true(): @@ -265,8 +267,8 @@ def test_to_strict_multidigraph_add_reverse_false(): # Only one forward edge should exist edges = list(graph.edges(keys=True)) assert len(edges) == 1 - assert edges[0][0] == "A" # source - assert edges[0][1] == "B" # target + assert edges[0][0] == "A" + assert edges[0][1] == "B" assert edges[0][2] == link_ab.id @@ -285,14 +287,14 @@ def test_max_flow_simple(): # Max flow from A to C is limited by the smallest capacity (3) flow_value = net.max_flow("A", "C") - assert flow_value == 3.0 + assert flow_value == {("A", "C"): 3.0} def test_max_flow_multi_parallel(): """ Tests a scenario where two parallel paths can carry flow. - A -> B -> C and A -> D -> C, each with capacity 5. - The total flow A to C should be 10. + A -> B -> C and A -> D -> C, each with capacity=5. + The total flow A->C should be 10. """ net = Network() net.add_node(Node("A")) @@ -306,7 +308,7 @@ def test_max_flow_multi_parallel(): net.add_link(Link("D", "C", capacity=5)) flow_value = net.max_flow("A", "C") - assert flow_value == 10.0 + assert flow_value == {("A", "C"): 10.0} def test_max_flow_no_source(): @@ -319,7 +321,7 @@ def test_max_flow_no_source(): net.add_node(Node("C")) net.add_link(Link("B", "C", capacity=10)) - with pytest.raises(ValueError, match="No source nodes found matching path 'A'"): + with pytest.raises(ValueError, match="No source nodes found matching 'A'"): net.max_flow("A", "C") @@ -332,5 +334,264 @@ def test_max_flow_no_sink(): net.add_node(Node("B")) net.add_link(Link("A", "B", capacity=10)) - with pytest.raises(ValueError, match="No sink nodes found matching path 'C'"): + with pytest.raises(ValueError, match="No sink nodes found matching 'C'"): net.max_flow("A", "C") + + +def test_max_flow_combine_empty(): + """ + Demonstrate that if the dictionary for sinks is not empty, but + all matched sink nodes are disabled, the final combined_snk_nodes is empty + => returns 0.0 (rather than raising ValueError). + + We add: + - Node("A") enabled => matches ^(A|Z)$ (source) + - Node("Z") disabled => also matches ^(A|Z)$ + So the final combined source label is "A|Z". + + - Node("C") disabled => matches ^(C|Y)$ + - Node("Y") disabled => matches ^(C|Y)$ + So the final combined sink label is "C|Y". + + Even though the source group is partially enabled (A), + the sink group ends up fully disabled (C, Y). + That yields 0.0 flow and the final label is ("A|Z", "C|Y"). + """ + net = Network() + net.add_node(Node("A")) # enabled + net.add_node( + Node("Z", attrs={"disabled": True}) + ) # disabled => but still recognized by regex + + net.add_node(Node("C", attrs={"disabled": True})) + net.add_node(Node("Y", attrs={"disabled": True})) + + flow_vals = net.max_flow("^(A|Z)$", "^(C|Y)$", mode="combine") + # Because "C" and "Y" are *all* disabled, the combined sink side is empty => 0.0 + # However, the label becomes "A|Z" for sources (both matched by pattern), + # and "C|Y" for sinks. That matches what the code returns. + assert flow_vals == {("A|Z", "C|Y"): 0.0} + + +def test_max_flow_pairwise_some_empty(): + """ + In 'pairwise' mode, we want distinct groups to appear in the result, + even if one group is fully disabled. Here: + - ^(A|B)$ => "A" => Node("A", enabled), "B" => Node("B", enabled) + - ^(C|Z)$ => "C" => Node("C", enabled), "Z" => Node("Z", disabled) + This yields pairs: (A,C), (A,Z), (B,C), (B,Z). + The 'Z' sub-group is fully disabled => flow=0.0 from either A or B to Z. + """ + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + net.add_node(Node("C")) + net.add_node(Node("Z", attrs={"disabled": True})) # needed for the label "C|Z" + + # A->B->C + net.add_link(Link("A", "B", capacity=5)) + net.add_link(Link("B", "C", capacity=3)) + + flow_vals = net.max_flow("^(A|B)$", "^(C|Z)$", mode="pairwise") + assert flow_vals == { + ("A", "C"): 3.0, # active path + ("A", "Z"): 0.0, # sink is disabled + ("B", "C"): 3.0, # active path + ("B", "Z"): 0.0, # sink is disabled + } + + +def test_max_flow_invalid_mode(): + """ + Passing an invalid mode should raise ValueError. This covers the + else-branch in max_flow that was previously untested. + """ + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + with pytest.raises(ValueError, match="Invalid mode 'foobar'"): + net.max_flow("A", "B", mode="foobar") + + +def test_compute_flow_single_group_empty_source_or_sink(): + """ + Directly tests _compute_flow_single_group returning 0.0 if sources or sinks is empty. + """ + net = Network() + # Minimal setup + net.add_node(Node("A")) + net.add_node(Node("B")) + net.add_link(Link("A", "B", capacity=5)) + + flow_val_empty_sources = net._compute_flow_single_group( + [], [Node("B")], False, None + ) + assert flow_val_empty_sources == 0.0 + + flow_val_empty_sinks = net._compute_flow_single_group([Node("A")], [], False, None) + assert flow_val_empty_sinks == 0.0 + + +def test_disable_enable_node(): + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + net.add_link(Link("A", "B")) + + # Initially, nothing is disabled + assert not net.nodes["A"].attrs["disabled"] + assert not net.nodes["B"].attrs["disabled"] + + net.disable_node("A") + assert net.nodes["A"].attrs["disabled"] + assert not net.nodes["B"].attrs["disabled"] + + # Re-enable + net.enable_node("A") + assert not net.nodes["A"].attrs["disabled"] + + +def test_disable_node_does_not_exist(): + net = Network() + with pytest.raises(ValueError, match="Node 'A' does not exist."): + net.disable_node("A") + + with pytest.raises(ValueError, match="Node 'B' does not exist."): + net.enable_node("B") + + +def test_disable_enable_link(): + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + link = Link("A", "B") + net.add_link(link) + + assert not net.links[link.id].attrs["disabled"] + + net.disable_link(link.id) + assert net.links[link.id].attrs["disabled"] + + net.enable_link(link.id) + assert not net.links[link.id].attrs["disabled"] + + +def test_disable_link_does_not_exist(): + net = Network() + with pytest.raises(ValueError, match="Link 'xyz' does not exist."): + net.disable_link("xyz") + with pytest.raises(ValueError, match="Link 'xyz' does not exist."): + net.enable_link("xyz") + + +def test_enable_all_disable_all(): + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + link = Link("A", "B") + net.add_link(link) + + # Everything enabled by default + assert not net.nodes["A"].attrs["disabled"] + assert not net.nodes["B"].attrs["disabled"] + assert not net.links[link.id].attrs["disabled"] + + # Disable all + net.disable_all() + assert net.nodes["A"].attrs["disabled"] + assert net.nodes["B"].attrs["disabled"] + assert net.links[link.id].attrs["disabled"] + + # Enable all + net.enable_all() + assert not net.nodes["A"].attrs["disabled"] + assert not net.nodes["B"].attrs["disabled"] + assert not net.links[link.id].attrs["disabled"] + + +def test_to_strict_multidigraph_excludes_disabled(): + """ + Disabled nodes or links should not appear in the final StrictMultiDiGraph. + """ + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + link_ab = Link("A", "B") + net.add_link(link_ab) + + # Disable node A + net.disable_node("A") + graph = net.to_strict_multidigraph() + # Node A and link A->B should not appear + assert "A" not in graph.nodes + # B is still there + assert "B" in graph.nodes + # No edges in the graph because A is disabled + assert len(graph.edges()) == 0 + + # Enable node A, disable link + net.enable_all() + net.disable_link(link_ab.id) + graph = net.to_strict_multidigraph() + # Nodes A and B appear now, but no edges because the link is disabled + assert "A" in graph.nodes + assert "B" in graph.nodes + assert len(graph.edges()) == 0 + + +def test_get_links_between(): + net = Network() + net.add_node(Node("A")) + net.add_node(Node("B")) + net.add_node(Node("C")) + + link_ab1 = Link("A", "B") + link_ab2 = Link("A", "B") + link_bc = Link("B", "C") + net.add_link(link_ab1) + net.add_link(link_ab2) + net.add_link(link_bc) + + # Two links from A->B + ab_links = net.get_links_between("A", "B") + assert len(ab_links) == 2 + assert set(ab_links) == {link_ab1.id, link_ab2.id} + + # One link from B->C + bc_links = net.get_links_between("B", "C") + assert len(bc_links) == 1 + assert bc_links[0] == link_bc.id + + # None from B->A + ba_links = net.get_links_between("B", "A") + assert ba_links == [] + + +def test_find_links(): + net = Network() + net.add_node(Node("srcA")) + net.add_node(Node("srcB")) + net.add_node(Node("C")) + link_a_c = Link("srcA", "C") + link_b_c = Link("srcB", "C") + net.add_link(link_a_c) + net.add_link(link_b_c) + + # No filter => returns all + all_links = net.find_links() + assert len(all_links) == 2 + assert set(l.id for l in all_links) == {link_a_c.id, link_b_c.id} + + # Filter by source pattern "srcA" + a_links = net.find_links(source_regex="^srcA$") + assert len(a_links) == 1 + assert a_links[0].id == link_a_c.id + + # Filter by target pattern "C" + c_links = net.find_links(target_regex="^C$") + assert len(c_links) == 2 + + # Combined filter that picks only link from "srcB" -> "C" + b_links = net.find_links(source_regex="srcB", target_regex="^C$") + assert len(b_links) == 1 + assert b_links[0].id == link_b_c.id diff --git a/tests/workflow/test_capacity_probe.py b/tests/workflow/test_capacity_probe.py index 44dd838..663eafd 100644 --- a/tests/workflow/test_capacity_probe.py +++ b/tests/workflow/test_capacity_probe.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call from ngraph.network import Network, Node, Link from ngraph.workflow.capacity_probe import CapacityProbe @@ -21,14 +21,13 @@ def mock_scenario(): def test_capacity_probe_simple_flow(mock_scenario): """ Tests a simple A->B network to confirm CapacityProbe calculates the correct flow - and stores it in scenario.results with the expected label. + and stores it in scenario.results with the expected label (no mode specified => "combine"). """ # Create a 2-node network (A->B capacity=5) mock_scenario.network.add_node(Node("A")) mock_scenario.network.add_node(Node("B")) mock_scenario.network.add_link(Link("A", "B", capacity=5)) - # Instantiate the step step = CapacityProbe( name="MyCapacityProbe", source_path="A", @@ -42,11 +41,9 @@ def test_capacity_probe_simple_flow(mock_scenario): # The flow from A to B should be 5 expected_flow = 5.0 - # Validate scenario.results.put call - # The step uses the label: "max_flow:[A -> B]" mock_scenario.results.put.assert_called_once() call_args = mock_scenario.results.put.call_args[0] - # call_args format => (step_name, result_label, flow_value) + # call_args => (step_name, result_label, flow_value) assert call_args[0] == "MyCapacityProbe" assert call_args[1] == "max_flow:[A -> B]" assert call_args[2] == expected_flow @@ -63,7 +60,7 @@ def test_capacity_probe_no_source(mock_scenario): step = CapacityProbe(name="MyCapacityProbe", source_path="A", sink_path="B") - with pytest.raises(ValueError, match="No source nodes found matching path 'A'"): + with pytest.raises(ValueError, match="No source nodes found matching 'A'"): step.run(mock_scenario) @@ -73,13 +70,11 @@ def test_capacity_probe_no_sink(mock_scenario): """ # The network only has node "A"; no node matches "B". mock_scenario.network.add_node(Node("A")) - mock_scenario.network.add_link( - Link("A", "A", capacity=10) - ) # silly link for completeness + mock_scenario.network.add_link(Link("A", "A", capacity=10)) step = CapacityProbe(name="MyCapacityProbe", source_path="A", sink_path="B") - with pytest.raises(ValueError, match="No sink nodes found matching path 'B'"): + with pytest.raises(ValueError, match="No sink nodes found matching 'B'"): step.run(mock_scenario) @@ -113,3 +108,139 @@ def test_capacity_probe_parallel_paths(mock_scenario): assert call_args[0] == "MyCapacityProbe" assert call_args[1] == "max_flow:[A -> C]" assert call_args[2] == 10.0 + + +def test_capacity_probe_mode_combine_multiple_groups(mock_scenario): + """ + Tests 'combine' mode with multiple source groups and sink groups. + The flow is combined into a single entry. We confirm there's only + one result label and it has the expected flow. + """ + # Network: + # S1 -> M -> T1 + # S2 -> M -> T2 + # Capacity = 2 on each link + mock_scenario.network.add_node(Node("S1")) + mock_scenario.network.add_node(Node("S2")) + mock_scenario.network.add_node(Node("M")) + mock_scenario.network.add_node(Node("T1")) + mock_scenario.network.add_node(Node("T2")) + + for src in ("S1", "S2"): + mock_scenario.network.add_link(Link(src, "M", capacity=2)) + mock_scenario.network.add_link(Link("M", "T1", capacity=2)) + mock_scenario.network.add_link(Link("M", "T2", capacity=2)) + + step = CapacityProbe( + name="MyCapacityProbeCombine", + source_path="^S", # matches S1, S2 + sink_path="^T", # matches T1, T2 + mode="combine", + shortest_path=False, + ) + + step.run(mock_scenario) + mock_scenario.results.put.assert_called_once() + + call_args = mock_scenario.results.put.call_args[0] + # We only expect one "combine" result + assert call_args[0] == "MyCapacityProbeCombine" + # The label might look like "max_flow:[S1 -> T1]" or "max_flow:[S|S2 -> T|T2]", + # but we only check the final flow value. + # The combined capacity is effectively 2 + 2 => 4 from S1 & S2 into M, + # and 2 + 2 => 4 from M to T1 & T2. So total flow is 4. + assert call_args[2] == 4.0 + + +def test_capacity_probe_mode_pairwise_multiple_groups(mock_scenario): + """ + Tests 'pairwise' mode with multiple source groups and sink groups. + We confirm multiple result entries are stored, one per (src_label, snk_label). + To ensure distinct group labels, we use capturing groups in the regex + (e.g. ^S(\d+)$), so S1 => group '1', S2 => group '2', etc. + """ + # Network: + # S1 -> M -> T1 + # S2 -> M -> T2 + # Each link capacity=2 + mock_scenario.network.add_node(Node("S1")) + mock_scenario.network.add_node(Node("S2")) + mock_scenario.network.add_node(Node("M")) + mock_scenario.network.add_node(Node("T1")) + mock_scenario.network.add_node(Node("T2")) + + mock_scenario.network.add_link(Link("S1", "M", capacity=2)) + mock_scenario.network.add_link(Link("M", "T1", capacity=2)) + mock_scenario.network.add_link(Link("S2", "M", capacity=2)) + mock_scenario.network.add_link(Link("M", "T2", capacity=2)) + + step = CapacityProbe( + name="MyCapacityProbePairwise", + # Use capturing groups so S1 => group "1", S2 => group "2", T1 => group "1", T2 => group "2" + source_path=r"^S(\d+)$", + sink_path=r"^T(\d+)$", + mode="pairwise", + shortest_path=False, + ) + + step.run(mock_scenario) + # Expect 2 x 2 = 4 result entries, since pairwise => (S1->T1, S1->T2, S2->T1, S2->T2) + + assert mock_scenario.results.put.call_count == 4 + + # Gather calls + calls = mock_scenario.results.put.call_args_list + flows = {} + for c in calls: + step_name, label, flow_val = c[0] + assert step_name == "MyCapacityProbePairwise" + flows[label] = flow_val + + expected_labels = { + "max_flow:[1 -> 1]", + "max_flow:[1 -> 2]", + "max_flow:[2 -> 1]", + "max_flow:[2 -> 2]", + } + assert set(flows.keys()) == expected_labels + + # Each path (S1->M->T1, etc.) has capacity 2, so we expect flows = 2.0 + for label in expected_labels: + assert flows[label] == 2.0 + + +def test_capacity_probe_probe_reverse(mock_scenario): + """ + Tests that probe_reverse=True computes flow in both directions. We expect + two sets of results: forward and reverse. + """ + # Simple A->B link with capacity=3 + mock_scenario.network.add_node(Node("A")) + mock_scenario.network.add_node(Node("B")) + mock_scenario.network.add_link(Link("A", "B", capacity=3)) + + step = CapacityProbe( + name="MyCapacityProbeReversed", + source_path="A", + sink_path="B", + probe_reverse=True, + mode="combine", + ) + + step.run(mock_scenario) + + # Expect 2 calls: forward flow (A->B) and reverse flow (B->A). + assert mock_scenario.results.put.call_count == 2 + calls = mock_scenario.results.put.call_args_list + + flows = {} + for c in calls: + step_name, label, flow_val = c[0] + assert step_name == "MyCapacityProbeReversed" + flows[label] = flow_val + + # We expect "max_flow:[A -> B]" = 3, and "max_flow:[B -> A]" = 3 + assert "max_flow:[A -> B]" in flows + assert "max_flow:[B -> A]" in flows + assert flows["max_flow:[A -> B]"] == 3.0 + assert flows["max_flow:[B -> A]"] == 3.0