Skip to content

Commit f4528c4

Browse files
committed
introduced failure manager
1 parent 2d0801d commit f4528c4

File tree

5 files changed

+716
-430
lines changed

5 files changed

+716
-430
lines changed

ngraph/failure_manager.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
from __future__ import annotations
2+
3+
import copy
4+
import statistics
5+
from collections import defaultdict
6+
from concurrent.futures import ThreadPoolExecutor, as_completed
7+
from typing import List, Dict, Optional, Tuple, Any
8+
9+
from ngraph.network import Network
10+
from ngraph.traffic_demand import TrafficDemand
11+
from ngraph.traffic_manager import TrafficManager, TrafficResult
12+
from ngraph.failure_policy import FailurePolicy
13+
14+
15+
class FailureManager:
16+
"""
17+
Applies FailurePolicy to a Network, runs traffic placement, and (optionally)
18+
repeats multiple times for Monte Carlo experiments.
19+
20+
Attributes:
21+
network (Network): The underlying network to mutate (enable/disable nodes/links).
22+
traffic_demands (List[TrafficDemand]): List of demands to place after failures.
23+
failure_policy (Optional[FailurePolicy]): The policy describing what fails.
24+
default_flow_policy_config: The default flow policy for any demands lacking one.
25+
"""
26+
27+
def __init__(
28+
self,
29+
network: Network,
30+
traffic_demands: List[TrafficDemand],
31+
failure_policy: Optional[FailurePolicy] = None,
32+
default_flow_policy_config=None,
33+
) -> None:
34+
"""
35+
Initialize a FailureManager.
36+
37+
Args:
38+
network: The Network to be modified by failures.
39+
traffic_demands: Demands to place on the network after applying failures.
40+
failure_policy: A FailurePolicy specifying the rules of what fails.
41+
default_flow_policy_config: Default FlowPolicyConfig if demands do not specify one.
42+
"""
43+
self.network = network
44+
self.traffic_demands = traffic_demands
45+
self.failure_policy = failure_policy
46+
self.default_flow_policy_config = default_flow_policy_config
47+
48+
def apply_failures(self) -> None:
49+
"""
50+
Apply the current failure_policy to self.network (in-place).
51+
52+
If failure_policy is None, this method does nothing.
53+
"""
54+
if not self.failure_policy:
55+
return
56+
57+
# Collect node/links as dicts {id: attrs}, matching FailurePolicy expectations
58+
node_map = {n_name: n.attrs for n_name, n in self.network.nodes.items()}
59+
link_map = {l_id: l.attrs for l_id, l in self.network.links.items()}
60+
61+
failed_ids = self.failure_policy.apply_failures(node_map, link_map)
62+
63+
# Disable the failed entities
64+
for f_id in failed_ids:
65+
if f_id in self.network.nodes:
66+
self.network.disable_node(f_id)
67+
elif f_id in self.network.links:
68+
self.network.disable_link(f_id)
69+
70+
def run_single_failure_scenario(self) -> List[TrafficResult]:
71+
"""
72+
Applies failures to the network, places the demands, and returns per-demand results.
73+
74+
Returns:
75+
List[TrafficResult]: A list of traffic result objects under the applied failures.
76+
"""
77+
# Ensure we start with a fully enabled network (in case of reuse)
78+
self.network.enable_all()
79+
80+
# Apply the current failure policy
81+
self.apply_failures()
82+
83+
# Build TrafficManager and place demands
84+
tmgr = TrafficManager(
85+
network=self.network,
86+
traffic_demands=copy.deepcopy(self.traffic_demands),
87+
default_flow_policy_config=self.default_flow_policy_config,
88+
)
89+
tmgr.build_graph()
90+
tmgr.expand_demands()
91+
tmgr.place_all_demands()
92+
93+
# Return detailed traffic results
94+
return tmgr.get_traffic_results(detailed=True)
95+
96+
def run_monte_carlo_failures(
97+
self,
98+
iterations: int,
99+
parallelism: int = 1,
100+
) -> Dict[str, Any]:
101+
"""
102+
Repeatedly applies (randomized) failures to the network and accumulates
103+
per-run traffic data. Returns both overall volume statistics and a
104+
breakdown of results for each (src, dst, priority).
105+
106+
Args:
107+
iterations (int): Number of times to run the failure scenario.
108+
parallelism (int): Max number of worker threads to use (for parallel runs).
109+
110+
Returns:
111+
Dict[str, Any]: A dictionary containing:
112+
{
113+
"overall_stats": {
114+
"mean": <float>,
115+
"stdev": <float>,
116+
"min": <float>,
117+
"max": <float>
118+
},
119+
"by_src_dst": {
120+
(src, dst, priority): [
121+
{
122+
"iteration": <int>,
123+
"total_volume": <float>,
124+
"placed_volume": <float>,
125+
"unplaced_volume": <float>
126+
},
127+
...
128+
],
129+
...
130+
}
131+
}
132+
"""
133+
# scenario_list will hold the list of traffic-results (List[TrafficResult]) per iteration
134+
scenario_list: List[List[TrafficResult]] = []
135+
136+
# Run in parallel or synchronously
137+
if parallelism > 1:
138+
with ThreadPoolExecutor(max_workers=parallelism) as executor:
139+
futures = [
140+
executor.submit(self.run_single_failure_scenario)
141+
for _ in range(iterations)
142+
]
143+
for f in as_completed(futures):
144+
scenario_list.append(f.result())
145+
else:
146+
for _ in range(iterations):
147+
scenario_list.append(self.run_single_failure_scenario())
148+
149+
# If no scenarios were run, return zeroed stats
150+
if not scenario_list:
151+
return {
152+
"overall_stats": {"mean": 0.0, "stdev": 0.0, "min": 0.0, "max": 0.0},
153+
"by_src_dst": {},
154+
}
155+
156+
# Accumulate total placed volumes for each iteration (for top-level summary)
157+
placed_totals: List[float] = []
158+
159+
# Dictionary mapping (src, dst, priority) -> list of run-by-run results
160+
by_src_dst: Dict[Tuple[str, str, int], List[Dict[str, float]]] = defaultdict(
161+
list
162+
)
163+
164+
for i, traffic_results in enumerate(scenario_list):
165+
# Compute total placed volume for this iteration
166+
scenario_placed_total = sum(r.placed_volume for r in traffic_results)
167+
placed_totals.append(scenario_placed_total)
168+
169+
# Accumulate detailed data for each (src, dst, priority)
170+
for r in traffic_results:
171+
key = (r.src, r.dst, r.priority)
172+
by_src_dst[key].append(
173+
{
174+
"iteration": i,
175+
"total_volume": r.total_volume,
176+
"placed_volume": r.placed_volume,
177+
"unplaced_volume": r.unplaced_volume,
178+
}
179+
)
180+
181+
# Compute overall statistics on the total placed volumes
182+
overall_stats = {
183+
"mean": statistics.mean(placed_totals),
184+
"stdev": statistics.pstdev(placed_totals),
185+
"min": min(placed_totals),
186+
"max": max(placed_totals),
187+
}
188+
189+
return {
190+
"overall_stats": overall_stats,
191+
"by_src_dst": dict(by_src_dst),
192+
}

0 commit comments

Comments
 (0)