diff --git a/docs/examples/basic.md b/docs/examples/basic.md index 7a5ea61..6d6475b 100644 --- a/docs/examples/basic.md +++ b/docs/examples/basic.md @@ -119,6 +119,34 @@ print(f"Equal-balanced flow: {max_flow_shortest_balanced}") Note that `EQUAL_BALANCED` flow placement is only applicable when calculating MaxFlow on shortest paths. +### Advanced Analysis: Sensitivity Analysis + +For deeper network analysis, you can use the low-level graph algorithms to perform sensitivity analysis and identify bottleneck edges: + +```python +from ngraph.lib.algorithms.max_flow import calc_max_flow, saturated_edges, run_sensitivity + +# Get the underlying graph for low-level analysis +graph = network.to_strict_multidigraph() + +# Identify bottleneck (saturated) edges +bottlenecks = saturated_edges(graph, "A", "C") +print(f"Bottleneck edges: {bottlenecks}") + +# Perform sensitivity analysis - test increasing capacity by 1 unit +sensitivity_increase = run_sensitivity(graph, "A", "C", change_amount=1.0) +print(f"Sensitivity to capacity increases: {sensitivity_increase}") + +# Test sensitivity to capacity decreases +sensitivity_decrease = run_sensitivity(graph, "A", "C", change_amount=-1.0) +print(f"Sensitivity to capacity decreases: {sensitivity_decrease}") +``` + +This analysis helps identify: +- **Bottleneck edges**: Links that are fully utilized and limit overall flow +- **High-impact upgrades**: Which capacity increases provide the most benefit +- **Vulnerability assessment**: How flow decreases when links are degraded + ## Next Steps - **[Tutorial](../getting-started/tutorial.md)** - Build complete network scenarios diff --git a/docs/reference/api-full.md b/docs/reference/api-full.md index 7fcc56d..b00dfe9 100644 --- a/docs/reference/api-full.md +++ b/docs/reference/api-full.md @@ -10,7 +10,7 @@ For a curated, example-driven API guide, see **[api.md](api.md)**. > - **[CLI Reference](cli.md)** - Command-line interface > - **[DSL Reference](dsl.md)** - YAML syntax guide -**Generated from source code on:** June 08, 2025 at 03:05 UTC +**Generated from source code on:** June 09, 2025 at 00:40 UTC --- @@ -950,7 +950,7 @@ Raises: ## ngraph.lib.algorithms.max_flow -### calc_max_flow(graph: ngraph.lib.graph.StrictMultiDiGraph, src_node: Hashable, dst_node: Hashable, flow_placement: ngraph.lib.algorithms.base.FlowPlacement = , shortest_path: bool = False, reset_flow_graph: bool = False, capacity_attr: str = 'capacity', flow_attr: str = 'flow', flows_attr: str = 'flows', copy_graph: bool = True) -> float +### calc_max_flow(graph: ngraph.lib.graph.StrictMultiDiGraph, src_node: Hashable, dst_node: Hashable, *, return_summary: bool = False, return_graph: bool = False, flow_placement: ngraph.lib.algorithms.base.FlowPlacement = , shortest_path: bool = False, reset_flow_graph: bool = False, capacity_attr: str = 'capacity', flow_attr: str = 'flow', flows_attr: str = 'flows', copy_graph: bool = True) -> Union[float, tuple] Compute the maximum flow between two nodes in a directed multi-graph, using an iterative shortest-path augmentation approach. @@ -972,6 +972,12 @@ Args: The source node for flow. dst_node (NodeID): The destination node for flow. + return_summary (bool): + If True, return a FlowSummary with detailed flow analytics. + Defaults to False. + return_graph (bool): + If True, return the mutated flow graph along with other results. + Defaults to False. flow_placement (FlowPlacement): Determines how flow is split among parallel edges of equal cost. Defaults to ``FlowPlacement.PROPORTIONAL``. @@ -992,24 +998,74 @@ Args: Defaults to True. Returns: - float: - The total flow placed between ``src_node`` and ``dst_node``. If ``shortest_path=True``, - this is just the flow from a single augmentation. + Union[float, tuple]: + - If neither return_summary nor return_graph: float (total flow) + - If return_summary only: tuple[float, FlowSummary] + - If both flags: tuple[float, FlowSummary, StrictMultiDiGraph] Notes: - For large graphs or performance-critical scenarios, consider specialized max-flow algorithms (e.g., Dinic, Edmond-Karp) for better scaling. + - When using return_summary or return_graph, callers must unpack the returned tuple. Examples: >>> g = StrictMultiDiGraph() >>> g.add_node('A') >>> g.add_node('B') >>> g.add_node('C') - >>> _ = g.add_edge('A', 'B', capacity=10.0, flow=0.0, flows={}) - >>> _ = g.add_edge('B', 'C', capacity=5.0, flow=0.0, flows={}) + >>> _ = g.add_edge('A', 'B', capacity=10.0, flow=0.0, flows={}, cost=1) + >>> _ = g.add_edge('B', 'C', capacity=5.0, flow=0.0, flows={}, cost=1) + >>> + >>> # Basic usage (scalar return) >>> max_flow_value = calc_max_flow(g, 'A', 'C') >>> print(max_flow_value) 5.0 + >>> + >>> # With flow summary analytics + >>> flow, summary = calc_max_flow(g, 'A', 'C', return_summary=True) + >>> print(f"Min-cut edges: {summary.min_cut}") + >>> + >>> # With both summary and mutated graph + >>> flow, summary, flow_graph = calc_max_flow( + ... g, 'A', 'C', return_summary=True, return_graph=True + ... ) + >>> # flow_graph contains the flow assignments + +### run_sensitivity(graph: ngraph.lib.graph.StrictMultiDiGraph, src_node: Hashable, dst_node: Hashable, *, capacity_attr: str = 'capacity', flow_attr: str = 'flow', change_amount: float = 1.0, **kwargs) -> dict[tuple, float] + +Perform sensitivity analysis to identify high-impact capacity changes. + +Tests changing each saturated edge capacity by change_amount and measures +the resulting change in total flow. Positive values increase capacity, +negative values decrease capacity (with validation to prevent negative capacities). + +Args: + graph: The graph to analyze + src_node: Source node + dst_node: Destination node + capacity_attr: Name of capacity attribute + flow_attr: Name of flow attribute + change_amount: Amount to change capacity for testing (positive=increase, negative=decrease) + **kwargs: Additional arguments passed to calc_max_flow + +Returns: + Dictionary mapping edge tuples to flow change when capacity is modified + +### saturated_edges(graph: ngraph.lib.graph.StrictMultiDiGraph, src_node: Hashable, dst_node: Hashable, *, capacity_attr: str = 'capacity', flow_attr: str = 'flow', tolerance: float = 1e-10, **kwargs) -> list[tuple] + +Identify saturated (bottleneck) edges in the max flow solution. + +Args: + graph: The graph to analyze + src_node: Source node + dst_node: Destination node + capacity_attr: Name of capacity attribute + flow_attr: Name of flow attribute + tolerance: Tolerance for considering an edge saturated + **kwargs: Additional arguments passed to calc_max_flow + +Returns: + List of saturated edge tuples (u, v, k) where residual capacity <= tolerance --- diff --git a/docs/reference/api.md b/docs/reference/api.md index 2bb6f9e..ca58ea6 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -213,18 +213,23 @@ Low-level graph analysis functions. ```python from ngraph.lib.graph import StrictMultiDiGraph from ngraph.lib.algorithms.spf import spf, ksp -from ngraph.lib.algorithms.max_flow import calc_max_flow +from ngraph.lib.algorithms.max_flow import calc_max_flow, run_sensitivity, saturated_edges # Direct graph manipulation graph = StrictMultiDiGraph() graph.add_node("A") -graph.add_edge("A", "B", capacity=10) +graph.add_node("B") +graph.add_edge("A", "B", capacity=10, cost=1) # Run shortest path algorithm costs, pred = spf(graph, "A") # Calculate maximum flow max_flow = calc_max_flow(graph, "A", "B") + +# Sensitivity analysis - identify bottleneck edges and test capacity changes +saturated = saturated_edges(graph, "A", "B") +sensitivity = run_sensitivity(graph, "A", "B", change_amount=1.0) ``` ## Error Handling diff --git a/ngraph/lib/algorithms/max_flow.py b/ngraph/lib/algorithms/max_flow.py index d0ac6f9..bd71a3f 100644 --- a/ngraph/lib/algorithms/max_flow.py +++ b/ngraph/lib/algorithms/max_flow.py @@ -1,14 +1,21 @@ +from typing import Literal, Union, overload + from ngraph.lib.algorithms.base import EdgeSelect, FlowPlacement from ngraph.lib.algorithms.flow_init import init_flow_graph from ngraph.lib.algorithms.place_flow import place_flow_on_graph from ngraph.lib.algorithms.spf import spf +from ngraph.lib.algorithms.types import FlowSummary from ngraph.lib.graph import NodeID, StrictMultiDiGraph +@overload def calc_max_flow( graph: StrictMultiDiGraph, src_node: NodeID, dst_node: NodeID, + *, + return_summary: Literal[False] = False, + return_graph: Literal[False] = False, flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, shortest_path: bool = False, reset_flow_graph: bool = False, @@ -16,7 +23,78 @@ def calc_max_flow( flow_attr: str = "flow", flows_attr: str = "flows", copy_graph: bool = True, -) -> float: +) -> float: ... + + +@overload +def calc_max_flow( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + return_summary: Literal[True], + return_graph: Literal[False] = False, + flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, + shortest_path: bool = False, + reset_flow_graph: bool = False, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + flows_attr: str = "flows", + copy_graph: bool = True, +) -> tuple[float, FlowSummary]: ... + + +@overload +def calc_max_flow( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + return_summary: Literal[False] = False, + return_graph: Literal[True], + flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, + shortest_path: bool = False, + reset_flow_graph: bool = False, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + flows_attr: str = "flows", + copy_graph: bool = True, +) -> tuple[float, StrictMultiDiGraph]: ... + + +@overload +def calc_max_flow( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + return_summary: Literal[True], + return_graph: Literal[True], + flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, + shortest_path: bool = False, + reset_flow_graph: bool = False, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + flows_attr: str = "flows", + copy_graph: bool = True, +) -> tuple[float, FlowSummary, StrictMultiDiGraph]: ... + + +def calc_max_flow( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + return_summary: bool = False, + return_graph: bool = False, + flow_placement: FlowPlacement = FlowPlacement.PROPORTIONAL, + shortest_path: bool = False, + reset_flow_graph: bool = False, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + flows_attr: str = "flows", + copy_graph: bool = True, +) -> Union[float, tuple]: """Compute the maximum flow between two nodes in a directed multi-graph, using an iterative shortest-path augmentation approach. @@ -37,6 +115,12 @@ def calc_max_flow( The source node for flow. dst_node (NodeID): The destination node for flow. + return_summary (bool): + If True, return a FlowSummary with detailed flow analytics. + Defaults to False. + return_graph (bool): + If True, return the mutated flow graph along with other results. + Defaults to False. flow_placement (FlowPlacement): Determines how flow is split among parallel edges of equal cost. Defaults to ``FlowPlacement.PROPORTIONAL``. @@ -57,24 +141,38 @@ def calc_max_flow( Defaults to True. Returns: - float: - The total flow placed between ``src_node`` and ``dst_node``. If ``shortest_path=True``, - this is just the flow from a single augmentation. + Union[float, tuple]: + - If neither return_summary nor return_graph: float (total flow) + - If return_summary only: tuple[float, FlowSummary] + - If both flags: tuple[float, FlowSummary, StrictMultiDiGraph] Notes: - For large graphs or performance-critical scenarios, consider specialized max-flow algorithms (e.g., Dinic, Edmond-Karp) for better scaling. + - When using return_summary or return_graph, callers must unpack the returned tuple. Examples: >>> g = StrictMultiDiGraph() >>> g.add_node('A') >>> g.add_node('B') >>> g.add_node('C') - >>> _ = g.add_edge('A', 'B', capacity=10.0, flow=0.0, flows={}) - >>> _ = g.add_edge('B', 'C', capacity=5.0, flow=0.0, flows={}) + >>> g.add_edge('A', 'B', capacity=10.0, flow=0.0, flows={}, cost=1) + >>> g.add_edge('B', 'C', capacity=5.0, flow=0.0, flows={}, cost=1) + >>> + >>> # Basic usage (scalar return) >>> max_flow_value = calc_max_flow(g, 'A', 'C') >>> print(max_flow_value) 5.0 + >>> + >>> # With flow summary analytics + >>> flow, summary = calc_max_flow(g, 'A', 'C', return_summary=True) + >>> print(f"Min-cut edges: {summary.min_cut}") + >>> + >>> # With both summary and mutated graph + >>> flow, summary, flow_graph = calc_max_flow( + ... g, 'A', 'C', return_summary=True, return_graph=True + ... ) + >>> # flow_graph contains the flow assignments """ # Initialize a flow-aware graph (copy or in-place). flow_graph = init_flow_graph( @@ -102,7 +200,15 @@ def calc_max_flow( # If only one path (single augmentation) is desired, return early. if shortest_path: - return max_flow + return _build_return_value( + max_flow, + flow_graph, + src_node, + return_summary, + return_graph, + capacity_attr, + flow_attr, + ) # Otherwise, repeatedly find augmenting paths until no new flow can be placed. while True: @@ -129,4 +235,225 @@ def calc_max_flow( max_flow += flow_meta.placed_flow - return max_flow + return _build_return_value( + max_flow, + flow_graph, + src_node, + return_summary, + return_graph, + capacity_attr, + flow_attr, + ) + + +def _build_return_value( + max_flow: float, + flow_graph: StrictMultiDiGraph, + src_node: NodeID, + return_summary: bool, + return_graph: bool, + capacity_attr: str, + flow_attr: str, +) -> Union[float, tuple]: + """Build the appropriate return value based on the requested flags.""" + if not (return_summary or return_graph): + return max_flow + + summary = None + if return_summary: + summary = _build_flow_summary( + max_flow, flow_graph, src_node, capacity_attr, flow_attr + ) + + ret: list = [max_flow] + if return_summary: + ret.append(summary) + if return_graph: + ret.append(flow_graph) + + return tuple(ret) if len(ret) > 1 else ret[0] + + +def _build_flow_summary( + total_flow: float, + flow_graph: StrictMultiDiGraph, + src_node: NodeID, + capacity_attr: str, + flow_attr: str, +) -> FlowSummary: + """Build a FlowSummary from the flow graph state.""" + edge_flow = {} + residual_cap = {} + + # Extract flow and residual capacity for each edge + for u, v, k, d in flow_graph.edges(data=True, keys=True): + edge = (u, v, k) + f = d.get(flow_attr, 0.0) + edge_flow[edge] = f + residual_cap[edge] = d[capacity_attr] - f + + # BFS in residual graph to find reachable nodes from source + reachable = set() + stack = [src_node] + while stack: + n = stack.pop() + if n in reachable: + continue + reachable.add(n) + for _, nbr, _, d in flow_graph.out_edges(n, data=True, keys=True): + if d[capacity_attr] - d.get(flow_attr, 0.0) > 0 and nbr not in reachable: + stack.append(nbr) + + # Find min-cut edges (saturated edges crossing the cut) + min_cut = [ + (u, v, k) + for u, v, k, d in flow_graph.edges(data=True, keys=True) + if u in reachable + and v not in reachable + and d[capacity_attr] - d.get(flow_attr, 0.0) == 0 + ] + + return FlowSummary( + total_flow=total_flow, + edge_flow=edge_flow, + residual_cap=residual_cap, + reachable=reachable, + min_cut=min_cut, + ) + + +def saturated_edges( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + tolerance: float = 1e-10, + **kwargs, +) -> list[tuple]: + """Identify saturated (bottleneck) edges in the max flow solution. + + Args: + graph: The graph to analyze + src_node: Source node + dst_node: Destination node + capacity_attr: Name of capacity attribute + flow_attr: Name of flow attribute + tolerance: Tolerance for considering an edge saturated + **kwargs: Additional arguments passed to calc_max_flow + + Returns: + List of saturated edge tuples (u, v, k) where residual capacity <= tolerance + """ + result = calc_max_flow( + graph, + src_node, + dst_node, + return_summary=True, + capacity_attr=capacity_attr, + flow_attr=flow_attr, + **kwargs, + ) + # Ensure we have a tuple to unpack + if isinstance(result, tuple) and len(result) >= 2: + _, summary = result + else: + raise ValueError( + "Expected tuple return from calc_max_flow with return_summary=True" + ) + + return [ + edge for edge, residual in summary.residual_cap.items() if residual <= tolerance + ] + + +def run_sensitivity( + graph: StrictMultiDiGraph, + src_node: NodeID, + dst_node: NodeID, + *, + capacity_attr: str = "capacity", + flow_attr: str = "flow", + change_amount: float = 1.0, + **kwargs, +) -> dict[tuple, float]: + """Perform sensitivity analysis to identify high-impact capacity changes. + + Tests changing each saturated edge capacity by change_amount and measures + the resulting change in total flow. Positive values increase capacity, + negative values decrease capacity (with validation to prevent negative capacities). + + Args: + graph: The graph to analyze + src_node: Source node + dst_node: Destination node + capacity_attr: Name of capacity attribute + flow_attr: Name of flow attribute + change_amount: Amount to change capacity for testing (positive=increase, negative=decrease) + **kwargs: Additional arguments passed to calc_max_flow + + Returns: + Dictionary mapping edge tuples to flow change when capacity is modified + """ + # Get baseline flow and identify saturated edges - ensure scalar return + baseline_flow = calc_max_flow( + graph, + src_node, + dst_node, + return_summary=False, + return_graph=False, + capacity_attr=capacity_attr, + flow_attr=flow_attr, + **kwargs, + ) + assert isinstance(baseline_flow, (int, float)) + + saturated = saturated_edges( + graph, + src_node, + dst_node, + capacity_attr=capacity_attr, + flow_attr=flow_attr, + **kwargs, + ) + + sensitivity = {} + + for edge in saturated: + u, v, k = edge + + # Create modified graph with changed edge capacity + test_graph = graph.copy() + edge_data = test_graph.get_edge_data(u, v, k) + if edge_data is not None: + # Create a mutable copy of the edge data + edge_data = dict(edge_data) + original_capacity = edge_data[capacity_attr] + new_capacity = original_capacity + change_amount + + # If the change would result in negative capacity, set to 0 + if new_capacity < 0: + new_capacity = 0 + + edge_data[capacity_attr] = new_capacity + test_graph.remove_edge(u, v, k) + test_graph.add_edge(u, v, k, **edge_data) + + # Calculate new max flow - ensure scalar return + new_flow = calc_max_flow( + test_graph, + src_node, + dst_node, + return_summary=False, + return_graph=False, + capacity_attr=capacity_attr, + flow_attr=flow_attr, + **kwargs, + ) + assert isinstance(new_flow, (int, float)) + + # Record flow change + sensitivity[edge] = new_flow - baseline_flow + + return sensitivity diff --git a/ngraph/lib/algorithms/types.py b/ngraph/lib/algorithms/types.py new file mode 100644 index 0000000..30225e8 --- /dev/null +++ b/ngraph/lib/algorithms/types.py @@ -0,0 +1,32 @@ +"""Types and data structures for algorithm analytics.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Set, Tuple + +# Edge identifier tuple: (source_node, destination_node, edge_key) +Edge = Tuple[str, str, str] + + +@dataclass(frozen=True) +class FlowSummary: + """Summary of max-flow computation results with detailed analytics. + + This immutable data structure provides comprehensive information about + the flow solution, including edge flows, residual capacities, and + min-cut analysis. + + Attributes: + total_flow: The maximum flow value achieved. + edge_flow: Flow amount on each edge, indexed by (src, dst, key). + residual_cap: Remaining capacity on each edge after flow placement. + reachable: Set of nodes reachable from source in residual graph. + min_cut: List of saturated edges that form the minimum cut. + """ + + total_flow: float + edge_flow: Dict[Edge, float] + residual_cap: Dict[Edge, float] + reachable: Set[str] + min_cut: List[Edge] diff --git a/notebooks/capacity_probe_demo.ipynb b/notebooks/capacity_probe_demo.ipynb new file mode 100644 index 0000000..9d760d5 --- /dev/null +++ b/notebooks/capacity_probe_demo.ipynb @@ -0,0 +1,562 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b3b8c827", + "metadata": {}, + "source": [ + "# Enhanced MaxFlow Demo\n", + "\n", + "This notebook demonstrates the extended max_flow functionality with:\n", + "1. FlowSummary analytics for bottleneck identification\n", + "2. Saturated edge detection\n", + "3. Sensitivity analysis for capacity changes (increases and decreases)\n", + "4. Min-cut identification\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "eb6f83bb", + "metadata": {}, + "outputs": [], + "source": [ + "# Import required modules\n", + "from ngraph.lib.algorithms.max_flow import (\n", + " calc_max_flow,\n", + " run_sensitivity,\n", + " saturated_edges,\n", + ")\n", + "from ngraph.lib.graph import StrictMultiDiGraph" + ] + }, + { + "cell_type": "markdown", + "id": "f02e9a5a", + "metadata": {}, + "source": [ + "## Sample Network Creation\n", + "\n", + "First, let's create a sample network with known bottlenecks for our demonstrations." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "a8525554", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Network has 5 nodes and 6 edges\n", + "\n", + "Edges with capacities:\n", + " A -> B (key=OOFmeRtuSou6vpZLVhPfmw): capacity=100.0\n", + " A -> C (key=T4O7lzy-Sf--3No5uZn5ew): capacity=50.0\n", + " B -> D (key=q3CVjlmFSMyxznlvGTKhdA): capacity=80.0\n", + " B -> E (key=6M4a3ZwZTKuytHkSyrJvGA): capacity=40.0\n", + " C -> D (key=QTTKXGdHT8ud6BnwMO-_lQ): capacity=60.0\n", + " D -> E (key=hEkosiunQQizsUpPYQ5DKQ): capacity=120.0\n" + ] + } + ], + "source": [ + "def create_sample_network():\n", + " \"\"\"Create a sample network with known bottlenecks.\"\"\"\n", + " g = StrictMultiDiGraph()\n", + "\n", + " # Add nodes\n", + " for node in [\"A\", \"B\", \"C\", \"D\", \"E\"]:\n", + " g.add_node(node)\n", + "\n", + " # Add edges with varying capacities to create bottlenecks\n", + " edges = [\n", + " (\"A\", \"B\", {\"capacity\": 100.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0}),\n", + " (\"A\", \"C\", {\"capacity\": 50.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0}),\n", + " (\"B\", \"D\", {\"capacity\": 80.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0}),\n", + " (\"C\", \"D\", {\"capacity\": 60.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0}),\n", + " (\n", + " \"B\",\n", + " \"E\",\n", + " {\"capacity\": 40.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0},\n", + " ), # Bottleneck!\n", + " (\"D\", \"E\", {\"capacity\": 120.0, \"flow\": 0.0, \"flows\": {}, \"cost\": 1.0}),\n", + " ]\n", + "\n", + " for u, v, attrs in edges:\n", + " g.add_edge(u, v, **attrs)\n", + "\n", + " return g\n", + "\n", + "\n", + "# Create our sample network\n", + "g = create_sample_network()\n", + "\n", + "# Display network structure\n", + "print(f\"Network has {g.number_of_nodes()} nodes and {g.number_of_edges()} edges\")\n", + "print(\"\\nEdges with capacities:\")\n", + "for u, v, k, d in g.edges(data=True, keys=True):\n", + " print(f\" {u} -> {v} (key={k}): capacity={d['capacity']}\")" + ] + }, + { + "cell_type": "markdown", + "id": "7e743879", + "metadata": {}, + "source": [ + "## 1. Basic Max Flow\n", + "\n", + "First, let's demonstrate the basic MaxFlow." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "8984076a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Basic Max Flow (Backward Compatible) ===\n", + "Maximum flow from A to E: 150.0\n" + ] + } + ], + "source": [ + "print(\"=== Basic Max Flow (Backward Compatible) ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Traditional usage - returns scalar\n", + "max_flow = calc_max_flow(g, \"A\", \"E\")\n", + "print(f\"Maximum flow from A to E: {max_flow}\")" + ] + }, + { + "cell_type": "markdown", + "id": "faa8bb15", + "metadata": {}, + "source": [ + "## 2. Flow Summary Analytics\n", + "\n", + "Now let's explore the enhanced functionality with FlowSummary analytics that provide detailed insights into the flow solution." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c19da68c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Flow Summary Analytics ===\n", + "Maximum flow: 150.0\n", + "Total edges analyzed: 6\n", + "Reachable nodes from source: ['A']\n", + "Min-cut edges (bottlenecks): 2\n", + "\n", + "Min-cut edges:\n", + " A -> B (key=FvkLnMVUSbKLEvtsEhqZBw, capacity=100.0)\n", + " A -> C (key=QD32uiIESSifIs7c6xKFzg, capacity=50.0)\n", + "\n", + "Flow details:\n", + " - Total flow: 150.0\n", + " - Edge flows: 6 edges have flow\n", + " - Residual capacities: 6 edges tracked\n" + ] + } + ], + "source": [ + "print(\"=== Flow Summary Analytics ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Enhanced usage - returns flow value and summary\n", + "flow_value, summary = calc_max_flow(g, \"A\", \"E\", return_summary=True)\n", + "\n", + "print(f\"Maximum flow: {flow_value}\")\n", + "print(f\"Total edges analyzed: {len(summary.edge_flow)}\")\n", + "print(f\"Reachable nodes from source: {sorted(summary.reachable)}\")\n", + "print(f\"Min-cut edges (bottlenecks): {len(summary.min_cut)}\")\n", + "\n", + "if summary.min_cut:\n", + " print(\"\\nMin-cut edges:\")\n", + " for edge in summary.min_cut:\n", + " u, v, k = edge\n", + " capacity = None\n", + " for u_edge, v_edge, k_edge, d in g.edges(data=True, keys=True):\n", + " if (u, v, k) == (u_edge, v_edge, k_edge):\n", + " capacity = d.get(\"capacity\", \"unknown\")\n", + " break\n", + " print(f\" {u} -> {v} (key={k}, capacity={capacity})\")\n", + "\n", + "print(\"\\nFlow details:\")\n", + "print(f\" - Total flow: {summary.total_flow}\")\n", + "print(f\" - Edge flows: {len(summary.edge_flow)} edges have flow\")\n", + "print(f\" - Residual capacities: {len(summary.residual_cap)} edges tracked\")" + ] + }, + { + "cell_type": "markdown", + "id": "48cd6105", + "metadata": {}, + "source": [ + "## 3. Bottleneck Detection\n", + "\n", + "The `saturated_edges` helper function identifies which edges are fully utilized and represent bottlenecks in the network." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "290285ee", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Bottleneck Detection ===\n", + "Found 3 saturated edges:\n", + " A -> B (key=ZBE4tJf9QsOQ3JiplORHZQ) - fully utilized\n", + " A -> C (key=qyjhuhkHSNaSXTiyix6zRg) - fully utilized\n", + " B -> E (key=IJdfc4sxT7CQMSO8Tjgd_g) - fully utilized\n", + "\n", + "Saturated edge details:\n", + " A -> B (key=ZBE4tJf9QsOQ3JiplORHZQ): capacity=100.0\n", + " A -> C (key=qyjhuhkHSNaSXTiyix6zRg): capacity=50.0\n", + " B -> E (key=IJdfc4sxT7CQMSO8Tjgd_g): capacity=40.0\n" + ] + } + ], + "source": [ + "print(\"=== Bottleneck Detection ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Find saturated (bottleneck) edges\n", + "saturated = saturated_edges(g, \"A\", \"E\")\n", + "\n", + "print(f\"Found {len(saturated)} saturated edges:\")\n", + "for edge in saturated:\n", + " u, v, k = edge\n", + " print(f\" {u} -> {v} (key={k}) - fully utilized\")\n", + "\n", + "# Let's also show the edge capacities for context\n", + "print(\"\\nSaturated edge details:\")\n", + "for edge in saturated:\n", + " u, v, k = edge\n", + " edge_data = g.get_edge_data(u, v, k)\n", + " if edge_data:\n", + " print(f\" {u} -> {v} (key={k}): capacity={edge_data['capacity']}\")" + ] + }, + { + "cell_type": "markdown", + "id": "22a7ac85", + "metadata": {}, + "source": [ + "## 4. Sensitivity Analysis for Capacity Changes\n", + "\n", + "The `run_sensitivity` function helps identify which capacity changes would have the highest impact on total flow. It supports both capacity increases (positive values) and decreases (negative values). When a capacity change would result in a negative capacity, the function automatically sets the capacity to zero." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "d3c87e0e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Sensitivity Analysis for Capacity Changes ===\n", + "\n", + "--- Capacity Increases ---\n", + "Impact of increasing each saturated edge by 20 units:\n", + " A -> B (key=0_mOkYdZTvajpoBNoYrH_w): +10.0 flow increase\n", + " A -> C (key=EqYAP0u0RTOU_ydmbCbvsA): +10.0 flow increase\n", + " B -> E (key=Jh6FuF36Tz-QkBx0y2BtuA): +0.0 flow increase\n", + "\n", + "Best upgrade target: A -> B (key=0_mOkYdZTvajpoBNoYrH_w) with +10.0 flow increase\n", + " Current capacity: 100.0 -> Upgraded capacity: 120.0\n", + "\n", + "--- Capacity Decreases ---\n", + "Impact of decreasing each saturated edge by 5 units:\n", + " A -> B (key=0_mOkYdZTvajpoBNoYrH_w): -5.0 flow change\n", + " A -> C (key=EqYAP0u0RTOU_ydmbCbvsA): -5.0 flow change\n", + " B -> E (key=Jh6FuF36Tz-QkBx0y2BtuA): +0.0 flow change\n", + "\n", + "Most critical edge for reduction: A -> B (key=0_mOkYdZTvajpoBNoYrH_w) with -5.0 flow impact\n", + "\n", + "--- Testing Large Decrease (Zero-Capacity Behavior) ---\n", + "Large decrease test: 3 edges analyzed (capacities set to 0 when change would be negative)\n", + "\n", + "Impact of setting each edge capacity to zero:\n", + " A -> B (capacity 100.0 -> 0): -100.0 flow change\n", + " A -> C (capacity 50.0 -> 0): -50.0 flow change\n", + " B -> E (capacity 40.0 -> 0): -30.0 flow change\n" + ] + } + ], + "source": [ + "print(\"=== Sensitivity Analysis for Capacity Changes ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Analyze impact of increasing each bottleneck capacity\n", + "print(\"\\n--- Capacity Increases ---\")\n", + "sensitivity_increase = run_sensitivity(g, \"A\", \"E\", change_amount=20.0)\n", + "\n", + "print(\"Impact of increasing each saturated edge by 20 units:\")\n", + "\n", + "# Sort by impact (highest first)\n", + "sorted_impacts = sorted(sensitivity_increase.items(), key=lambda x: x[1], reverse=True)\n", + "\n", + "for edge, impact in sorted_impacts:\n", + " u, v, k = edge\n", + " print(f\" {u} -> {v} (key={k}): +{impact:.1f} flow increase\")\n", + "\n", + "if sorted_impacts:\n", + " best_edge, best_impact = sorted_impacts[0]\n", + " u, v, k = best_edge\n", + " print(\n", + " f\"\\nBest upgrade target: {u} -> {v} (key={k}) with +{best_impact:.1f} flow increase\"\n", + " )\n", + "\n", + " # Show current capacity for context\n", + " edge_data = g.get_edge_data(u, v, k)\n", + " if edge_data:\n", + " current_cap = edge_data[\"capacity\"]\n", + " print(\n", + " f\" Current capacity: {current_cap} -> Upgraded capacity: {current_cap + 20.0}\"\n", + " )\n", + "\n", + "# Analyze impact of decreasing each bottleneck capacity\n", + "print(\"\\n--- Capacity Decreases ---\")\n", + "sensitivity_decrease = run_sensitivity(g, \"A\", \"E\", change_amount=-5.0)\n", + "\n", + "print(\"Impact of decreasing each saturated edge by 5 units:\")\n", + "\n", + "# Sort by impact (most negative first)\n", + "sorted_impacts_dec = sorted(sensitivity_decrease.items(), key=lambda x: x[1])\n", + "\n", + "for edge, impact in sorted_impacts_dec:\n", + " u, v, k = edge\n", + " print(f\" {u} -> {v} (key={k}): {impact:+.1f} flow change\")\n", + "\n", + "if sorted_impacts_dec:\n", + " worst_edge, worst_impact = sorted_impacts_dec[0]\n", + " u, v, k = worst_edge\n", + " print(\n", + " f\"\\nMost critical edge for reduction: {u} -> {v} (key={k}) with {worst_impact:+.1f} flow impact\"\n", + " )\n", + "\n", + "# Demonstrate zero-capacity behavior for large decreases\n", + "print(\"\\n--- Testing Large Decrease (Zero-Capacity Behavior) ---\")\n", + "sensitivity_large = run_sensitivity(g, \"A\", \"E\", change_amount=-100.0)\n", + "print(\n", + " f\"Large decrease test: {len(sensitivity_large)} edges analyzed (capacities set to 0 when change would be negative)\"\n", + ")\n", + "\n", + "# Show the impact of setting each edge to zero capacity\n", + "if sensitivity_large:\n", + " print(\"\\nImpact of setting each edge capacity to zero:\")\n", + " sorted_zero_impacts = sorted(sensitivity_large.items(), key=lambda x: x[1])\n", + " for edge, impact in sorted_zero_impacts:\n", + " u, v, k = edge\n", + " edge_data = g.get_edge_data(u, v, k)\n", + " current_cap = edge_data[\"capacity\"] if edge_data else \"unknown\"\n", + " print(f\" {u} -> {v} (capacity {current_cap} -> 0): {impact:+.1f} flow change\")" + ] + }, + { + "cell_type": "markdown", + "id": "82856366", + "metadata": {}, + "source": [ + "## 5. Advanced Sensitivity Analysis Scenarios\n", + "\n", + "Let's explore more advanced scenarios to demonstrate the full capabilities of the enhanced sensitivity analysis." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "7cdd2ea9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Advanced Sensitivity Analysis Scenarios ===\n", + "Baseline maximum flow: 150.0\n", + "\n", + "--- Scenario 1: Which single +10 capacity upgrade gives best ROI? ---\n", + "Best investment: A -> B (ROI: 1.00 flow per capacity unit)\n", + "\n", + "--- Scenario 2: Risk Analysis - Most Critical Edge Vulnerabilities ---\n", + "Most vulnerable edge: A -> B (losing 2 capacity = -2.0 flow)\n", + " Current capacity: 100.0, Utilization efficiency: 1.00\n", + "\n", + "--- Scenario 3: Sensitivity vs. Change Magnitude ---\n", + " Change ± 1: Max impact +1.0, Avg impact +0.7\n", + " Change ± 5: Max impact +5.0, Avg impact +3.3\n", + " Change ±10: Max impact +10.0, Avg impact +6.7\n", + " Change ±20: Max impact +10.0, Avg impact +6.7\n" + ] + } + ], + "source": [ + "print(\"=== Advanced Sensitivity Analysis Scenarios ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Get baseline information\n", + "baseline_flow = calc_max_flow(g, \"A\", \"E\")\n", + "print(f\"Baseline maximum flow: {baseline_flow}\")\n", + "\n", + "# Scenario 1: What if we could increase any edge by 10 units?\n", + "print(\"\\n--- Scenario 1: Which single +10 capacity upgrade gives best ROI? ---\")\n", + "sensitivity_10 = run_sensitivity(g, \"A\", \"E\", change_amount=10.0)\n", + "if sensitivity_10:\n", + " best_edge = max(sensitivity_10.items(), key=lambda x: x[1])\n", + " edge, impact = best_edge\n", + " u, v, k = edge\n", + " roi = impact / 10.0 # Flow increase per unit of capacity added\n", + " print(f\"Best investment: {u} -> {v} (ROI: {roi:.2f} flow per capacity unit)\")\n", + "\n", + "# Scenario 2: Risk analysis - which edge reduction hurts most?\n", + "print(\"\\n--- Scenario 2: Risk Analysis - Most Critical Edge Vulnerabilities ---\")\n", + "sensitivity_risk = run_sensitivity(g, \"A\", \"E\", change_amount=-2.0)\n", + "if sensitivity_risk:\n", + " worst_edge = min(sensitivity_risk.items(), key=lambda x: x[1])\n", + " edge, impact = worst_edge\n", + " u, v, k = edge\n", + " print(f\"Most vulnerable edge: {u} -> {v} (losing 2 capacity = {impact:+.1f} flow)\")\n", + "\n", + " # Show current capacity for context\n", + " edge_data = g.get_edge_data(u, v, k)\n", + " if edge_data:\n", + " current_cap = edge_data[\"capacity\"]\n", + " utilization = -impact / 2.0 # How much flow per unit of lost capacity\n", + " print(\n", + " f\" Current capacity: {current_cap}, Utilization efficiency: {utilization:.2f}\"\n", + " )\n", + "\n", + "# Scenario 3: Comparative analysis of different change magnitudes\n", + "print(\"\\n--- Scenario 3: Sensitivity vs. Change Magnitude ---\")\n", + "for change in [1.0, 5.0, 10.0, 20.0]:\n", + " sens = run_sensitivity(g, \"A\", \"E\", change_amount=change)\n", + " if sens:\n", + " max_impact = max(sens.values())\n", + " avg_impact = sum(sens.values()) / len(sens) if sens else 0\n", + " print(\n", + " f\" Change ±{change:2.0f}: Max impact {max_impact:+5.1f}, Avg impact {avg_impact:+5.1f}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "23c3a0bb", + "metadata": {}, + "source": [ + "## 6. Combined Analysis\n", + "\n", + "Now let's demonstrate the comprehensive analysis capabilities by using both return flags to get complete information in a single call." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "770d52ee", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=== Combined Analysis ===\n", + "Analysis Results:\n", + " - Maximum flow: 150.0\n", + " - Bottleneck edges: 2\n", + " - Flow graph has 6 edges with flow assignments\n", + " - Flow conservation check: 150.0 == 150.0\n", + "\n", + "Detailed edge flows:\n", + " A -> B (key=r9zIPN4gQuKNbT8DzDH6oQ): 100.0 units\n", + " A -> C (key=MCL5LU6PSa-6H156pTadog): 50.0 units\n", + " B -> D (key=lkHnuBw3RfyOMJx3BOXn8Q): 60.0 units\n", + " B -> E (key=O1QZZ3JvStWsavp1er_Mdg): 40.0 units\n", + " C -> D (key=TWZAFWRZQKuDcjLvBCuowA): 50.0 units\n", + " D -> E (key=JjT048d0RGubB9gSQyFDZg): 110.0 units\n", + "\n", + "Residual capacities (remaining capacity):\n", + " A -> B (key=r9zIPN4gQuKNbT8DzDH6oQ): 0.000 (SATURATED)\n", + " A -> C (key=MCL5LU6PSa-6H156pTadog): 0.000 (SATURATED)\n", + " B -> E (key=O1QZZ3JvStWsavp1er_Mdg): 0.000 (SATURATED)\n" + ] + } + ], + "source": [ + "print(\"=== Combined Analysis ===\")\n", + "g = create_sample_network()\n", + "\n", + "# Get all information in one call\n", + "flow_value, summary, flow_graph = calc_max_flow(\n", + " g, \"A\", \"E\", return_summary=True, return_graph=True\n", + ")\n", + "\n", + "print(\"Analysis Results:\")\n", + "print(f\" - Maximum flow: {flow_value}\")\n", + "print(f\" - Bottleneck edges: {len(summary.min_cut)}\")\n", + "print(f\" - Flow graph has {flow_graph.number_of_edges()} edges with flow assignments\")\n", + "\n", + "# Verify flow conservation\n", + "total_source_outflow = sum(\n", + " flow for (u, v, k), flow in summary.edge_flow.items() if u == \"A\"\n", + ")\n", + "print(f\" - Flow conservation check: {total_source_outflow} == {summary.total_flow}\")\n", + "\n", + "# Show detailed edge flow information\n", + "print(\"\\nDetailed edge flows:\")\n", + "for (u, v, k), flow in summary.edge_flow.items():\n", + " if flow > 0: # Only show edges with positive flow\n", + " print(f\" {u} -> {v} (key={k}): {flow:.1f} units\")\n", + "\n", + "# Show residual capacities for bottleneck analysis\n", + "print(\"\\nResidual capacities (remaining capacity):\")\n", + "for (u, v, k), residual in summary.residual_cap.items():\n", + " if residual <= 1e-10: # Show saturated edges\n", + " print(f\" {u} -> {v} (key={k}): {residual:.3f} (SATURATED)\")\n", + " elif residual < 10: # Show nearly saturated edges\n", + " print(f\" {u} -> {v} (key={k}): {residual:.1f}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ngraph-venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tests/lib/algorithms/test_max_flow.py b/tests/lib/algorithms/test_max_flow.py index d831a48..9fcb76d 100644 --- a/tests/lib/algorithms/test_max_flow.py +++ b/tests/lib/algorithms/test_max_flow.py @@ -3,6 +3,7 @@ from ngraph.lib.algorithms.base import FlowPlacement from ngraph.lib.algorithms.max_flow import calc_max_flow +from ngraph.lib.algorithms.types import FlowSummary from ngraph.lib.graph import StrictMultiDiGraph @@ -172,3 +173,271 @@ def test_disconnected_graph(self): g.add_node("B") max_flow = calc_max_flow(g, "A", "B") assert max_flow == 0.0 + + +class TestMaxFlowExtended: + """ + Tests for the extended max flow functionality with return_summary and return_graph flags. + """ + + def test_max_flow_return_summary_basic(self, line1): + """Test return_summary=True returns flow value and FlowSummary.""" + result = calc_max_flow(line1, "A", "C", return_summary=True) + + # Should return a tuple + assert isinstance(result, tuple) + assert len(result) == 2 + + flow_value, summary = result + assert flow_value == 5 + assert isinstance(summary, FlowSummary) + assert summary.total_flow == 5 + + # Check that we have edge flows + assert len(summary.edge_flow) > 0 + assert len(summary.residual_cap) > 0 + + # Check that source is reachable + assert "A" in summary.reachable + + # Check min-cut is properly identified + assert isinstance(summary.min_cut, list) + + def test_max_flow_return_graph_basic(self, line1): + """Test return_graph=True returns flow value and flow graph.""" + result = calc_max_flow(line1, "A", "C", return_graph=True) + + # Should return a tuple + assert isinstance(result, tuple) + assert len(result) == 2 + + flow_value, flow_graph = result + assert flow_value == 5 + assert isinstance(flow_graph, StrictMultiDiGraph) + + # Flow graph should have flow attributes on edges + for _, _, _, d in flow_graph.edges(data=True, keys=True): + assert "flow" in d + assert "capacity" in d + + def test_max_flow_return_both_flags(self, line1): + """Test both return_summary=True and return_graph=True.""" + result = calc_max_flow(line1, "A", "C", return_summary=True, return_graph=True) + + # Should return a tuple with 3 elements + assert isinstance(result, tuple) + assert len(result) == 3 + + flow_value, summary, flow_graph = result + assert flow_value == 5 + assert isinstance(summary, FlowSummary) + assert isinstance(flow_graph, StrictMultiDiGraph) + assert summary.total_flow == 5 + + def test_max_flow_backward_compatibility(self, line1): + """Test that default behavior (no flags) maintains backward compatibility.""" + result = calc_max_flow(line1, "A", "C") + + # Should return just the flow value as a scalar + assert isinstance(result, (int, float)) + assert result == 5 + + def test_flow_summary_edge_flows(self, line1): + """Test that FlowSummary contains correct edge flow information.""" + _, summary = calc_max_flow(line1, "A", "C", return_summary=True) + + # Verify edge flows sum to total flow at source + total_outflow = sum( + flow for (u, _, _), flow in summary.edge_flow.items() if u == "A" + ) + assert total_outflow == summary.total_flow + + # Verify residual capacities are non-negative + for residual in summary.residual_cap.values(): + assert residual >= 0 + + def test_flow_summary_min_cut_identification(self, square4): + """Test min-cut identification on a more complex graph.""" + _, summary = calc_max_flow(square4, "A", "B", return_summary=True) + + # Min-cut should be non-empty for a bottleneck graph + assert len(summary.min_cut) > 0 + + # All min-cut edges should be saturated (zero residual capacity) + for edge in summary.min_cut: + assert summary.residual_cap[edge] == 0 + + def test_flow_summary_reachable_nodes(self, line1): + """Test that reachable nodes are correctly identified.""" + _, summary = calc_max_flow(line1, "A", "C", return_summary=True) + + # Source should always be reachable + assert "A" in summary.reachable + + # If there's flow to destination, intermediate nodes should be reachable + if summary.total_flow > 0: + # At least the source should be reachable + assert len(summary.reachable) >= 1 + + def test_shortest_path_with_summary(self, line1): + """Test return_summary works with shortest_path=True.""" + result = calc_max_flow(line1, "A", "C", shortest_path=True, return_summary=True) + + flow_value, summary = result + assert flow_value == 4 # Single path flow + assert summary.total_flow == 4 + assert isinstance(summary.edge_flow, dict) + assert isinstance(summary.min_cut, list) + + def test_empty_graph_with_summary(self): + """Test behavior with disconnected nodes.""" + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + + flow_value, summary = calc_max_flow(g, "A", "B", return_summary=True) + + assert flow_value == 0 + assert summary.total_flow == 0 + assert len(summary.edge_flow) == 0 + assert len(summary.residual_cap) == 0 + assert "A" in summary.reachable + assert "B" not in summary.reachable + assert len(summary.min_cut) == 0 + + def test_saturated_edges_helper(self, line1): + """Test the saturated_edges helper function.""" + from ngraph.lib.algorithms.max_flow import saturated_edges + + saturated = saturated_edges(line1, "A", "C") + + # Should return a list of edge tuples + assert isinstance(saturated, list) + + # All saturated edges should have zero residual capacity + _, summary = calc_max_flow(line1, "A", "C", return_summary=True) + for edge in saturated: + assert summary.residual_cap[edge] <= 1e-10 + + def test_sensitivity_analysis_helper(self, line1): + """Test the run_sensitivity helper function.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + sensitivity = run_sensitivity(line1, "A", "C", change_amount=1.0) + + # Should return a dictionary mapping edges to flow increases + assert isinstance(sensitivity, dict) + + # All sensitivity values should be non-negative + for edge, flow_increase in sensitivity.items(): + assert isinstance(edge, tuple) + assert len(edge) == 3 # (u, v, k) + assert flow_increase >= 0 + + def test_sensitivity_analysis_identifies_bottlenecks(self, square4): + """Test that sensitivity analysis identifies meaningful bottlenecks.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + sensitivity = run_sensitivity(square4, "A", "B", change_amount=10.0) + + # Should have some edges with positive sensitivity + positive_impacts = [impact for impact in sensitivity.values() if impact > 0] + assert len(positive_impacts) > 0 + + # Highest impact edges should be meaningful bottlenecks + if sensitivity: + max_impact = max(sensitivity.values()) + assert max_impact > 0 + + def test_sensitivity_analysis_negative_capacity_protection(self, line1): + """Test that sensitivity analysis sets capacity to zero instead of negative values.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + # Test with a large negative change that would make capacities negative + sensitivity = run_sensitivity(line1, "A", "C", change_amount=-100.0) + + # Should still return results (not skip edges) + assert isinstance(sensitivity, dict) + assert len(sensitivity) > 0 + + # All sensitivity values should be negative (flow reduction) + for edge, flow_change in sensitivity.items(): + assert isinstance(edge, tuple) + assert len(edge) == 3 # (u, v, k) + assert ( + flow_change <= 0 + ) # Should reduce or maintain flow def test_sensitivity_analysis_zero_capacity_behavior(self): + """Test specific behavior when edge capacity is reduced to zero.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + # Create a simple graph with known capacities + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + g.add_node("C") + + # Add edges: A->B (capacity 10), B->C (capacity 5) + g.add_edge("A", "B", capacity=10.0, flow=0.0, flows={}, cost=1.0) + bc_edge_key = g.add_edge("B", "C", capacity=5.0, flow=0.0, flows={}, cost=1.0) + + # Test reducing edge B->C capacity by 10 (more than its current capacity of 5) + sensitivity = run_sensitivity(g, "A", "C", change_amount=-10.0) + + # Should reduce flow to zero (complete bottleneck removal) + bc_edge = ("B", "C", bc_edge_key) + assert bc_edge in sensitivity + assert ( + sensitivity[bc_edge] == -5.0 + ) # Should reduce flow by 5 (from 5 to 0) def test_sensitivity_analysis_partial_capacity_reduction(self): + """Test behavior when capacity is partially reduced but not to zero.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + # Create a simple graph + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + g.add_node("C") + + # Add edges with specific capacities + g.add_edge("A", "B", capacity=10.0, flow=0.0, flows={}, cost=1.0) + bc_edge_key = g.add_edge("B", "C", capacity=8.0, flow=0.0, flows={}, cost=1.0) + + # Test reducing edge B->C capacity by 3 (from 8 to 5) + sensitivity = run_sensitivity(g, "A", "C", change_amount=-3.0) + + # Should reduce flow by 3 (the bottleneck reduction) + bc_edge = ("B", "C", bc_edge_key) + assert bc_edge in sensitivity + assert sensitivity[bc_edge] == -3.0 + + def test_sensitivity_analysis_capacity_increase_and_decrease(self): + """Test that both positive and negative changes work correctly.""" + from ngraph.lib.algorithms.max_flow import run_sensitivity + + # Create a bottleneck graph + g = StrictMultiDiGraph() + for node in ["A", "B", "C", "D"]: + g.add_node(node) + + g.add_edge("A", "B", capacity=20.0, flow=0.0, flows={}, cost=1.0) + g.add_edge("A", "C", capacity=20.0, flow=0.0, flows={}, cost=1.0) + g.add_edge("B", "D", capacity=10.0, flow=0.0, flows={}, cost=1.0) # Bottleneck + g.add_edge("C", "D", capacity=15.0, flow=0.0, flows={}, cost=1.0) + + # Test capacity increase + sensitivity_increase = run_sensitivity(g, "A", "D", change_amount=5.0) + + # Test capacity decrease + sensitivity_decrease = run_sensitivity(g, "A", "D", change_amount=-3.0) + + # Both should return results + assert len(sensitivity_increase) > 0 + assert len(sensitivity_decrease) > 0 + + # Increases should be positive or zero + for flow_change in sensitivity_increase.values(): + assert flow_change >= 0 + + # Decreases should be negative or zero + for flow_change in sensitivity_decrease.values(): + assert flow_change <= 0 diff --git a/tests/test_api_docs.py b/tests/test_doc_generation.py similarity index 100% rename from tests/test_api_docs.py rename to tests/test_doc_generation.py diff --git a/tests/test_example_docs.py b/tests/test_example_docs.py new file mode 100644 index 0000000..d72e1b4 --- /dev/null +++ b/tests/test_example_docs.py @@ -0,0 +1,527 @@ +""" +Test code examples from documentation examples directory. + +This module tests examples from: +- docs/examples/basic.md +- docs/examples/clos-fabric.md + +These are practical examples showing how to use NetGraph features. +""" + +from ngraph.lib.algorithms.base import FlowPlacement +from ngraph.lib.algorithms.max_flow import run_sensitivity, saturated_edges +from ngraph.scenario import Scenario + + +class TestBasicMdExamples: + """Test examples from docs/examples/basic.md""" + + def test_basic_network_creation(self): + """Test the basic network topology example.""" + # Create a simple network based on the basic.md example + scenario_yaml = """ +network: + name: "fundamentals_example" + + # Create individual nodes + nodes: + A: {} + B: {} + C: {} + D: {} + + # Create links with different capacities and costs + links: + # Parallel edges between A→B + - source: A + target: B + link_params: + capacity: 1 + cost: 1 + - source: A + target: B + link_params: + capacity: 2 + cost: 1 + + # Parallel edges between B→C + - source: B + target: C + link_params: + capacity: 1 + cost: 1 + - source: B + target: C + link_params: + capacity: 2 + cost: 1 + + # Alternative path A→D→C + - source: A + target: D + link_params: + capacity: 3 + cost: 2 + - source: D + target: C + link_params: + capacity: 3 + cost: 2 +""" + + # Create the network + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Verify network structure + assert len(network.nodes) == 4 + assert "A" in network.nodes + assert "B" in network.nodes + assert "C" in network.nodes + assert "D" in network.nodes + + def test_flow_analysis_variants(self): + """Test different flow analysis approaches from basic.md""" + scenario_yaml = """ +network: + name: "fundamentals_example" + nodes: + A: {} + B: {} + C: {} + D: {} + links: + - source: A + target: B + link_params: + capacity: 1 + cost: 1 + - source: A + target: B + link_params: + capacity: 2 + cost: 1 + - source: B + target: C + link_params: + capacity: 1 + cost: 1 + - source: B + target: C + link_params: + capacity: 2 + cost: 1 + - source: A + target: D + link_params: + capacity: 3 + cost: 2 + - source: D + target: C + link_params: + capacity: 3 + cost: 2 +""" + + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Test "true" maximum flow (uses all available paths) + max_flow_all = network.max_flow(source_path="A", sink_path="C") + assert isinstance(max_flow_all, dict) + assert len(max_flow_all) == 1 + flow_value = list(max_flow_all.values())[0] + # Should be 6.0 (3 from A→B→C path + 3 from A→D→C path) + assert flow_value == 6.0 + + # Test flow along shortest paths only + max_flow_shortest = network.max_flow( + source_path="A", sink_path="C", shortest_path=True + ) + assert isinstance(max_flow_shortest, dict) + flow_value_shortest = list(max_flow_shortest.values())[0] + # Should be 3.0 (only uses A→B→C path, ignoring higher-cost A→D→C) + assert flow_value_shortest == 3.0 + + # Test with EQUAL_BALANCED flow placement + max_flow_balanced = network.max_flow( + source_path="A", + sink_path="C", + shortest_path=True, + flow_placement=FlowPlacement.EQUAL_BALANCED, + ) + assert isinstance(max_flow_balanced, dict) + flow_value_balanced = list(max_flow_balanced.values())[0] + # Should be limited by equal distribution across parallel paths + assert flow_value_balanced <= flow_value_shortest + + def test_advanced_sensitivity_analysis(self): + """Test the advanced sensitivity analysis section from basic.md""" + scenario_yaml = """ +network: + name: "fundamentals_example" + nodes: + A: {} + B: {} + C: {} + D: {} + links: + - source: A + target: B + link_params: + capacity: 1 + cost: 1 + - source: A + target: B + link_params: + capacity: 2 + cost: 1 + - source: B + target: C + link_params: + capacity: 1 + cost: 1 + - source: B + target: C + link_params: + capacity: 2 + cost: 1 + - source: A + target: D + link_params: + capacity: 3 + cost: 2 + - source: D + target: C + link_params: + capacity: 3 + cost: 2 +""" + + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Get the underlying graph for low-level analysis + graph = network.to_strict_multidigraph() + + # Identify bottleneck (saturated) edges + bottlenecks = saturated_edges(graph, "A", "C") + assert isinstance(bottlenecks, list) + assert len(bottlenecks) > 0 + + # Perform sensitivity analysis - test increasing capacity by 1 unit + sensitivity_increase = run_sensitivity(graph, "A", "C", change_amount=1.0) + assert isinstance(sensitivity_increase, dict) + assert len(sensitivity_increase) > 0 + + # All values should be non-negative (increasing capacity shouldn't decrease flow) + for flow_change in sensitivity_increase.values(): + assert flow_change >= 0 + + # Test sensitivity to capacity decreases + sensitivity_decrease = run_sensitivity(graph, "A", "C", change_amount=-1.0) + assert isinstance(sensitivity_decrease, dict) + assert len(sensitivity_decrease) > 0 + + # All values should be non-positive (decreasing capacity shouldn't increase flow) + for flow_change in sensitivity_decrease.values(): + assert flow_change <= 0 + + def test_results_interpretation(self): + """Test that the documented behavior matches actual results.""" + scenario_yaml = """ +network: + name: "fundamentals_example" + nodes: + A: {} + B: {} + C: {} + D: {} + links: + - source: A + target: B + link_params: + capacity: 1 + cost: 1 + - source: A + target: B + link_params: + capacity: 2 + cost: 1 + - source: B + target: C + link_params: + capacity: 1 + cost: 1 + - source: B + target: C + link_params: + capacity: 2 + cost: 1 + - source: A + target: D + link_params: + capacity: 3 + cost: 2 + - source: D + target: C + link_params: + capacity: 3 + cost: 2 +""" + + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Test documented behavior: + # - "True" MaxFlow: Uses all available paths regardless of cost + max_flow_all = network.max_flow(source_path="A", sink_path="C") + flow_all = list(max_flow_all.values())[0] + + # - Shortest Path: Only uses paths with minimum cost + max_flow_shortest = network.max_flow( + source_path="A", sink_path="C", shortest_path=True + ) + flow_shortest = list(max_flow_shortest.values())[0] + + # True max flow should be >= shortest path flow + assert flow_all >= flow_shortest + + # In this topology: + # - A→B→C has cost 2 and capacity 3 (sum of parallel edges) + # - A→D→C has cost 4 and capacity 3 + # So shortest path should only use A→B→C + assert flow_shortest == 3.0 + # And true max flow should use both paths + assert flow_all == 6.0 + + +class TestClosFabricMdExamples: + """Test examples from docs/examples/clos-fabric.md (if any specific examples exist)""" + + def test_clos_fabric_imports(self): + """Test that we can import everything needed for Clos fabric examples.""" + # This is a basic test to ensure the example dependencies work + from ngraph.lib.algorithms.base import FlowPlacement + from ngraph.scenario import Scenario + + # These should import without errors + assert Scenario is not None + assert FlowPlacement is not None + assert hasattr(FlowPlacement, "PROPORTIONAL") + assert hasattr(FlowPlacement, "EQUAL_BALANCED") + + def test_clos_fabric_hierarchical_blueprint(self): + """Test the hierarchical blueprint structure from clos-fabric.md""" + # This matches the actual example shown in clos-fabric.md + scenario_yaml = """ +blueprints: + brick_2tier: + groups: + t1: + node_count: 8 + name_template: t1-{node_num} + t2: + node_count: 8 + name_template: t2-{node_num} + + adjacency: + - source: /t1 + target: /t2 + pattern: mesh + link_params: + capacity: 2 + cost: 1 + + 3tier_clos: + groups: + b1: + use_blueprint: brick_2tier + b2: + use_blueprint: brick_2tier + spine: + node_count: 64 + name_template: t3-{node_num} + + adjacency: + - source: b1/t2 + target: spine + pattern: one_to_one + link_params: + capacity: 2 + cost: 1 + - source: b2/t2 + target: spine + pattern: one_to_one + link_params: + capacity: 2 + cost: 1 + +network: + name: "3tier_clos_network" + version: 1.0 + + groups: + my_clos1: + use_blueprint: 3tier_clos + + my_clos2: + use_blueprint: 3tier_clos + + adjacency: + - source: my_clos1/spine + target: my_clos2/spine + pattern: one_to_one + link_count: 4 + link_params: + capacity: 1 + cost: 1 +""" + + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Verify the complex hierarchical structure was created + # Each 3tier_clos has: 2 * (8 t1 + 8 t2) + 64 spine = 96 nodes + # Two 3tier_clos instances = 192 nodes total + assert len(network.nodes) == 192 + + # Verify we have nodes with the expected hierarchical naming + node_names = list(network.nodes.keys()) + + # Check for nodes from the first Clos fabric + my_clos1_t1_nodes = [ + name + for name in node_names + if name.startswith("my_clos1/b1/t1/t1-") + or name.startswith("my_clos1/b2/t1/t1-") + ] + my_clos1_spine_nodes = [ + name for name in node_names if name.startswith("my_clos1/spine/t3-") + ] + + # Check for nodes from the second Clos fabric + my_clos2_t1_nodes = [ + name + for name in node_names + if name.startswith("my_clos2/b1/t1/t1-") + or name.startswith("my_clos2/b2/t1/t1-") + ] + my_clos2_spine_nodes = [ + name for name in node_names if name.startswith("my_clos2/spine/t3-") + ] + + # Verify node counts match the blueprint structure + assert len(my_clos1_t1_nodes) == 16 # 8 from b1 + 8 from b2 + assert len(my_clos1_spine_nodes) == 64 + assert len(my_clos2_t1_nodes) == 16 # 8 from b1 + 8 from b2 + assert len(my_clos2_spine_nodes) == 64 + + # Verify inter-fabric connectivity exists (64 spine nodes * 4 parallel links = 256 total links) + inter_fabric_links = [ + link + for link in network.links.values() + if ( + "my_clos1/spine/t3-" in link.source + and "my_clos2/spine/t3-" in link.target + ) + or ( + "my_clos2/spine/t3-" in link.source + and "my_clos1/spine/t3-" in link.target + ) + ] + # With one_to_one pattern + link_count=4: 64 spine pairs * 4 links each = 256 total links + assert len(inter_fabric_links) == 256 + + def test_clos_fabric_max_flow_analysis(self): + """Test the max flow analysis example from clos-fabric.md""" + # Using a simplified version of the hierarchical structure for testing + scenario_yaml = """ +blueprints: + brick_2tier: + groups: + t1: + node_count: 2 + name_template: t1-{node_num} + t2: + node_count: 2 + name_template: t2-{node_num} + + adjacency: + - source: /t1 + target: /t2 + pattern: mesh + link_params: + capacity: 2 + cost: 1 + + 3tier_clos: + groups: + b1: + use_blueprint: brick_2tier + b2: + use_blueprint: brick_2tier + spine: + node_count: 4 + name_template: t3-{node_num} + + adjacency: + - source: b1/t2 + target: spine + pattern: one_to_one + link_params: + capacity: 2 + cost: 1 + - source: b2/t2 + target: spine + pattern: one_to_one + link_params: + capacity: 2 + cost: 1 + +network: + name: "3tier_clos_network" + version: 1.0 + + groups: + my_clos1: + use_blueprint: 3tier_clos + + my_clos2: + use_blueprint: 3tier_clos + + adjacency: + - source: my_clos1/spine + target: my_clos2/spine + pattern: one_to_one + link_count: 4 + link_params: + capacity: 1 + cost: 1 +""" + + scenario = Scenario.from_yaml(scenario_yaml) + network = scenario.network + + # Test the max flow calculation as shown in the documentation + # Note: using simplified regex patterns for the test + max_flow_result = network.max_flow( + source_path=r"my_clos1.*(b[0-9]*)/t1", + sink_path=r"my_clos2.*(b[0-9]*)/t1", + mode="combine", + shortest_path=True, + flow_placement=FlowPlacement.EQUAL_BALANCED, + ) + + # Verify the result structure matches documentation expectations + assert isinstance(max_flow_result, dict) + assert len(max_flow_result) == 1 + + # The key should be a tuple representing the combined source/sink groups + flow_key = list(max_flow_result.keys())[0] + assert isinstance(flow_key, tuple) + assert len(flow_key) == 2 + + # Verify flow value is positive (actual value depends on topology) + flow_value = list(max_flow_result.values())[0] + assert flow_value > 0 diff --git a/tests/test_reference_docs.py b/tests/test_reference_docs.py new file mode 100644 index 0000000..a0e6aff --- /dev/null +++ b/tests/test_reference_docs.py @@ -0,0 +1,190 @@ +""" +Test code examples from API reference documentation. + +This module tests examples from: +- docs/reference/api.md +- docs/reference/api-full.md + +These are the low-level algorithmic examples that users might copy-paste. +""" + +from ngraph.lib.algorithms.max_flow import ( + calc_max_flow, + run_sensitivity, + saturated_edges, +) +from ngraph.lib.algorithms.spf import spf +from ngraph.lib.graph import StrictMultiDiGraph + + +class TestApiMdExamples: + """Test examples from docs/reference/api.md""" + + def test_graph_algorithms_section(self): + """Test the Graph Algorithms section examples.""" + # Example from api.md Graph Algorithms section + graph = StrictMultiDiGraph() + graph.add_node("A") + graph.add_node("B") + graph.add_edge("A", "B", capacity=10, cost=1) + + # Run shortest path algorithm + costs, pred = spf(graph, "A") + assert "A" in costs + assert "B" in costs + assert costs["A"] == 0 + assert costs["B"] == 1 + + # Calculate maximum flow + max_flow = calc_max_flow(graph, "A", "B") + assert max_flow == 10.0 + + # Sensitivity analysis - identify bottleneck edges and test capacity changes + saturated = saturated_edges(graph, "A", "B") + assert len(saturated) == 1 + assert saturated[0][:2] == ("A", "B") # Check source and target + + sensitivity = run_sensitivity(graph, "A", "B", change_amount=1.0) + assert len(sensitivity) == 1 + # Should show +1.0 flow increase when capacity increased by 1 + assert list(sensitivity.values())[0] == 1.0 + + +class TestApiFullMdExamples: + """Test examples from docs/reference/api-full.md""" + + def test_calc_max_flow_examples(self): + """Test calc_max_flow examples from api-full.md docstring.""" + # Example from api-full.md calc_max_flow docstring + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + g.add_node("C") + _ = g.add_edge("A", "B", capacity=10.0, flow=0.0, flows={}, cost=1) + _ = g.add_edge("B", "C", capacity=5.0, flow=0.0, flows={}, cost=1) + + # Basic usage (scalar return) + max_flow_value = calc_max_flow(g, "A", "C") + assert max_flow_value == 5.0 + + # With flow summary analytics + flow, summary = calc_max_flow(g, "A", "C", return_summary=True) + assert flow == 5.0 + assert summary.total_flow == 5.0 + assert len(summary.min_cut) == 1 + # The min-cut should be the B→C edge since it has lower capacity + min_cut_edge = summary.min_cut[0] + assert min_cut_edge[:2] == ("B", "C") + + # With both summary and mutated graph + flow, summary, flow_graph = calc_max_flow( + g, "A", "C", return_summary=True, return_graph=True + ) + assert flow == 5.0 + assert isinstance(flow_graph, StrictMultiDiGraph) + assert len(list(flow_graph.edges())) == 2 + + # Verify flow assignments in the graph + edge_flows = [] + for _u, _v, _k, data in flow_graph.edges(data=True, keys=True): + edge_flows.append(data.get("flow", 0.0)) + + # Both edges should have 5.0 flow + assert all(flow == 5.0 for flow in edge_flows) + + def test_saturated_edges_examples(self): + """Test saturated_edges function with a more complex example.""" + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + g.add_node("C") + g.add_node("D") + + # Create a network where B→C is the bottleneck + _ = g.add_edge("A", "B", capacity=10.0, cost=1) + _ = g.add_edge("B", "C", capacity=3.0, cost=1) # Bottleneck + _ = g.add_edge("A", "D", capacity=5.0, cost=1) + _ = g.add_edge("D", "C", capacity=5.0, cost=1) + + # Find saturated edges + saturated = saturated_edges(g, "A", "C") + + # Should find the bottleneck edge(s) + assert len(saturated) >= 1 + + # Verify the function works with tolerance parameter + saturated_tight = saturated_edges(g, "A", "C", tolerance=1e-12) + saturated_loose = saturated_edges(g, "A", "C", tolerance=0.1) + + # Tight tolerance should find exact saturated edges + # Loose tolerance might find more edges + assert len(saturated_tight) <= len(saturated_loose) + + def test_run_sensitivity_examples(self): + """Test run_sensitivity function examples.""" + g = StrictMultiDiGraph() + g.add_node("S") + g.add_node("T") + g.add_node("M") # Middle node + + # Create a simple network with one bottleneck + _ = g.add_edge("S", "M", capacity=10.0, cost=1) + _ = g.add_edge("M", "T", capacity=5.0, cost=1) # Bottleneck + + # Test capacity increase sensitivity + sensitivity_up = run_sensitivity(g, "S", "T", change_amount=1.0) + assert len(sensitivity_up) >= 1 + + # Increasing bottleneck capacity should increase flow + bottleneck_improvement = max(sensitivity_up.values()) + assert bottleneck_improvement > 0 + + # Test capacity decrease sensitivity + sensitivity_down = run_sensitivity(g, "S", "T", change_amount=-1.0) + assert len(sensitivity_down) >= 1 + + # Decreasing bottleneck capacity should decrease flow + bottleneck_degradation = min(sensitivity_down.values()) + assert bottleneck_degradation < 0 + + # Test zero-capacity behavior (negative change larger than capacity) + sensitivity_zero = run_sensitivity(g, "S", "T", change_amount=-10.0) + # Should not cause errors, capacities should be clamped to 0 + assert isinstance(sensitivity_zero, dict) + + def test_overload_return_types(self): + """Test that calc_max_flow returns correct types based on parameters.""" + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + _ = g.add_edge("A", "B", capacity=5.0, cost=1) + + # Test scalar return (default) + result1 = calc_max_flow(g, "A", "B") + assert isinstance(result1, (int, float)) + assert result1 == 5.0 + + # Test with return_summary=True + result2 = calc_max_flow(g, "A", "B", return_summary=True) + assert isinstance(result2, tuple) + assert len(result2) == 2 + flow, summary = result2 + assert isinstance(flow, (int, float)) + assert hasattr(summary, "total_flow") + + # Test with return_graph=True + result3 = calc_max_flow(g, "A", "B", return_graph=True) + assert isinstance(result3, tuple) + assert len(result3) == 2 + flow, graph = result3 + assert isinstance(flow, (int, float)) + assert isinstance(graph, StrictMultiDiGraph) + + # Test with both flags + result4 = calc_max_flow(g, "A", "B", return_summary=True, return_graph=True) + assert isinstance(result4, tuple) + assert len(result4) == 3 + flow, summary, graph = result4 + assert isinstance(flow, (int, float)) + assert hasattr(summary, "total_flow") + assert isinstance(graph, StrictMultiDiGraph) diff --git a/tests/test_tutorial_docs.py b/tests/test_tutorial_docs.py new file mode 100644 index 0000000..1fdd6e3 --- /dev/null +++ b/tests/test_tutorial_docs.py @@ -0,0 +1,339 @@ +""" +Test code examples from tutorial documentation. + +This module tests examples from: +- docs/getting-started/tutorial.md + +These are step-by-step tutorial examples for new users. +""" + +from ngraph.lib.algorithms.base import FlowPlacement +from ngraph.scenario import Scenario + + +class TestTutorialMdExamples: + """Test examples from docs/getting-started/tutorial.md""" + + def test_first_scenario_creation(self): + """Test the first scenario example from tutorial.md""" + # This should match the YAML from the tutorial + yaml_content = """ +network: + name: "tutorial_clos" + + groups: + servers: + node_count: 4 + name_template: "s{node_num}" + attrs: + hw_type: "server" + + leaf: + node_count: 4 + name_template: "l{node_num}" + attrs: + hw_type: "leaf_switch" + + spine: + node_count: 2 + name_template: "sp{node_num}" + attrs: + hw_type: "spine_switch" + + adjacency: + # Pod structure + - source: "s[0-1].*" + target: "l[0-1].*" + pattern: "mesh" + link_params: + capacity: 10 + cost: 1 + + - source: "s[2-3].*" + target: "l[2-3].*" + pattern: "mesh" + link_params: + capacity: 10 + cost: 1 + + # Leaf to spine + - source: "leaf" + target: "spine" + pattern: "mesh" + link_params: + capacity: 40 + cost: 1 +""" + + scenario = Scenario.from_yaml(yaml_content) + network = scenario.network + + # Verify the network was created correctly + assert len(network.nodes) == 10 # 4 servers + 4 leaf + 2 spine + + # Check node naming convention + server_nodes = [n for n in network.nodes if n.startswith("servers/")] + leaf_nodes = [n for n in network.nodes if n.startswith("leaf/")] + spine_nodes = [n for n in network.nodes if n.startswith("spine/")] + + assert len(server_nodes) == 4 + assert len(leaf_nodes) == 4 + assert len(spine_nodes) == 2 + + def test_analyzing_maximum_flow_capacity(self): + """Test the maximum flow analysis examples from tutorial.md""" + # Use a simpler version for testing + yaml_content = """ +network: + name: "tutorial_clos" + + groups: + pod1_servers: + node_count: 2 + name_template: "pod1_s{node_num}" + + pod2_servers: + node_count: 2 + name_template: "pod2_s{node_num}" + + pod1_leaf: + node_count: 2 + name_template: "pod1_l{node_num}" + + pod2_leaf: + node_count: 2 + name_template: "pod2_l{node_num}" + + pod1_spine: + node_count: 1 + name_template: "pod1_sp{node_num}" + + pod2_spine: + node_count: 1 + name_template: "pod2_sp{node_num}" + + adjacency: + # Pod 1 connections + - source: "pod1_servers" + target: "pod1_leaf" + pattern: "mesh" + link_params: + capacity: 10 + cost: 1 + + - source: "pod1_leaf" + target: "pod1_spine" + pattern: "mesh" + link_params: + capacity: 20 + cost: 1 + + # Pod 2 connections + - source: "pod2_servers" + target: "pod2_leaf" + pattern: "mesh" + link_params: + capacity: 10 + cost: 1 + + - source: "pod2_leaf" + target: "pod2_spine" + pattern: "mesh" + link_params: + capacity: 20 + cost: 1 + + # Inter-pod connection + - source: "pod1_spine" + target: "pod2_spine" + pattern: "mesh" + link_params: + capacity: 40 + cost: 1 +""" + + scenario = Scenario.from_yaml(yaml_content) + network = scenario.network + + # Test max flow calculations like in the tutorial + + # Calculate MaxFlow from pod1 servers to pod2 servers + max_flow_result = network.max_flow( + source_path="pod1_servers", + sink_path="pod2_servers", + ) + assert isinstance(max_flow_result, dict) + assert len(max_flow_result) == 1 + + # Calculate MaxFlow from pod1 leaf to pod2 leaf + max_flow_leaf = network.max_flow( + source_path="pod1_leaf", + sink_path="pod2_leaf", + ) + assert isinstance(max_flow_leaf, dict) + + # Calculate MaxFlow from pod1 spine to pod2 spine + max_flow_spine = network.max_flow( + source_path="pod1_spine", + sink_path="pod2_spine", + ) + assert isinstance(max_flow_spine, dict) + + def test_understanding_maxflow_results(self): + """Test the MaxFlow results interpretation section.""" + yaml_content = """ +network: + name: "simple_test" + + nodes: + A: {} + B: {} + C: {} + + links: + - source: A + target: B + link_params: + capacity: 10 + cost: 1 + - source: B + target: C + link_params: + capacity: 5 + cost: 1 +""" + + scenario = Scenario.from_yaml(yaml_content) + network = scenario.network + + # Test shortest_path parameter + max_flow_all = network.max_flow(source_path="A", sink_path="C") + max_flow_shortest = network.max_flow( + source_path="A", sink_path="C", shortest_path=True + ) + + # In this simple case, they should be the same + assert max_flow_all == max_flow_shortest + + # Test flow_placement parameter with shortest_path=True + max_flow_proportional = network.max_flow( + source_path="A", + sink_path="C", + shortest_path=True, + flow_placement=FlowPlacement.PROPORTIONAL, + ) + + max_flow_balanced = network.max_flow( + source_path="A", + sink_path="C", + shortest_path=True, + flow_placement=FlowPlacement.EQUAL_BALANCED, + ) + + # Both should work without error + assert isinstance(max_flow_proportional, dict) + assert isinstance(max_flow_balanced, dict) + + def test_flow_placement_policies(self): + """Test FlowPlacement policies mentioned in tutorial.""" + # Test that FlowPlacement enum works as expected + assert hasattr(FlowPlacement, "PROPORTIONAL") + assert hasattr(FlowPlacement, "EQUAL_BALANCED") + + # Test they can be used in max_flow calls + yaml_content = """ +network: + name: "parallel_test" + + nodes: + A: {} + B: {} + + links: + - source: A + target: B + link_params: + capacity: 10 + cost: 1 + - source: A + target: B + link_params: + capacity: 20 + cost: 1 +""" + + scenario = Scenario.from_yaml(yaml_content) + network = scenario.network + + # Test both flow placement policies + result_prop = network.max_flow( + source_path="A", + sink_path="B", + shortest_path=True, + flow_placement=FlowPlacement.PROPORTIONAL, + ) + + result_balanced = network.max_flow( + source_path="A", + sink_path="B", + shortest_path=True, + flow_placement=FlowPlacement.EQUAL_BALANCED, + ) + + # Both should work and potentially give different results + assert isinstance(result_prop, dict) + assert isinstance(result_balanced, dict) + + # Flow values should be positive + flow_prop = list(result_prop.values())[0] + flow_balanced = list(result_balanced.values())[0] + assert flow_prop > 0 + assert flow_balanced > 0 + + def test_pseudo_node_concept(self): + """Test that pseudo-source/sink concept works as described.""" + yaml_content = """ +network: + name: "multi_source_sink" + + groups: + sources: + node_count: 3 + name_template: "src{node_num}" + + sinks: + node_count: 2 + name_template: "sink{node_num}" + + middle: + node_count: 1 + name_template: "mid{node_num}" + + adjacency: + - source: "sources" + target: "middle" + pattern: "mesh" + link_params: + capacity: 10 + cost: 1 + - source: "middle" + target: "sinks" + pattern: "mesh" + link_params: + capacity: 15 + cost: 1 +""" + + scenario = Scenario.from_yaml(yaml_content) + network = scenario.network + + # Test that max_flow works with multiple sources and sinks + # (This tests the pseudo-node concept mentioned in tutorial) + max_flow_result = network.max_flow(source_path="sources", sink_path="sinks") + + assert isinstance(max_flow_result, dict) + assert len(max_flow_result) == 1 + + # Should aggregate flow from all sources to all sinks + total_flow = list(max_flow_result.values())[0] + assert total_flow > 0