diff --git a/docs/reference/api-full.md b/docs/reference/api-full.md index 025ad23..2cabd9a 100644 --- a/docs/reference/api-full.md +++ b/docs/reference/api-full.md @@ -12,9 +12,9 @@ Quick links: - [CLI Reference](cli.md) - [DSL Reference](dsl.md) -Generated from source code on: August 09, 2025 at 04:52 UTC +Generated from source code on: August 09, 2025 at 13:04 UTC -Modules auto-discovered: 59 +Modules auto-discovered: 63 --- @@ -1670,10 +1670,61 @@ Raises: ## ngraph.demand.manager.builder +Builders for traffic matrices. + +Construct `TrafficMatrixSet` from raw dictionaries (e.g. parsed YAML). +This logic was previously embedded in `Scenario.from_yaml`. + +### build_traffic_matrix_set(raw: 'Dict[str, List[dict]]') -> 'TrafficMatrixSet' + +Build a `TrafficMatrixSet` from a mapping of name -> list of dicts. + +Args: + raw: Mapping where each key is a matrix name and each value is a list of + dictionaries with `TrafficDemand` constructor fields. + +Returns: + Initialized `TrafficMatrixSet` with constructed `TrafficDemand` objects. + +Raises: + ValueError: If ``raw`` is not a mapping of name -> list[dict]. + --- ## ngraph.demand.manager.expand +Expansion helpers for traffic demand specifications. + +Public functions here convert user-facing `TrafficDemand` specifications into +concrete `Demand` objects that can be placed on a `StrictMultiDiGraph`. + +This module provides the pure expansion logic that was previously embedded in +`TrafficManager`. + +### expand_demands(network: "Union[Network, 'NetworkView']", graph: 'StrictMultiDiGraph | None', traffic_demands: 'List[TrafficDemand]', default_flow_policy_config: 'FlowPolicyConfig') -> 'Tuple[List[Demand], Dict[str, List[Demand]]]' + +Expand traffic demands into concrete `Demand` objects. + +The result is a flat list of `Demand` plus a mapping from +``TrafficDemand.id`` to the list of expanded demands for that entry. + +Args: + network: Network or NetworkView used for node group selection. + graph: Flow graph to operate on. If ``None``, expansion that requires + graph mutation (pseudo nodes/edges) is skipped. + traffic_demands: List of high-level traffic demand specifications. + default_flow_policy_config: Default policy to apply when a demand does + not specify an explicit `flow_policy`. + +Returns: + A tuple ``(expanded, td_map)`` where: + +- ``expanded`` is the flattened, sorted list of all expanded demands + + (sorted by ascending ``demand_class``). + +- ``td_map`` maps ``TrafficDemand.id`` to its expanded demands. + --- ## ngraph.demand.manager.manager @@ -1764,6 +1815,57 @@ Attributes: ## ngraph.demand.manager.schedule +Scheduling utilities for demand placement rounds. + +Provides the simple priority-aware round-robin scheduler that was previously +implemented in `TrafficManager`. + +### place_demands_round_robin(graph: 'StrictMultiDiGraph', demands: 'List[Demand]', placement_rounds: 'int', reoptimize_after_each_round: 'bool' = False) -> 'float' + +Place demands using priority buckets and round-robin within each bucket. + +Args: + graph: Active flow graph. + demands: Expanded demands to place. + placement_rounds: Number of passes per priority class. + reoptimize_after_each_round: Whether to re-run placement for each demand + after a round to better share capacity. + +Returns: + Total volume successfully placed across all demands. + +--- + +## ngraph.demand.matrix + +Traffic matrix containers. + +Provides `TrafficMatrixSet`, a named collection of `TrafficDemand` lists +used as input to demand expansion and placement. This module contains input +containers, not analysis results. + +### TrafficMatrixSet + +Named collection of TrafficDemand lists. + +This mutable container maps scenario names to lists of TrafficDemand objects, +allowing management of multiple traffic matrices for analysis. + +Attributes: + matrices: Dictionary mapping scenario names to TrafficDemand lists. + +**Attributes:** + +- `matrices` (dict[str, list[TrafficDemand]]) = {} + +**Methods:** + +- `add(self, name: 'str', demands: 'list[TrafficDemand]') -> 'None'` - Add a traffic matrix to the collection. +- `get_all_demands(self) -> 'list[TrafficDemand]'` - Get all traffic demands from all matrices combined. +- `get_default_matrix(self) -> 'list[TrafficDemand]'` - Get default traffic matrix. +- `get_matrix(self, name: 'str') -> 'list[TrafficDemand]'` - Get a specific traffic matrix by name. +- `to_dict(self) -> 'dict[str, Any]'` - Convert to dictionary for JSON serialization. + --- ## ngraph.demand.spec @@ -1880,7 +1982,7 @@ Attributes: - `compute_exclusions(self, policy: "'FailurePolicy | None'" = None, seed_offset: 'int | None' = None) -> 'tuple[set[str], set[str]]'` - Compute set of nodes and links to exclude for a failure iteration. - `create_network_view(self, excluded_nodes: 'set[str] | None' = None, excluded_links: 'set[str] | None' = None) -> 'NetworkView'` - Create NetworkView with specified exclusions. - `get_failure_policy(self) -> "'FailurePolicy | None'"` - Get failure policy for analysis. -- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int' = 50, baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures. +- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int | str' = 'auto', baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures. - `run_max_flow_monte_carlo(self, source_path: 'str', sink_path: 'str', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = , baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_summary: 'bool' = False, **kwargs) -> 'Any'` - Analyze maximum flow capacity envelopes between node groups under failures. - `run_monte_carlo_analysis(self, analysis_func: 'AnalysisFunction', iterations: 'int' = 1, parallelism: 'int' = 1, baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **analysis_kwargs) -> 'dict[str, Any]'` - Run Monte Carlo failure analysis with any analysis function. - `run_sensitivity_monte_carlo(self, source_path: 'str', sink_path: 'str', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = , baseline: 'bool' = False, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'Any'` - Analyze component criticality for flow capacity under failures. @@ -2065,6 +2167,37 @@ Attributes: --- +## ngraph.failure.policy_set + +Failure policy containers. + +Provides `FailurePolicySet`, a named collection of `FailurePolicy` objects +used as input to failure analysis workflows. This module contains input +containers, not analysis results. + +### FailurePolicySet + +Named collection of FailurePolicy objects. + +This mutable container maps failure policy names to FailurePolicy objects, +allowing management of multiple failure policies for analysis. + +Attributes: + policies: Dictionary mapping failure policy names to FailurePolicy objects. + +**Attributes:** + +- `policies` (dict[str, FailurePolicy]) = {} + +**Methods:** + +- `add(self, name: 'str', policy: 'FailurePolicy') -> 'None'` - Add a failure policy to the collection. +- `get_all_policies(self) -> 'list[FailurePolicy]'` - Get all failure policies from the collection. +- `get_policy(self, name: 'str') -> 'FailurePolicy'` - Get a specific failure policy by name. +- `to_dict(self) -> 'dict[str, Any]'` - Convert to dictionary for JSON serialization. + +--- + ## ngraph.workflow.analysis.base Base classes for notebook analysis components. @@ -2169,6 +2302,27 @@ Manage package installation and imports for notebooks. --- +## ngraph.workflow.analysis.placement_matrix + +Placement envelope analysis utilities. + +Processes placement envelope results from TrafficMatrixPlacementAnalysis into +placement matrices and summaries suitable for notebooks. + +### PlacementMatrixAnalyzer + +Analyze placement envelopes and display matrices/statistics. + +**Methods:** + +- `analyze(self, results: 'Dict[str, Any]', **kwargs) -> 'Dict[str, Any]'` - Analyze placement envelopes for a given step. +- `analyze_and_display(self, results: Dict[str, Any], **kwargs) -> None` - Analyze results and display them in notebook format. +- `analyze_and_display_step(self, results: 'Dict[str, Any]', **kwargs) -> 'None'` +- `display_analysis(self, analysis: 'Dict[str, Any]', **kwargs) -> 'None'` - Display analysis results in notebook format. +- `get_description(self) -> 'str'` - Return a concise description of the analyzer purpose. + +--- + ## ngraph.workflow.analysis.registry Analysis registry for mapping workflow steps to analysis modules. @@ -2479,6 +2633,69 @@ Attributes: --- +## ngraph.workflow.traffic_matrix_placement_analysis + +Traffic matrix demand placement workflow component. + +Executes Monte Carlo analysis of traffic demand placement under failures using +FailureManager. Takes a named traffic matrix from the scenario's +TrafficMatrixSet. Optionally includes a baseline iteration (no failures). + +YAML Configuration Example: + + workflow: + +- step_type: TrafficMatrixPlacementAnalysis + + name: "tm_placement_monte_carlo" + matrix_name: "default" # Required: Name of traffic matrix to use + failure_policy: "random_failures" # Optional: Named failure policy + iterations: 100 # Number of Monte Carlo trials + parallelism: 4 # Number of worker processes + placement_rounds: "auto" # Optimization rounds per priority (int or "auto") + baseline: true # Include baseline iteration first + seed: 42 # Optional reproducible seed + store_failure_patterns: false # Store failure patterns if needed + +Results stored in `scenario.results` under the step name: + +- placement_results: Per-iteration demand placement statistics (serializable) +- failure_pattern_results: Failure pattern mapping (if requested) +- metadata: Execution metadata (iterations, parallelism, baseline, etc.) + +### TrafficMatrixPlacementAnalysis + +Monte Carlo demand placement analysis using a named traffic matrix. + +Attributes: + matrix_name: Name of the traffic matrix in scenario.traffic_matrix_set. + failure_policy: Optional policy name in scenario.failure_policy_set. + iterations: Number of Monte Carlo iterations. + parallelism: Number of parallel worker processes. + placement_rounds: Placement optimization rounds (int or "auto"). + baseline: Include baseline iteration without failures first. + seed: Optional seed for reproducibility. + store_failure_patterns: Whether to store failure pattern results. + +**Attributes:** + +- `name` (str) +- `seed` (int | None) +- `matrix_name` (str) +- `failure_policy` (str | None) +- `iterations` (int) = 1 +- `parallelism` (int) = 1 +- `placement_rounds` (int | str) = auto +- `baseline` (bool) = False +- `store_failure_patterns` (bool) = False + +**Methods:** + +- `execute(self, scenario: "'Scenario'") -> 'None'` - Execute the workflow step with logging and metadata storage. +- `run(self, scenario: "'Scenario'") -> 'None'` - Execute demand placement Monte Carlo analysis. + +--- + ## ngraph.dsl.blueprints.expand Network topology blueprints and generation. @@ -2569,8 +2786,39 @@ Returns: Parsing helpers for the network DSL. -This module is reserved for future parsing utilities. The main expansion -entry point is `ngraph.dsl.blueprints.expand.expand_network_dsl`. +This module factors out pure parsing/validation helpers from the expansion +module so they can be tested independently and reused. + +### check_adjacency_keys(adj_def: 'Dict[str, Any]', context: 'str') -> 'None' + +Ensure adjacency definitions only contain recognized keys. + +### check_link_params(link_params: 'Dict[str, Any]', context: 'str') -> 'None' + +Ensure link_params contain only recognized keys. + +### check_no_extra_keys(data_dict: 'Dict[str, Any]', allowed: 'set[str]', context: 'str') -> 'None' + +Raise if ``data_dict`` contains keys outside ``allowed``. + +Args: + data_dict: The dict to check. + allowed: Set of recognized keys. + context: Short description used in error messages. + +### expand_name_patterns(name: 'str') -> 'List[str]' + +Expand bracket expressions in a group name. + +Examples: + +- "fa[1-3]" -> ["fa1", "fa2", "fa3"] +- "dc[1,3,5-6]" -> ["dc1", "dc3", "dc5", "dc6"] +- "fa[1-2]_plane[5-6]" -> ["fa1_plane5", "fa1_plane6", "fa2_plane5", "fa2_plane6"] + +### join_paths(parent_path: 'str', rel_path: 'str') -> 'str' + +Join two path segments according to the DSL conventions. --- @@ -2578,12 +2826,10 @@ entry point is `ngraph.dsl.blueprints.expand.expand_network_dsl`. Serializable result artifacts for analysis workflows. -This module defines small dataclasses that capture outputs from analyses -and simulations in a JSON-serializable form: +This module defines dataclasses that capture outputs from analyses and +simulations in a JSON-serializable form: -- `TrafficMatrixSet`: named collections of `TrafficDemand` lists - `PlacementResultSet`: aggregated placement results and statistics -- `FailurePolicySet`: named collections of failure policies - `CapacityEnvelope`: frequency-based capacity distributions and optional aggregated flow statistics @@ -2653,26 +2899,42 @@ Attributes: - `to_dict(self) -> 'Dict[str, Any]'` - Convert to dictionary for JSON serialization. -### FailurePolicySet +### PlacementEnvelope -Named collection of FailurePolicy objects. +Per-demand placement envelope keyed like capacity envelopes. -This mutable container maps failure policy names to FailurePolicy objects, -allowing management of multiple failure policies for analysis. +Each envelope captures frequency distribution of placement ratio for a +specific demand definition across Monte Carlo iterations. Attributes: - policies: Dictionary mapping failure policy names to FailurePolicy objects. + source: Source selection regex or node label. + sink: Sink selection regex or node label. + mode: Demand expansion mode ("combine" or "pairwise"). + priority: Demand priority class. + frequencies: Mapping of placement ratio to occurrence count. + min: Minimum observed placement ratio. + max: Maximum observed placement ratio. + mean: Mean placement ratio. + stdev: Standard deviation of placement ratio. + total_samples: Number of iterations represented. **Attributes:** -- `policies` (dict[str, 'FailurePolicy']) = {} +- `source` (str) +- `sink` (str) +- `mode` (str) +- `priority` (int) +- `frequencies` (Dict[float, int]) +- `min` (float) +- `max` (float) +- `mean` (float) +- `stdev` (float) +- `total_samples` (int) **Methods:** -- `add(self, name: 'str', policy: "'FailurePolicy'") -> 'None'` - Add a failure policy to the collection. -- `get_all_policies(self) -> "list['FailurePolicy']"` - Get all failure policies from the collection. -- `get_policy(self, name: 'str') -> "'FailurePolicy'"` - Get a specific failure policy by name. -- `to_dict(self) -> 'dict[str, Any]'` - Convert to dictionary for JSON serialization. +- `from_values(source: 'str', sink: 'str', mode: 'str', priority: 'int', ratios: 'List[float]', rounding_decimals: 'int' = 4) -> "'PlacementEnvelope'"` +- `to_dict(self) -> 'Dict[str, Any]'` ### PlacementResultSet @@ -2696,31 +2958,9 @@ Attributes: - `to_dict(self) -> 'dict[str, Any]'` - Convert to dictionary for JSON serialization. -### TrafficMatrixSet - -Named collection of TrafficDemand lists. - -This mutable container maps scenario names to lists of TrafficDemand objects, -allowing management of multiple traffic matrices for analysis. - -Attributes: - matrices: Dictionary mapping scenario names to TrafficDemand lists. - -**Attributes:** - -- `matrices` (dict[str, list[TrafficDemand]]) = {} - -**Methods:** - -- `add(self, name: 'str', demands: 'list[TrafficDemand]') -> 'None'` - Add a traffic matrix to the collection. -- `get_all_demands(self) -> 'list[TrafficDemand]'` - Get all traffic demands from all matrices combined. -- `get_default_matrix(self) -> 'list[TrafficDemand]'` - Get default traffic matrix. -- `get_matrix(self, name: 'str') -> 'list[TrafficDemand]'` - Get a specific traffic matrix by name. -- `to_dict(self) -> 'dict[str, Any]'` - Convert to dictionary for JSON serialization. - --- -## ngraph.results.results +## ngraph.results.store Generic results store for workflow steps and their metadata. @@ -2751,8 +2991,8 @@ Example usage: - `get(self, step_name: str, key: str, default: Any = None) -> Any` - Retrieve the value from (step_name, key). If the key is missing, return `default`. - `get_all(self, key: str) -> Dict[str, Any]` - Retrieve a dictionary of {step_name: value} for all step_names that contain the specified key. -- `get_all_step_metadata(self) -> Dict[str, ngraph.results.results.WorkflowStepMetadata]` - Get metadata for all workflow steps. -- `get_step_metadata(self, step_name: str) -> Optional[ngraph.results.results.WorkflowStepMetadata]` - Get metadata for a workflow step. +- `get_all_step_metadata(self) -> Dict[str, ngraph.results.store.WorkflowStepMetadata]` - Get metadata for all workflow steps. +- `get_step_metadata(self, step_name: str) -> Optional[ngraph.results.store.WorkflowStepMetadata]` - Get metadata for a workflow step. - `get_steps_by_execution_order(self) -> list[str]` - Get step names ordered by their execution order. - `put(self, step_name: str, key: str, value: Any) -> None` - Store a value under (step_name, key). - `put_step_metadata(self, step_name: str, step_type: str, execution_order: int) -> None` - Store metadata for a workflow step. @@ -2789,7 +3029,7 @@ failure analysis scenarios. Note: This module is distinct from ngraph.workflow.analysis, which provides notebook visualization components for workflow results. -### demand_placement_analysis(network_view: "'NetworkView'", demands_config: 'list[dict[str, Any]]', placement_rounds: 'int' = 50, **kwargs) -> 'dict[str, Any]' +### demand_placement_analysis(network_view: "'NetworkView'", demands_config: 'list[dict[str, Any]]', placement_rounds: 'int | str' = 'auto', **kwargs) -> 'dict[str, Any]' Analyze traffic demand placement success rates. @@ -2805,7 +3045,8 @@ Returns: - total_placed: Total placed demand volume. - total_demand: Total demand volume. - overall_placement_ratio: total_placed / total_demand (0.0 if undefined). -- priority_results: Mapping from priority to statistics with keys +- demand_results: List of per-demand statistics preserving offered volume. +- priority_results: Mapping from priority to aggregated statistics with keys total_volume, placed_volume, unplaced_volume, placement_ratio, and demand_count. diff --git a/ngraph/__init__.py b/ngraph/__init__.py index a00fa88..191c18b 100644 --- a/ngraph/__init__.py +++ b/ngraph/__init__.py @@ -8,7 +8,8 @@ from __future__ import annotations from . import cli, config, logging -from .results.artifacts import CapacityEnvelope, PlacementResultSet, TrafficMatrixSet +from .demand.matrix import TrafficMatrixSet +from .results.artifacts import CapacityEnvelope, PlacementResultSet __all__ = [ "cli", diff --git a/ngraph/demand/manager/builder.py b/ngraph/demand/manager/builder.py index 918242d..00979a0 100644 --- a/ngraph/demand/manager/builder.py +++ b/ngraph/demand/manager/builder.py @@ -1,7 +1,43 @@ -from __future__ import annotations - """Builders for traffic matrices. -Public functions in this module assemble traffic matrices from higher-level -inputs. This is a placeholder for future matrix construction utilities. +Construct `TrafficMatrixSet` from raw dictionaries (e.g. parsed YAML). +This logic was previously embedded in `Scenario.from_yaml`. """ + +from __future__ import annotations + +from typing import Dict, List + +from ngraph.demand.matrix import TrafficMatrixSet +from ngraph.demand.spec import TrafficDemand +from ngraph.yaml_utils import normalize_yaml_dict_keys + + +def build_traffic_matrix_set(raw: Dict[str, List[dict]]) -> TrafficMatrixSet: + """Build a `TrafficMatrixSet` from a mapping of name -> list of dicts. + + Args: + raw: Mapping where each key is a matrix name and each value is a list of + dictionaries with `TrafficDemand` constructor fields. + + Returns: + Initialized `TrafficMatrixSet` with constructed `TrafficDemand` objects. + + Raises: + ValueError: If ``raw`` is not a mapping of name -> list[dict]. + """ + if not isinstance(raw, dict): + raise ValueError( + "'traffic_matrix_set' must be a mapping of name -> list[TrafficDemand]" + ) + + normalized_raw = normalize_yaml_dict_keys(raw) + tms = TrafficMatrixSet() + for name, td_list in normalized_raw.items(): + if not isinstance(td_list, list): + raise ValueError( + f"Matrix '{name}' must map to a list of TrafficDemand dicts" + ) + tms.add(name, [TrafficDemand(**d) for d in td_list]) + + return tms diff --git a/ngraph/demand/manager/expand.py b/ngraph/demand/manager/expand.py index bb0d136..0502ea8 100644 --- a/ngraph/demand/manager/expand.py +++ b/ngraph/demand/manager/expand.py @@ -1,8 +1,196 @@ -from __future__ import annotations - """Expansion helpers for traffic demand specifications. -Functions in this module transform high-level demand specs into concrete -`Demand` objects. This is a placeholder module; expansion is currently -implemented in `TrafficManager`. +Public functions here convert user-facing `TrafficDemand` specifications into +concrete `Demand` objects that can be placed on a `StrictMultiDiGraph`. + +This module provides the pure expansion logic that was previously embedded in +`TrafficManager`. """ + +from __future__ import annotations + +from typing import Dict, List, Tuple, Union + +from ngraph.algorithms.flow_init import init_flow_graph +from ngraph.demand import Demand +from ngraph.demand.spec import TrafficDemand +from ngraph.flows.policy import FlowPolicyConfig, get_flow_policy +from ngraph.graph.strict_multidigraph import StrictMultiDiGraph +from ngraph.model.network import Network, Node + +try: + # Avoid importing at runtime if not needed while keeping type hints precise + from typing import TYPE_CHECKING + + if TYPE_CHECKING: # pragma: no cover - typing only + from ngraph.model.view import NetworkView +except Exception: # pragma: no cover - defensive for environments without extras + TYPE_CHECKING = False + + +def expand_demands( + network: Union[Network, "NetworkView"], + graph: StrictMultiDiGraph | None, + traffic_demands: List[TrafficDemand], + default_flow_policy_config: FlowPolicyConfig, +) -> Tuple[List[Demand], Dict[str, List[Demand]]]: + """Expand traffic demands into concrete `Demand` objects. + + The result is a flat list of `Demand` plus a mapping from + ``TrafficDemand.id`` to the list of expanded demands for that entry. + + Args: + network: Network or NetworkView used for node group selection. + graph: Flow graph to operate on. If ``None``, expansion that requires + graph mutation (pseudo nodes/edges) is skipped. + traffic_demands: List of high-level traffic demand specifications. + default_flow_policy_config: Default policy to apply when a demand does + not specify an explicit `flow_policy`. + + Returns: + A tuple ``(expanded, td_map)`` where: + - ``expanded`` is the flattened, sorted list of all expanded demands + (sorted by ascending ``demand_class``). + - ``td_map`` maps ``TrafficDemand.id`` to its expanded demands. + """ + td_to_demands: Dict[str, List[Demand]] = {} + expanded: List[Demand] = [] + + for td in traffic_demands: + # Gather node groups for source and sink + src_groups = network.select_node_groups_by_path(td.source_path) + snk_groups = network.select_node_groups_by_path(td.sink_path) + + if not src_groups or not snk_groups: + td_to_demands[td.id] = [] + continue + + demands_of_td: List[Demand] = [] + if td.mode == "combine": + _expand_combine( + demands_of_td, + td, + src_groups, + snk_groups, + graph, + default_flow_policy_config, + ) + elif td.mode == "pairwise": + _expand_pairwise( + demands_of_td, + td, + src_groups, + snk_groups, + default_flow_policy_config, + ) + else: + raise ValueError(f"Unknown mode: {td.mode}") + + expanded.extend(demands_of_td) + td_to_demands[td.id] = demands_of_td + + # Sort final demands by ascending demand_class (i.e., priority) + expanded.sort(key=lambda d: d.demand_class) + return expanded, td_to_demands + + +def _expand_combine( + expanded: List[Demand], + td: TrafficDemand, + src_groups: Dict[str, List[Node]], + snk_groups: Dict[str, List[Node]], + graph: StrictMultiDiGraph | None, + default_flow_policy_config: FlowPolicyConfig, +) -> None: + """Expand a single demand using the ``combine`` mode. + + Adds pseudo-source and pseudo-sink nodes, connects them to real nodes + with infinite-capacity, zero-cost edges, and creates one aggregate + `Demand` from pseudo-source to pseudo-sink with the full volume. + """ + # Flatten the source and sink node lists + src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes] + dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes] + + if not src_nodes or not dst_nodes or graph is None: + return + + # Create pseudo-source / pseudo-sink names + pseudo_source_name = f"combine_src::{td.id}" + pseudo_sink_name = f"combine_snk::{td.id}" + + # Add pseudo nodes to the graph (no-op if they already exist) + graph.add_node(pseudo_source_name) + graph.add_node(pseudo_sink_name) + + # Link pseudo-source to real sources, and real sinks to pseudo-sink + for s_node in src_nodes: + graph.add_edge(pseudo_source_name, s_node.name, capacity=float("inf"), cost=0) + for t_node in dst_nodes: + graph.add_edge(t_node.name, pseudo_sink_name, capacity=float("inf"), cost=0) + + init_flow_graph(graph) # Re-initialize flow-related attributes + + # Create a single Demand with the full volume + if td.flow_policy: + flow_policy = td.flow_policy.deep_copy() + else: + fp_config = td.flow_policy_config or default_flow_policy_config + flow_policy = get_flow_policy(fp_config) + + expanded.append( + Demand( + src_node=pseudo_source_name, + dst_node=pseudo_sink_name, + volume=td.demand, + demand_class=td.priority, + flow_policy=flow_policy, + ) + ) + + +def _expand_pairwise( + expanded: List[Demand], + td: TrafficDemand, + src_groups: Dict[str, List[Node]], + snk_groups: Dict[str, List[Node]], + default_flow_policy_config: FlowPolicyConfig, +) -> None: + """Expand a single demand using the ``pairwise`` mode. + + Creates one `Demand` for each valid source-destination pair (excluding + self-pairs) and splits total volume evenly across pairs. + """ + # Flatten the source and sink node lists + src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes] + dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes] + + # Generate all valid (src, dst) pairs + valid_pairs = [ + (s_node, t_node) + for s_node in src_nodes + for t_node in dst_nodes + if s_node.name != t_node.name + ] + pair_count = len(valid_pairs) + if pair_count == 0: + return + + demand_per_pair = td.demand / float(pair_count) + + for s_node, t_node in valid_pairs: + if td.flow_policy: + flow_policy = td.flow_policy.deep_copy() + else: + fp_config = td.flow_policy_config or default_flow_policy_config + flow_policy = get_flow_policy(fp_config) + + expanded.append( + Demand( + src_node=s_node.name, + dst_node=t_node.name, + volume=demand_per_pair, + demand_class=td.priority, + flow_policy=flow_policy, + ) + ) diff --git a/ngraph/demand/manager/manager.py b/ngraph/demand/manager/manager.py index 4e065c4..3b0368c 100644 --- a/ngraph/demand/manager/manager.py +++ b/ngraph/demand/manager/manager.py @@ -8,21 +8,21 @@ from __future__ import annotations import statistics -from collections import defaultdict from dataclasses import dataclass, field from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union -from ngraph.algorithms import base from ngraph.algorithms.flow_init import init_flow_graph from ngraph.demand import Demand +from ngraph.demand.manager.expand import expand_demands as expand_demands_helper +from ngraph.demand.manager.schedule import place_demands_round_robin from ngraph.demand.spec import TrafficDemand -from ngraph.flows.policy import FlowPolicyConfig, get_flow_policy +from ngraph.flows.policy import FlowPolicyConfig from ngraph.graph.strict_multidigraph import StrictMultiDiGraph -from ngraph.model.network import Network, Node +from ngraph.model.network import Network if TYPE_CHECKING: + from ngraph.demand.matrix import TrafficMatrixSet from ngraph.model.view import NetworkView - from ngraph.results.artifacts import TrafficMatrixSet def _new_td_map() -> Dict[str, List[Demand]]: @@ -137,35 +137,13 @@ def expand_demands(self) -> None: Raises: ValueError: If an unknown mode is encountered. """ - self._td_to_demands.clear() - expanded: List[Demand] = [] - - for td in self._get_traffic_demands(): - # Gather node groups for source and sink - src_groups = self.network.select_node_groups_by_path(td.source_path) - snk_groups = self.network.select_node_groups_by_path(td.sink_path) - - if not src_groups or not snk_groups: - # No matching nodes; skip - self._td_to_demands[td.id] = [] - continue - - # Expand demands according to the specified mode - if td.mode == "combine": - demands_of_td: List[Demand] = [] - self._expand_combine(demands_of_td, td, src_groups, snk_groups) - expanded.extend(demands_of_td) - self._td_to_demands[td.id] = demands_of_td - elif td.mode == "pairwise": - demands_of_td: List[Demand] = [] - self._expand_pairwise(demands_of_td, td, src_groups, snk_groups) - expanded.extend(demands_of_td) - self._td_to_demands[td.id] = demands_of_td - else: - raise ValueError(f"Unknown mode: {td.mode}") - - # Sort final demands by ascending demand_class (i.e., priority) - expanded.sort(key=lambda d: d.demand_class) + expanded, td_map = expand_demands_helper( + network=self.network, + graph=self.graph, + traffic_demands=self._get_traffic_demands(), + default_flow_policy_config=self.default_flow_policy_config, + ) + self._td_to_demands = td_map self.demands = expanded def place_all_demands( @@ -203,41 +181,12 @@ def place_all_demands( else placement_rounds ) - # Group demands by priority class - prio_map: Dict[int, List[Demand]] = defaultdict(list) - for dmd in self.demands: - prio_map[dmd.demand_class].append(dmd) - - total_placed = 0.0 - sorted_priorities = sorted(prio_map.keys()) - - for priority_class in sorted_priorities: - demands_in_class = prio_map[priority_class] - - for round_idx in range(placement_rounds_int): - placed_in_this_round = 0.0 - rounds_left = placement_rounds_int - round_idx - - for demand in demands_in_class: - leftover = demand.volume - demand.placed_demand - if leftover < base.MIN_FLOW: - continue - - step_to_place = leftover / float(rounds_left) - placed_now, _remain = demand.place( - flow_graph=self.graph, - max_placement=step_to_place, - ) - total_placed += placed_now - placed_in_this_round += placed_now - - # Optionally reoptimize flows in this class - if reoptimize_after_each_round and placed_in_this_round > 0.0: - self._reoptimize_priority_demands(demands_in_class) - - # If no progress was made, no need to continue extra rounds - if placed_in_this_round < base.MIN_FLOW: - break + total_placed = place_demands_round_robin( + graph=self.graph, + demands=self.demands, + placement_rounds=placement_rounds_int, + reoptimize_after_each_round=reoptimize_after_each_round, + ) # Update each TrafficDemand's placed volume for td in self._get_traffic_demands(): @@ -352,16 +301,10 @@ def get_traffic_results(self, detailed: bool = False) -> List[TrafficResult]: return results def _reoptimize_priority_demands(self, demands_in_prio: List[Demand]) -> None: - """Re-run placement for each demand in the same priority class. - - Allows the policy to adjust to capacity changes due to other demands. - - Args: - demands_in_prio: All demands of the same priority class. - """ + # Retained for backward-compat within this class; internal only. The + # scheduling module provides its own implementation. if self.graph is None: return - for dmd in demands_in_prio: if not dmd.flow_policy: continue @@ -376,138 +319,6 @@ def _reoptimize_priority_demands(self, demands_in_prio: List[Demand]) -> None: ) dmd.placed_demand = dmd.flow_policy.placed_demand - def _expand_combine( - self, - expanded: List[Demand], - td: TrafficDemand, - src_groups: Dict[str, List[Node]], - snk_groups: Dict[str, List[Node]], - ) -> None: - """Expand a single demand using the ``combine`` mode. - - Adds pseudo-source and pseudo-sink nodes, connects them to real nodes - with infinite-capacity, zero-cost edges, and creates one aggregate - `Demand` from pseudo-source to pseudo-sink with the full volume. - - Args: - expanded: Accumulates newly created `Demand` objects. - td: The original `TrafficDemand` specification. - src_groups: Matched source nodes by label. - snk_groups: Matched sink nodes by label. - """ - # Flatten the source and sink node lists - src_nodes = [ - node for group_nodes in src_groups.values() for node in group_nodes - ] - dst_nodes = [ - node for group_nodes in snk_groups.values() for node in group_nodes - ] - - if not src_nodes or not dst_nodes or self.graph is None: - # If no valid nodes or no graph, skip - return - - # Create pseudo-source / pseudo-sink names - pseudo_source_name = f"combine_src::{td.id}" - pseudo_sink_name = f"combine_snk::{td.id}" - - # Add pseudo nodes to the graph (no-op if they already exist) - self.graph.add_node(pseudo_source_name) - self.graph.add_node(pseudo_sink_name) - - # Link pseudo-source to real sources, and real sinks to pseudo-sink - for s_node in src_nodes: - self.graph.add_edge( - pseudo_source_name, - s_node.name, - capacity=float("inf"), - cost=0, - ) - for t_node in dst_nodes: - self.graph.add_edge( - t_node.name, - pseudo_sink_name, - capacity=float("inf"), - cost=0, - ) - - init_flow_graph(self.graph) # Re-initialize flow-related attributes - - # Create a single Demand with the full volume - if td.flow_policy: - flow_policy = td.flow_policy.deep_copy() - else: - fp_config = td.flow_policy_config or self.default_flow_policy_config - flow_policy = get_flow_policy(fp_config) - - expanded.append( - Demand( - src_node=pseudo_source_name, - dst_node=pseudo_sink_name, - volume=td.demand, - demand_class=td.priority, - flow_policy=flow_policy, - ) - ) - - def _expand_pairwise( - self, - expanded: List[Demand], - td: TrafficDemand, - src_groups: Dict[str, List[Node]], - snk_groups: Dict[str, List[Node]], - ) -> None: - """Expand a single demand using the ``pairwise`` mode. - - Creates one `Demand` for each valid source-destination pair (excluding - self-pairs) and splits total volume evenly across pairs. - - Args: - expanded: Accumulates newly created `Demand` objects. - td: The original `TrafficDemand` specification. - src_groups: Matched source nodes by label. - snk_groups: Matched sink nodes by label. - """ - # Flatten the source and sink node lists - src_nodes = [ - node for group_nodes in src_groups.values() for node in group_nodes - ] - dst_nodes = [ - node for group_nodes in snk_groups.values() for node in group_nodes - ] - - # Generate all valid (src, dst) pairs - valid_pairs = [ - (s_node, t_node) - for s_node in src_nodes - for t_node in dst_nodes - if s_node.name != t_node.name - ] - pair_count = len(valid_pairs) - if pair_count == 0: - return - - demand_per_pair = td.demand / float(pair_count) - - for s_node, t_node in valid_pairs: - if td.flow_policy: - # Already a FlowPolicy instance, so deep copy it - flow_policy = td.flow_policy.deep_copy() - else: - # Build from enum-based factory - fp_config = td.flow_policy_config or self.default_flow_policy_config - flow_policy = get_flow_policy(fp_config) - - expanded.append( - Demand( - src_node=s_node.name, - dst_node=t_node.name, - volume=demand_per_pair, - demand_class=td.priority, - flow_policy=flow_policy, - ) - ) - def _estimate_rounds(self) -> int: """Estimate a suitable number of placement rounds. diff --git a/ngraph/demand/manager/schedule.py b/ngraph/demand/manager/schedule.py index 26582d6..762e11d 100644 --- a/ngraph/demand/manager/schedule.py +++ b/ngraph/demand/manager/schedule.py @@ -1,7 +1,92 @@ -from __future__ import annotations - """Scheduling utilities for demand placement rounds. -Placeholder module for future scheduling strategies beyond the simple -round-robin implemented in `TrafficManager`. +Provides the simple priority-aware round-robin scheduler that was previously +implemented in `TrafficManager`. """ + +from __future__ import annotations + +from collections import defaultdict +from typing import Dict, List + +from ngraph.algorithms import base +from ngraph.demand import Demand +from ngraph.graph.strict_multidigraph import StrictMultiDiGraph + + +def place_demands_round_robin( + graph: StrictMultiDiGraph, + demands: List[Demand], + placement_rounds: int, + reoptimize_after_each_round: bool = False, +) -> float: + """Place demands using priority buckets and round-robin within each bucket. + + Args: + graph: Active flow graph. + demands: Expanded demands to place. + placement_rounds: Number of passes per priority class. + reoptimize_after_each_round: Whether to re-run placement for each demand + after a round to better share capacity. + + Returns: + Total volume successfully placed across all demands. + """ + # Group demands by priority class + prio_map: Dict[int, List[Demand]] = defaultdict(list) + for dmd in demands: + prio_map[dmd.demand_class].append(dmd) + + total_placed = 0.0 + sorted_priorities = sorted(prio_map.keys()) + + for priority_class in sorted_priorities: + demands_in_class = prio_map[priority_class] + + for round_idx in range(placement_rounds): + placed_in_this_round = 0.0 + rounds_left = placement_rounds - round_idx + + for demand in demands_in_class: + leftover = demand.volume - demand.placed_demand + if leftover < base.MIN_FLOW: + continue + + step_to_place = leftover / float(rounds_left) + placed_now, _remain = demand.place( + flow_graph=graph, + max_placement=step_to_place, + ) + total_placed += placed_now + placed_in_this_round += placed_now + + if reoptimize_after_each_round and placed_in_this_round > 0.0: + _reoptimize_priority_demands(graph, demands_in_class) + + # If no progress was made, no need to continue extra rounds + if placed_in_this_round < base.MIN_FLOW: + break + + return total_placed + + +def _reoptimize_priority_demands( + graph: StrictMultiDiGraph, demands_in_prio: List[Demand] +) -> None: + """Re-run placement for each demand in the same priority class. + + Allows the policy to adjust to capacity changes due to other demands. + """ + for dmd in demands_in_prio: + if not dmd.flow_policy: + continue + placed_volume = dmd.placed_demand + dmd.flow_policy.remove_demand(graph) + dmd.flow_policy.place_demand( + graph, + dmd.src_node, + dmd.dst_node, + dmd.demand_class, + placed_volume, + ) + dmd.placed_demand = dmd.flow_policy.placed_demand diff --git a/ngraph/demand/matrix.py b/ngraph/demand/matrix.py new file mode 100644 index 0000000..a8ccc5f --- /dev/null +++ b/ngraph/demand/matrix.py @@ -0,0 +1,100 @@ +"""Traffic matrix containers. + +Provides `TrafficMatrixSet`, a named collection of `TrafficDemand` lists +used as input to demand expansion and placement. This module contains input +containers, not analysis results. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from ngraph.demand.spec import TrafficDemand + + +@dataclass +class TrafficMatrixSet: + """Named collection of TrafficDemand lists. + + This mutable container maps scenario names to lists of TrafficDemand objects, + allowing management of multiple traffic matrices for analysis. + + Attributes: + matrices: Dictionary mapping scenario names to TrafficDemand lists. + """ + + matrices: dict[str, list[TrafficDemand]] = field(default_factory=dict) + + def add(self, name: str, demands: list[TrafficDemand]) -> None: + """Add a traffic matrix to the collection. + + Args: + name: Scenario name identifier. + demands: List of TrafficDemand objects for this scenario. + """ + self.matrices[name] = demands + + def get_matrix(self, name: str) -> list[TrafficDemand]: + """Get a specific traffic matrix by name. + + Args: + name: Name of the matrix to retrieve. + + Returns: + List of TrafficDemand objects for the named matrix. + + Raises: + KeyError: If the matrix name doesn't exist. + """ + return self.matrices[name] + + def get_default_matrix(self) -> list[TrafficDemand]: + """Get default traffic matrix. + + Returns the matrix named 'default' if it exists. If there is exactly + one matrix, returns that single matrix. If there are no matrices, + returns an empty list. If there are multiple matrices and none is + named 'default', raises an error. + + Returns: + List of TrafficDemand objects for the default matrix. + + Raises: + ValueError: If multiple matrices exist without a 'default' matrix. + """ + if not self.matrices: + return [] + + if "default" in self.matrices: + return self.matrices["default"] + + if len(self.matrices) == 1: + return next(iter(self.matrices.values())) + + raise ValueError( + f"Multiple matrices exist ({list(self.matrices.keys())}) but no 'default' matrix. " + f"Please specify which matrix to use or add a 'default' matrix." + ) + + def get_all_demands(self) -> list[TrafficDemand]: + """Get all traffic demands from all matrices combined. + + Returns: + Flattened list of all TrafficDemand objects across all matrices. + """ + all_demands: list[TrafficDemand] = [] + for demands in self.matrices.values(): + all_demands.extend(demands) + return all_demands + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization. + + Returns: + Dictionary mapping scenario names to lists of TrafficDemand dictionaries. + """ + return { + name: [demand.__dict__ for demand in demands] + for name, demands in self.matrices.items() + } diff --git a/ngraph/dsl/blueprints/expand.py b/ngraph/dsl/blueprints/expand.py index d7254af..a029729 100644 --- a/ngraph/dsl/blueprints/expand.py +++ b/ngraph/dsl/blueprints/expand.py @@ -3,11 +3,11 @@ from __future__ import annotations import copy -import re from dataclasses import dataclass from itertools import product, zip_longest from typing import Any, Dict, List, Set +from ngraph.dsl.blueprints import parse as _bp_parse from ngraph.model.network import Link, Network, Node @@ -90,7 +90,7 @@ def expand_network_dsl(data: Dict[str, Any]) -> Network: raise ValueError( f"Blueprint definition for '{bp_name}' must be a dict." ) - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( bp_data, allowed={"groups", "adjacency"}, context=f"blueprint '{bp_name}'", @@ -191,7 +191,7 @@ def _expand_group( """ if inherited_risk_groups is None: inherited_risk_groups = set() - expanded_names = _expand_name_patterns(group_name) + expanded_names = _bp_parse.expand_name_patterns(group_name) # If bracket expansions exist, replicate for each expansion if len(expanded_names) > 1 or expanded_names[0] != group_name: for expanded_name in expanded_names: @@ -208,7 +208,7 @@ def _expand_group( if "use_blueprint" in group_def: # Blueprint usage => recognized keys - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( group_def, allowed={"use_blueprint", "parameters", "attrs", "disabled", "risk_groups"}, context=f"group '{group_name}' using blueprint", @@ -276,7 +276,7 @@ def _expand_group( else: # Direct node group => recognized keys - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( group_def, allowed={"node_count", "name_template", "attrs", "disabled", "risk_groups"}, context=f"group '{group_name}'", @@ -329,7 +329,7 @@ def _expand_blueprint_adjacency( adj_def (Dict[str, Any]): The adjacency definition inside the blueprint. parent_path (str): The path serving as the base for the blueprint's node paths. """ - _check_adjacency_keys(adj_def, context="blueprint adjacency") + _bp_parse.check_adjacency_keys(adj_def, context="blueprint adjacency") expand_vars = adj_def.get("expand_vars", {}) if expand_vars: _expand_adjacency_with_variables(ctx, adj_def, parent_path) @@ -339,11 +339,11 @@ def _expand_blueprint_adjacency( target_rel = adj_def["target"] pattern = adj_def.get("pattern", "mesh") link_params = adj_def.get("link_params", {}) - _check_link_params(link_params, context="blueprint adjacency") + _bp_parse.check_link_params(link_params, context="blueprint adjacency") link_count = adj_def.get("link_count", 1) - src_path = _join_paths(parent_path, source_rel) - tgt_path = _join_paths(parent_path, target_rel) + src_path = _bp_parse.join_paths(parent_path, source_rel) + tgt_path = _bp_parse.join_paths(parent_path, target_rel) _expand_adjacency_pattern(ctx, src_path, tgt_path, pattern, link_params, link_count) @@ -360,7 +360,7 @@ def _expand_adjacency(ctx: DSLExpansionContext, adj_def: Dict[str, Any]) -> None ctx (DSLExpansionContext): The context containing the target network. adj_def (Dict[str, Any]): The adjacency definition dict. """ - _check_adjacency_keys(adj_def, context="top-level adjacency") + _bp_parse.check_adjacency_keys(adj_def, context="top-level adjacency") expand_vars = adj_def.get("expand_vars", {}) if expand_vars: _expand_adjacency_with_variables(ctx, adj_def, parent_path="") @@ -371,10 +371,10 @@ def _expand_adjacency(ctx: DSLExpansionContext, adj_def: Dict[str, Any]) -> None pattern = adj_def.get("pattern", "mesh") link_count = adj_def.get("link_count", 1) link_params = adj_def.get("link_params", {}) - _check_link_params(link_params, context="top-level adjacency") + _bp_parse.check_link_params(link_params, context="top-level adjacency") - source_path = _join_paths("", source_path_raw) - target_path = _join_paths("", target_path_raw) + source_path = _bp_parse.join_paths("", source_path_raw) + target_path = _bp_parse.join_paths("", target_path_raw) _expand_adjacency_pattern( ctx, source_path, target_path, pattern, link_params, link_count @@ -397,7 +397,7 @@ def _expand_adjacency_with_variables( target_template = adj_def["target"] pattern = adj_def.get("pattern", "mesh") link_params = adj_def.get("link_params", {}) - _check_link_params(link_params, context="adjacency with expand_vars") + _bp_parse.check_link_params(link_params, context="adjacency with expand_vars") link_count = adj_def.get("link_count", 1) expand_vars = adj_def["expand_vars"] expansion_mode = adj_def.get("expansion_mode", "cartesian") @@ -414,10 +414,10 @@ def _expand_adjacency_with_variables( for combo_tuple in zip_longest(*lists_of_values, fillvalue=None): combo_dict = dict(zip(var_names, combo_tuple, strict=False)) - expanded_src = _join_paths( + expanded_src = _bp_parse.join_paths( parent_path, source_template.format(**combo_dict) ) - expanded_tgt = _join_paths( + expanded_tgt = _bp_parse.join_paths( parent_path, target_template.format(**combo_dict) ) _expand_adjacency_pattern( @@ -427,10 +427,10 @@ def _expand_adjacency_with_variables( # "cartesian" default for combo_tuple in product(*lists_of_values): combo_dict = dict(zip(var_names, combo_tuple, strict=False)) - expanded_src = _join_paths( + expanded_src = _bp_parse.join_paths( parent_path, source_template.format(**combo_dict) ) - expanded_tgt = _join_paths( + expanded_tgt = _bp_parse.join_paths( parent_path, target_template.format(**combo_dict) ) _expand_adjacency_pattern( @@ -536,7 +536,9 @@ def _create_link( 'capacity', 'cost', 'disabled', 'risk_groups', 'attrs'. link_count (int): Number of parallel links to create between source and target. """ - _check_link_params(link_params, context=f"creating link {source}->{target}") + _bp_parse.check_link_params( + link_params, context=f"creating link {source}->{target}" + ) for _ in range(link_count): capacity = link_params.get("capacity", 1.0) @@ -576,7 +578,7 @@ def _process_direct_nodes(net: Network, network_data: Dict[str, Any]) -> None: for node_name, raw_def in nodes_dict.items(): if not isinstance(raw_def, dict): raise ValueError(f"Node definition for '{node_name}' must be a dict.") - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( raw_def, allowed={"disabled", "attrs", "risk_groups"}, context=f"node '{node_name}'", @@ -618,7 +620,7 @@ def _process_direct_links(net: Network, network_data: Dict[str, Any]) -> None: for link_info in links_list: if not isinstance(link_info, dict): raise ValueError("Each link definition must be a dictionary.") - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( link_info, allowed={"source", "target", "link_params", "link_count"}, context="direct link", @@ -665,7 +667,7 @@ def _process_link_overrides(net: Network, network_data: Dict[str, Any]) -> None: for link_override in link_overrides: if not isinstance(link_override, dict): raise ValueError("Each link_override must be a dict.") - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( link_override, allowed={"source", "target", "link_params", "any_direction"}, context="link override", @@ -702,7 +704,7 @@ def _process_node_overrides(net: Network, network_data: Dict[str, Any]) -> None: for override in node_overrides: if not isinstance(override, dict): raise ValueError("Each node_override must be a dict.") - _check_no_extra_keys( + _bp_parse.check_no_extra_keys( override, allowed={"path", "attrs", "disabled", "risk_groups"}, context="node override", @@ -745,7 +747,7 @@ def _update_links( (capacity, cost, disabled, risk_groups, attrs). any_direction (bool): If True, also update reversed direction links. """ - _check_link_params(link_params, context="link override processing") + _bp_parse.check_link_params(link_params, context="link override processing") source_node_groups = net.select_node_groups_by_path(source) target_node_groups = net.select_node_groups_by_path(target) @@ -867,151 +869,3 @@ def _apply_nested_path( if key not in node_def or not isinstance(node_def[key], dict): node_def[key] = {} _apply_nested_path(node_def[key], path_parts[1:], value) - - -_RANGE_REGEX = re.compile(r"\[([^\]]+)\]") - - -def _expand_name_patterns(name: str) -> List[str]: - """Parses and expands bracketed expressions in a group name. For example: - - "fa[1-3]" -> ["fa1", "fa2", "fa3"] - "dc[1,3,5-6]" -> ["dc1", "dc3", "dc5", "dc6"] - "fa[1-2]_plane[5-6]" -> - ["fa1_plane5", "fa1_plane6", "fa2_plane5", "fa2_plane6"] - - If no bracket expressions are present, returns [name] unchanged. - - Args: - name (str): A group name that may contain bracket expansions. - - Returns: - List[str]: All expanded names. If no expansion was needed, returns - a single-element list with 'name' itself. - """ - matches = list(_RANGE_REGEX.finditer(name)) - if not matches: - return [name] # no expansions - - expansions_list = [] - for match in matches: - range_expr = match.group(1) - expansions_list.append(_parse_range_expr(range_expr)) - - expanded_names = [] - for combo in product(*expansions_list): - result_str = "" - last_end = 0 - for m_idx, match in enumerate(matches): - start, end = match.span() - result_str += name[last_end:start] - result_str += combo[m_idx] - last_end = end - result_str += name[last_end:] - expanded_names.append(result_str) - - return expanded_names - - -def _parse_range_expr(expr: str) -> List[str]: - """Parses a bracket expression that might have commas, single values, and dash ranges. - For example: "1-3,5,7-9" -> ["1", "2", "3", "5", "7", "8", "9"]. - - Args: - expr (str): The raw expression from inside brackets, e.g. "1-3,5,7-9". - - Returns: - List[str]: A sorted list of all expansions. - """ - values = [] - parts = [x.strip() for x in expr.split(",")] - for part in parts: - if "-" in part: - start_str, end_str = part.split("-", 1) - start = int(start_str) - end = int(end_str) - for val in range(start, end + 1): - values.append(str(val)) - else: - values.append(part) - return values - - -def _join_paths(parent_path: str, rel_path: str) -> str: - """Joins two path segments according to NetGraph's DSL conventions: - - - If rel_path starts with '/', we strip the leading slash and treat it as - appended to parent_path if parent_path is not empty. - - Otherwise, simply append rel_path to parent_path if parent_path is non-empty. - - Args: - parent_path (str): The existing path prefix. - rel_path (str): A relative path that may start with '/'. - - Returns: - str: The combined path as a single string. - """ - if rel_path.startswith("/"): - rel_path = rel_path[1:] - if parent_path: - return f"{parent_path}/{rel_path}" - return rel_path - - if parent_path: - return f"{parent_path}/{rel_path}" - return rel_path - - -def _check_no_extra_keys( - data_dict: Dict[str, Any], allowed: set[str], context: str -) -> None: - """Checks that data_dict only has keys in 'allowed'. Raises ValueError if not. - - Args: - data_dict (Dict[str, Any]): The dict to check. - allowed (set[str]): The set of recognized keys. - context (str): A short description of what we are validating. - """ - extra_keys = set(data_dict.keys()) - allowed - if extra_keys: - raise ValueError( - f"Unrecognized key(s) in {context}: {', '.join(sorted(extra_keys))}. " - f"Allowed keys are: {sorted(allowed)}" - ) - - -def _check_adjacency_keys(adj_def: Dict[str, Any], context: str) -> None: - """Ensures adjacency definitions only contain recognized keys. - - Recognized adjacency keys are: - {"source", "target", "pattern", "link_count", "link_params", - "expand_vars", "expansion_mode"}. - """ - _check_no_extra_keys( - adj_def, - allowed={ - "source", - "target", - "pattern", - "link_count", - "link_params", - "expand_vars", - "expansion_mode", - }, - context=context, - ) - if "source" not in adj_def or "target" not in adj_def: - raise ValueError(f"Adjacency in {context} must have 'source' and 'target'.") - - -def _check_link_params(link_params: Dict[str, Any], context: str) -> None: - """Checks that link_params only has recognized keys: - {"capacity", "cost", "disabled", "risk_groups", "attrs"}. - """ - recognized = {"capacity", "cost", "disabled", "risk_groups", "attrs"} - extra = set(link_params.keys()) - recognized - if extra: - raise ValueError( - f"Unrecognized link_params key(s) in {context}: {', '.join(sorted(extra))}. " - f"Allowed: {sorted(recognized)}" - ) diff --git a/ngraph/dsl/blueprints/parse.py b/ngraph/dsl/blueprints/parse.py index 1def1e1..00bb35e 100644 --- a/ngraph/dsl/blueprints/parse.py +++ b/ngraph/dsl/blueprints/parse.py @@ -1,7 +1,122 @@ """Parsing helpers for the network DSL. -This module is reserved for future parsing utilities. The main expansion -entry point is `ngraph.dsl.blueprints.expand.expand_network_dsl`. +This module factors out pure parsing/validation helpers from the expansion +module so they can be tested independently and reused. """ from __future__ import annotations + +import re +from itertools import product +from typing import Any, Dict, List + + +def check_no_extra_keys( + data_dict: Dict[str, Any], allowed: set[str], context: str +) -> None: + """Raise if ``data_dict`` contains keys outside ``allowed``. + + Args: + data_dict: The dict to check. + allowed: Set of recognized keys. + context: Short description used in error messages. + """ + extra_keys = set(data_dict.keys()) - allowed + if extra_keys: + raise ValueError( + f"Unrecognized key(s) in {context}: {', '.join(sorted(extra_keys))}. " + f"Allowed keys are: {sorted(allowed)}" + ) + + +def check_adjacency_keys(adj_def: Dict[str, Any], context: str) -> None: + """Ensure adjacency definitions only contain recognized keys.""" + check_no_extra_keys( + adj_def, + allowed={ + "source", + "target", + "pattern", + "link_count", + "link_params", + "expand_vars", + "expansion_mode", + }, + context=context, + ) + if "source" not in adj_def or "target" not in adj_def: + raise ValueError(f"Adjacency in {context} must have 'source' and 'target'.") + + +def check_link_params(link_params: Dict[str, Any], context: str) -> None: + """Ensure link_params contain only recognized keys.""" + recognized = {"capacity", "cost", "disabled", "risk_groups", "attrs"} + extra = set(link_params.keys()) - recognized + if extra: + raise ValueError( + f"Unrecognized link_params key(s) in {context}: {', '.join(sorted(extra))}. " + f"Allowed: {sorted(recognized)}" + ) + + +_RANGE_REGEX = re.compile(r"\[([^\]]+)\]") + + +def expand_name_patterns(name: str) -> List[str]: + """Expand bracket expressions in a group name. + + Examples: + - "fa[1-3]" -> ["fa1", "fa2", "fa3"] + - "dc[1,3,5-6]" -> ["dc1", "dc3", "dc5", "dc6"] + - "fa[1-2]_plane[5-6]" -> ["fa1_plane5", "fa1_plane6", "fa2_plane5", "fa2_plane6"] + """ + matches = list(_RANGE_REGEX.finditer(name)) + if not matches: + return [name] + + expansions_list = [] + for match in matches: + range_expr = match.group(1) + expansions_list.append(_parse_range_expr(range_expr)) + + expanded_names = [] + for combo in product(*expansions_list): + result_str = "" + last_end = 0 + for m_idx, match in enumerate(matches): + start, end = match.span() + result_str += name[last_end:start] + result_str += combo[m_idx] + last_end = end + result_str += name[last_end:] + expanded_names.append(result_str) + + return expanded_names + + +def _parse_range_expr(expr: str) -> List[str]: + values: List[str] = [] + parts = [x.strip() for x in expr.split(",")] + for part in parts: + if "-" in part: + start_str, end_str = part.split("-", 1) + start = int(start_str) + end = int(end_str) + for val in range(start, end + 1): + values.append(str(val)) + else: + values.append(part) + return values + + +def join_paths(parent_path: str, rel_path: str) -> str: + """Join two path segments according to the DSL conventions.""" + if rel_path.startswith("/"): + rel_path = rel_path[1:] + if parent_path: + return f"{parent_path}/{rel_path}" + return rel_path + + if parent_path: + return f"{parent_path}/{rel_path}" + return rel_path diff --git a/ngraph/failure/manager/manager.py b/ngraph/failure/manager/manager.py index f028250..f2d657c 100644 --- a/ngraph/failure/manager/manager.py +++ b/ngraph/failure/manager/manager.py @@ -30,9 +30,9 @@ from typing import TYPE_CHECKING, Any, Dict, Protocol, Set, TypeVar from ngraph.algorithms.base import FlowPlacement +from ngraph.failure.policy_set import FailurePolicySet from ngraph.logging import get_logger from ngraph.model.view import NetworkView -from ngraph.results.artifacts import FailurePolicySet if TYPE_CHECKING: import cProfile @@ -1254,10 +1254,11 @@ def run_demand_placement_monte_carlo( | Any, # List of demand configs or TrafficMatrixSet iterations: int = 100, parallelism: int = 1, - placement_rounds: int = 50, + placement_rounds: int | str = "auto", baseline: bool = False, seed: int | None = None, store_failure_patterns: bool = False, + include_flow_details: bool = False, **kwargs, ) -> Any: # Will be DemandPlacementResults when imports are enabled """Analyze traffic demand placement success under failures. @@ -1280,20 +1281,29 @@ def run_demand_placement_monte_carlo( from ngraph.monte_carlo.functions import demand_placement_analysis from ngraph.monte_carlo.results import DemandPlacementResults - # Convert TrafficMatrixSet to serializable format if needed - if hasattr(demands_config, "demands") and not isinstance(demands_config, list): - # This is a TrafficMatrixSet - convert to config list - serializable_demands = [] - for demand in demands_config.demands: # type: ignore - config = { - "source_path": demand.source_path, - "sink_path": demand.sink_path, - "demand": demand.demand, - "mode": getattr(demand, "mode", "pairwise"), - "flow_policy_config": getattr(demand, "flow_policy_config", None), - "priority": getattr(demand, "priority", 0), - } - serializable_demands.append(config) + # If caller passed a sequence of TrafficDemand objects, convert to dicts + if not isinstance(demands_config, list): + # Accept TrafficMatrixSet or any container providing get_matrix()/matrices + serializable_demands: list[dict[str, Any]] = [] + if hasattr(demands_config, "get_all_demands"): + td_iter = demands_config.get_all_demands() # TrafficMatrixSet helper + elif hasattr(demands_config, "demands"): + td_iter = demands_config.demands # Backward-compat mock path in tests + else: + td_iter = [] + for demand in td_iter: # type: ignore[assignment] + serializable_demands.append( + { + "source_path": getattr(demand, "source_path", ""), + "sink_path": getattr(demand, "sink_path", ""), + "demand": float(getattr(demand, "demand", 0.0)), + "mode": getattr(demand, "mode", "pairwise"), + "flow_policy_config": getattr( + demand, "flow_policy_config", None + ), + "priority": int(getattr(demand, "priority", 0)), + } + ) demands_config = serializable_demands raw_results = self.run_monte_carlo_analysis( @@ -1305,6 +1315,7 @@ def run_demand_placement_monte_carlo( store_failure_patterns=store_failure_patterns, demands_config=demands_config, placement_rounds=placement_rounds, + include_flow_details=include_flow_details, **kwargs, ) diff --git a/ngraph/failure/policy_set.py b/ngraph/failure/policy_set.py new file mode 100644 index 0000000..508d94c --- /dev/null +++ b/ngraph/failure/policy_set.py @@ -0,0 +1,66 @@ +"""Failure policy containers. + +Provides `FailurePolicySet`, a named collection of `FailurePolicy` objects +used as input to failure analysis workflows. This module contains input +containers, not analysis results. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from ngraph.failure.policy import FailurePolicy + + +@dataclass +class FailurePolicySet: + """Named collection of FailurePolicy objects. + + This mutable container maps failure policy names to FailurePolicy objects, + allowing management of multiple failure policies for analysis. + + Attributes: + policies: Dictionary mapping failure policy names to FailurePolicy objects. + """ + + policies: dict[str, FailurePolicy] = field(default_factory=dict) + + def add(self, name: str, policy: FailurePolicy) -> None: + """Add a failure policy to the collection. + + Args: + name: Failure policy name identifier. + policy: FailurePolicy object for this failure policy. + """ + self.policies[name] = policy + + def get_policy(self, name: str) -> FailurePolicy: + """Get a specific failure policy by name. + + Args: + name: Name of the policy to retrieve. + + Returns: + FailurePolicy object for the named policy. + + Raises: + KeyError: If the policy name doesn't exist. + """ + return self.policies[name] + + def get_all_policies(self) -> list[FailurePolicy]: + """Get all failure policies from the collection. + + Returns: + List of all FailurePolicy objects. + """ + return list(self.policies.values()) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization. + + Returns: + Dictionary mapping failure policy names to FailurePolicy dictionaries. + """ + return {name: policy.to_dict() for name, policy in self.policies.items()} diff --git a/ngraph/monte_carlo/functions.py b/ngraph/monte_carlo/functions.py index 5b33600..d5581b8 100644 --- a/ngraph/monte_carlo/functions.py +++ b/ngraph/monte_carlo/functions.py @@ -18,8 +18,8 @@ from ngraph.algorithms.base import FlowPlacement from ngraph.demand.manager.manager import TrafficManager +from ngraph.demand.matrix import TrafficMatrixSet from ngraph.demand.spec import TrafficDemand -from ngraph.results.artifacts import TrafficMatrixSet if TYPE_CHECKING: from ngraph.model.view import NetworkView @@ -80,7 +80,8 @@ def max_flow_analysis( def demand_placement_analysis( network_view: "NetworkView", demands_config: list[dict[str, Any]], - placement_rounds: int = 50, + placement_rounds: int | str = "auto", + include_flow_details: bool = False, **kwargs, ) -> dict[str, Any]: """Analyze traffic demand placement success rates. @@ -96,8 +97,12 @@ def demand_placement_analysis( - total_placed: Total placed demand volume. - total_demand: Total demand volume. - overall_placement_ratio: total_placed / total_demand (0.0 if undefined). - - priority_results: Mapping from priority to statistics with keys - total_volume, placed_volume, unplaced_volume, placement_ratio, + - demand_results: List of per-demand statistics preserving offered volume. + When ``include_flow_details`` is True, each entry also includes + ``cost_distribution`` mapping path cost to placed volume and + ``edges_used`` as a list of edge identifiers seen in the placed flows. + - priority_results: Mapping from priority to aggregated statistics with + keys total_volume, placed_volume, unplaced_volume, placement_ratio, and demand_count. """ # Reconstruct demands from config to avoid passing complex objects @@ -125,14 +130,51 @@ def demand_placement_analysis( tm.expand_demands() total_placed = tm.place_all_demands(placement_rounds=placement_rounds) - # Aggregate results by priority + # Build per-demand results from expanded demands to preserve offered volumes + demand_results: list[dict[str, Any]] = [] + # Aggregate by priority as well demand_stats = defaultdict( lambda: {"total_volume": 0.0, "placed_volume": 0.0, "count": 0} ) - for demand in tm.demands: - priority = getattr(demand, "priority", 0) - demand_stats[priority]["total_volume"] += demand.volume - demand_stats[priority]["placed_volume"] += demand.placed_demand + for dmd in tm.demands: + offered = float(getattr(dmd, "volume", 0.0)) + placed = float(getattr(dmd, "placed_demand", 0.0)) + unplaced = offered - placed + priority = int(getattr(dmd, "priority", getattr(dmd, "demand_class", 0))) + + entry: dict[str, Any] = { + "src": str(getattr(dmd, "src_node", "")), + "dst": str(getattr(dmd, "dst_node", "")), + "priority": priority, + "offered_demand": offered, + "placed_demand": placed, + "unplaced_demand": unplaced, + "placement_ratio": (placed / offered) if offered > 0 else 0.0, + } + + if include_flow_details and getattr(dmd, "flow_policy", None) is not None: + # Summarize placed flows by path cost and collect edges used + cost_distribution: dict[float, float] = {} + edge_strings: set[str] = set() + for flow in dmd.flow_policy.flows.values(): # type: ignore[union-attr] + cost_val = float(getattr(flow.path_bundle, "cost", 0.0)) + placed_flow = float(getattr(flow, "placed_flow", 0.0)) + if placed_flow > 0.0: + cost_distribution[cost_val] = ( + cost_distribution.get(cost_val, 0.0) + placed_flow + ) + for eid in getattr(flow.path_bundle, "edges", set()): + edge_strings.add(str(eid)) + + if cost_distribution: + entry["cost_distribution"] = cost_distribution + if edge_strings: + entry["edges_used"] = sorted(edge_strings) + + demand_results.append(entry) + + demand_stats[priority]["total_volume"] += offered + demand_stats[priority]["placed_volume"] += placed demand_stats[priority]["count"] += 1 priority_results = {} @@ -157,6 +199,7 @@ def demand_placement_analysis( "total_placed": total_placed, "total_demand": total_demand, "overall_placement_ratio": overall_placement_ratio, + "demand_results": demand_results, "priority_results": priority_results, } diff --git a/ngraph/report.py b/ngraph/report.py index 1cb00e0..e8e78cd 100644 --- a/ngraph/report.py +++ b/ngraph/report.py @@ -176,6 +176,7 @@ def _create_setup_cell(self) -> nbformat.NotebookNode: setup_code = """# Setup analysis environment from ngraph.workflow.analysis import ( CapacityMatrixAnalyzer, + PlacementMatrixAnalyzer, SummaryAnalyzer, PackageManager, DataLoader, diff --git a/ngraph/results/__init__.py b/ngraph/results/__init__.py index fa72f47..c81c8ec 100644 --- a/ngraph/results/__init__.py +++ b/ngraph/results/__init__.py @@ -6,6 +6,6 @@ from __future__ import annotations -from .results import Results, WorkflowStepMetadata +from .store import Results, WorkflowStepMetadata __all__ = ["Results", "WorkflowStepMetadata"] diff --git a/ngraph/results/artifacts.py b/ngraph/results/artifacts.py index 2069979..d6a5a22 100644 --- a/ngraph/results/artifacts.py +++ b/ngraph/results/artifacts.py @@ -1,11 +1,9 @@ """Serializable result artifacts for analysis workflows. -This module defines small dataclasses that capture outputs from analyses -and simulations in a JSON-serializable form: +This module defines dataclasses that capture outputs from analyses and +simulations in a JSON-serializable form: -- `TrafficMatrixSet`: named collections of `TrafficDemand` lists - `PlacementResultSet`: aggregated placement results and statistics -- `FailurePolicySet`: named collections of failure policies - `CapacityEnvelope`: frequency-based capacity distributions and optional aggregated flow statistics - `FailurePatternResult`: capacity results for specific failure patterns @@ -14,100 +12,9 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Dict, List +from typing import Any, Dict, List from ngraph.demand.manager.manager import TrafficResult -from ngraph.demand.spec import TrafficDemand - -if TYPE_CHECKING: - from ngraph.failure.policy import FailurePolicy - - -@dataclass -class TrafficMatrixSet: - """Named collection of TrafficDemand lists. - - This mutable container maps scenario names to lists of TrafficDemand objects, - allowing management of multiple traffic matrices for analysis. - - Attributes: - matrices: Dictionary mapping scenario names to TrafficDemand lists. - """ - - matrices: dict[str, list[TrafficDemand]] = field(default_factory=dict) - - def add(self, name: str, demands: list[TrafficDemand]) -> None: - """Add a traffic matrix to the collection. - - Args: - name: Scenario name identifier. - demands: List of TrafficDemand objects for this scenario. - """ - self.matrices[name] = demands - - def get_matrix(self, name: str) -> list[TrafficDemand]: - """Get a specific traffic matrix by name. - - Args: - name: Name of the matrix to retrieve. - - Returns: - List of TrafficDemand objects for the named matrix. - - Raises: - KeyError: If the matrix name doesn't exist. - """ - return self.matrices[name] - - def get_default_matrix(self) -> list[TrafficDemand]: - """Get default traffic matrix. - - Returns the matrix named 'default' if it exists. If there is exactly - one matrix, returns that single matrix. If there are no matrices, - returns an empty list. If there are multiple matrices and none is - named 'default', raises an error. - - Returns: - List of TrafficDemand objects for the default matrix. - - Raises: - ValueError: If multiple matrices exist without a 'default' matrix. - """ - if not self.matrices: - return [] - - if "default" in self.matrices: - return self.matrices["default"] - - if len(self.matrices) == 1: - return next(iter(self.matrices.values())) - - raise ValueError( - f"Multiple matrices exist ({list(self.matrices.keys())}) but no 'default' matrix. " - f"Please specify which matrix to use or add a 'default' matrix." - ) - - def get_all_demands(self) -> list[TrafficDemand]: - """Get all traffic demands from all matrices combined. - - Returns: - Flattened list of all TrafficDemand objects across all matrices. - """ - all_demands = [] - for demands in self.matrices.values(): - all_demands.extend(demands) - return all_demands - - def to_dict(self) -> dict[str, Any]: - """Convert to dictionary for JSON serialization. - - Returns: - Dictionary mapping scenario names to lists of TrafficDemand dictionaries. - """ - return { - name: [demand.__dict__ for demand in demands] - for name, demands in self.matrices.items() - } @dataclass(frozen=True) @@ -157,59 +64,6 @@ def to_dict(self) -> dict[str, Any]: } -@dataclass -class FailurePolicySet: - """Named collection of FailurePolicy objects. - - This mutable container maps failure policy names to FailurePolicy objects, - allowing management of multiple failure policies for analysis. - - Attributes: - policies: Dictionary mapping failure policy names to FailurePolicy objects. - """ - - policies: dict[str, "FailurePolicy"] = field(default_factory=dict) - - def add(self, name: str, policy: "FailurePolicy") -> None: - """Add a failure policy to the collection. - - Args: - name: Failure policy name identifier. - policy: FailurePolicy object for this failure policy. - """ - self.policies[name] = policy - - def get_policy(self, name: str) -> "FailurePolicy": - """Get a specific failure policy by name. - - Args: - name: Name of the policy to retrieve. - - Returns: - FailurePolicy object for the named policy. - - Raises: - KeyError: If the policy name doesn't exist. - """ - return self.policies[name] - - def get_all_policies(self) -> list["FailurePolicy"]: - """Get all failure policies from the collection. - - Returns: - List of all FailurePolicy objects. - """ - return list(self.policies.values()) - - def to_dict(self) -> dict[str, Any]: - """Convert to dictionary for JSON serialization. - - Returns: - Dictionary mapping failure policy names to FailurePolicy dictionaries. - """ - return {name: policy.to_dict() for name, policy in self.policies.items()} - - @dataclass class CapacityEnvelope: """Frequency-based capacity envelope that stores capacity values as frequencies. @@ -461,3 +315,91 @@ def pattern_key(self) -> str: # Create deterministic key from excluded entities excluded_str = ",".join(sorted(self.excluded_nodes + self.excluded_links)) return f"pattern_{hash(excluded_str) & 0x7FFFFFFF:08x}" + + +@dataclass +class PlacementEnvelope: + """Per-demand placement envelope keyed like capacity envelopes. + + Each envelope captures frequency distribution of placement ratio for a + specific demand definition across Monte Carlo iterations. + + Attributes: + source: Source selection regex or node label. + sink: Sink selection regex or node label. + mode: Demand expansion mode ("combine" or "pairwise"). + priority: Demand priority class. + frequencies: Mapping of placement ratio to occurrence count. + min: Minimum observed placement ratio. + max: Maximum observed placement ratio. + mean: Mean placement ratio. + stdev: Standard deviation of placement ratio. + total_samples: Number of iterations represented. + """ + + source: str + sink: str + mode: str + priority: int + frequencies: Dict[float, int] + min: float + max: float + mean: float + stdev: float + total_samples: int + + @staticmethod + def _compute_stats(values: List[float]) -> tuple[float, float, float, float]: + n = len(values) + total = sum(values) + mean = total / n + sum_squares = sum(v * v for v in values) + variance = (sum_squares / n) - (mean * mean) + stdev = variance**0.5 + return (min(values), max(values), mean, stdev) + + @classmethod + def from_values( + cls, + source: str, + sink: str, + mode: str, + priority: int, + ratios: List[float], + rounding_decimals: int = 4, + ) -> "PlacementEnvelope": + if not ratios: + raise ValueError("Cannot create placement envelope from empty ratios list") + freqs: Dict[float, int] = {} + quantized: List[float] = [] + for r in ratios: + q = round(float(r), rounding_decimals) + quantized.append(q) + freqs[q] = freqs.get(q, 0) + 1 + mn, mx, mean, stdev = cls._compute_stats(quantized) + return cls( + source=source, + sink=sink, + mode=mode, + priority=int(priority), + frequencies=freqs, + min=mn, + max=mx, + mean=mean, + stdev=stdev, + total_samples=len(quantized), + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "source": self.source, + "sink": self.sink, + "mode": self.mode, + "priority": self.priority, + "frequencies": self.frequencies, + "min": self.min, + "max": self.max, + "mean": self.mean, + "stdev": self.stdev, + "total_samples": self.total_samples, + } diff --git a/ngraph/results/results.py b/ngraph/results/store.py similarity index 99% rename from ngraph/results/results.py rename to ngraph/results/store.py index 6127a2f..4b72276 100644 --- a/ngraph/results/results.py +++ b/ngraph/results/store.py @@ -96,7 +96,7 @@ def get_all(self, key: str) -> Dict[str, Any]: Returns: Dict[str, Any]: A dict mapping step_name -> value for all steps that have stored something under 'key'. """ - result = {} + result: Dict[str, Any] = {} for step_name, data in self._store.items(): if key in data: result[step_name] = data[key] diff --git a/ngraph/scenario.py b/ngraph/scenario.py index 9851818..f8b070f 100644 --- a/ngraph/scenario.py +++ b/ngraph/scenario.py @@ -8,16 +8,17 @@ import yaml from ngraph.components import ComponentsLibrary -from ngraph.demand.spec import TrafficDemand +from ngraph.demand.manager.builder import build_traffic_matrix_set +from ngraph.demand.matrix import TrafficMatrixSet from ngraph.dsl.blueprints.expand import expand_network_dsl from ngraph.failure.policy import ( FailureCondition, FailurePolicy, FailureRule, ) +from ngraph.failure.policy_set import FailurePolicySet from ngraph.model.network import Network, RiskGroup from ngraph.results import Results -from ngraph.results.artifacts import FailurePolicySet, TrafficMatrixSet from ngraph.seed_manager import SeedManager from ngraph.workflow.base import WORKFLOW_STEP_REGISTRY, WorkflowStep from ngraph.yaml_utils import normalize_yaml_dict_keys @@ -165,20 +166,7 @@ def from_yaml( # 3) Build traffic matrix set raw = data.get("traffic_matrix_set", {}) - if not isinstance(raw, dict): - raise ValueError( - "'traffic_matrix_set' must be a mapping of name -> list[TrafficDemand]" - ) - - # Normalize dictionary keys to handle YAML boolean keys - normalized_raw = normalize_yaml_dict_keys(raw) - tms = TrafficMatrixSet() - for name, td_list in normalized_raw.items(): - if not isinstance(td_list, list): - raise ValueError( - f"Matrix '{name}' must map to a list of TrafficDemand dicts" - ) - tms.add(name, [TrafficDemand(**d) for d in td_list]) + tms = build_traffic_matrix_set(raw) # 4) Build workflow steps workflow_data = data.get("workflow", []) diff --git a/ngraph/workflow/__init__.py b/ngraph/workflow/__init__.py index d1b054c..d23421e 100644 --- a/ngraph/workflow/__init__.py +++ b/ngraph/workflow/__init__.py @@ -4,6 +4,7 @@ from .build_graph import BuildGraph from .capacity_envelope_analysis import CapacityEnvelopeAnalysis from .network_stats import NetworkStats +from .traffic_matrix_placement_analysis import TrafficMatrixPlacementAnalysis __all__ = [ "WorkflowStep", @@ -11,4 +12,5 @@ "BuildGraph", "CapacityEnvelopeAnalysis", "NetworkStats", + "TrafficMatrixPlacementAnalysis", ] diff --git a/ngraph/workflow/analysis/__init__.py b/ngraph/workflow/analysis/__init__.py index fb31743..f7d2d18 100644 --- a/ngraph/workflow/analysis/__init__.py +++ b/ngraph/workflow/analysis/__init__.py @@ -36,6 +36,7 @@ from .capacity_matrix import CapacityMatrixAnalyzer from .data_loader import DataLoader from .package_manager import PackageManager +from .placement_matrix import PlacementMatrixAnalyzer from .registry import AnalysisConfig, AnalysisRegistry, get_default_registry from .summary import SummaryAnalyzer @@ -70,6 +71,7 @@ def analyze_capacity_envelopes( "AnalysisRegistry", "get_default_registry", "CapacityMatrixAnalyzer", + "PlacementMatrixAnalyzer", "SummaryAnalyzer", "PackageManager", "DataLoader", diff --git a/ngraph/workflow/analysis/placement_matrix.py b/ngraph/workflow/analysis/placement_matrix.py new file mode 100644 index 0000000..809e466 --- /dev/null +++ b/ngraph/workflow/analysis/placement_matrix.py @@ -0,0 +1,171 @@ +"""Placement envelope analysis utilities. + +Processes placement envelope results from TrafficMatrixPlacementAnalysis into +placement matrices and summaries suitable for notebooks. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +import pandas as pd + +from .base import NotebookAnalyzer + + +class PlacementMatrixAnalyzer(NotebookAnalyzer): + """Analyze placement envelopes and display matrices/statistics.""" + + def get_description(self) -> str: # noqa: D401 – simple return + return "Processes placement envelope data into matrices and summaries" + + def analyze(self, results: Dict[str, Any], **kwargs) -> Dict[str, Any]: + """Analyze placement envelopes for a given step. + + Expects results[step_name]["placement_envelopes"] to be a dict keyed by + "src->dst|prio=K" mapping to a dict containing: + - source, sink, priority + - mean, min, max, stdev, total_samples + - frequencies + """ + step_name: Optional[str] = kwargs.get("step_name") + if not step_name: + raise ValueError("step_name required for placement matrix analysis") + + step_data = results.get(step_name, {}) + envelopes = step_data.get("placement_envelopes", {}) + if not envelopes: + raise ValueError(f"No placement envelope data found for step: {step_name}") + + matrix_data = self._extract_matrix_data(envelopes) + if not matrix_data: + raise ValueError(f"No valid placement envelope data in step: {step_name}") + + df_matrix = pd.DataFrame(matrix_data) + # Build per-priority matrices and stats + placement_matrices: Dict[int, pd.DataFrame] = {} + statistics_by_priority: Dict[int, Dict[str, Any]] = {} + for prio in sorted({int(row["priority"]) for row in matrix_data}): + df_p = df_matrix[df_matrix["priority"] == prio] + pm = self._create_matrix(df_p) + placement_matrices[prio] = pm + statistics_by_priority[prio] = self._calculate_statistics(pm) + + # Combined matrix + placement_matrix = self._create_matrix(df_matrix) + statistics = self._calculate_statistics(placement_matrix) + + return { + "status": "success", + "step_name": step_name, + "matrix_data": matrix_data, + "placement_matrix": placement_matrix, + "statistics": statistics, + "placement_matrices": placement_matrices, + "statistics_by_priority": statistics_by_priority, + } + + def analyze_and_display_step(self, results: Dict[str, Any], **kwargs) -> None: + step_name = kwargs.get("step_name") + if not step_name: + print("❌ No step name provided for placement matrix analysis") + return + + try: + analysis = self.analyze(results, step_name=step_name) + self.display_analysis(analysis) + except Exception as e: + print(f"❌ Placement matrix analysis failed: {e}") + raise + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _extract_matrix_data(self, envelopes: Dict[str, Any]) -> List[Dict[str, Any]]: + data: List[Dict[str, Any]] = [] + for flow_key, env in envelopes.items(): + if not isinstance(env, dict): + continue + src = env.get("src") or env.get("source") + dst = env.get("dst") or env.get("sink") + prio = env.get("priority", 0) + mean_ratio = env.get("mean") + if src is None or dst is None or mean_ratio is None: + continue + data.append( + { + "source": str(src), + "destination": str(dst), + "ratio": float(mean_ratio), + "flow_path": flow_key, + "priority": int(prio), + } + ) + return data + + @staticmethod + def _create_matrix(df_matrix: pd.DataFrame) -> pd.DataFrame: + return df_matrix.pivot_table( + index="source", + columns="destination", + values="ratio", + aggfunc="mean", + fill_value=0.0, + ) + + @staticmethod + def _calculate_statistics(placement_matrix: pd.DataFrame) -> Dict[str, Any]: + values = placement_matrix.values + non_zero = values[values > 0] + if len(non_zero) == 0: + return {"has_data": False} + return { + "has_data": True, + "ratio_min": float(non_zero.min()), + "ratio_max": float(non_zero.max()), + "ratio_mean": float(non_zero.mean()), + "num_sources": len(placement_matrix.index), + "num_destinations": len(placement_matrix.columns), + } + + def display_analysis(self, analysis: Dict[str, Any], **kwargs) -> None: # noqa: D401 + step_name = analysis.get("step_name", "Unknown") + print(f"✅ Analyzing placement matrix for {step_name}") + from . import show # lazy import to avoid circular + + def fmt(x: float) -> str: + return f"{x:.2f}" + + matrices = analysis.get("placement_matrices", {}) + stats_by_prio = analysis.get("statistics_by_priority", {}) + + if not matrices: + print("No placement data available") + return + + for prio in sorted(matrices.keys()): + print(f"\nPriority {prio}") + stats = stats_by_prio.get(prio, {"has_data": False}) + if not stats.get("has_data"): + print(" No data") + continue + print(f" Sources: {stats['num_sources']:,} nodes") + print(f" Destinations: {stats['num_destinations']:,} columns") + print( + f" Placement ratio range: {stats['ratio_min']:.2f} - {stats['ratio_max']:.2f} (mean {stats['ratio_mean']:.2f})" + ) + + matrix_display = matrices[prio].copy() + matrix_display.index.name = "Source" + matrix_display.columns.name = "Destination" + if not matrix_display.empty: + md = matrix_display.applymap(fmt) + show( + md, + caption=f"Placement Matrix (priority {prio}) - {step_name}", + scrollY="400px", + scrollX=True, + scrollCollapse=True, + paging=False, + ) diff --git a/ngraph/workflow/analysis/registry.py b/ngraph/workflow/analysis/registry.py index 48c7d69..3bcb275 100644 --- a/ngraph/workflow/analysis/registry.py +++ b/ngraph/workflow/analysis/registry.py @@ -149,4 +149,14 @@ def get_default_registry() -> AnalysisRegistry: section_title="Graph Construction", ) + # Traffic matrix placement analysis - dedicated analyzer + from .placement_matrix import PlacementMatrixAnalyzer + + registry.register( + "TrafficMatrixPlacementAnalysis", + PlacementMatrixAnalyzer, + method_name="analyze_and_display_step", + section_title="Traffic Matrix Placement Analysis", + ) + return registry diff --git a/ngraph/workflow/capacity_envelope_analysis.py b/ngraph/workflow/capacity_envelope_analysis.py index 3a5c30f..1d907db 100644 --- a/ngraph/workflow/capacity_envelope_analysis.py +++ b/ngraph/workflow/capacity_envelope_analysis.py @@ -157,11 +157,20 @@ def run(self, scenario: "Scenario") -> None: logger.info(f"Generated {len(envelope_results.envelopes)} capacity envelopes") - # Convert envelope objects to serializable format for scenario storage - envelopes_dict = { - flow_key: envelope.to_dict() - for flow_key, envelope in envelope_results.envelopes.items() - } + # Convert envelope objects to serializable format and enrich with flow labels/metric + envelopes_dict = {} + for flow_key, envelope in envelope_results.envelopes.items(): + data = envelope.to_dict() + # Parse labels from key like "A->B" + if "->" in flow_key: + src_label, dst_label = flow_key.split("->", 1) + data["src"] = src_label + data["dst"] = dst_label + else: + data["src"] = flow_key + data["dst"] = flow_key + data["metric"] = "capacity" + envelopes_dict[flow_key] = data # Store results in scenario scenario.results.put(self.name, "capacity_envelopes", envelopes_dict) diff --git a/ngraph/workflow/traffic_matrix_placement_analysis.py b/ngraph/workflow/traffic_matrix_placement_analysis.py new file mode 100644 index 0000000..2299f5d --- /dev/null +++ b/ngraph/workflow/traffic_matrix_placement_analysis.py @@ -0,0 +1,239 @@ +"""Traffic matrix demand placement workflow component. + +Executes Monte Carlo analysis of traffic demand placement under failures using +FailureManager. Takes a named traffic matrix from the scenario's +TrafficMatrixSet. Optionally includes a baseline iteration (no failures). + +YAML Configuration Example: + + workflow: + - step_type: TrafficMatrixPlacementAnalysis + name: "tm_placement_monte_carlo" + matrix_name: "default" # Required: Name of traffic matrix to use + failure_policy: "random_failures" # Optional: Named failure policy + iterations: 100 # Number of Monte Carlo trials + parallelism: 4 # Number of worker processes + placement_rounds: "auto" # Optimization rounds per priority (int or "auto") + baseline: true # Include baseline iteration first + seed: 42 # Optional reproducible seed + store_failure_patterns: false # Store failure patterns if needed + include_flow_details: true # Collect per-demand cost distribution and edges + +Results stored in `scenario.results` under the step name: + - placement_envelopes: Per-demand placement ratio envelopes with statistics + When ``include_flow_details`` is true, each envelope also includes + ``flow_summary_stats`` with aggregated ``cost_distribution_stats`` and + ``edge_usage_frequencies``. + - failure_pattern_results: Failure pattern mapping (if requested) + - metadata: Execution metadata (iterations, parallelism, baseline, etc.) +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from ngraph.failure.manager.manager import FailureManager +from ngraph.logging import get_logger +from ngraph.workflow.base import WorkflowStep, register_workflow_step + +if TYPE_CHECKING: + from ngraph.scenario import Scenario + +logger = get_logger(__name__) + + +@dataclass +class TrafficMatrixPlacementAnalysis(WorkflowStep): + """Monte Carlo demand placement analysis using a named traffic matrix. + + Attributes: + matrix_name: Name of the traffic matrix in scenario.traffic_matrix_set. + failure_policy: Optional policy name in scenario.failure_policy_set. + iterations: Number of Monte Carlo iterations. + parallelism: Number of parallel worker processes. + placement_rounds: Placement optimization rounds (int or "auto"). + baseline: Include baseline iteration without failures first. + seed: Optional seed for reproducibility. + store_failure_patterns: Whether to store failure pattern results. + include_flow_details: If True, collect per-demand cost distribution and + edges used per iteration, and aggregate into ``flow_summary_stats`` + on each placement envelope. + """ + + matrix_name: str = "" + failure_policy: str | None = None + iterations: int = 1 + parallelism: int = 1 + placement_rounds: int | str = "auto" + baseline: bool = False + seed: int | None = None + store_failure_patterns: bool = False + include_flow_details: bool = False + + def run(self, scenario: "Scenario") -> None: + """Execute demand placement Monte Carlo analysis. + + Args: + scenario: Scenario containing network, failure policies, and traffic matrices. + + Raises: + ValueError: If matrix_name is not provided or not found in the scenario. + """ + if not self.matrix_name: + raise ValueError( + "'matrix_name' is required for TrafficMatrixPlacementAnalysis" + ) + + logger.info( + f"Starting demand placement analysis: {self.name or self.__class__.__name__}" + ) + + # Extract and serialize the requested traffic matrix to simple dicts + try: + td_list = scenario.traffic_matrix_set.get_matrix(self.matrix_name) + except KeyError as exc: + raise ValueError( + f"Traffic matrix '{self.matrix_name}' not found in scenario." + ) from exc + + demands_config: list[dict[str, Any]] = [] + for td in td_list: + demands_config.append( + { + "source_path": td.source_path, + "sink_path": td.sink_path, + "demand": td.demand, + "mode": getattr(td, "mode", "pairwise"), + "flow_policy_config": getattr(td, "flow_policy_config", None), + "priority": getattr(td, "priority", 0), + } + ) + + # Run via FailureManager convenience method + fm = FailureManager( + network=scenario.network, + failure_policy_set=scenario.failure_policy_set, + policy_name=self.failure_policy, + ) + + results = fm.run_demand_placement_monte_carlo( + demands_config=demands_config, + iterations=self.iterations, + parallelism=self.parallelism, + placement_rounds=self.placement_rounds, + baseline=self.baseline, + seed=self.seed, + store_failure_patterns=self.store_failure_patterns, + include_flow_details=self.include_flow_details, + ) + + # Build per-demand placement envelopes similar to capacity envelopes + from collections import defaultdict + + from ngraph.results.artifacts import PlacementEnvelope + + # Single-pass accumulation across all iterations + # - placement ratios always + # - optional cost distributions and edge usage when include_flow_details=True + per_demand_ratios: dict[tuple[str, str, int], list[float]] = defaultdict(list) + cost_map: dict[tuple[str, str, int], dict[float, list[float]]] | None = None + edge_counts: dict[tuple[str, str, int], dict[str, int]] | None = None + + if self.include_flow_details: + cost_map = defaultdict(lambda: defaultdict(list)) + edge_counts = defaultdict(lambda: defaultdict(int)) + + for iter_result in results.raw_results.get("results", []): + for d_entry in iter_result.get("demand_results", []): + src = str(d_entry.get("src", "")) + dst = str(d_entry.get("dst", "")) + prio = int(d_entry.get("priority", 0)) + ratio = float(d_entry.get("placement_ratio", 0.0)) + per_demand_ratios[(src, dst, prio)].append(ratio) + + if ( + self.include_flow_details + and cost_map is not None + and edge_counts is not None + ): + cd = d_entry.get("cost_distribution") + if isinstance(cd, dict): + for cost_key, vol in cd.items(): + try: + cost_val = float(cost_key) + vol_f = float(vol) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Invalid cost_distribution entry for {src}->{dst} prio={prio}: {cost_key!r}={vol!r}: {exc}" + ) from exc + cost_map[(src, dst, prio)][cost_val].append(vol_f) + + used_edges = d_entry.get("edges_used") or [] + if isinstance(used_edges, list): + for e in used_edges: + edge_counts[(src, dst, prio)][str(e)] += 1 + + # Create PlacementEnvelope per demand; use 'pairwise' as mode because expanded demands are per pair + envelopes: dict[str, dict[str, Any]] = {} + for (src, dst, prio), ratios in per_demand_ratios.items(): + env = PlacementEnvelope.from_values( + source=src, + sink=dst, + mode="pairwise", + priority=prio, + ratios=ratios, + ) + key = f"{src}->{dst}|prio={prio}" + data = env.to_dict() + data["src"] = src + data["dst"] = dst + data["metric"] = "placement_ratio" + envelopes[key] = data + + # If flow details were requested, aggregate them into per-demand flow_summary_stats + if ( + self.include_flow_details + and cost_map is not None + and edge_counts is not None + ): + # Reduce accumulations into stats and attach to envelopes + for (src, dst, prio), costs in cost_map.items(): + key = f"{src}->{dst}|prio={prio}" + if key not in envelopes: + continue + cost_stats: dict[float, dict[str, Any]] = {} + for cost, vols in costs.items(): + if not vols: + continue + freq = {v: vols.count(v) for v in set(vols)} + cost_stats[float(cost)] = { + "mean": sum(vols) / len(vols), + "min": min(vols), + "max": max(vols), + "total_samples": len(vols), + "frequencies": freq, + } + flow_stats: dict[str, Any] = {"cost_distribution_stats": cost_stats} + # Attach edge usage frequencies if collected + ec = edge_counts.get((src, dst, prio)) + if ec: + flow_stats["edge_usage_frequencies"] = dict(ec) + envelopes[key]["flow_summary_stats"] = flow_stats + + # Convert to serializable dicts for results export + # Store serializable outputs + scenario.results.put(self.name, "placement_envelopes", envelopes) + if self.store_failure_patterns and results.failure_patterns: + scenario.results.put( + self.name, "failure_pattern_results", results.failure_patterns + ) + scenario.results.put(self.name, "metadata", results.metadata) + + logger.info( + f"Demand placement analysis completed: {self.name or self.__class__.__name__}" + ) + + +# Register the workflow step +register_workflow_step("TrafficMatrixPlacementAnalysis")(TrafficMatrixPlacementAnalysis) diff --git a/scenarios/square_mesh.yaml b/scenarios/square_mesh.yaml index f7b7238..91944ee 100644 --- a/scenarios/square_mesh.yaml +++ b/scenarios/square_mesh.yaml @@ -49,6 +49,13 @@ failure_policy_set: rule_type: choice count: 1 +traffic_matrix_set: + default: + - source_path: "^N([1-4])$" + sink_path: "^N([1-4])$" + demand: 12.0 + mode: "pairwise" + workflow: # Single pairwise analysis generates complete 4x4 node-to-node capacity matrix - step_type: CapacityEnvelopeAnalysis @@ -63,5 +70,17 @@ workflow: flow_placement: "PROPORTIONAL" # Distribute flow proportionally across paths baseline: true # Include baseline (no failure) analysis seed: 42 # Reproducible results - store_failure_patterns: false # Retain failure patterns for analysis + store_failure_patterns: true # Retain failure patterns for analysis include_flow_summary: true # Include cost distribution and min-cut analysis + + - step_type: TrafficMatrixPlacementAnalysis + name: "tm_placement" + matrix_name: "default" + failure_policy: "single_link_failure" + iterations: 10 + parallelism: 8 + placement_rounds: "auto" + baseline: true + seed: 42 + store_failure_patterns: true + include_flow_details: true diff --git a/tests/demand/manager/test_expand.py b/tests/demand/manager/test_expand.py new file mode 100644 index 0000000..35365e5 --- /dev/null +++ b/tests/demand/manager/test_expand.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Protocol + +from ngraph.demand.manager.expand import expand_demands +from ngraph.demand.spec import TrafficDemand +from ngraph.flows.policy import FlowPolicyConfig +from ngraph.graph.strict_multidigraph import StrictMultiDiGraph + + +@dataclass +class _NodeStub: + name: str + + +class _NetworkLike(Protocol): + def select_node_groups_by_path(self, path: str) -> Dict[str, List[_NodeStub]]: ... + + +class _NetworkStub: + def __init__(self, mapping: Dict[str, Dict[str, List[_NodeStub]]]): + self._mapping = mapping + + def select_node_groups_by_path(self, path: str) -> Dict[str, List[_NodeStub]]: # noqa: D401 - simple stub + return self._mapping.get(path, {}) + + +def test_expand_pairwise_multiple_pairs() -> None: + # Two sources x two sinks -> four demands + mapping = { + "src": {"S": [_NodeStub("A"), _NodeStub("B")]}, + "dst": {"T": [_NodeStub("C"), _NodeStub("D")]}, + } + net: _NetworkLike = _NetworkStub(mapping) + graph = StrictMultiDiGraph() + # The expansion logic connects pseudo nodes to real nodes; ensure real nodes exist + for n in ("A", "B", "C", "D"): + graph.add_node(n) + + td = TrafficDemand( + source_path="src", sink_path="dst", demand=100.0, mode="pairwise" + ) + expanded, td_map = expand_demands( + network=net, # type: ignore[arg-type] + graph=graph, + traffic_demands=[td], + default_flow_policy_config=FlowPolicyConfig.SHORTEST_PATHS_ECMP, + ) + + assert len(expanded) == 4 + assert len(td_map[td.id]) == 4 + # Equal split across pairs + assert all(abs(d.volume - 25.0) < 1e-9 for d in expanded) + assert all(d.demand_class == td.priority for d in expanded) + + +def test_expand_combine_uses_pseudo_nodes_and_single_demand() -> None: + # Combine mode should create a single Demand via pseudo nodes and edges + mapping = { + "src": {"S": [_NodeStub("A"), _NodeStub("B")]}, + "dst": {"T": [_NodeStub("C"), _NodeStub("D")]}, + } + net: _NetworkLike = _NetworkStub(mapping) + graph = StrictMultiDiGraph() + for n in ("A", "B", "C", "D"): + graph.add_node(n) + + td = TrafficDemand(source_path="src", sink_path="dst", demand=42.0, mode="combine") + expanded, td_map = expand_demands( + network=net, # type: ignore[arg-type] + graph=graph, + traffic_demands=[td], + default_flow_policy_config=FlowPolicyConfig.SHORTEST_PATHS_ECMP, + ) + + assert len(expanded) == 1 + assert len(td_map[td.id]) == 1 + d = expanded[0] + assert d.volume == 42.0 + assert str(d.src_node).startswith("combine_src::") + assert str(d.dst_node).startswith("combine_snk::") + # Pseudo nodes and link edges should exist + assert d.src_node in graph.nodes + assert d.dst_node in graph.nodes diff --git a/tests/demand/manager/test_schedule.py b/tests/demand/manager/test_schedule.py new file mode 100644 index 0000000..9011c5e --- /dev/null +++ b/tests/demand/manager/test_schedule.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from ngraph.algorithms.base import PathAlg +from ngraph.algorithms.flow_init import init_flow_graph +from ngraph.demand import Demand +from ngraph.demand.manager.schedule import place_demands_round_robin +from ngraph.flows.policy import FlowPolicy +from ngraph.graph.strict_multidigraph import StrictMultiDiGraph + + +def _graph_square() -> StrictMultiDiGraph: + g = StrictMultiDiGraph() + for node in ("A", "B", "C", "D"): + g.add_node(node) + # Two disjoint paths A->B->C and A->D->C + g.add_edge("A", "B", key=0, cost=1, capacity=1) + g.add_edge("B", "C", key=1, cost=1, capacity=1) + g.add_edge("A", "D", key=2, cost=1, capacity=1) + g.add_edge("D", "C", key=3, cost=1, capacity=1) + return g + + +def _policy() -> FlowPolicy: + # Defaults require additional params; set minimal working configuration + from ngraph.algorithms.base import EdgeSelect + from ngraph.algorithms.placement import FlowPlacement + + return FlowPolicy( + path_alg=PathAlg.SPF, + flow_placement=FlowPlacement.PROPORTIONAL, + edge_select=EdgeSelect.ALL_MIN_COST, + multipath=True, + ) + + +def test_round_robin_places_all_when_capacity_sufficient() -> None: + g = _graph_square() + init_flow_graph(g) + demands = [ + Demand("A", "C", 1.0, demand_class=0, flow_policy=_policy()), + Demand("A", "C", 1.0, demand_class=0, flow_policy=_policy()), + ] + + total = place_demands_round_robin(g, demands, placement_rounds=5) + assert abs(total - 2.0) < 1e-9 + assert all(abs(d.placed_demand - 1.0) < 1e-9 for d in demands) + + +def test_round_robin_stops_when_no_progress() -> None: + g = _graph_square() + # Reduce capacity on one edge to limit placement + # Set B->C capacity to 0 to enforce single path usage only + g.add_edge("B", "C", key=10, cost=1, capacity=0) + init_flow_graph(g) + + d1 = Demand("A", "C", 2.0, demand_class=0, flow_policy=_policy()) + d2 = Demand("A", "C", 2.0, demand_class=0, flow_policy=_policy()) + total = place_demands_round_robin(g, [d1, d2], placement_rounds=50) + # The two available links should allow at most 2 units total + assert abs(total - 2.0) < 1e-9 diff --git a/tests/dsl/test_parse_helpers.py b/tests/dsl/test_parse_helpers.py new file mode 100644 index 0000000..f5d2213 --- /dev/null +++ b/tests/dsl/test_parse_helpers.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import pytest + +from ngraph.dsl.blueprints.parse import ( + check_adjacency_keys, + check_link_params, + check_no_extra_keys, + expand_name_patterns, + join_paths, +) + + +def test_expand_name_patterns_no_brackets_returns_same() -> None: + assert expand_name_patterns("fa")[0] == "fa" + + +def test_expand_name_patterns_single_range() -> None: + assert expand_name_patterns("fa[1-3]") == ["fa1", "fa2", "fa3"] + + +def test_expand_name_patterns_multiple_ranges_cartesian() -> None: + out = expand_name_patterns("fa[1-2]_plane[5-6]") + assert sorted(out) == sorted( + ["fa1_plane5", "fa1_plane6", "fa2_plane5", "fa2_plane6"] + ) + + +def test_join_paths_behavior() -> None: + assert join_paths("", "/x") == "x" + assert join_paths("p", "/x") == "p/x" + assert join_paths("", "x") == "x" + assert join_paths("p", "x") == "p/x" + + +def test_check_no_extra_keys_allows_only_expected() -> None: + check_no_extra_keys({"a": 1}, allowed={"a"}, context="ctx") + with pytest.raises(ValueError) as exc: + check_no_extra_keys({"a": 1, "b": 2}, allowed={"a"}, context="ctx") + assert "Unrecognized key(s) in ctx" in str(exc.value) + + +def test_check_adjacency_keys_valid_and_missing_required() -> None: + # Valid + check_adjacency_keys( + { + "source": "A", + "target": "B", + "pattern": "mesh", + "link_count": 1, + "link_params": {}, + }, + context="top-level adjacency", + ) + + # Missing required keys + with pytest.raises(ValueError) as exc: + check_adjacency_keys({"pattern": "mesh"}, context="adj") + assert "must have 'source' and 'target'" in str(exc.value) + + # Extra key + with pytest.raises(ValueError) as exc2: + check_adjacency_keys( + { + "source": "A", + "target": "B", + "unexpected": True, + }, + context="adj", + ) + assert "Unrecognized key(s) in adj" in str(exc2.value) + + +def test_check_link_params_valid_and_extra_key() -> None: + # Valid set of keys + check_link_params( + { + "capacity": 1, + "cost": 2, + "disabled": False, + "risk_groups": ["RG"], + "attrs": {"k": "v"}, + }, + context="ctx", + ) + + # Extra key should raise + with pytest.raises(ValueError) as exc: + check_link_params({"capacity": 1, "extra": 0}, context="ctx") + assert "Unrecognized link_params key(s) in ctx" in str(exc.value) diff --git a/tests/failure/test_manager.py b/tests/failure/test_manager.py index d08ce0d..9d3237a 100644 --- a/tests/failure/test_manager.py +++ b/tests/failure/test_manager.py @@ -11,9 +11,9 @@ from ngraph.failure.manager.manager import FailureManager from ngraph.failure.policy import FailurePolicy +from ngraph.failure.policy_set import FailurePolicySet from ngraph.model.network import Network from ngraph.model.view import NetworkView -from ngraph.results.artifacts import FailurePolicySet @pytest.fixture diff --git a/tests/failure/test_manager_integration.py b/tests/failure/test_manager_integration.py index d4806aa..038b21d 100644 --- a/tests/failure/test_manager_integration.py +++ b/tests/failure/test_manager_integration.py @@ -4,9 +4,9 @@ from ngraph.failure.manager.manager import FailureManager from ngraph.failure.policy import FailurePolicy, FailureRule +from ngraph.failure.policy_set import FailurePolicySet from ngraph.model.network import Network from ngraph.monte_carlo.functions import max_flow_analysis -from ngraph.results.artifacts import FailurePolicySet class TestFailureManagerCore: diff --git a/tests/failure/test_policy_set.py b/tests/failure/test_policy_set.py index ca91184..6249f39 100644 --- a/tests/failure/test_policy_set.py +++ b/tests/failure/test_policy_set.py @@ -3,7 +3,7 @@ import pytest from ngraph.failure.policy import FailurePolicy, FailureRule -from ngraph.results.artifacts import FailurePolicySet +from ngraph.failure.policy_set import FailurePolicySet class TestFailurePolicySet: diff --git a/tests/monte_carlo/test_functions.py b/tests/monte_carlo/test_functions.py index f531714..0d8a62c 100644 --- a/tests/monte_carlo/test_functions.py +++ b/tests/monte_carlo/test_functions.py @@ -195,8 +195,16 @@ def test_demand_placement_analysis_basic(self) -> None: # Verify results structure assert "total_placed" in result assert "priority_results" in result + assert "demand_results" in result assert result["total_placed"] == 130.0 + # Check demand_results preserve offered and placed volumes + dr = sorted(result["demand_results"], key=lambda x: x["priority"]) # type: ignore[arg-type] + assert dr[0]["offered_demand"] == 100.0 + assert dr[0]["placed_demand"] == 80.0 + assert dr[1]["offered_demand"] == 50.0 + assert dr[1]["placed_demand"] == 50.0 + priority_results = result["priority_results"] assert 0 in priority_results assert 1 in priority_results diff --git a/tests/results/test_artifacts.py b/tests/results/test_artifacts.py index 7c81850..f254a0e 100644 --- a/tests/results/test_artifacts.py +++ b/tests/results/test_artifacts.py @@ -3,11 +3,12 @@ import pytest from ngraph.demand.manager.manager import TrafficResult +from ngraph.demand.matrix import TrafficMatrixSet from ngraph.demand.spec import TrafficDemand from ngraph.results.artifacts import ( CapacityEnvelope, + PlacementEnvelope, PlacementResultSet, - TrafficMatrixSet, ) @@ -194,7 +195,7 @@ def test_placement_result_set_complex_scenarios(): def test_all_artifacts_json_roundtrip(): """Test that all result artifacts can roundtrip through JSON.""" from ngraph.demand.spec import TrafficDemand - from ngraph.results.artifacts import PlacementResultSet, TrafficMatrixSet + from ngraph.results.artifacts import PlacementResultSet # Create instances of all artifact types env = CapacityEnvelope.from_values("src", "dst", "combine", [100, 150, 200]) @@ -238,6 +239,25 @@ def check_primitives(obj): check_primitives(parsed) +def test_placement_envelope_from_values_basic(): + env = PlacementEnvelope.from_values( + source="A", + sink="B", + mode="pairwise", + priority=1, + ratios=[1.0, 0.8, 0.8], + ) + assert env.source == "A" + assert env.sink == "B" + assert env.mode == "pairwise" + assert env.priority == 1 + assert env.frequencies.get(1.0) == 1 + assert env.frequencies.get(0.8) == 2 + assert env.total_samples == 3 + d = env.to_dict() + json.dumps(d) + + def test_traffic_matrix_set_get_default_single_matrix(): """Test get_default() with only one matrix.""" matrix_set = TrafficMatrixSet() diff --git a/tests/workflow/test_capacity_envelope_analysis.py b/tests/workflow/test_capacity_envelope_analysis.py index abbdd98..86a7cfe 100644 --- a/tests/workflow/test_capacity_envelope_analysis.py +++ b/tests/workflow/test_capacity_envelope_analysis.py @@ -6,9 +6,9 @@ from ngraph.algorithms.base import FlowPlacement from ngraph.failure.policy import FailurePolicy, FailureRule +from ngraph.failure.policy_set import FailurePolicySet from ngraph.model.network import Link, Network, Node from ngraph.results import Results -from ngraph.results.artifacts import FailurePolicySet from ngraph.scenario import Scenario from ngraph.workflow.capacity_envelope_analysis import ( CapacityEnvelopeAnalysis, diff --git a/tests/workflow/test_traffic_matrix_placement_analysis.py b/tests/workflow/test_traffic_matrix_placement_analysis.py new file mode 100644 index 0000000..78d953b --- /dev/null +++ b/tests/workflow/test_traffic_matrix_placement_analysis.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from ngraph.workflow.traffic_matrix_placement_analysis import ( + TrafficMatrixPlacementAnalysis, +) + + +@patch("ngraph.workflow.traffic_matrix_placement_analysis.FailureManager") +def test_traffic_matrix_placement_analysis_stores_envelopes( + mock_failure_manager_class, +) -> None: + # Prepare mock scenario with traffic matrix and results store + mock_scenario = MagicMock() + mock_td = MagicMock() + mock_td.source_path = "A" + mock_td.sink_path = "B" + mock_td.demand = 10.0 + mock_td.mode = "pairwise" + mock_td.priority = 0 + mock_scenario.traffic_matrix_set.get_matrix.return_value = [mock_td] + + # Mock FailureManager return value (two iterations with different ratios) + mock_results = MagicMock() + mock_results.raw_results = { + "results": [ + { + "demand_results": [ + { + "src": "A", + "dst": "B", + "priority": 0, + "offered_demand": 10.0, + "placed_demand": 8.0, + "unplaced_demand": 2.0, + "placement_ratio": 0.8, + } + ] + }, + { + "demand_results": [ + { + "src": "A", + "dst": "B", + "priority": 0, + "offered_demand": 10.0, + "placed_demand": 10.0, + "unplaced_demand": 0.0, + "placement_ratio": 1.0, + } + ] + }, + ] + } + mock_results.failure_patterns = {} + mock_results.metadata = {"iterations": 2} + mock_failure_manager = MagicMock() + mock_failure_manager_class.return_value = mock_failure_manager + mock_failure_manager.run_demand_placement_monte_carlo.return_value = mock_results + + step = TrafficMatrixPlacementAnalysis( + name="tm_step", + matrix_name="default", + iterations=2, + baseline=False, + ) + step.run(mock_scenario) + + # Verify results were stored under the new key and include an envelope dict + put_calls = mock_scenario.results.put.call_args_list + stored = {args[1]: args[2] for args, _ in (call for call in put_calls)} + assert "placement_envelopes" in stored + envelopes = stored["placement_envelopes"] + assert isinstance(envelopes, dict) + key = "A->B|prio=0" + assert key in envelopes + env = envelopes[key] + # Envelope should be a plain dict ready for JSON export + assert isinstance(env, dict) + for k in [ + "source", + "sink", + "mode", + "priority", + "frequencies", + "min", + "max", + "mean", + "stdev", + "total_samples", + ]: + assert k in env + + +@patch("ngraph.workflow.traffic_matrix_placement_analysis.FailureManager") +def test_traffic_matrix_placement_analysis_flow_details_aggregated( + mock_failure_manager_class, +) -> None: + # Prepare mock scenario with traffic matrix and results store + mock_scenario = MagicMock() + mock_td = MagicMock() + mock_td.source_path = "A" + mock_td.sink_path = "B" + mock_td.demand = 10.0 + mock_td.mode = "pairwise" + mock_td.priority = 0 + mock_scenario.traffic_matrix_set.get_matrix.return_value = [mock_td] + + # Mock FailureManager return value with cost_distribution data + mock_results = MagicMock() + mock_results.raw_results = { + "results": [ + { + "demand_results": [ + { + "src": "A", + "dst": "B", + "priority": 0, + "offered_demand": 10.0, + "placed_demand": 8.0, + "unplaced_demand": 2.0, + "placement_ratio": 0.8, + "cost_distribution": {1.0: 5.0, 2.0: 3.0}, + "edges_used": ["(u,v,k1)", "(x,y,k2)"], + } + ] + }, + { + "demand_results": [ + { + "src": "A", + "dst": "B", + "priority": 0, + "offered_demand": 10.0, + "placed_demand": 10.0, + "unplaced_demand": 0.0, + "placement_ratio": 1.0, + "cost_distribution": {1.0: 7.0, 3.0: 2.0}, + "edges_used": ["(u,v,k1)"], + } + ] + }, + ] + } + mock_results.failure_patterns = {} + mock_results.metadata = {"iterations": 2} + mock_failure_manager = MagicMock() + mock_failure_manager_class.return_value = mock_failure_manager + mock_failure_manager.run_demand_placement_monte_carlo.return_value = mock_results + + step = TrafficMatrixPlacementAnalysis( + name="tm_step", + matrix_name="default", + iterations=2, + baseline=False, + include_flow_details=True, + ) + step.run(mock_scenario) + + # Verify flow_summary_stats aggregated into envelope + put_calls = mock_scenario.results.put.call_args_list + stored = {args[1]: args[2] for args, _ in (call for call in put_calls)} + envelopes = stored["placement_envelopes"] + env = envelopes["A->B|prio=0"] + assert "flow_summary_stats" in env + stats = env["flow_summary_stats"] + cds = stats.get("cost_distribution_stats", {}) + # cost 1.0 has volumes [5.0, 7.0] -> mean 6.0, min 5.0, max 7.0, samples 2 + assert 1.0 in cds + assert abs(cds[1.0]["mean"] - 6.0) < 1e-9 + assert cds[1.0]["min"] == 5.0 + assert cds[1.0]["max"] == 7.0 + assert cds[1.0]["total_samples"] == 2 + # edge frequencies counted across iterations + edge_freq = stats.get("edge_usage_frequencies", {}) + assert edge_freq.get("(u,v,k1)") == 2 + assert edge_freq.get("(x,y,k2)") == 1