Skip to content

Commit c9bdc34

Browse files
committed
enambled multi-path for TE_UCMP_UNLIM
1 parent b75726b commit c9bdc34

File tree

6 files changed

+91
-57
lines changed

6 files changed

+91
-57
lines changed

ngraph/flows/policy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,7 @@ def get_flow_policy(flow_policy_config: FlowPolicyConfig) -> FlowPolicy:
827827
path_alg=base.PathAlg.SPF,
828828
flow_placement=FlowPlacement.PROPORTIONAL,
829829
edge_select=base.EdgeSelect.ALL_MIN_COST_WITH_CAP_REMAINING,
830-
multipath=False,
830+
multipath=True,
831831
)
832832
elif flow_policy_config == FlowPolicyConfig.TE_ECMP_UP_TO_256_LSP:
833833
# TE with up to 256 LSPs using ECMP flow placement.

ngraph/monte_carlo/functions.py

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,21 +133,29 @@ def demand_placement_analysis(
133133
) -> FlowIterationResult:
134134
"""Analyze traffic demand placement success rates.
135135
136-
Returns a structured dictionary per iteration containing per-demand offered
137-
and placed volumes (in Gbit/s) and an iteration-level summary. This shape
138-
is designed for downstream computation of delivered bandwidth percentiles
139-
without having to reconstruct per-iteration joint distributions.
136+
Produces per-demand FlowEntry records and an iteration-level summary suitable
137+
for downstream statistics (e.g., delivered percentiles) without reconstructing
138+
joint distributions.
139+
140+
Additionally exposes placement engine counters to aid performance analysis:
141+
- Per-demand: ``FlowEntry.data.policy_metrics`` (dict) with totals collected by
142+
the active FlowPolicy (e.g., ``spf_calls_total``, ``flows_created_total``,
143+
``reopt_calls_total``, ``place_iterations_total``, ``placed_total``).
144+
- Per-iteration: ``FlowIterationResult.data.iteration_metrics`` aggregating the
145+
same counters across all demands in the iteration.
140146
141147
Args:
142148
network_view: NetworkView with potential exclusions applied.
143149
demands_config: List of demand configurations (serializable dicts).
144150
placement_rounds: Number of placement optimization rounds.
145151
include_flow_details: When True, include cost_distribution per flow.
146-
include_used_edges: When True, include set of used edges per demand in entry data.
152+
include_used_edges: When True, include set of used edges per demand in entry data
153+
as ``FlowEntry.data.edges`` with ``edges_kind='used'``.
147154
**kwargs: Ignored. Accepted for interface compatibility.
148155
149156
Returns:
150-
FlowIterationResult describing this iteration.
157+
FlowIterationResult describing this iteration. The ``data`` field contains
158+
``{"iteration_metrics": { ... }}``.
151159
"""
152160
# Reconstruct demands from config to avoid passing complex objects
153161
demands = []
@@ -179,6 +187,15 @@ def demand_placement_analysis(
179187
total_demand = 0.0
180188
total_placed = 0.0
181189

190+
# Aggregate iteration-level engine metrics across all demands
191+
iteration_metrics: dict[str, float] = {
192+
"spf_calls_total": 0.0,
193+
"flows_created_total": 0.0,
194+
"reopt_calls_total": 0.0,
195+
"place_iterations_total": 0.0,
196+
"placed_total": 0.0,
197+
}
198+
182199
for dmd in tm.demands:
183200
offered = float(getattr(dmd, "volume", 0.0))
184201
placed = float(getattr(dmd, "placed_demand", 0.0))
@@ -212,6 +229,24 @@ def demand_placement_analysis(
212229
extra["edges"] = sorted(edge_strings)
213230
extra["edges_kind"] = "used"
214231

232+
# Always expose per-demand FlowPolicy metrics when available
233+
fp = getattr(dmd, "flow_policy", None)
234+
if fp is not None:
235+
try:
236+
# Cumulative totals over the policy's lifetime within this iteration
237+
totals: dict[str, float] = fp.get_metrics() # type: ignore[assignment]
238+
except Exception:
239+
totals = {}
240+
if totals:
241+
extra["policy_metrics"] = {k: float(v) for k, v in totals.items()}
242+
# Accumulate iteration-level totals across demands on known keys
243+
for key in iteration_metrics.keys():
244+
if key in totals:
245+
try:
246+
iteration_metrics[key] += float(totals[key])
247+
except Exception:
248+
pass
249+
215250
entry = FlowEntry(
216251
source=str(getattr(dmd, "src_node", "")),
217252
destination=str(getattr(dmd, "dst_node", "")),
@@ -235,7 +270,11 @@ def demand_placement_analysis(
235270
dropped_flows=dropped_flows,
236271
num_flows=len(flow_entries),
237272
)
238-
return FlowIterationResult(flows=flow_entries, summary=summary)
273+
return FlowIterationResult(
274+
flows=flow_entries,
275+
summary=summary,
276+
data={"iteration_metrics": iteration_metrics},
277+
)
239278

240279

241280
def sensitivity_analysis(

ngraph/workflow/analysis/cost_power_analysis.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ def analyze(self, results: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
6969
data_obj = step_data.get("data", {}) if isinstance(step_data, dict) else {}
7070

7171
ctx = data_obj.get("context", {}) if isinstance(data_obj, dict) else {}
72-
try:
73-
agg_level = int(ctx.get("aggregation_level", 2))
74-
except Exception as exc: # pragma: no cover - defensive
75-
raise ValueError(
76-
f"Invalid aggregation_level in CostPower data: {exc}"
77-
) from exc
72+
agg_level = int(ctx.get("aggregation_level", 2))
7873

7974
levels = data_obj.get("levels", {}) if isinstance(data_obj, dict) else {}
8075
level_key = str(agg_level)
@@ -99,12 +94,9 @@ def analyze(self, results: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
9994
if isinstance(workflow_meta, dict) and workflow_meta:
10095
candidates: list[tuple[int, str]] = []
10196
for name, meta in workflow_meta.items():
102-
try:
103-
if str(meta.get("step_type")) == "TrafficMatrixPlacement":
104-
order = int(meta.get("execution_order", 1_000_000))
105-
candidates.append((order, name))
106-
except Exception:
107-
continue
97+
if str(meta.get("step_type")) == "TrafficMatrixPlacement":
98+
order = int(meta.get("execution_order", 1_000_000))
99+
candidates.append((order, name))
108100
if candidates:
109101
traffic_step_name = sorted(candidates)[0][1]
110102

@@ -153,12 +145,9 @@ def path_at_level(name: str, level: int) -> str:
153145

154146
# Attribute delivered volumes to both endpoints' sites (count once if same)
155147
for rec in base_flows:
156-
try:
157-
src = str(rec.get("source", ""))
158-
dst = str(rec.get("destination", ""))
159-
placed = float(rec.get("placed", 0.0))
160-
except Exception:
161-
continue
148+
src = str(rec.get("source", ""))
149+
dst = str(rec.get("destination", ""))
150+
placed = float(rec.get("placed", 0.0))
162151
if not src or not dst or placed <= 0.0:
163152
continue
164153

ngraph/workflow/base.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,24 @@ def execute(self, scenario: "Scenario") -> None:
144144
self.run(scenario)
145145
end_time = time.time()
146146
duration = end_time - start_time
147+
# Persist step duration into step-scoped metadata for downstream analysis
148+
existing_md = scenario.results.get("metadata", {})
149+
if not isinstance(existing_md, dict):
150+
raise TypeError("Results metadata must be a dict")
151+
updated_md = dict(existing_md)
152+
updated_md["duration_sec"] = float(duration)
153+
scenario.results.put("metadata", updated_md)
147154
logger.info(
148155
f"Completed workflow step: {display_name} ({step_type}) "
149156
f"in {duration:.3f} seconds"
150157
)
151158
try:
152159
store = getattr(scenario.results, "_store", {})
153160
keys = ", ".join(sorted(list(store.get(step_name, {}).keys())))
154-
except Exception:
161+
except Exception as exc:
162+
logger.debug(
163+
"Failed to read results keys for step %s: %s", display_name, exc
164+
)
155165
keys = "-"
156166
logger.debug(
157167
"Step %s finished: duration=%.3fs, results_keys=%s",
@@ -171,8 +181,10 @@ def execute(self, scenario: "Scenario") -> None:
171181
# Always exit step scope
172182
try:
173183
scenario.results.exit_step()
174-
except Exception:
175-
pass
184+
except Exception as exc:
185+
logger.warning(
186+
"Failed to exit step scope cleanly for %s: %s", display_name, exc
187+
)
176188

177189
@abstractmethod
178190
def run(self, scenario: "Scenario") -> None:

ngraph/workflow/cost_power.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,20 +226,17 @@ def add_values(
226226
)
227227

228228
# Log root summary
229-
try:
230-
root_items = levels_payload.get(0, [])
231-
root = root_items[0] if root_items else {}
232-
logger.info(
233-
"CostPower complete: name=%s capex=%.3f power=%.3f platform_capex=%.3f optics_capex=%.3f duration=%.3fs",
234-
self.name or self.__class__.__name__,
235-
float(root.get("capex_total", 0.0)),
236-
float(root.get("power_total_watts", 0.0)),
237-
float(root.get("platform_capex", 0.0)),
238-
float(root.get("optics_capex", 0.0)),
239-
time.perf_counter() - t0,
240-
)
241-
except Exception:
242-
pass
229+
root_items = levels_payload.get(0, [])
230+
root = root_items[0] if root_items else {}
231+
logger.info(
232+
"CostPower complete: name=%s capex=%.3f power=%.3f platform_capex=%.3f optics_capex=%.3f duration=%.3fs",
233+
self.name or self.__class__.__name__,
234+
float(root.get("capex_total", 0.0)),
235+
float(root.get("power_total_watts", 0.0)),
236+
float(root.get("platform_capex", 0.0)),
237+
float(root.get("optics_capex", 0.0)),
238+
time.perf_counter() - t0,
239+
)
243240

244241

245242
register_workflow_step("CostPower")(CostPower)

ngraph/workflow/traffic_matrix_placement_step.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,17 @@ def run(self, scenario: "Scenario") -> None:
104104
) from exc
105105

106106
def _serialize_policy(cfg: Any) -> Any:
107-
try:
108-
from ngraph.flows.policy import FlowPolicyConfig # local import
109-
except Exception: # pragma: no cover - defensive
110-
return str(cfg) if cfg is not None else None
107+
from ngraph.flows.policy import FlowPolicyConfig # local import
108+
111109
if cfg is None:
112110
return None
113111
if isinstance(cfg, FlowPolicyConfig):
114112
return cfg.name
113+
# Fall back to string when it cannot be coerced to enum
115114
try:
116115
return FlowPolicyConfig(int(cfg)).name
117-
except Exception:
116+
except Exception as exc:
117+
logger.debug("Unrecognized flow_policy_config value: %r (%s)", cfg, exc)
118118
return str(cfg)
119119

120120
base_demands: list[dict[str, Any]] = [
@@ -133,15 +133,12 @@ def _serialize_policy(cfg: Any) -> Any:
133133

134134
# Resolve alpha
135135
effective_alpha = self._resolve_alpha(scenario)
136-
try:
137-
alpha_src = getattr(self, "_alpha_source", None) or "explicit"
138-
logger.info(
139-
"Using alpha: value=%.6g source=%s",
140-
float(effective_alpha),
141-
str(alpha_src),
142-
)
143-
except Exception:
144-
pass
136+
alpha_src = getattr(self, "_alpha_source", None) or "explicit"
137+
logger.info(
138+
"Using alpha: value=%.6g source=%s",
139+
float(effective_alpha),
140+
str(alpha_src),
141+
)
145142

146143
demands_config: list[dict[str, Any]] = []
147144
for td in td_list:

0 commit comments

Comments
 (0)