diff --git a/docs/examples/basic.md b/docs/examples/basic.md index 73a6fb0..5031bbf 100644 --- a/docs/examples/basic.md +++ b/docs/examples/basic.md @@ -121,9 +121,71 @@ print(f"Equal-balanced flow: {max_flow_shortest_balanced}") Note that `EQUAL_BALANCED` flow placement is only applicable when calculating MaxFlow on shortest paths. +## Cost Distribution Analysis + +The cost distribution feature analyzes how flow is distributed across paths of different costs for latency span analysis and network performance characterization. + +```python +# Get flow analysis with cost distribution +result = network.max_flow_with_summary( + source_path="A", + sink_path="C", + mode="combine" +) + +# Extract flow value and summary +(src_label, sink_label), (flow_value, summary) = next(iter(result.items())) + +print(f"Total flow: {flow_value}") +print(f"Cost distribution: {summary.cost_distribution}") + +# Example output: +# Total flow: 6.0 +# Cost distribution: {2.0: 3.0, 4.0: 3.0} +# +# This means: +# - 3.0 units of flow use paths with total cost 2.0 (A→B→C path) +# - 3.0 units of flow use paths with total cost 4.0 (A→D→C path) +``` + +### Latency Span Analysis + +When link costs represent latency (e.g., distance-based), the cost distribution provides insight into traffic latency characteristics: + +```python +def analyze_latency_span(cost_distribution): + """Analyze latency characteristics from cost distribution.""" + if not cost_distribution: + return "No flow paths available" + + total_flow = sum(cost_distribution.values()) + weighted_avg_latency = sum(cost * flow for cost, flow in cost_distribution.items()) / total_flow + + min_latency = min(cost_distribution.keys()) + max_latency = max(cost_distribution.keys()) + latency_span = max_latency - min_latency + + print(f"Latency Analysis:") + print(f" Average latency: {weighted_avg_latency:.2f}") + print(f" Latency range: {min_latency:.1f} - {max_latency:.1f}") + print(f" Latency span: {latency_span:.1f}") + print(f" Flow distribution:") + for cost, flow in sorted(cost_distribution.items()): + percentage = (flow / total_flow) * 100 + print(f" {percentage:.1f}% of traffic uses paths with latency {cost:.1f}") + +# Example usage +analyze_latency_span(summary.cost_distribution) +``` + +This analysis helps identify: +- **Traffic concentration**: How much traffic uses low vs. high latency paths +- **Latency span**: The range of latencies experienced by traffic +- **Performance bottlenecks**: When high-latency paths carry traffic due to capacity constraints + ## Advanced Analysis: Sensitivity Analysis -For deeper network analysis, you can use the low-level graph algorithms to perform sensitivity analysis and identify bottleneck edges: +For network analysis, you can use the low-level graph algorithms to run sensitivity analysis and identify bottleneck edges: ```python from ngraph.lib.algorithms.max_flow import calc_max_flow, saturated_edges, run_sensitivity diff --git a/docs/reference/api-full.md b/docs/reference/api-full.md index 1c845f0..44174b1 100644 --- a/docs/reference/api-full.md +++ b/docs/reference/api-full.md @@ -10,7 +10,7 @@ For a curated, example-driven API guide, see **[api.md](api.md)**. > - **[CLI Reference](cli.md)** - Command-line interface > - **[DSL Reference](dsl.md)** - YAML syntax guide -**Generated from source code on:** July 29, 2025 at 16:59 UTC +**Generated from source code on:** July 29, 2025 at 17:52 UTC **Modules auto-discovered:** 53 @@ -2205,6 +2205,9 @@ Attributes: residual_cap: Remaining capacity on each edge after flow placement. reachable: Set of nodes reachable from source in residual graph. min_cut: List of saturated edges that form the minimum cut. + cost_distribution: Distribution of flow volume over path costs. + Maps each cost value to the total volume of flow placed on + paths with that cost during sequential augmentation. **Attributes:** @@ -2213,6 +2216,7 @@ Attributes: - `residual_cap` (Dict[Edge, float]) - `reachable` (Set[str]) - `min_cut` (List[Edge]) +- `cost_distribution` (Dict[Cost, float]) --- diff --git a/docs/reference/api.md b/docs/reference/api.md index a5e1a02..cffac6e 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -142,6 +142,18 @@ max_flow = network.max_flow( shortest_path=True, # Use only shortest paths flow_placement=FlowPlacement.PROPORTIONAL # UCMP load balancing ) + +# Detailed flow analysis with cost distribution +result = network.max_flow_with_summary( + source_path="datacenter.*", + sink_path="edge.*", + mode="combine" +) +(src_label, sink_label), (flow_value, summary) = next(iter(result.items())) + +# Cost distribution shows flow volume per path cost (useful for latency analysis) +print(f"Cost distribution: {summary.cost_distribution}") +# Example: {2.0: 150.0, 4.0: 75.0} means 150 units on cost-2 paths, 75 on cost-4 paths ``` **Key Options:** @@ -150,6 +162,11 @@ max_flow = network.max_flow( - `shortest_path`: `True` (shortest only) or `False` (all available paths) - `flow_placement`: `FlowPlacement.PROPORTIONAL` (UCMP) or `FlowPlacement.EQUAL_BALANCED` (ECMP) +**Advanced Features:** + +- **Cost Distribution**: `FlowSummary.cost_distribution` provides flow volume breakdown by path cost for latency span analysis and performance characterization +- **Analytics**: Edge flows, residual capacities, min-cut analysis, and reachability information + **Integration:** Available on both Network and NetworkView objects. Foundation for FailureManager Monte Carlo analysis. ### NetworkView diff --git a/ngraph/lib/algorithms/max_flow.py b/ngraph/lib/algorithms/max_flow.py index 0f29327..e32840d 100644 --- a/ngraph/lib/algorithms/max_flow.py +++ b/ngraph/lib/algorithms/max_flow.py @@ -1,11 +1,11 @@ """Maximum flow algorithms and network flow computations.""" -from typing import Literal, Union, overload +from typing import Dict, Literal, Union, overload from ngraph.lib.algorithms.base import EdgeSelect, FlowPlacement from ngraph.lib.algorithms.flow_init import init_flow_graph from ngraph.lib.algorithms.place_flow import place_flow_on_graph -from ngraph.lib.algorithms.spf import spf +from ngraph.lib.algorithms.spf import Cost, spf from ngraph.lib.algorithms.types import FlowSummary from ngraph.lib.graph import NodeID, StrictMultiDiGraph @@ -208,6 +208,7 @@ def calc_max_flow( capacity_attr, flow_attr, tolerance, + {}, # Empty cost distribution for self-loop case ) else: return 0.0 @@ -220,8 +221,11 @@ def calc_max_flow( reset_flow_graph, ) + # Initialize cost distribution tracking + cost_distribution: Dict[Cost, float] = {} + # First path-finding iteration. - _, pred = spf( + costs, pred = spf( flow_graph, src_node, edge_select=EdgeSelect.ALL_MIN_COST_WITH_CAP_REMAINING ) flow_meta = place_flow_on_graph( @@ -236,6 +240,13 @@ def calc_max_flow( ) max_flow = flow_meta.placed_flow + # Track cost distribution for first iteration + if dst_node in costs and flow_meta.placed_flow > 0: + path_cost = costs[dst_node] + cost_distribution[path_cost] = ( + cost_distribution.get(path_cost, 0.0) + flow_meta.placed_flow + ) + # If only one path (single augmentation) is desired, return early. if shortest_path: return _build_return_value( @@ -247,11 +258,12 @@ def calc_max_flow( capacity_attr, flow_attr, tolerance, + cost_distribution, ) # Otherwise, repeatedly find augmenting paths until no new flow can be placed. while True: - _, pred = spf( + costs, pred = spf( flow_graph, src_node, edge_select=EdgeSelect.ALL_MIN_COST_WITH_CAP_REMAINING ) if dst_node not in pred: @@ -274,6 +286,13 @@ def calc_max_flow( max_flow += flow_meta.placed_flow + # Track cost distribution for this iteration + if dst_node in costs and flow_meta.placed_flow > 0: + path_cost = costs[dst_node] + cost_distribution[path_cost] = ( + cost_distribution.get(path_cost, 0.0) + flow_meta.placed_flow + ) + return _build_return_value( max_flow, flow_graph, @@ -283,6 +302,7 @@ def calc_max_flow( capacity_attr, flow_attr, tolerance, + cost_distribution, ) @@ -295,6 +315,7 @@ def _build_return_value( capacity_attr: str, flow_attr: str, tolerance: float, + cost_distribution: Dict[Cost, float], ) -> Union[float, tuple]: """Build the appropriate return value based on the requested flags.""" if not (return_summary or return_graph): @@ -303,7 +324,13 @@ def _build_return_value( summary = None if return_summary: summary = _build_flow_summary( - max_flow, flow_graph, src_node, capacity_attr, flow_attr, tolerance + max_flow, + flow_graph, + src_node, + capacity_attr, + flow_attr, + tolerance, + cost_distribution, ) ret: list = [max_flow] @@ -322,6 +349,7 @@ def _build_flow_summary( capacity_attr: str, flow_attr: str, tolerance: float, + cost_distribution: Dict[Cost, float], ) -> FlowSummary: """Build a FlowSummary from the flow graph state.""" edge_flow = {} @@ -364,6 +392,7 @@ def _build_flow_summary( residual_cap=residual_cap, reachable=reachable, min_cut=min_cut, + cost_distribution=cost_distribution, ) diff --git a/ngraph/lib/algorithms/types.py b/ngraph/lib/algorithms/types.py index 72d58af..658c073 100644 --- a/ngraph/lib/algorithms/types.py +++ b/ngraph/lib/algorithms/types.py @@ -5,6 +5,8 @@ from dataclasses import dataclass from typing import Dict, List, Set, Tuple +from ngraph.lib.algorithms.base import Cost + # Edge identifier tuple: (source_node, destination_node, edge_key) Edge = Tuple[str, str, str] @@ -23,6 +25,9 @@ class FlowSummary: residual_cap: Remaining capacity on each edge after flow placement. reachable: Set of nodes reachable from source in residual graph. min_cut: List of saturated edges that form the minimum cut. + cost_distribution: Distribution of flow volume over path costs. + Maps each cost value to the total volume of flow placed on + paths with that cost during sequential augmentation. """ total_flow: float @@ -30,3 +35,4 @@ class FlowSummary: residual_cap: Dict[Edge, float] reachable: Set[str] min_cut: List[Edge] + cost_distribution: Dict[Cost, float] diff --git a/ngraph/network.py b/ngraph/network.py index 7dc8040..f5c89d9 100644 --- a/ngraph/network.py +++ b/ngraph/network.py @@ -463,6 +463,7 @@ def _compute_flow_with_summary_single_group( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return 0.0, empty_summary @@ -584,6 +585,7 @@ def _compute_flow_detailed_single_group( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return 0.0, empty_summary, base_graph @@ -1202,6 +1204,7 @@ def _max_flow_with_summary_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return {(combined_src_label, combined_snk_label): (0.0, empty_summary)} @@ -1215,6 +1218,7 @@ def _max_flow_with_summary_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return {(combined_src_label, combined_snk_label): (0.0, empty_summary)} else: @@ -1240,6 +1244,7 @@ def _max_flow_with_summary_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) flow_val, summary = 0.0, empty_summary else: @@ -1255,6 +1260,7 @@ def _max_flow_with_summary_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) flow_val, summary = 0.0, empty_summary results[(src_label, snk_label)] = (flow_val, summary) @@ -1439,6 +1445,7 @@ def _max_flow_detailed_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return { (combined_src_label, combined_snk_label): ( @@ -1459,6 +1466,7 @@ def _max_flow_detailed_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) return { (combined_src_label, combined_snk_label): ( @@ -1501,6 +1509,7 @@ def _max_flow_detailed_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) flow_val, summary, flow_graph = ( 0.0, @@ -1521,6 +1530,7 @@ def _max_flow_detailed_internal( residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) flow_val, summary, flow_graph = 0.0, empty_summary, base_graph results[(src_label, snk_label)] = (flow_val, summary, flow_graph) diff --git a/tests/lib/algorithms/test_max_flow.py b/tests/lib/algorithms/test_max_flow.py index 3f84920..9c7f25d 100644 --- a/tests/lib/algorithms/test_max_flow.py +++ b/tests/lib/algorithms/test_max_flow.py @@ -671,3 +671,154 @@ def test_max_flow_with_parallel_edges(): g, "A", "C", shortest_path=True, flow_placement=FlowPlacement.EQUAL_BALANCED ) assert max_flow_eq == 2.0, f"Expected 2.0, got {max_flow_eq}" + + +class TestMaxFlowCostDistribution: + """Tests for cost distribution calculation in max flow analysis.""" + + def test_cost_distribution_multiple_paths(self): + """Test cost distribution with paths of different costs.""" + # Create graph with two path options at different costs + g = StrictMultiDiGraph() + for node in ["S", "A", "B", "T"]: + g.add_node(node) + + # Path 1: S -> A -> T (cost = 1 + 1 = 2, capacity = 5) + g.add_edge("S", "A", capacity=5.0, flow=0.0, flows={}, cost=1) + g.add_edge("A", "T", capacity=5.0, flow=0.0, flows={}, cost=1) + + # Path 2: S -> B -> T (cost = 2 + 2 = 4, capacity = 3) + g.add_edge("S", "B", capacity=3.0, flow=0.0, flows={}, cost=2) + g.add_edge("B", "T", capacity=3.0, flow=0.0, flows={}, cost=2) + + flow_value, summary = calc_max_flow(g, "S", "T", return_summary=True) + + # Algorithm should use lowest cost path first, then higher cost + assert flow_value == 8.0 + assert summary.cost_distribution == {2.0: 5.0, 4.0: 3.0} + + def test_cost_distribution_single_path(self): + """Test cost distribution with a single path.""" + g = StrictMultiDiGraph() + for node in ["A", "B", "C"]: + g.add_node(node) + + # Single path: A -> B -> C (cost = 3 + 2 = 5, capacity = 10) + g.add_edge("A", "B", capacity=10.0, flow=0.0, flows={}, cost=3) + g.add_edge("B", "C", capacity=10.0, flow=0.0, flows={}, cost=2) + + flow_value, summary = calc_max_flow(g, "A", "C", return_summary=True) + + assert flow_value == 10.0 + assert summary.cost_distribution == {5.0: 10.0} + + def test_cost_distribution_equal_cost_paths(self): + """Test cost distribution with multiple equal-cost paths.""" + g = StrictMultiDiGraph() + for node in ["S", "A", "B", "T"]: + g.add_node(node) + + # Two paths with same cost but different capacities + # Path 1: S -> A -> T (cost = 1 + 1 = 2, capacity = 4) + g.add_edge("S", "A", capacity=4.0, flow=0.0, flows={}, cost=1) + g.add_edge("A", "T", capacity=4.0, flow=0.0, flows={}, cost=1) + + # Path 2: S -> B -> T (cost = 1 + 1 = 2, capacity = 6) + g.add_edge("S", "B", capacity=6.0, flow=0.0, flows={}, cost=1) + g.add_edge("B", "T", capacity=6.0, flow=0.0, flows={}, cost=1) + + flow_value, summary = calc_max_flow(g, "S", "T", return_summary=True) + + # Should aggregate all flow at the same cost + assert flow_value == 10.0 + assert summary.cost_distribution == {2.0: 10.0} + + def test_cost_distribution_three_tiers(self): + """Test cost distribution with three different cost tiers.""" + g = StrictMultiDiGraph() + for node in ["S", "A", "B", "C", "T"]: + g.add_node(node) + + # Path 1: S -> A -> T (cost = 1, capacity = 2) + g.add_edge("S", "A", capacity=2.0, flow=0.0, flows={}, cost=1) + g.add_edge("A", "T", capacity=2.0, flow=0.0, flows={}, cost=0) + + # Path 2: S -> B -> T (cost = 3, capacity = 4) + g.add_edge("S", "B", capacity=4.0, flow=0.0, flows={}, cost=2) + g.add_edge("B", "T", capacity=4.0, flow=0.0, flows={}, cost=1) + + # Path 3: S -> C -> T (cost = 6, capacity = 3) + g.add_edge("S", "C", capacity=3.0, flow=0.0, flows={}, cost=3) + g.add_edge("C", "T", capacity=3.0, flow=0.0, flows={}, cost=3) + + flow_value, summary = calc_max_flow(g, "S", "T", return_summary=True) + + # Should use paths in cost order: cost 1, then 3, then 6 + assert flow_value == 9.0 + assert summary.cost_distribution == {1.0: 2.0, 3.0: 4.0, 6.0: 3.0} + + def test_cost_distribution_no_flow(self): + """Test cost distribution when no flow is possible.""" + g = StrictMultiDiGraph() + g.add_node("A") + g.add_node("B") + # No edges - no path possible + + flow_value, summary = calc_max_flow(g, "A", "B", return_summary=True) + + assert flow_value == 0.0 + assert summary.cost_distribution == {} + + def test_cost_distribution_self_loop(self): + """Test cost distribution for self-loop case.""" + g = StrictMultiDiGraph() + g.add_node("A") + g.add_edge("A", "A", capacity=10.0, flow=0.0, flows={}, cost=5) + + flow_value, summary = calc_max_flow(g, "A", "A", return_summary=True) + + # Self-loop always returns 0 flow + assert flow_value == 0.0 + assert summary.cost_distribution == {} + + def test_cost_distribution_shortest_path_mode(self): + """Test cost distribution with shortest_path=True (single augmentation).""" + g = StrictMultiDiGraph() + for node in ["S", "A", "B", "T"]: + g.add_node(node) + + # Path 1: S -> A -> T (cost = 2, capacity = 5) + g.add_edge("S", "A", capacity=5.0, flow=0.0, flows={}, cost=1) + g.add_edge("A", "T", capacity=5.0, flow=0.0, flows={}, cost=1) + + # Path 2: S -> B -> T (cost = 4, capacity = 3) + g.add_edge("S", "B", capacity=3.0, flow=0.0, flows={}, cost=2) + g.add_edge("B", "T", capacity=3.0, flow=0.0, flows={}, cost=2) + + flow_value, summary = calc_max_flow( + g, "S", "T", shortest_path=True, return_summary=True + ) + + # Should only use the first (lowest cost) path + assert flow_value == 5.0 + assert summary.cost_distribution == {2.0: 5.0} + + def test_cost_distribution_capacity_bottleneck(self): + """Test cost distribution when bottleneck limits flow on cheaper path.""" + g = StrictMultiDiGraph() + for node in ["S", "A", "B", "T"]: + g.add_node(node) + + # Path 1: S -> A -> T (cost = 1, but bottleneck capacity = 2) + g.add_edge("S", "A", capacity=10.0, flow=0.0, flows={}, cost=1) + g.add_edge("A", "T", capacity=2.0, flow=0.0, flows={}, cost=0) # Bottleneck + + # Path 2: S -> B -> T (cost = 3, capacity = 5) + g.add_edge("S", "B", capacity=5.0, flow=0.0, flows={}, cost=2) + g.add_edge("B", "T", capacity=5.0, flow=0.0, flows={}, cost=1) + + flow_value, summary = calc_max_flow(g, "S", "T", return_summary=True) + + # Should use cheap path first (limited by bottleneck), then expensive path + assert flow_value == 7.0 + assert summary.cost_distribution == {1.0: 2.0, 3.0: 5.0} diff --git a/tests/lib/algorithms/test_types.py b/tests/lib/algorithms/test_types.py index 73251be..0f4e043 100644 --- a/tests/lib/algorithms/test_types.py +++ b/tests/lib/algorithms/test_types.py @@ -35,6 +35,7 @@ def test_flow_summary_creation(self) -> None: residual_cap = {("A", "B", "e1"): 90.0, ("B", "C", "e2"): 95.0} reachable = {"A", "B"} min_cut = [("B", "C", "e2")] + cost_distribution = {1.0: 10.0, 2.0: 5.0} summary = FlowSummary( total_flow=15.0, @@ -42,6 +43,7 @@ def test_flow_summary_creation(self) -> None: residual_cap=residual_cap, reachable=reachable, min_cut=min_cut, + cost_distribution=cost_distribution, ) assert summary.total_flow == 15.0 @@ -49,6 +51,7 @@ def test_flow_summary_creation(self) -> None: assert summary.residual_cap == residual_cap assert summary.reachable == reachable assert summary.min_cut == min_cut + assert summary.cost_distribution == cost_distribution def test_flow_summary_structure(self) -> None: """Test that FlowSummary has the expected dataclass structure.""" @@ -58,6 +61,7 @@ def test_flow_summary_structure(self) -> None: residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) # Verify it's a dataclass with expected fields @@ -69,6 +73,7 @@ def test_flow_summary_structure(self) -> None: "residual_cap", "reachable", "min_cut", + "cost_distribution", } assert set(fields.keys()) == expected_fields @@ -97,6 +102,7 @@ def test_flow_summary_with_complex_data(self) -> None: residual_cap=residual_cap, reachable=reachable, min_cut=min_cut, + cost_distribution={2.0: 100.0, 3.0: 75.0}, # Test cost distribution ) # Verify all data is accessible @@ -105,12 +111,14 @@ def test_flow_summary_with_complex_data(self) -> None: assert len(summary.residual_cap) == 4 assert len(summary.reachable) == 3 assert len(summary.min_cut) == 1 + assert len(summary.cost_distribution) == 2 # Check specific values assert summary.edge_flow[("datacenter_1", "edge_1", "link_1")] == 100.0 assert summary.residual_cap[("datacenter_1", "edge_1", "link_1")] == 0.0 assert "datacenter_1" in summary.reachable assert ("datacenter_1", "edge_1", "link_1") in summary.min_cut + assert summary.cost_distribution[2.0] == 100.0 def test_flow_summary_empty_collections(self) -> None: """Test FlowSummary with empty collections.""" @@ -120,6 +128,7 @@ def test_flow_summary_empty_collections(self) -> None: residual_cap={}, reachable=set(), min_cut=[], + cost_distribution={}, ) assert summary.total_flow == 0.0 diff --git a/tests/test_network_enhanced_max_flow.py b/tests/test_network_enhanced_max_flow.py index d5952bc..6435714 100644 --- a/tests/test_network_enhanced_max_flow.py +++ b/tests/test_network_enhanced_max_flow.py @@ -280,3 +280,49 @@ def test_reachability_analysis(self): assert "A" in summary.reachable # D should not be reachable since it's isolated assert "D" not in summary.reachable + + def test_network_cost_distribution_functionality(self): + """Test that cost distribution is exposed through Network max flow methods.""" + # Create a network with different path costs + nodes = { + "S": Node("S"), + "A": Node("A"), + "B": Node("B"), + "T": Node("T"), + } + + links = { + "link1": Link( + "S", "A", capacity=5.0, cost=1.0 + ), # Path 1: cost 2, capacity 5 + "link2": Link("A", "T", capacity=5.0, cost=1.0), + "link3": Link( + "S", "B", capacity=3.0, cost=2.0 + ), # Path 2: cost 4, capacity 3 + "link4": Link("B", "T", capacity=3.0, cost=2.0), + } + + network = Network(nodes=nodes, links=links, risk_groups={}, attrs={}) + + # Test max_flow_with_summary for cost distribution + result = network.max_flow_with_summary("^S$", "^T$", mode="combine") + + assert len(result) == 1 + (src_label, sink_label), (flow_value, summary) = next(iter(result.items())) + + # Verify flow value and cost distribution + assert flow_value == 8.0 + assert hasattr(summary, "cost_distribution") + assert summary.cost_distribution == {2.0: 5.0, 4.0: 3.0} + + # Test max_flow_detailed for cost distribution + detailed_result = network.max_flow_detailed("^S$", "^T$", mode="combine") + + assert len(detailed_result) == 1 + (src_label, sink_label), (flow_value, summary, flow_graph) = next( + iter(detailed_result.items()) + ) + + # Should have the same cost distribution + assert flow_value == 8.0 + assert summary.cost_distribution == {2.0: 5.0, 4.0: 3.0}