Skip to content

Commit 8654e45

Browse files
committed
Refactor FailureManager to optimize Monte Carlo analysis
1 parent 02cafe8 commit 8654e45

File tree

4 files changed

+122
-53
lines changed

4 files changed

+122
-53
lines changed

docs/reference/api-full.md

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Quick links:
1212
- [CLI Reference](cli.md)
1313
- [DSL Reference](dsl.md)
1414

15-
Generated from source code on: August 09, 2025 at 13:04 UTC
15+
Generated from source code on: August 09, 2025 at 22:20 UTC
1616

1717
Modules auto-discovered: 63
1818

@@ -1982,7 +1982,7 @@ Attributes:
19821982
- `compute_exclusions(self, policy: "'FailurePolicy | None'" = None, seed_offset: 'int | None' = None) -> 'tuple[set[str], set[str]]'` - Compute set of nodes and links to exclude for a failure iteration.
19831983
- `create_network_view(self, excluded_nodes: 'set[str] | None' = None, excluded_links: 'set[str] | None' = None) -> 'NetworkView'` - Create NetworkView with specified exclusions.
19841984
- `get_failure_policy(self) -> "'FailurePolicy | None'"` - Get failure policy for analysis.
1985-
- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int | str' = 'auto', baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures.
1985+
- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int | str' = 'auto', baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_details: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures.
19861986
- `run_max_flow_monte_carlo(self, source_path: 'str', sink_path: 'str', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = <FlowPlacement.PROPORTIONAL: 1>, baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_summary: 'bool' = False, **kwargs) -> 'Any'` - Analyze maximum flow capacity envelopes between node groups under failures.
19871987
- `run_monte_carlo_analysis(self, analysis_func: 'AnalysisFunction', iterations: 'int' = 1, parallelism: 'int' = 1, baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **analysis_kwargs) -> 'dict[str, Any]'` - Run Monte Carlo failure analysis with any analysis function.
19881988
- `run_sensitivity_monte_carlo(self, source_path: 'str', sink_path: 'str', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = <FlowPlacement.PROPORTIONAL: 1>, baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'Any'` - Analyze component criticality for flow capacity under failures.
@@ -2656,10 +2656,16 @@ YAML Configuration Example:
26562656
baseline: true # Include baseline iteration first
26572657
seed: 42 # Optional reproducible seed
26582658
store_failure_patterns: false # Store failure patterns if needed
2659+
include_flow_details: true # Collect per-demand cost distribution and edges
26592660

26602661
Results stored in `scenario.results` under the step name:
26612662

2662-
- placement_results: Per-iteration demand placement statistics (serializable)
2663+
- placement_envelopes: Per-demand placement ratio envelopes with statistics
2664+
2665+
When ``include_flow_details`` is true, each envelope also includes
2666+
``flow_summary_stats`` with aggregated ``cost_distribution_stats`` and
2667+
``edge_usage_frequencies``.
2668+
26632669
- failure_pattern_results: Failure pattern mapping (if requested)
26642670
- metadata: Execution metadata (iterations, parallelism, baseline, etc.)
26652671

@@ -2676,6 +2682,9 @@ Attributes:
26762682
baseline: Include baseline iteration without failures first.
26772683
seed: Optional seed for reproducibility.
26782684
store_failure_patterns: Whether to store failure pattern results.
2685+
include_flow_details: If True, collect per-demand cost distribution and
2686+
edges used per iteration, and aggregate into ``flow_summary_stats``
2687+
on each placement envelope.
26792688

26802689
**Attributes:**
26812690

@@ -2688,6 +2697,7 @@ Attributes:
26882697
- `placement_rounds` (int | str) = auto
26892698
- `baseline` (bool) = False
26902699
- `store_failure_patterns` (bool) = False
2700+
- `include_flow_details` (bool) = False
26912701

26922702
**Methods:**
26932703

@@ -3029,7 +3039,7 @@ failure analysis scenarios.
30293039
Note: This module is distinct from ngraph.workflow.analysis, which provides
30303040
notebook visualization components for workflow results.
30313041

3032-
### demand_placement_analysis(network_view: "'NetworkView'", demands_config: 'list[dict[str, Any]]', placement_rounds: 'int | str' = 'auto', **kwargs) -> 'dict[str, Any]'
3042+
### demand_placement_analysis(network_view: "'NetworkView'", demands_config: 'list[dict[str, Any]]', placement_rounds: 'int | str' = 'auto', include_flow_details: 'bool' = False, **kwargs) -> 'dict[str, Any]'
30333043

30343044
Analyze traffic demand placement success rates.
30353045

@@ -3046,9 +3056,14 @@ Returns:
30463056
- total_demand: Total demand volume.
30473057
- overall_placement_ratio: total_placed / total_demand (0.0 if undefined).
30483058
- demand_results: List of per-demand statistics preserving offered volume.
3049-
- priority_results: Mapping from priority to aggregated statistics with keys
30503059

3051-
total_volume, placed_volume, unplaced_volume, placement_ratio,
3060+
When ``include_flow_details`` is True, each entry also includes
3061+
``cost_distribution`` mapping path cost to placed volume and
3062+
``edges_used`` as a list of edge identifiers seen in the placed flows.
3063+
3064+
- priority_results: Mapping from priority to aggregated statistics with
3065+
3066+
keys total_volume, placed_volume, unplaced_volume, placement_ratio,
30523067
and demand_count.
30533068

30543069
### max_flow_analysis(network_view: "'NetworkView'", source_regex: 'str', sink_regex: 'str', mode: 'str' = 'combine', shortest_path: 'bool' = False, flow_placement: 'FlowPlacement' = <FlowPlacement.PROPORTIONAL: 1>, include_flow_summary: 'bool' = False, **kwargs) -> 'list[tuple]'

ngraph/failure/manager/manager.py

Lines changed: 82 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,11 @@ def run_monte_carlo_analysis(
502502
logger.debug("Pre-computing failure exclusions for all iterations")
503503
pre_compute_start = time.time()
504504

505-
worker_args = []
505+
worker_args: list[tuple] = []
506+
iteration_index_to_key: dict[int, tuple] = {}
507+
key_to_first_arg: dict[tuple, tuple] = {}
508+
key_to_members: dict[tuple, list[int]] = {}
509+
506510
for i in range(mc_iters):
507511
seed_offset = None
508512
if seed is not None:
@@ -520,37 +524,87 @@ def run_monte_carlo_analysis(
520524
policy, seed_offset
521525
)
522526

523-
# Create worker arguments
524-
worker_args.append(
525-
(
526-
excluded_nodes,
527-
excluded_links,
528-
analysis_func,
529-
analysis_kwargs,
530-
i, # iteration_index
531-
is_baseline,
532-
func_name,
533-
)
527+
arg = (
528+
excluded_nodes,
529+
excluded_links,
530+
analysis_func,
531+
analysis_kwargs,
532+
i, # iteration_index
533+
is_baseline,
534+
func_name,
535+
)
536+
worker_args.append(arg)
537+
538+
# Build deduplication key (excludes iteration index)
539+
dedup_key = _create_cache_key(
540+
excluded_nodes, excluded_links, func_name, analysis_kwargs
534541
)
542+
iteration_index_to_key[i] = dedup_key
543+
if dedup_key not in key_to_first_arg:
544+
key_to_first_arg[dedup_key] = arg
545+
key_to_members.setdefault(dedup_key, []).append(i)
535546

536547
pre_compute_time = time.time() - pre_compute_start
537548
logger.debug(
538549
f"Pre-computed {len(worker_args)} exclusion sets in {pre_compute_time:.2f}s"
539550
)
540551

552+
# Prepare unique tasks (deduplicated by failure pattern + analysis params)
553+
unique_worker_args: list[tuple] = list(key_to_first_arg.values())
554+
num_unique_tasks: int = len(unique_worker_args)
555+
logger.info(
556+
f"Monte-Carlo deduplication: {num_unique_tasks} unique patterns from {mc_iters} iterations"
557+
)
558+
541559
# Determine if we should run in parallel
542-
use_parallel = parallelism > 1 and mc_iters > 1
560+
use_parallel = parallelism > 1 and num_unique_tasks > 1
543561

544562
start_time = time.time()
545563

564+
# Execute only unique tasks, then replicate results to original indices
546565
if use_parallel:
547-
results, failure_patterns = self._run_parallel(
548-
worker_args, mc_iters, store_failure_patterns, parallelism
566+
unique_result_values, _ = self._run_parallel(
567+
unique_worker_args, num_unique_tasks, False, parallelism
549568
)
550569
else:
551-
results, failure_patterns = self._run_serial(
552-
worker_args, store_failure_patterns
570+
unique_result_values, _ = self._run_serial(unique_worker_args, False)
571+
572+
# Map unique task results back to their groups by zipping args with results
573+
key_to_result: dict[tuple, Any] = {}
574+
for arg, value in zip(unique_worker_args, unique_result_values, strict=False):
575+
exc_nodes, exc_links = arg[0], arg[1]
576+
dedup_key = _create_cache_key(
577+
exc_nodes, exc_links, func_name, analysis_kwargs
553578
)
579+
key_to_result[dedup_key] = value
580+
581+
# Build full results list in original order
582+
results: list[Any] = [None] * mc_iters # type: ignore[var-annotated]
583+
for key, members in key_to_members.items():
584+
if key not in key_to_result:
585+
# Defensive: should not happen unless parallel map returned fewer tasks
586+
continue
587+
value = key_to_result[key]
588+
for idx in members:
589+
results[idx] = value
590+
591+
# Reconstruct failure patterns per original iteration if requested
592+
failure_patterns: list[dict[str, Any]] = []
593+
if store_failure_patterns:
594+
for key, members in key_to_members.items():
595+
# Use exclusions from the representative arg
596+
rep_arg = key_to_first_arg[key]
597+
exc_nodes: set[str] = rep_arg[0]
598+
exc_links: set[str] = rep_arg[1]
599+
for idx in members:
600+
failure_patterns.append(
601+
{
602+
"iteration_index": idx,
603+
"is_baseline": bool(baseline and idx == 0),
604+
"excluded_nodes": list(exc_nodes),
605+
"excluded_links": list(exc_links),
606+
}
607+
)
554608

555609
elapsed_time = time.time() - start_time
556610

@@ -564,19 +618,14 @@ def run_monte_carlo_analysis(
564618
"analysis_function": func_name,
565619
"policy_name": self.policy_name,
566620
"execution_time": elapsed_time,
567-
"unique_patterns": len(
568-
set(
569-
(tuple(sorted(args[0])), tuple(sorted(args[1])))
570-
for args in worker_args
571-
)
572-
),
621+
"unique_patterns": num_unique_tasks,
573622
},
574623
}
575624

576625
def _run_parallel(
577626
self,
578627
worker_args: list[tuple],
579-
mc_iters: int,
628+
total_tasks: int,
580629
store_failure_patterns: bool,
581630
parallelism: int,
582631
) -> tuple[list[Any], list[dict[str, Any]]]:
@@ -596,17 +645,17 @@ def _run_parallel(
596645
Returns:
597646
Tuple of (results_list, failure_patterns_list).
598647
"""
599-
workers = min(parallelism, mc_iters)
648+
workers = min(parallelism, total_tasks)
600649
logger.info(
601-
f"Running parallel analysis with {workers} workers for {mc_iters} iterations"
650+
f"Running parallel analysis with {workers} workers for {total_tasks} iterations"
602651
)
603652

604653
# Serialize network once for all workers
605654
network_pickle = pickle.dumps(self.network)
606655
logger.debug(f"Serialized network once: {len(network_pickle)} bytes")
607656

608657
# Calculate optimal chunksize to minimize IPC overhead
609-
chunksize = max(1, mc_iters // (workers * 4))
658+
chunksize = max(1, total_tasks // (workers * 4))
610659
logger.debug(f"Using chunksize={chunksize} for parallel execution")
611660

612661
start_time = time.time()
@@ -622,7 +671,7 @@ def _run_parallel(
622671
logger.debug(
623672
f"ProcessPoolExecutor created with {workers} workers and shared network"
624673
)
625-
logger.info(f"Starting parallel execution of {mc_iters} iterations")
674+
logger.info(f"Starting parallel execution of {total_tasks} iterations")
626675

627676
try:
628677
for (
@@ -649,9 +698,9 @@ def _run_parallel(
649698
)
650699

651700
# Progress logging
652-
if completed_tasks % max(1, mc_iters // 10) == 0:
701+
if completed_tasks % max(1, total_tasks // 10) == 0:
653702
logger.info(
654-
f"Parallel analysis progress: {completed_tasks}/{mc_iters} tasks completed"
703+
f"Parallel analysis progress: {completed_tasks}/{total_tasks} tasks completed"
655704
)
656705

657706
except Exception as e:
@@ -664,7 +713,7 @@ def _run_parallel(
664713
elapsed_time = time.time() - start_time
665714
logger.info(f"Parallel analysis completed in {elapsed_time:.2f} seconds")
666715
logger.debug(
667-
f"Average time per iteration: {elapsed_time / mc_iters:.3f} seconds"
716+
f"Average time per iteration: {elapsed_time / total_tasks:.3f} seconds"
668717
)
669718

670719
# Log exclusion pattern diversity for cache efficiency analysis
@@ -678,9 +727,9 @@ def _run_parallel(
678727
unique_exclusions.add(exclusion_key)
679728

680729
logger.info(
681-
f"Generated {len(unique_exclusions)} unique exclusion patterns from {mc_iters} iterations"
730+
f"Generated {len(unique_exclusions)} unique exclusion patterns from {total_tasks} iterations"
682731
)
683-
cache_efficiency = (mc_iters - len(unique_exclusions)) / mc_iters * 100
732+
cache_efficiency = (total_tasks - len(unique_exclusions)) / total_tasks * 100
684733
logger.debug(
685734
f"Potential cache efficiency: {cache_efficiency:.1f}% (worker processes benefit from caching)"
686735
)

scenarios/square_mesh.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ workflow:
6060
# Single pairwise analysis generates complete 4x4 node-to-node capacity matrix
6161
- step_type: CapacityEnvelopeAnalysis
6262
name: "node_to_node_capacity_matrix"
63-
source_path: "^N([1-4])$" # Capturing group creates separate groups: "1", "2", "3", "4"
64-
sink_path: "^N([1-4])$" # Capturing group creates separate groups: "1", "2", "3", "4"
63+
source_path: "^(N[1-4])$" # Capturing group creates separate groups: "1", "2", "3", "4"
64+
sink_path: "^(N[1-4])$" # Capturing group creates separate groups: "1", "2", "3", "4"
6565
mode: "pairwise" # Generates all source-sink combinations (16 total)
6666
failure_policy: "single_link_failure"
6767
iterations: 10 # Monte Carlo iterations per node pair

tests/failure/test_manager.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -279,20 +279,15 @@ def test_parallel_execution(
279279
mock_pool_executor: MagicMock,
280280
failure_manager: FailureManager,
281281
) -> None:
282+
"""When deduplication collapses iterations to one unique pattern, execution
283+
may run serially even if parallelism > 1. Validate results shape and metadata
284+
without asserting executor usage.
285+
"""
282286
mock_pickle.dumps.return_value = b"fake_network_data"
283287

284288
mock_pool = MagicMock()
285289
mock_pool_executor.return_value.__enter__.return_value = mock_pool
286290

287-
mock_results = [
288-
[("src1", "dst1", 100.0)],
289-
[("src2", "dst2", 200.0)],
290-
]
291-
mock_pool.map.return_value = [
292-
(mock_results[0], 0, False, set(), set()),
293-
(mock_results[1], 1, False, set(), set()),
294-
]
295-
296291
result = failure_manager.run_monte_carlo_analysis(
297292
analysis_func=mock_analysis_func,
298293
iterations=2,
@@ -301,7 +296,6 @@ def test_parallel_execution(
301296

302297
assert len(result["results"]) == 2
303298
assert result["metadata"]["parallelism"] == 2
304-
mock_pool_executor.assert_called_once()
305299

306300

307301
class TestFailureManagerTopLevelMatching:
@@ -397,15 +391,26 @@ class TestFailureManagerErrorHandling:
397391
def test_run_monte_carlo_parallel_execution_error(
398392
self, failure_manager: FailureManager
399393
) -> None:
394+
"""Force two unique patterns so the parallel path is taken, then assert
395+
errors in worker execution propagate.
396+
"""
400397
with patch(
401398
"ngraph.failure.manager.manager.ProcessPoolExecutor"
402399
) as mock_pool_executor:
403400
mock_pool = MagicMock()
404401
mock_pool_executor.return_value.__enter__.return_value = mock_pool
405402
mock_pool.map.side_effect = RuntimeError("Parallel execution failed")
406403

407-
with patch(
408-
"ngraph.failure.manager.manager.pickle.dumps", return_value=b"fake_data"
404+
with (
405+
patch(
406+
"ngraph.failure.manager.manager.pickle.dumps",
407+
return_value=b"fake_data",
408+
),
409+
patch.object(
410+
failure_manager,
411+
"compute_exclusions",
412+
side_effect=[({"n1"}, set()), ({"n2"}, set())],
413+
),
409414
):
410415
with pytest.raises(RuntimeError, match="Parallel execution failed"):
411416
failure_manager.run_monte_carlo_analysis(

0 commit comments

Comments
 (0)