Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion docs/examples/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,71 @@ print(f"Equal-balanced flow: {max_flow_shortest_balanced}")

Note that `EQUAL_BALANCED` flow placement is only applicable when calculating MaxFlow on shortest paths.

## Cost Distribution Analysis

The cost distribution feature analyzes how flow is distributed across paths of different costs for latency span analysis and network performance characterization.

```python
# Get flow analysis with cost distribution
result = network.max_flow_with_summary(
source_path="A",
sink_path="C",
mode="combine"
)

# Extract flow value and summary
(src_label, sink_label), (flow_value, summary) = next(iter(result.items()))

print(f"Total flow: {flow_value}")
print(f"Cost distribution: {summary.cost_distribution}")

# Example output:
# Total flow: 6.0
# Cost distribution: {2.0: 3.0, 4.0: 3.0}
#
# This means:
# - 3.0 units of flow use paths with total cost 2.0 (A→B→C path)
# - 3.0 units of flow use paths with total cost 4.0 (A→D→C path)
```

### Latency Span Analysis

When link costs represent latency (e.g., distance-based), the cost distribution provides insight into traffic latency characteristics:

```python
def analyze_latency_span(cost_distribution):
"""Analyze latency characteristics from cost distribution."""
if not cost_distribution:
return "No flow paths available"

total_flow = sum(cost_distribution.values())
weighted_avg_latency = sum(cost * flow for cost, flow in cost_distribution.items()) / total_flow

min_latency = min(cost_distribution.keys())
max_latency = max(cost_distribution.keys())
latency_span = max_latency - min_latency

print(f"Latency Analysis:")
print(f" Average latency: {weighted_avg_latency:.2f}")
print(f" Latency range: {min_latency:.1f} - {max_latency:.1f}")
print(f" Latency span: {latency_span:.1f}")
print(f" Flow distribution:")
for cost, flow in sorted(cost_distribution.items()):
percentage = (flow / total_flow) * 100
print(f" {percentage:.1f}% of traffic uses paths with latency {cost:.1f}")

# Example usage
analyze_latency_span(summary.cost_distribution)
```

This analysis helps identify:
- **Traffic concentration**: How much traffic uses low vs. high latency paths
- **Latency span**: The range of latencies experienced by traffic
- **Performance bottlenecks**: When high-latency paths carry traffic due to capacity constraints

## Advanced Analysis: Sensitivity Analysis

For deeper network analysis, you can use the low-level graph algorithms to perform sensitivity analysis and identify bottleneck edges:
For network analysis, you can use the low-level graph algorithms to run sensitivity analysis and identify bottleneck edges:

```python
from ngraph.lib.algorithms.max_flow import calc_max_flow, saturated_edges, run_sensitivity
Expand Down
6 changes: 5 additions & 1 deletion docs/reference/api-full.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ For a curated, example-driven API guide, see **[api.md](api.md)**.
> - **[CLI Reference](cli.md)** - Command-line interface
> - **[DSL Reference](dsl.md)** - YAML syntax guide

**Generated from source code on:** July 29, 2025 at 16:59 UTC
**Generated from source code on:** July 29, 2025 at 17:52 UTC

**Modules auto-discovered:** 53

Expand Down Expand Up @@ -2205,6 +2205,9 @@ Attributes:
residual_cap: Remaining capacity on each edge after flow placement.
reachable: Set of nodes reachable from source in residual graph.
min_cut: List of saturated edges that form the minimum cut.
cost_distribution: Distribution of flow volume over path costs.
Maps each cost value to the total volume of flow placed on
paths with that cost during sequential augmentation.

**Attributes:**

Expand All @@ -2213,6 +2216,7 @@ Attributes:
- `residual_cap` (Dict[Edge, float])
- `reachable` (Set[str])
- `min_cut` (List[Edge])
- `cost_distribution` (Dict[Cost, float])

---

Expand Down
17 changes: 17 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ max_flow = network.max_flow(
shortest_path=True, # Use only shortest paths
flow_placement=FlowPlacement.PROPORTIONAL # UCMP load balancing
)

# Detailed flow analysis with cost distribution
result = network.max_flow_with_summary(
source_path="datacenter.*",
sink_path="edge.*",
mode="combine"
)
(src_label, sink_label), (flow_value, summary) = next(iter(result.items()))

# Cost distribution shows flow volume per path cost (useful for latency analysis)
print(f"Cost distribution: {summary.cost_distribution}")
# Example: {2.0: 150.0, 4.0: 75.0} means 150 units on cost-2 paths, 75 on cost-4 paths
```

**Key Options:**
Expand All @@ -150,6 +162,11 @@ max_flow = network.max_flow(
- `shortest_path`: `True` (shortest only) or `False` (all available paths)
- `flow_placement`: `FlowPlacement.PROPORTIONAL` (UCMP) or `FlowPlacement.EQUAL_BALANCED` (ECMP)

**Advanced Features:**

- **Cost Distribution**: `FlowSummary.cost_distribution` provides flow volume breakdown by path cost for latency span analysis and performance characterization
- **Analytics**: Edge flows, residual capacities, min-cut analysis, and reachability information

**Integration:** Available on both Network and NetworkView objects. Foundation for FailureManager Monte Carlo analysis.

### NetworkView
Expand Down
39 changes: 34 additions & 5 deletions ngraph/lib/algorithms/max_flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Maximum flow algorithms and network flow computations."""

from typing import Literal, Union, overload
from typing import Dict, Literal, Union, overload

from ngraph.lib.algorithms.base import EdgeSelect, FlowPlacement
from ngraph.lib.algorithms.flow_init import init_flow_graph
from ngraph.lib.algorithms.place_flow import place_flow_on_graph
from ngraph.lib.algorithms.spf import spf
from ngraph.lib.algorithms.spf import Cost, spf
from ngraph.lib.algorithms.types import FlowSummary
from ngraph.lib.graph import NodeID, StrictMultiDiGraph

Expand Down Expand Up @@ -208,6 +208,7 @@ def calc_max_flow(
capacity_attr,
flow_attr,
tolerance,
{}, # Empty cost distribution for self-loop case
)
else:
return 0.0
Expand All @@ -220,8 +221,11 @@ def calc_max_flow(
reset_flow_graph,
)

# Initialize cost distribution tracking
cost_distribution: Dict[Cost, float] = {}

# First path-finding iteration.
_, pred = spf(
costs, pred = spf(
flow_graph, src_node, edge_select=EdgeSelect.ALL_MIN_COST_WITH_CAP_REMAINING
)
flow_meta = place_flow_on_graph(
Expand All @@ -236,6 +240,13 @@ def calc_max_flow(
)
max_flow = flow_meta.placed_flow

# Track cost distribution for first iteration
if dst_node in costs and flow_meta.placed_flow > 0:
path_cost = costs[dst_node]
cost_distribution[path_cost] = (
cost_distribution.get(path_cost, 0.0) + flow_meta.placed_flow
)

# If only one path (single augmentation) is desired, return early.
if shortest_path:
return _build_return_value(
Expand All @@ -247,11 +258,12 @@ def calc_max_flow(
capacity_attr,
flow_attr,
tolerance,
cost_distribution,
)

# Otherwise, repeatedly find augmenting paths until no new flow can be placed.
while True:
_, pred = spf(
costs, pred = spf(
flow_graph, src_node, edge_select=EdgeSelect.ALL_MIN_COST_WITH_CAP_REMAINING
)
if dst_node not in pred:
Expand All @@ -274,6 +286,13 @@ def calc_max_flow(

max_flow += flow_meta.placed_flow

# Track cost distribution for this iteration
if dst_node in costs and flow_meta.placed_flow > 0:
path_cost = costs[dst_node]
cost_distribution[path_cost] = (
cost_distribution.get(path_cost, 0.0) + flow_meta.placed_flow
)

return _build_return_value(
max_flow,
flow_graph,
Expand All @@ -283,6 +302,7 @@ def calc_max_flow(
capacity_attr,
flow_attr,
tolerance,
cost_distribution,
)


Expand All @@ -295,6 +315,7 @@ def _build_return_value(
capacity_attr: str,
flow_attr: str,
tolerance: float,
cost_distribution: Dict[Cost, float],
) -> Union[float, tuple]:
"""Build the appropriate return value based on the requested flags."""
if not (return_summary or return_graph):
Expand All @@ -303,7 +324,13 @@ def _build_return_value(
summary = None
if return_summary:
summary = _build_flow_summary(
max_flow, flow_graph, src_node, capacity_attr, flow_attr, tolerance
max_flow,
flow_graph,
src_node,
capacity_attr,
flow_attr,
tolerance,
cost_distribution,
)

ret: list = [max_flow]
Expand All @@ -322,6 +349,7 @@ def _build_flow_summary(
capacity_attr: str,
flow_attr: str,
tolerance: float,
cost_distribution: Dict[Cost, float],
) -> FlowSummary:
"""Build a FlowSummary from the flow graph state."""
edge_flow = {}
Expand Down Expand Up @@ -364,6 +392,7 @@ def _build_flow_summary(
residual_cap=residual_cap,
reachable=reachable,
min_cut=min_cut,
cost_distribution=cost_distribution,
)


Expand Down
6 changes: 6 additions & 0 deletions ngraph/lib/algorithms/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple

from ngraph.lib.algorithms.base import Cost

# Edge identifier tuple: (source_node, destination_node, edge_key)
Edge = Tuple[str, str, str]

Expand All @@ -23,10 +25,14 @@ class FlowSummary:
residual_cap: Remaining capacity on each edge after flow placement.
reachable: Set of nodes reachable from source in residual graph.
min_cut: List of saturated edges that form the minimum cut.
cost_distribution: Distribution of flow volume over path costs.
Maps each cost value to the total volume of flow placed on
paths with that cost during sequential augmentation.
"""

total_flow: float
edge_flow: Dict[Edge, float]
residual_cap: Dict[Edge, float]
reachable: Set[str]
min_cut: List[Edge]
cost_distribution: Dict[Cost, float]
10 changes: 10 additions & 0 deletions ngraph/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ def _compute_flow_with_summary_single_group(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return 0.0, empty_summary

Expand Down Expand Up @@ -584,6 +585,7 @@ def _compute_flow_detailed_single_group(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return 0.0, empty_summary, base_graph

Expand Down Expand Up @@ -1202,6 +1204,7 @@ def _max_flow_with_summary_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return {(combined_src_label, combined_snk_label): (0.0, empty_summary)}

Expand All @@ -1215,6 +1218,7 @@ def _max_flow_with_summary_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return {(combined_src_label, combined_snk_label): (0.0, empty_summary)}
else:
Expand All @@ -1240,6 +1244,7 @@ def _max_flow_with_summary_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
flow_val, summary = 0.0, empty_summary
else:
Expand All @@ -1255,6 +1260,7 @@ def _max_flow_with_summary_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
flow_val, summary = 0.0, empty_summary
results[(src_label, snk_label)] = (flow_val, summary)
Expand Down Expand Up @@ -1439,6 +1445,7 @@ def _max_flow_detailed_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return {
(combined_src_label, combined_snk_label): (
Expand All @@ -1459,6 +1466,7 @@ def _max_flow_detailed_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
return {
(combined_src_label, combined_snk_label): (
Expand Down Expand Up @@ -1501,6 +1509,7 @@ def _max_flow_detailed_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
flow_val, summary, flow_graph = (
0.0,
Expand All @@ -1521,6 +1530,7 @@ def _max_flow_detailed_internal(
residual_cap={},
reachable=set(),
min_cut=[],
cost_distribution={},
)
flow_val, summary, flow_graph = 0.0, empty_summary, base_graph
results[(src_label, snk_label)] = (flow_val, summary, flow_graph)
Expand Down
Loading