From 5fc0512c8021dea0a8dfcb77cbde17cb20c4035d Mon Sep 17 00:00:00 2001 From: Andrey Golovanov Date: Mon, 22 Dec 2025 00:47:04 +0000 Subject: [PATCH 1/2] Update to v0.16.0 - Reorganized modules: `ngraph.exec` split into `ngraph.analysis` and `ngraph.model`, maintaining public API via re-exports. - Expanded public API to include `TrafficDemand`, `FlowPolicyPreset`, `Scenario`, `NetworkExplorer`, and placement functions. - Introduced `ngraph.analysis.placement` for demand placement with SPF caching, including `place_demands()` and `PlacementResult`. --- CHANGELOG.md | 14 + docs/reference/api-full.md | 363 +++++++++------ docs/reference/api.md | 2 +- docs/reference/design.md | 28 +- ngraph/__init__.py | 10 +- ngraph/_version.py | 2 +- ngraph/analysis/__init__.py | 101 ++--- ngraph/analysis/context.py | 51 +++ .../demand/expand.py => analysis/demand.py} | 0 .../failure_manager.py} | 14 +- .../flow.py => analysis/functions.py} | 413 ++---------------- ngraph/analysis/placement.py | 351 +++++++++++++++ ngraph/exec/analysis/__init__.py | 19 - ngraph/exec/demand/__init__.py | 5 - ngraph/exec/failure/__init__.py | 8 - ngraph/explorer.py | 244 ++++++----- ngraph/lib/nx.py | 15 +- ngraph/model/__init__.py | 17 +- ngraph/model/demand/__init__.py | 20 + ngraph/{exec => model}/demand/builder.py | 0 ngraph/model/failure/__init__.py | 28 +- ngraph/model/flow/__init__.py | 22 + ngraph/model/path.py | 13 +- ngraph/results/__init__.py | 14 +- ngraph/scenario.py | 6 +- ngraph/types/__init__.py | 18 + ngraph/workflow/max_flow_step.py | 2 +- .../workflow/maximum_supported_demand_step.py | 63 ++- .../workflow/traffic_matrix_placement_step.py | 2 +- pyproject.toml | 2 +- tests/analysis/__init__.py | 1 + .../test_context.py} | 0 .../test_demand.py} | 2 +- .../test_failure_manager.py} | 8 +- .../test_failure_manager_integration.py} | 4 +- .../test_flow_placement_semantics.py | 0 tests/{exec => }/analysis/test_functions.py | 16 +- .../analysis/test_functions_details.py | 2 +- .../{solver => analysis}/test_maxflow_api.py | 0 .../test_maxflow_cache.py | 0 .../test_maxflow_cost_distribution.py | 0 tests/{solver => analysis}/test_paths.py | 0 .../test_placement.py} | 58 +-- tests/exec/analysis/__init__.py | 1 - tests/exec/failure/__init__.py | 0 tests/{exec => model}/demand/test_builder.py | 2 +- tests/model/failure/test_failure_trace.py | 2 +- tests/solver/__init__.py | 0 .../workflow/test_maximum_supported_demand.py | 2 +- 49 files changed, 1097 insertions(+), 848 deletions(-) rename ngraph/{exec/demand/expand.py => analysis/demand.py} (100%) rename ngraph/{exec/failure/manager.py => analysis/failure_manager.py} (98%) rename ngraph/{exec/analysis/flow.py => analysis/functions.py} (52%) create mode 100644 ngraph/analysis/placement.py delete mode 100644 ngraph/exec/analysis/__init__.py delete mode 100644 ngraph/exec/demand/__init__.py delete mode 100644 ngraph/exec/failure/__init__.py create mode 100644 ngraph/model/demand/__init__.py rename ngraph/{exec => model}/demand/builder.py (100%) create mode 100644 ngraph/model/flow/__init__.py create mode 100644 tests/analysis/__init__.py rename tests/{adapters/test_adapters.py => analysis/test_context.py} (100%) rename tests/{exec/demand/test_expand.py => analysis/test_demand.py} (99%) rename tests/{exec/failure/test_manager.py => analysis/test_failure_manager.py} (98%) rename tests/{exec/failure/test_manager_integration.py => analysis/test_failure_manager_integration.py} (99%) rename tests/{solver => analysis}/test_flow_placement_semantics.py (100%) rename tests/{exec => }/analysis/test_functions.py (96%) rename tests/{exec => }/analysis/test_functions_details.py (97%) rename tests/{solver => analysis}/test_maxflow_api.py (100%) rename tests/{solver => analysis}/test_maxflow_cache.py (100%) rename tests/{solver => analysis}/test_maxflow_cost_distribution.py (100%) rename tests/{solver => analysis}/test_paths.py (100%) rename tests/{exec/analysis/test_spf_caching.py => analysis/test_placement.py} (93%) delete mode 100644 tests/exec/analysis/__init__.py delete mode 100644 tests/exec/failure/__init__.py rename tests/{exec => model}/demand/test_builder.py (99%) delete mode 100644 tests/solver/__init__.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b89bc8..2560975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.16.0] - 2025-12-21 + +### Changed + +- **Module reorganization**: `ngraph.exec` split into `ngraph.analysis` (runtime analysis) and `ngraph.model` (data structures); public API unchanged via re-exports +- **Expanded public API**: `TrafficDemand`, `FlowPolicyPreset`, `Scenario`, `NetworkExplorer`, and placement functions now exported from top-level modules +- **Placement analysis**: Extracted SPF caching and demand placement logic into `ngraph.analysis.placement` module with `place_demands()` and `PlacementResult` + +### Added + +- `ngraph.model.demand` subpackage: `TrafficDemand` and builder functions +- `ngraph.model.flow` subpackage: `FlowPolicyPreset` and policy configuration +- `ngraph.types` exports: `Mode`, `FlowPlacement`, `EdgeSelect`, `EdgeRef`, `MaxFlowResult` + ## [0.15.0] - 2025-12-21 ### Added diff --git a/docs/reference/api-full.md b/docs/reference/api-full.md index bd1e8c6..1fc68ce 100644 --- a/docs/reference/api-full.md +++ b/docs/reference/api-full.md @@ -12,9 +12,9 @@ Quick links: - [CLI Reference](cli.md) - [DSL Reference](dsl.md) -Generated from source code on: December 21, 2025 at 01:24 UTC +Generated from source code on: December 22, 2025 at 00:44 UTC -Modules auto-discovered: 52 +Modules auto-discovered: 53 --- @@ -409,6 +409,29 @@ Returns: --- +## ngraph.model.demand.builder + +Builders for traffic matrices. + +Construct `TrafficMatrixSet` from raw dictionaries (e.g. parsed YAML). + +### build_traffic_matrix_set(raw: 'Dict[str, List[dict]]') -> 'TrafficMatrixSet' + +Build a `TrafficMatrixSet` from a mapping of name -> list of dicts. + +Args: + raw: Mapping where each key is a matrix name and each value is a list of + dictionaries with `TrafficDemand` constructor fields. + +Returns: + Initialized `TrafficMatrixSet` with constructed `TrafficDemand` objects. + +Raises: + ValueError: If ``raw`` is not a mapping of name -> list[dict], + or if required fields are missing. + +--- + ## ngraph.model.demand.matrix Traffic matrix containers. @@ -2751,6 +2774,42 @@ Attributes: capacity: Edge capacity cost: Edge cost (converted to int64 for Core) +### analyze(network: "'Network'", *, source: 'Optional[Union[str, Dict[str, Any]]]' = None, sink: 'Optional[Union[str, Dict[str, Any]]]' = None, mode: 'Mode' = , augmentations: 'Optional[List[AugmentationEdge]]' = None) -> 'AnalysisContext' + +Create an analysis context for the network. + +This is THE primary entry point for network analysis in NetGraph. + +Args: + network: Network topology to analyze. + source: Optional source node selector (string path or selector dict). + If provided with sink, creates bound context with pre-built + pseudo-nodes for efficient repeated flow analysis. + sink: Optional sink node selector (string path or selector dict). + mode: Group mode (COMBINE or PAIRWISE). Only used if bound. + augmentations: Optional custom augmentation edges. + +Returns: + AnalysisContext ready for analysis calls. + +Examples: + One-off analysis (unbound context): + + flow = analyze(network).max_flow("^A$", "^B$") + paths = analyze(network).shortest_paths("^A$", "^B$") + + Efficient repeated analysis (bound context): + + ctx = analyze(network, source="^dc/", sink="^edge/") + baseline = ctx.max_flow() + degraded = ctx.max_flow(excluded_links=failed_links) + + Multiple exclusion scenarios: + + ctx = analyze(network, source="^A$", sink="^B$") + for scenario in failure_scenarios: + result = ctx.max_flow(excluded_links=scenario) + ### build_edge_mask(ctx: 'AnalysisContext', excluded_links: 'Optional[Set[str]]' = None) -> 'np.ndarray' Build an edge mask array for Core algorithms. @@ -2781,7 +2840,138 @@ Returns: --- -## ngraph.exec.analysis.flow +## ngraph.analysis.demand + +Demand expansion: converts TrafficDemand specs into concrete placement demands. + +Supports both pairwise and combine modes through augmentation-based pseudo nodes. +Uses unified selectors for node selection. + +### DemandExpansion + +Demand expansion result. + +Attributes: + demands: Concrete demands ready for placement (sorted by priority). + augmentations: Augmentation edges for pseudo nodes (empty for pairwise). + +**Attributes:** + +- `demands` (List[ExpandedDemand]) +- `augmentations` (List[AugmentationEdge]) + +### ExpandedDemand + +Concrete demand ready for placement. + +Uses node names (not IDs) so expansion happens before graph building. +Node IDs are resolved after the graph is built with pseudo nodes. + +Attributes: + src_name: Source node name (real or pseudo). + dst_name: Destination node name (real or pseudo). + volume: Traffic volume to place. + priority: Priority class (lower is higher priority). + policy_preset: FlowPolicy configuration preset. + demand_id: Parent TrafficDemand ID for tracking. + +**Attributes:** + +- `src_name` (str) +- `dst_name` (str) +- `volume` (float) +- `priority` (int) +- `policy_preset` (FlowPolicyPreset) +- `demand_id` (str) + +### expand_demands(network: 'Network', traffic_demands: 'List[TrafficDemand]', default_policy_preset: 'FlowPolicyPreset' = ) -> 'DemandExpansion' + +Expand TrafficDemand specifications into concrete demands with augmentations. + +Pure function that: + +1. Expands variables in selectors using expand_vars +2. Normalizes and evaluates selectors to get node groups +3. Distributes volume based on mode (combine/pairwise) and group_mode +4. Generates augmentation edges for combine mode (pseudo nodes) +5. Returns demands (node names) + augmentations + +Node names are used (not IDs) so expansion happens BEFORE graph building. +IDs are resolved after graph is built with augmentations. + +Args: + network: Network for node selection. + traffic_demands: High-level demand specifications. + default_policy_preset: Default policy if demand doesn't specify one. + +Returns: + DemandExpansion with demands and augmentations. + +Raises: + ValueError: If no demands could be expanded or unsupported mode. + +--- + +## ngraph.analysis.failure_manager + +FailureManager for Monte Carlo failure analysis. + +Provides the failure analysis engine for NetGraph. Supports parallel +processing, graph caching, and failure policy handling for workflow steps +and direct programmatic use. + +Performance characteristics: +Time complexity: O(S + I * A / P), where S is one-time graph setup cost, +I is iteration count, A is per-iteration analysis cost, and P is parallelism. +Graph caching amortizes expensive graph construction across all iterations, +and O(|excluded|) mask building replaces O(V+E) iteration. + +Space complexity: O(V + E + I * R), where V and E are node and link counts, +and R is result size per iteration. The pre-built graph is shared across +all iterations. + +Parallelism: The C++ Core backend releases the GIL during computation, +enabling true parallelism with Python threads. With graph caching, most +per-iteration work runs in GIL-free C++ code; speedup depends on workload +and parallelism level. + +### AnalysisFunction + +Protocol for analysis functions used with FailureManager. + +Analysis functions should take a Network, exclusion sets, and any additional +keyword arguments, returning analysis results of any type. + +### FailureManager + +Failure analysis engine with Monte Carlo capabilities. + +This is the component for failure analysis in NetGraph. +Provides parallel processing, worker caching, and failure +policy handling for workflow steps and direct notebook usage. + +The FailureManager can execute any analysis function that takes a Network +with exclusion sets and returns results, making it generic for different +types of failure analysis (capacity, traffic, connectivity, etc.). + +Attributes: + network: The underlying network (not modified during analysis). + failure_policy_set: Set of named failure policies. + policy_name: Name of specific failure policy to use. + +**Methods:** + +- `compute_exclusions(self, policy: "'FailurePolicy | None'" = None, seed_offset: 'int | None' = None, failure_trace: 'Optional[Dict[str, Any]]' = None) -> 'tuple[set[str], set[str]]'` - Compute set of nodes and links to exclude for a failure iteration. +- `get_failure_policy(self) -> "'FailurePolicy | None'"` - Get failure policy for analysis. +- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int | str' = 'auto', seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_details: 'bool' = False, include_used_edges: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures. +- `run_max_flow_monte_carlo(self, source: 'str | dict[str, Any]', sink: 'str | dict[str, Any]', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, require_capacity: 'bool' = True, flow_placement: 'FlowPlacement | str' = , seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_summary: 'bool' = False, **kwargs) -> 'Any'` - Analyze maximum flow capacity envelopes between node groups under failures. +- `run_monte_carlo_analysis(self, analysis_func: 'AnalysisFunction', iterations: 'int' = 1, parallelism: 'int' = 1, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **analysis_kwargs) -> 'dict[str, Any]'` - Run Monte Carlo failure analysis with any analysis function. +- `run_sensitivity_monte_carlo(self, source: 'str | dict[str, Any]', sink: 'str | dict[str, Any]', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = , seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'dict[str, Any]'` - Analyze component criticality for flow capacity under failures. +- `run_single_failure_scenario(self, analysis_func: 'AnalysisFunction', **kwargs) -> 'Any'` - Run a single failure scenario for convenience. + +--- + +## ngraph.analysis.functions Flow analysis functions for network evaluation. @@ -2915,157 +3105,60 @@ Returns: --- -## ngraph.exec.demand.builder - -Builders for traffic matrices. - -Construct `TrafficMatrixSet` from raw dictionaries (e.g. parsed YAML). - -### build_traffic_matrix_set(raw: 'Dict[str, List[dict]]') -> 'TrafficMatrixSet' - -Build a `TrafficMatrixSet` from a mapping of name -> list of dicts. - -Args: - raw: Mapping where each key is a matrix name and each value is a list of - dictionaries with `TrafficDemand` constructor fields. - -Returns: - Initialized `TrafficMatrixSet` with constructed `TrafficDemand` objects. - -Raises: - ValueError: If ``raw`` is not a mapping of name -> list[dict], - or if required fields are missing. - ---- - -## ngraph.exec.demand.expand +## ngraph.analysis.placement -Demand expansion: converts TrafficDemand specs into concrete placement demands. - -Supports both pairwise and combine modes through augmentation-based pseudo nodes. -Uses unified selectors for node selection. - -### DemandExpansion +Core demand placement with SPF caching. -Demand expansion result. +### PlacementEntry -Attributes: - demands: Concrete demands ready for placement (sorted by priority). - augmentations: Augmentation edges for pseudo nodes (empty for pairwise). - -**Attributes:** - -- `demands` (List[ExpandedDemand]) -- `augmentations` (List[AugmentationEdge]) - -### ExpandedDemand - -Concrete demand ready for placement. - -Uses node names (not IDs) so expansion happens before graph building. -Node IDs are resolved after the graph is built with pseudo nodes. - -Attributes: - src_name: Source node name (real or pseudo). - dst_name: Destination node name (real or pseudo). - volume: Traffic volume to place. - priority: Priority class (lower is higher priority). - policy_preset: FlowPolicy configuration preset. - demand_id: Parent TrafficDemand ID for tracking. +Single demand placement result. **Attributes:** - `src_name` (str) - `dst_name` (str) -- `volume` (float) - `priority` (int) -- `policy_preset` (FlowPolicyPreset) -- `demand_id` (str) - -### expand_demands(network: 'Network', traffic_demands: 'List[TrafficDemand]', default_policy_preset: 'FlowPolicyPreset' = ) -> 'DemandExpansion' - -Expand TrafficDemand specifications into concrete demands with augmentations. - -Pure function that: - -1. Expands variables in selectors using expand_vars -2. Normalizes and evaluates selectors to get node groups -3. Distributes volume based on mode (combine/pairwise) and group_mode -4. Generates augmentation edges for combine mode (pseudo nodes) -5. Returns demands (node names) + augmentations - -Node names are used (not IDs) so expansion happens BEFORE graph building. -IDs are resolved after graph is built with augmentations. - -Args: - network: Network for node selection. - traffic_demands: High-level demand specifications. - default_policy_preset: Default policy if demand doesn't specify one. - -Returns: - DemandExpansion with demands and augmentations. - -Raises: - ValueError: If no demands could be expanded or unsupported mode. - ---- - -## ngraph.exec.failure.manager - -FailureManager for Monte Carlo failure analysis. - -Provides the failure analysis engine for NetGraph. Supports parallel -processing, graph caching, and failure policy handling for workflow steps -and direct programmatic use. - -Performance characteristics: -Time complexity: O(S + I × A / P), where S is one-time graph setup cost, -I is iteration count, A is per-iteration analysis cost, and P is parallelism. -Graph caching amortizes expensive graph construction across all iterations, -and O(|excluded|) mask building replaces O(V+E) iteration. +- `volume` (float) +- `placed` (float) +- `cost_distribution` (dict[float, float]) = {} +- `used_edges` (set[str]) = set() -Space complexity: O(V + E + I × R), where V and E are node and link counts, -and R is result size per iteration. The pre-built graph is shared across -all iterations. +### PlacementResult -Parallelism: The C++ Core backend releases the GIL during computation, -enabling true parallelism with Python threads. With graph caching, most -per-iteration work runs in GIL-free C++ code; speedup depends on workload -and parallelism level. +Complete placement result. -### AnalysisFunction +**Attributes:** -Protocol for analysis functions used with FailureManager. +- `summary` (PlacementSummary) +- `entries` (list[PlacementEntry] | None) -Analysis functions should take a Network, exclusion sets, and any additional -keyword arguments, returning analysis results of any type. +### PlacementSummary -### FailureManager +Aggregated placement totals. -Failure analysis engine with Monte Carlo capabilities. +**Attributes:** -This is the component for failure analysis in NetGraph. -Provides parallel processing, worker caching, and failure -policy handling for workflow steps and direct notebook usage. +- `total_demand` (float) +- `total_placed` (float) -The FailureManager can execute any analysis function that takes a Network -with exclusion sets and returns results, making it generic for different -types of failure analysis (capacity, traffic, connectivity, etc.). +### place_demands(demands: "Sequence['ExpandedDemand']", volumes: 'Sequence[float]', flow_graph: 'netgraph_core.FlowGraph', ctx: "'AnalysisContext'", node_mask: 'np.ndarray', edge_mask: 'np.ndarray', *, resolved_ids: 'Sequence[tuple[int, int]] | None' = None, collect_entries: 'bool' = False, include_cost_distribution: 'bool' = False, include_used_edges: 'bool' = False) -> 'PlacementResult' -Attributes: - network: The underlying network (not modified during analysis). - failure_policy_set: Set of named failure policies. - policy_name: Name of specific failure policy to use. +Place demands on a flow graph with SPF caching. -**Methods:** +Args: + demands: Expanded demands (policy_preset, priority, names). + volumes: Demand volumes (allows scaling without modifying demands). + flow_graph: Target FlowGraph. + ctx: AnalysisContext with graph infrastructure. + node_mask: Node inclusion mask. + edge_mask: Edge inclusion mask. + resolved_ids: Pre-resolved (src_id, dst_id) pairs. Computed if None. + collect_entries: If True, populate result.entries. + include_cost_distribution: Include cost distribution in entries. + include_used_edges: Include used edges in entries. -- `compute_exclusions(self, policy: "'FailurePolicy | None'" = None, seed_offset: 'int | None' = None, failure_trace: 'Optional[Dict[str, Any]]' = None) -> 'tuple[set[str], set[str]]'` - Compute set of nodes and links to exclude for a failure iteration. -- `get_failure_policy(self) -> "'FailurePolicy | None'"` - Get failure policy for analysis. -- `run_demand_placement_monte_carlo(self, demands_config: 'list[dict[str, Any]] | Any', iterations: 'int' = 100, parallelism: 'int' = 1, placement_rounds: 'int | str' = 'auto', seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_details: 'bool' = False, include_used_edges: 'bool' = False, **kwargs) -> 'Any'` - Analyze traffic demand placement success under failures. -- `run_max_flow_monte_carlo(self, source: 'str | dict[str, Any]', sink: 'str | dict[str, Any]', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, require_capacity: 'bool' = True, flow_placement: 'FlowPlacement | str' = , seed: 'int | None' = None, store_failure_patterns: 'bool' = False, include_flow_summary: 'bool' = False, **kwargs) -> 'Any'` - Analyze maximum flow capacity envelopes between node groups under failures. -- `run_monte_carlo_analysis(self, analysis_func: 'AnalysisFunction', iterations: 'int' = 1, parallelism: 'int' = 1, seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **analysis_kwargs) -> 'dict[str, Any]'` - Run Monte Carlo failure analysis with any analysis function. -- `run_sensitivity_monte_carlo(self, source: 'str | dict[str, Any]', sink: 'str | dict[str, Any]', mode: 'str' = 'combine', iterations: 'int' = 100, parallelism: 'int' = 1, shortest_path: 'bool' = False, flow_placement: 'FlowPlacement | str' = , seed: 'int | None' = None, store_failure_patterns: 'bool' = False, **kwargs) -> 'dict[str, Any]'` - Analyze component criticality for flow capacity under failures. -- `run_single_failure_scenario(self, analysis_func: 'AnalysisFunction', **kwargs) -> 'Any'` - Run a single failure scenario for convenience. +Returns: + PlacementResult with summary and optional entries. --- @@ -3116,8 +3209,8 @@ Example: **Attributes:** -- `to_ref` (Dict[int, EdgeRef]) = {} -- `from_ref` (Dict[EdgeRef, List[int]]) = {} +- `to_ref` (Dict[int, NxEdgeTuple]) = {} +- `from_ref` (Dict[NxEdgeTuple, List[int]]) = {} ### NodeMap diff --git a/docs/reference/api.md b/docs/reference/api.md index bc0b823..160c082 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -55,7 +55,7 @@ The core components that form the foundation of most NetGraph programs. ```python from pathlib import Path -from ngraph.scenario import Scenario +from ngraph import Scenario # Load complete scenario from YAML text yaml_text = Path("scenarios/square_mesh.yaml").read_text() diff --git a/docs/reference/design.md b/docs/reference/design.md index 145d98c..34aad3d 100644 --- a/docs/reference/design.md +++ b/docs/reference/design.md @@ -17,6 +17,7 @@ NetGraph is a network scenario analysis engine using a **hybrid Python+C++ archi - Workflow Engine: Composes steps into end-to-end analyses, storing outputs in a results store - Results Store: Collects outputs and metadata from each step, enabling structured JSON export - Analysis bridge: `AnalysisContext` builds Core graphs from the model, manages name/ID mapping, and executes Core algorithms +- NetworkExplorer: Network hierarchy traversal and hardware cost/power aggregation **C++ Layer (NetGraph-Core):** @@ -26,6 +27,24 @@ NetGraph is a network scenario analysis engine using a **hybrid Python+C++ archi - Max-Flow: Successive shortest paths with blocking flow augmentation and configurable flow placement policies - Backend Interface: Abstraction for algorithm execution (CPU backend provided) +### Package Structure + +```text +ngraph/ +├── analysis/ # AnalysisContext, FailureManager, placement +├── model/ # Network, Node, Link, demand/, failure/, flow/ +├── dsl/ # YAML parsing (blueprints/, selectors/, expansion/) +├── workflow/ # WorkflowStep implementations +├── results/ # Results store and flow result types +├── types/ # Enums, DTOs, type aliases +├── profiling/ # Performance profiling +├── lib/ # NetworkX integration +├── utils/ # Utilities (ids, yaml, seeds) +├── scenario.py # Scenario orchestrator +├── explorer.py # NetworkExplorer +└── cli.py # Command-line interface +``` + ### Integration Points The Python layer uses the `analyze()` function and `AnalysisContext` class (`ngraph.analysis`) to: @@ -681,7 +700,7 @@ Practical performance is significantly better than worst-case bounds due to earl Managers handle scenario dynamics and prepare inputs for algorithmic steps. -**Demand Expansion** (`ngraph.exec.demand.builder`): Builds traffic matrix sets from DSL definitions, expanding source/sink patterns into concrete node groups. +**Demand Expansion** (`ngraph.model.demand.builder`): Builds traffic matrix sets from DSL definitions, expanding source/sink patterns into concrete node groups. - Deterministic expansion: source/sink node lists sorted alphabetically; no randomization - Supports `combine` mode (aggregate via pseudo nodes) and `pairwise` mode (individual (src,dst) pairs with volume split) @@ -689,7 +708,7 @@ Managers handle scenario dynamics and prepare inputs for algorithmic steps. - Placement uses SPF caching for simple policies (ECMP, WCMP, TE_WCMP_UNLIM), FlowPolicy for complex multi-flow policies - Non-mutating: operates on Core flow graphs with exclusions; Network remains unmodified -**Failure Manager** (`ngraph.exec.failure.manager`): Applies a `FailurePolicy` to compute exclusion sets and runs analyses with those exclusions. +**Failure Manager** (`ngraph.analysis.failure_manager`): Applies a `FailurePolicy` to compute exclusion sets and runs analyses with those exclusions. - Parallel execution via `ThreadPoolExecutor` with zero-copy network sharing across worker threads - Deterministic results when seed is provided (each iteration derives `seed + iteration_index`) @@ -771,13 +790,14 @@ This optimization is critical for performance: graph construction involves Pytho **SPF Caching for Demand Placement:** -For demand placement with cacheable policies (ECMP, WCMP, TE_WCMP_UNLIM), SPF results are cached by (source_node, policy_preset): +Both TrafficMatrixPlacement and MaximumSupportedDemand (MSD) use a unified placement function (`place_demands()` in `ngraph.analysis.placement`) with SPF caching for cacheable policies (ECMP, WCMP, TE_WCMP_UNLIM): - Initial SPF computed once per unique source; subsequent demands from the same source reuse the cached DAG - For TE policies, DAG is recomputed when capacity constraints require alternate paths - Complex multi-flow policies (TE_ECMP_16_LSP, TE_ECMP_UP_TO_256_LSP) use FlowPolicy directly +- MSD additionally pre-resolves node IDs once at cache build time and reuses them across all alpha probes -This reduces SPF computations from O(demands) to O(unique_sources) for workloads where many demands share the same source nodes. +This reduces SPF computations from O(demands) to O(unique_sources) for workloads where many demands share the same source nodes. For MSD, the optimization is particularly significant since it evaluates many alpha values during binary search. **Monte Carlo Deduplication:** diff --git a/ngraph/__init__.py b/ngraph/__init__.py index fe31a98..838f068 100644 --- a/ngraph/__init__.py +++ b/ngraph/__init__.py @@ -33,13 +33,17 @@ from ngraph import cli, logging from ngraph._version import __version__ from ngraph.analysis import AnalysisContext, analyze -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager +from ngraph.explorer import NetworkExplorer from ngraph.lib.nx import EdgeMap, NodeMap, from_networkx, to_networkx from ngraph.model.demand.matrix import TrafficMatrixSet +from ngraph.model.demand.spec import TrafficDemand +from ngraph.model.flow.policy_config import FlowPolicyPreset from ngraph.model.network import Link, Network, Node, RiskGroup from ngraph.model.path import Path from ngraph.results.artifacts import CapacityEnvelope from ngraph.results.flow import FlowEntry, FlowIterationResult, FlowSummary +from ngraph.scenario import Scenario from ngraph.types.base import EdgeSelect, FlowPlacement, Mode from ngraph.types.dto import EdgeRef, MaxFlowResult @@ -53,6 +57,9 @@ "RiskGroup", "Path", "TrafficMatrixSet", + "TrafficDemand", + "FlowPolicyPreset", + "Scenario", # Analysis (primary API) "analyze", "AnalysisContext", @@ -69,6 +76,7 @@ "CapacityEnvelope", # Execution "FailureManager", + "NetworkExplorer", # Library integrations (NetworkX) "EdgeMap", "NodeMap", diff --git a/ngraph/_version.py b/ngraph/_version.py index bd08160..6b84c81 100644 --- a/ngraph/_version.py +++ b/ngraph/_version.py @@ -2,4 +2,4 @@ __all__ = ["__version__"] -__version__ = "0.15.0" +__version__ = "0.16.0" diff --git a/ngraph/analysis/__init__.py b/ngraph/analysis/__init__.py index 9a41988..ccdb286 100644 --- a/ngraph/analysis/__init__.py +++ b/ngraph/analysis/__init__.py @@ -16,76 +16,57 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union - -# Internal helpers - importable but not part of public API. -# Redundant aliases silence F401 while keeping them accessible. from ngraph.analysis.context import LARGE_CAPACITY as LARGE_CAPACITY from ngraph.analysis.context import ( AnalysisContext, AugmentationEdge, + analyze, ) from ngraph.analysis.context import build_edge_mask as build_edge_mask from ngraph.analysis.context import build_node_mask as build_node_mask -from ngraph.types.base import Mode - -if TYPE_CHECKING: - from ngraph.model.network import Network - - -def analyze( - network: "Network", - *, - source: Optional[Union[str, Dict[str, Any]]] = None, - sink: Optional[Union[str, Dict[str, Any]]] = None, - mode: Mode = Mode.COMBINE, - augmentations: Optional[List[AugmentationEdge]] = None, -) -> AnalysisContext: - """Create an analysis context for the network. - - This is THE primary entry point for network analysis in NetGraph. - - Args: - network: Network topology to analyze. - source: Optional source node selector (string path or selector dict). - If provided with sink, creates bound context with pre-built - pseudo-nodes for efficient repeated flow analysis. - sink: Optional sink node selector (string path or selector dict). - mode: Group mode (COMBINE or PAIRWISE). Only used if bound. - augmentations: Optional custom augmentation edges. - - Returns: - AnalysisContext ready for analysis calls. - - Examples: - One-off analysis (unbound context): - - flow = analyze(network).max_flow("^A$", "^B$") - paths = analyze(network).shortest_paths("^A$", "^B$") - - Efficient repeated analysis (bound context): - - ctx = analyze(network, source="^dc/", sink="^edge/") - baseline = ctx.max_flow() - degraded = ctx.max_flow(excluded_links=failed_links) - - Multiple exclusion scenarios: - - ctx = analyze(network, source="^A$", sink="^B$") - for scenario in failure_scenarios: - result = ctx.max_flow(excluded_links=scenario) - """ - return AnalysisContext.from_network( - network, - source=source, - sink=sink, - mode=mode, - augmentations=augmentations, - ) - +from ngraph.analysis.demand import ( + DemandExpansion, + ExpandedDemand, + expand_demands, +) +from ngraph.analysis.failure_manager import AnalysisFunction, FailureManager +from ngraph.analysis.functions import ( + build_demand_context, + build_maxflow_context, + demand_placement_analysis, + max_flow_analysis, + sensitivity_analysis, +) +from ngraph.analysis.placement import ( + CACHEABLE_PRESETS, + PlacementEntry, + PlacementResult, + PlacementSummary, + place_demands, +) __all__ = [ + # Primary API "analyze", "AnalysisContext", "AugmentationEdge", + # Placement + "CACHEABLE_PRESETS", + "PlacementEntry", + "PlacementResult", + "PlacementSummary", + "place_demands", + # Demand expansion + "DemandExpansion", + "ExpandedDemand", + "expand_demands", + # Analysis functions + "build_demand_context", + "build_maxflow_context", + "demand_placement_analysis", + "max_flow_analysis", + "sensitivity_analysis", + # Failure analysis + "AnalysisFunction", + "FailureManager", ] diff --git a/ngraph/analysis/context.py b/ngraph/analysis/context.py index 0c30792..a3739cd 100644 --- a/ngraph/analysis/context.py +++ b/ngraph/analysis/context.py @@ -1719,3 +1719,54 @@ def build_edge_mask( Boolean numpy array of shape (num_edges,) where True means included. """ return ctx._build_edge_mask(excluded_links) + + +def analyze( + network: "Network", + *, + source: Optional[Union[str, Dict[str, Any]]] = None, + sink: Optional[Union[str, Dict[str, Any]]] = None, + mode: Mode = Mode.COMBINE, + augmentations: Optional[List[AugmentationEdge]] = None, +) -> AnalysisContext: + """Create an analysis context for the network. + + This is THE primary entry point for network analysis in NetGraph. + + Args: + network: Network topology to analyze. + source: Optional source node selector (string path or selector dict). + If provided with sink, creates bound context with pre-built + pseudo-nodes for efficient repeated flow analysis. + sink: Optional sink node selector (string path or selector dict). + mode: Group mode (COMBINE or PAIRWISE). Only used if bound. + augmentations: Optional custom augmentation edges. + + Returns: + AnalysisContext ready for analysis calls. + + Examples: + One-off analysis (unbound context): + + flow = analyze(network).max_flow("^A$", "^B$") + paths = analyze(network).shortest_paths("^A$", "^B$") + + Efficient repeated analysis (bound context): + + ctx = analyze(network, source="^dc/", sink="^edge/") + baseline = ctx.max_flow() + degraded = ctx.max_flow(excluded_links=failed_links) + + Multiple exclusion scenarios: + + ctx = analyze(network, source="^A$", sink="^B$") + for scenario in failure_scenarios: + result = ctx.max_flow(excluded_links=scenario) + """ + return AnalysisContext.from_network( + network, + source=source, + sink=sink, + mode=mode, + augmentations=augmentations, + ) diff --git a/ngraph/exec/demand/expand.py b/ngraph/analysis/demand.py similarity index 100% rename from ngraph/exec/demand/expand.py rename to ngraph/analysis/demand.py diff --git a/ngraph/exec/failure/manager.py b/ngraph/analysis/failure_manager.py similarity index 98% rename from ngraph/exec/failure/manager.py rename to ngraph/analysis/failure_manager.py index e31a460..5ec6eef 100644 --- a/ngraph/exec/failure/manager.py +++ b/ngraph/analysis/failure_manager.py @@ -5,12 +5,12 @@ and direct programmatic use. Performance characteristics: -Time complexity: O(S + I × A / P), where S is one-time graph setup cost, +Time complexity: O(S + I * A / P), where S is one-time graph setup cost, I is iteration count, A is per-iteration analysis cost, and P is parallelism. Graph caching amortizes expensive graph construction across all iterations, and O(|excluded|) mask building replaces O(V+E) iteration. -Space complexity: O(V + E + I × R), where V and E are node and link counts, +Space complexity: O(V + E + I * R), where V and E are node and link counts, and R is result size per iteration. The pre-built graph is shared across all iterations. @@ -413,7 +413,7 @@ def run_monte_carlo_analysis( if "demands_config" in analysis_kwargs: # Demand placement analysis - from ngraph.exec.analysis.flow import build_demand_context + from ngraph.analysis.functions import build_demand_context logger.debug("Pre-building context for demand placement analysis") analysis_kwargs["context"] = build_demand_context( @@ -423,7 +423,7 @@ def run_monte_carlo_analysis( elif "source" in analysis_kwargs and "sink" in analysis_kwargs: # Max-flow analysis or sensitivity analysis - from ngraph.exec.analysis.flow import build_maxflow_context + from ngraph.analysis.functions import build_maxflow_context logger.debug("Pre-building context for max-flow analysis") analysis_kwargs["context"] = build_maxflow_context( @@ -806,7 +806,7 @@ def run_max_flow_monte_carlo( Each result has occurrence_count indicating how many iterations matched. - 'metadata': Execution metadata (iterations, unique_patterns, execution_time, etc.) """ - from ngraph.exec.analysis.flow import max_flow_analysis + from ngraph.analysis.functions import max_flow_analysis # Convert string flow_placement to enum if needed if isinstance(flow_placement, str): @@ -922,7 +922,7 @@ def run_demand_placement_monte_carlo( Each result has occurrence_count indicating how many iterations matched. - 'metadata': Execution metadata (iterations, unique_patterns, execution_time, etc.) """ - from ngraph.exec.analysis.flow import demand_placement_analysis + from ngraph.analysis.functions import demand_placement_analysis # If caller passed a sequence of TrafficDemand objects, convert to dicts if not isinstance(demands_config, list): @@ -1010,7 +1010,7 @@ def run_sensitivity_monte_carlo( - 'component_scores': aggregated statistics (mean, max, min, count) per component per flow - 'metadata': Execution metadata (iterations, unique_patterns, execution_time, etc.) """ - from ngraph.exec.analysis.flow import sensitivity_analysis + from ngraph.analysis.functions import sensitivity_analysis # Convert string flow_placement to enum if needed if isinstance(flow_placement, str): diff --git a/ngraph/exec/analysis/flow.py b/ngraph/analysis/functions.py similarity index 52% rename from ngraph/exec/analysis/flow.py rename to ngraph/analysis/functions.py index 3ecec1e..585dc8b 100644 --- a/ngraph/exec/analysis/flow.py +++ b/ngraph/analysis/functions.py @@ -17,16 +17,15 @@ from __future__ import annotations -from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Optional, Set import netgraph_core -import numpy as np -from ngraph.analysis import AnalysisContext, analyze -from ngraph.exec.demand.expand import ExpandedDemand, expand_demands +from ngraph.analysis.context import AnalysisContext, analyze +from ngraph.analysis.demand import expand_demands +from ngraph.analysis.placement import place_demands from ngraph.model.demand.spec import TrafficDemand -from ngraph.model.flow.policy_config import FlowPolicyPreset, create_flow_policy +from ngraph.model.flow.policy_config import FlowPolicyPreset from ngraph.results.flow import FlowEntry, FlowIterationResult, FlowSummary from ngraph.types.base import FlowPlacement, Mode @@ -62,257 +61,6 @@ def _reconstruct_traffic_demands( if TYPE_CHECKING: from ngraph.model.network import Network -# Minimum flow threshold for placement decisions -_MIN_FLOW = 1e-9 - -# Policies that support SPF caching with simple single-flow placement -_CACHEABLE_SIMPLE: frozenset[FlowPolicyPreset] = frozenset( - { - FlowPolicyPreset.SHORTEST_PATHS_ECMP, - FlowPolicyPreset.SHORTEST_PATHS_WCMP, - } -) - -# Policies that support SPF caching with fallback for capacity-aware routing -_CACHEABLE_TE: frozenset[FlowPolicyPreset] = frozenset( - { - FlowPolicyPreset.TE_WCMP_UNLIM, - } -) - -# All cacheable policies -_CACHEABLE_PRESETS: frozenset[FlowPolicyPreset] = _CACHEABLE_SIMPLE | _CACHEABLE_TE - - -def _get_selection_for_preset( - preset: FlowPolicyPreset, -) -> netgraph_core.EdgeSelection: - """Get EdgeSelection configuration for a cacheable policy preset. - - Args: - preset: Flow policy preset. - - Returns: - EdgeSelection configured for the preset. - - Raises: - ValueError: If preset is not cacheable. - """ - if preset in _CACHEABLE_SIMPLE: - return netgraph_core.EdgeSelection( - multi_edge=True, - require_capacity=False, - tie_break=netgraph_core.EdgeTieBreak.DETERMINISTIC, - ) - elif preset == FlowPolicyPreset.TE_WCMP_UNLIM: - return netgraph_core.EdgeSelection( - multi_edge=True, - require_capacity=True, - tie_break=netgraph_core.EdgeTieBreak.PREFER_HIGHER_RESIDUAL, - ) - raise ValueError(f"Preset {preset} is not cacheable") - - -def _get_placement_for_preset(preset: FlowPolicyPreset) -> netgraph_core.FlowPlacement: - """Get FlowPlacement strategy for a cacheable policy preset. - - Args: - preset: Flow policy preset. - - Returns: - FlowPlacement strategy for the preset. - """ - if preset == FlowPolicyPreset.SHORTEST_PATHS_ECMP: - return netgraph_core.FlowPlacement.EQUAL_BALANCED - # WCMP and TE policies use PROPORTIONAL - return netgraph_core.FlowPlacement.PROPORTIONAL - - -@dataclass -class _CachedPlacementResult: - """Result of placing a demand using cached SPF.""" - - total_placed: float - next_flow_idx: int - cost_distribution: dict[float, float] - used_edges: set[str] - flow_indices: list[netgraph_core.FlowIndex] = field(default_factory=list) - - -def _place_demand_cached( - demand: ExpandedDemand, - src_id: int, - dst_id: int, - dag_cache: dict[tuple[int, FlowPolicyPreset], tuple[np.ndarray, Any]], - algorithms: netgraph_core.Algorithms, - handle: netgraph_core.Graph, - flow_graph: netgraph_core.FlowGraph, - node_mask: np.ndarray, - edge_mask: np.ndarray, - flow_idx_start: int, - include_flow_details: bool, - include_used_edges: bool, - edge_mapper: Any, - multidigraph: netgraph_core.StrictMultiDiGraph, -) -> _CachedPlacementResult: - """Place a demand using cached SPF DAG with fallback for TE policies. - - This function implements SPF caching to reduce redundant shortest path - computations. For demands sharing the same source node and policy preset, - the SPF result is computed once and reused. - - For simple policies (ECMP/WCMP), the cached DAG is always valid. - For TE policies, the DAG may become stale as edges saturate. In this case, - a fallback loop recomputes SPF with current residuals until the demand is - placed or no more progress can be made. - - Args: - demand: Expanded demand to place. - src_id: Source node ID. - dst_id: Destination node ID. - dag_cache: Cache mapping (src_id, preset) to (distances, DAG). - algorithms: Core Algorithms instance. - handle: Core Graph handle. - flow_graph: FlowGraph for placement. - node_mask: Node inclusion mask. - edge_mask: Edge inclusion mask. - flow_idx_start: Starting flow index counter. - include_flow_details: Whether to collect cost distribution. - include_used_edges: Whether to collect used edges. - edge_mapper: Edge ID mapper for edge name resolution. - multidigraph: Graph for edge lookup. - - Returns: - _CachedPlacementResult with placement details. - """ - cache_key = (src_id, demand.policy_preset) - selection = _get_selection_for_preset(demand.policy_preset) - placement = _get_placement_for_preset(demand.policy_preset) - is_te = demand.policy_preset in _CACHEABLE_TE - - flow_indices: list[netgraph_core.FlowIndex] = [] - flow_costs: list[tuple[float, float]] = [] # (cost, placed_amount) - flow_idx_counter = flow_idx_start - demand_placed = 0.0 - remaining = demand.volume - - # Get or compute initial DAG - if cache_key not in dag_cache: - # Initial computation without residual - on a fresh graph all edges - # have full capacity, so residual-aware selection is not needed yet - dists, dag = algorithms.spf( - handle, - src=src_id, - dst=None, # Full DAG to all destinations - selection=selection, - node_mask=node_mask, - edge_mask=edge_mask, - multipath=True, - dtype="float64", - ) - dag_cache[cache_key] = (dists, dag) - - dists, dag = dag_cache[cache_key] - - # Check if destination is reachable - if dists[dst_id] == float("inf"): - # Destination unreachable - return zero placement - return _CachedPlacementResult( - total_placed=0.0, - next_flow_idx=flow_idx_counter, - cost_distribution={}, - used_edges=set(), - flow_indices=[], - ) - - cost = float(dists[dst_id]) - - # First placement attempt with cached DAG - flow_idx = netgraph_core.FlowIndex( - src_id, dst_id, demand.priority, flow_idx_counter - ) - flow_idx_counter += 1 - placed = flow_graph.place(flow_idx, src_id, dst_id, dag, remaining, placement) - - if placed > _MIN_FLOW: - flow_indices.append(flow_idx) - flow_costs.append((cost, placed)) - demand_placed += placed - remaining -= placed - - # For TE policies, use fallback loop if partial placement - if is_te and remaining > _MIN_FLOW: - max_fallback_iterations = 100 - iterations = 0 - - while remaining > _MIN_FLOW and iterations < max_fallback_iterations: - iterations += 1 - - # Recompute DAG with current residuals - residual = np.ascontiguousarray( - flow_graph.residual_view(), dtype=np.float64 - ) - fresh_dists, fresh_dag = algorithms.spf( - handle, - src=src_id, - dst=None, - selection=selection, - residual=residual, - node_mask=node_mask, - edge_mask=edge_mask, - multipath=True, - dtype="float64", - ) - - # Update cache with fresh DAG - dag_cache[cache_key] = (fresh_dists, fresh_dag) - - # Check if destination still reachable - if fresh_dists[dst_id] == float("inf"): - break # No more paths available - - fresh_cost = float(fresh_dists[dst_id]) - - flow_idx = netgraph_core.FlowIndex( - src_id, dst_id, demand.priority, flow_idx_counter - ) - flow_idx_counter += 1 - additional = flow_graph.place( - flow_idx, src_id, dst_id, fresh_dag, remaining, placement - ) - - if additional < _MIN_FLOW: - break # No progress, stop - - flow_indices.append(flow_idx) - flow_costs.append((fresh_cost, additional)) - demand_placed += additional - remaining -= additional - - # Collect cost distribution if requested - cost_distribution: dict[float, float] = {} - if include_flow_details: - for c, amount in flow_costs: - cost_distribution[c] = cost_distribution.get(c, 0.0) + amount - - # Collect used edges if requested - used_edges: set[str] = set() - if include_used_edges: - for fidx in flow_indices: - edges = flow_graph.get_flow_edges(fidx) - for edge_id, _ in edges: - edge_ref = edge_mapper.to_ref(edge_id, multidigraph) - if edge_ref is not None: - used_edges.add(f"{edge_ref.link_id}:{edge_ref.direction}") - - return _CachedPlacementResult( - total_placed=demand_placed, - next_flow_idx=flow_idx_counter, - cost_distribution=cost_distribution, - used_edges=used_edges, - flow_indices=flow_indices, - ) - def max_flow_analysis( network: "Network", @@ -492,135 +240,52 @@ def demand_placement_analysis( network, augmentations=expansion.augmentations ) - # Extract infrastructure from context - handle = ctx.handle - multidigraph = ctx.multidigraph - node_mapper = ctx.node_mapper - edge_mapper = ctx.edge_mapper - algorithms = ctx.algorithms node_mask = ctx._build_node_mask(excluded_nodes) edge_mask = ctx._build_edge_mask(excluded_links) + flow_graph = netgraph_core.FlowGraph(ctx.multidigraph) + + # Phase 3: Place demands using unified placement module + result = place_demands( + expansion.demands, + [d.volume for d in expansion.demands], + flow_graph, + ctx, + node_mask, + edge_mask, + collect_entries=True, + include_cost_distribution=include_flow_details, + include_used_edges=include_used_edges, + ) - flow_graph = netgraph_core.FlowGraph(multidigraph) - - # Phase 3: Place demands with SPF caching for cacheable policies - flow_entries: list[FlowEntry] = [] - total_demand = 0.0 - total_placed = 0.0 - - # SPF cache: (src_id, policy_preset) -> (distances, DAG) - dag_cache: dict[tuple[int, FlowPolicyPreset], tuple[np.ndarray, Any]] = {} - flow_idx_counter = 0 - - for demand in expansion.demands: - # Resolve node names to IDs (includes pseudo nodes from augmentations) - src_id = node_mapper.to_id(demand.src_name) - dst_id = node_mapper.to_id(demand.dst_name) - - # Use cached placement for cacheable policies, FlowPolicy for others - if demand.policy_preset in _CACHEABLE_PRESETS: - result = _place_demand_cached( - demand=demand, - src_id=src_id, - dst_id=dst_id, - dag_cache=dag_cache, - algorithms=algorithms, - handle=handle, - flow_graph=flow_graph, - node_mask=node_mask, - edge_mask=edge_mask, - flow_idx_start=flow_idx_counter, - include_flow_details=include_flow_details, - include_used_edges=include_used_edges, - edge_mapper=edge_mapper, - multidigraph=multidigraph, - ) - flow_idx_counter = result.next_flow_idx - placed = result.total_placed - cost_distribution = result.cost_distribution - used_edges = result.used_edges - else: - # Complex policies (multi-flow LSP variants): use FlowPolicy - policy = create_flow_policy( - algorithms, - handle, - demand.policy_preset, - node_mask=node_mask, - edge_mask=edge_mask, - ) - - placed, flow_count = policy.place_demand( - flow_graph, - src_id, - dst_id, - demand.priority, - demand.volume, - ) - - # Collect flow details if requested - cost_distribution = {} - used_edges = set() - - if include_flow_details or include_used_edges: - flows_dict = policy.flows - for flow_key, flow_data in flows_dict.items(): - if include_flow_details: - cost = float(flow_data[2]) - flow_vol = float(flow_data[3]) - if flow_vol > 0: - cost_distribution[cost] = ( - cost_distribution.get(cost, 0.0) + flow_vol - ) - - if include_used_edges: - flow_idx = netgraph_core.FlowIndex( - flow_key[0], flow_key[1], flow_key[2], flow_key[3] - ) - edges = flow_graph.get_flow_edges(flow_idx) - for edge_id, _ in edges: - edge_ref = edge_mapper.to_ref(edge_id, multidigraph) - if edge_ref is not None: - used_edges.add( - f"{edge_ref.link_id}:{edge_ref.direction}" - ) - - # Build entry data - entry_data: dict[str, Any] = {} - if include_used_edges and used_edges: - entry_data["edges"] = sorted(used_edges) - entry_data["edges_kind"] = "used" - - # Create flow entry - entry = FlowEntry( - source=demand.src_name, - destination=demand.dst_name, - priority=demand.priority, - demand=demand.volume, - placed=placed, - dropped=demand.volume - placed, - cost_distribution=cost_distribution if include_flow_details else {}, - data=entry_data, + # Phase 4: Convert to FlowEntry format + flow_entries = [ + FlowEntry( + source=e.src_name, + destination=e.dst_name, + priority=e.priority, + demand=e.volume, + placed=e.placed, + dropped=e.volume - e.placed, + cost_distribution=e.cost_distribution, + data=( + {"edges": sorted(e.used_edges), "edges_kind": "used"} + if e.used_edges + else {} + ), ) - flow_entries.append(entry) - total_demand += demand.volume - total_placed += placed + for e in result.entries or [] + ] - # Build summary - overall_ratio = (total_placed / total_demand) if total_demand > 0 else 1.0 dropped_flows = sum(1 for e in flow_entries if e.dropped > 0.0) summary = FlowSummary( - total_demand=total_demand, - total_placed=total_placed, - overall_ratio=overall_ratio, + total_demand=result.summary.total_demand, + total_placed=result.summary.total_placed, + overall_ratio=result.summary.ratio, dropped_flows=dropped_flows, num_flows=len(flow_entries), ) - return FlowIterationResult( - flows=flow_entries, - summary=summary, - data={}, - ) + return FlowIterationResult(flows=flow_entries, summary=summary, data={}) def sensitivity_analysis( diff --git a/ngraph/analysis/placement.py b/ngraph/analysis/placement.py new file mode 100644 index 0000000..21870c8 --- /dev/null +++ b/ngraph/analysis/placement.py @@ -0,0 +1,351 @@ +"""Core demand placement with SPF caching.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Sequence + +import netgraph_core +import numpy as np + +from ngraph.model.flow.policy_config import FlowPolicyPreset, create_flow_policy + +if TYPE_CHECKING: + from ngraph.analysis.context import AnalysisContext + from ngraph.analysis.demand import ExpandedDemand + +CACHEABLE_PRESETS: frozenset[FlowPolicyPreset] = frozenset( + { + FlowPolicyPreset.SHORTEST_PATHS_ECMP, + FlowPolicyPreset.SHORTEST_PATHS_WCMP, + FlowPolicyPreset.TE_WCMP_UNLIM, + } +) + +_CACHEABLE_TE: frozenset[FlowPolicyPreset] = frozenset( + { + FlowPolicyPreset.TE_WCMP_UNLIM, + } +) + +_MIN_FLOW = 1e-9 + + +@dataclass(slots=True) +class PlacementSummary: + """Aggregated placement totals.""" + + total_demand: float + total_placed: float + + @property + def ratio(self) -> float: + return self.total_placed / self.total_demand if self.total_demand > 0 else 1.0 + + @property + def is_feasible(self) -> bool: + return self.ratio >= 1.0 - 1e-12 + + +@dataclass(slots=True) +class PlacementEntry: + """Single demand placement result.""" + + src_name: str + dst_name: str + priority: int + volume: float + placed: float + cost_distribution: dict[float, float] = field(default_factory=dict) + used_edges: set[str] = field(default_factory=set) + + +@dataclass(slots=True) +class PlacementResult: + """Complete placement result.""" + + summary: PlacementSummary + entries: list[PlacementEntry] | None = None + + +def _get_edge_selection(preset: FlowPolicyPreset) -> netgraph_core.EdgeSelection: + """Get EdgeSelection for a cacheable preset.""" + if preset in ( + FlowPolicyPreset.SHORTEST_PATHS_ECMP, + FlowPolicyPreset.SHORTEST_PATHS_WCMP, + ): + return netgraph_core.EdgeSelection( + multi_edge=True, + require_capacity=False, + tie_break=netgraph_core.EdgeTieBreak.DETERMINISTIC, + ) + return netgraph_core.EdgeSelection( + multi_edge=True, + require_capacity=True, + tie_break=netgraph_core.EdgeTieBreak.PREFER_HIGHER_RESIDUAL, + ) + + +def _get_flow_placement(preset: FlowPolicyPreset) -> netgraph_core.FlowPlacement: + """Get FlowPlacement for a cacheable preset.""" + if preset == FlowPolicyPreset.SHORTEST_PATHS_ECMP: + return netgraph_core.FlowPlacement.EQUAL_BALANCED + return netgraph_core.FlowPlacement.PROPORTIONAL + + +def place_demands( + demands: Sequence["ExpandedDemand"], + volumes: Sequence[float], + flow_graph: netgraph_core.FlowGraph, + ctx: "AnalysisContext", + node_mask: np.ndarray, + edge_mask: np.ndarray, + *, + resolved_ids: Sequence[tuple[int, int]] | None = None, + collect_entries: bool = False, + include_cost_distribution: bool = False, + include_used_edges: bool = False, +) -> PlacementResult: + """Place demands on a flow graph with SPF caching. + + Args: + demands: Expanded demands (policy_preset, priority, names). + volumes: Demand volumes (allows scaling without modifying demands). + flow_graph: Target FlowGraph. + ctx: AnalysisContext with graph infrastructure. + node_mask: Node inclusion mask. + edge_mask: Edge inclusion mask. + resolved_ids: Pre-resolved (src_id, dst_id) pairs. Computed if None. + collect_entries: If True, populate result.entries. + include_cost_distribution: Include cost distribution in entries. + include_used_edges: Include used edges in entries. + + Returns: + PlacementResult with summary and optional entries. + """ + if resolved_ids is None: + resolved_ids = [ + (ctx.node_mapper.to_id(d.src_name), ctx.node_mapper.to_id(d.dst_name)) + for d in demands + ] + + dag_cache: dict[tuple[int, FlowPolicyPreset], tuple[np.ndarray, Any]] = {} + entries: list[PlacementEntry] | None = [] if collect_entries else None + total_demand = 0.0 + total_placed = 0.0 + flow_idx_counter = 0 + + for demand, volume, (src_id, dst_id) in zip( + demands, volumes, resolved_ids, strict=True + ): + total_demand += volume + + if demand.policy_preset in CACHEABLE_PRESETS: + placed, cost_dist, used_edges, flow_idx_counter = _place_cached( + src_id, + dst_id, + volume, + demand.priority, + demand.policy_preset, + dag_cache, + ctx, + flow_graph, + node_mask, + edge_mask, + flow_idx_counter, + include_cost_distribution, + include_used_edges, + ) + else: + placed, cost_dist, used_edges = _place_with_policy( + src_id, + dst_id, + volume, + demand.priority, + demand.policy_preset, + ctx, + flow_graph, + node_mask, + edge_mask, + include_cost_distribution, + include_used_edges, + ) + + total_placed += placed + + if entries is not None: + entries.append( + PlacementEntry( + src_name=demand.src_name, + dst_name=demand.dst_name, + priority=demand.priority, + volume=volume, + placed=placed, + cost_distribution=cost_dist if include_cost_distribution else {}, + used_edges=used_edges if include_used_edges else set(), + ) + ) + + return PlacementResult( + summary=PlacementSummary(total_demand=total_demand, total_placed=total_placed), + entries=entries, + ) + + +def _place_cached( + src_id: int, + dst_id: int, + volume: float, + priority: int, + preset: FlowPolicyPreset, + dag_cache: dict[tuple[int, FlowPolicyPreset], tuple[np.ndarray, Any]], + ctx: "AnalysisContext", + flow_graph: netgraph_core.FlowGraph, + node_mask: np.ndarray, + edge_mask: np.ndarray, + flow_idx_start: int, + include_cost_distribution: bool, + include_used_edges: bool, +) -> tuple[float, dict[float, float], set[str], int]: + """Place single demand with SPF caching.""" + cache_key = (src_id, preset) + selection = _get_edge_selection(preset) + placement = _get_flow_placement(preset) + is_te = preset in _CACHEABLE_TE + + flow_indices: list[netgraph_core.FlowIndex] = [] + flow_costs: list[tuple[float, float]] = [] + flow_idx_counter = flow_idx_start + placed = 0.0 + remaining = volume + + if cache_key not in dag_cache: + dists, dag = ctx.algorithms.spf( + ctx.handle, + src=src_id, + dst=None, + selection=selection, + node_mask=node_mask, + edge_mask=edge_mask, + multipath=True, + dtype="float64", + ) + dag_cache[cache_key] = (dists, dag) + + dists, dag = dag_cache[cache_key] + + if dists[dst_id] == float("inf"): + return 0.0, {}, set(), flow_idx_counter + + cost = float(dists[dst_id]) + + flow_idx = netgraph_core.FlowIndex(src_id, dst_id, priority, flow_idx_counter) + flow_idx_counter += 1 + amount = flow_graph.place(flow_idx, src_id, dst_id, dag, remaining, placement) + + if amount > _MIN_FLOW: + flow_indices.append(flow_idx) + flow_costs.append((cost, amount)) + placed += amount + remaining -= amount + + if is_te and remaining > _MIN_FLOW: + for _ in range(100): + residual = np.ascontiguousarray( + flow_graph.residual_view(), dtype=np.float64 + ) + fresh_dists, fresh_dag = ctx.algorithms.spf( + ctx.handle, + src=src_id, + dst=None, + selection=selection, + residual=residual, + node_mask=node_mask, + edge_mask=edge_mask, + multipath=True, + dtype="float64", + ) + dag_cache[cache_key] = (fresh_dists, fresh_dag) + + if fresh_dists[dst_id] == float("inf"): + break + + fresh_cost = float(fresh_dists[dst_id]) + flow_idx = netgraph_core.FlowIndex( + src_id, dst_id, priority, flow_idx_counter + ) + flow_idx_counter += 1 + additional = flow_graph.place( + flow_idx, src_id, dst_id, fresh_dag, remaining, placement + ) + + if additional < _MIN_FLOW: + break + + flow_indices.append(flow_idx) + flow_costs.append((fresh_cost, additional)) + placed += additional + remaining -= additional + + if remaining < _MIN_FLOW: + break + + cost_dist: dict[float, float] = {} + if include_cost_distribution: + for c, amt in flow_costs: + cost_dist[c] = cost_dist.get(c, 0.0) + amt + + used_edges: set[str] = set() + if include_used_edges: + for fidx in flow_indices: + for edge_id, _ in flow_graph.get_flow_edges(fidx): + ref = ctx.edge_mapper.to_ref(edge_id, ctx.multidigraph) + if ref: + used_edges.add(f"{ref.link_id}:{ref.direction}") + + return placed, cost_dist, used_edges, flow_idx_counter + + +def _place_with_policy( + src_id: int, + dst_id: int, + volume: float, + priority: int, + preset: FlowPolicyPreset, + ctx: "AnalysisContext", + flow_graph: netgraph_core.FlowGraph, + node_mask: np.ndarray, + edge_mask: np.ndarray, + include_cost_distribution: bool, + include_used_edges: bool, +) -> tuple[float, dict[float, float], set[str]]: + """Place single demand using FlowPolicy (for non-cacheable presets).""" + policy = create_flow_policy( + ctx.algorithms, + ctx.handle, + preset, + node_mask=node_mask, + edge_mask=edge_mask, + ) + placed, _ = policy.place_demand(flow_graph, src_id, dst_id, priority, volume) + + cost_dist: dict[float, float] = {} + used_edges: set[str] = set() + + if include_cost_distribution or include_used_edges: + for flow_key, flow_data in policy.flows.items(): + if include_cost_distribution: + cost, flow_vol = float(flow_data[2]), float(flow_data[3]) + if flow_vol > 0: + cost_dist[cost] = cost_dist.get(cost, 0.0) + flow_vol + + if include_used_edges: + fidx = netgraph_core.FlowIndex( + flow_key[0], flow_key[1], flow_key[2], flow_key[3] + ) + for edge_id, _ in flow_graph.get_flow_edges(fidx): + ref = ctx.edge_mapper.to_ref(edge_id, ctx.multidigraph) + if ref: + used_edges.add(f"{ref.link_id}:{ref.direction}") + + return placed, cost_dist, used_edges diff --git a/ngraph/exec/analysis/__init__.py b/ngraph/exec/analysis/__init__.py deleted file mode 100644 index 403527b..0000000 --- a/ngraph/exec/analysis/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Analysis functions for network evaluation. - -Provides domain-specific analysis functions designed for use with FailureManager. -Functions follow the AnalysisFunction protocol: they accept a network with exclusion -sets and return structured results. All functions use only hashable parameters to -support multiprocessing and caching. -""" - -from .flow import ( - demand_placement_analysis, - max_flow_analysis, - sensitivity_analysis, -) - -__all__ = [ - "max_flow_analysis", - "demand_placement_analysis", - "sensitivity_analysis", -] diff --git a/ngraph/exec/demand/__init__.py b/ngraph/exec/demand/__init__.py deleted file mode 100644 index 5dfeaf5..0000000 --- a/ngraph/exec/demand/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Demand manager subpackage. - -This package contains helpers for building traffic matrices, expanding -specifications into concrete demands, and scheduling placement. -""" diff --git a/ngraph/exec/failure/__init__.py b/ngraph/exec/failure/__init__.py deleted file mode 100644 index d10cde0..0000000 --- a/ngraph/exec/failure/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Failure analysis manager package. - -Provides `FailureManager` and helpers to run Monte Carlo failure analyses over a -`Network`. Submodules cover pattern enumeration, result aggregation, and -execution helpers. See `manager.py` for the main implementation. -""" - -from __future__ import annotations diff --git a/ngraph/explorer.py b/ngraph/explorer.py index 2d57f4d..16c3dc6 100644 --- a/ngraph/explorer.py +++ b/ngraph/explorer.py @@ -3,7 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Dict, List, Optional, Set +from typing import TYPE_CHECKING, Dict, List, Optional, Set from ngraph.logging import get_logger from ngraph.model.components import ( @@ -14,6 +14,9 @@ ) from ngraph.model.network import Link, Network, Node +if TYPE_CHECKING: + from ngraph.model.components import Component + logger = get_logger(__name__) @@ -324,9 +327,15 @@ def _compute_statistics(self) -> None: - node.stats (all, ignoring disabled) - node.active_stats (only enabled nodes/links) """ + self._reset_all_stats() + self._compute_node_counts() + node_has_hw = self._compute_node_costs_and_utilization() + self._compute_link_stats(node_has_hw) + + def _reset_all_stats(self) -> None: + """Zero out stats on all tree nodes.""" - # First, zero them out - def reset_stats(n: TreeNode): + def reset_stats(n: TreeNode) -> None: n.stats = TreeStats() n.active_stats = TreeStats() for c in n.children.values(): @@ -335,8 +344,10 @@ def reset_stats(n: TreeNode): if self.root_node: reset_stats(self.root_node) - # 1) Node counts from subtree sets - def set_node_counts(n: TreeNode): + def _compute_node_counts(self) -> None: + """Set node counts from subtree sets.""" + + def set_node_counts(n: TreeNode) -> None: n.stats.node_count = len(n.subtree_nodes) n.active_stats.node_count = len(n.active_subtree_nodes) for c in n.children.values(): @@ -345,12 +356,16 @@ def set_node_counts(n: TreeNode): if self.root_node: set_node_counts(self.root_node) - # 2) Accumulate node capex/power and validate hardware capacity vs attached links - # Also validate that sum of endpoint optics usage does not exceed node port count - # Track which nodes actually have chassis/hardware assigned; optics at a link - # endpoint should contribute cost/power only when the endpoint node has - # hardware. Without node hardware, optics cannot be installed and should be - # ignored in aggregation and capacity validation. + def _compute_node_costs_and_utilization(self) -> Dict[str, bool]: + """Accumulate node capex/power and validate hardware capacity. + + Also validates that sum of endpoint optics usage does not exceed node port count. + Tracks which nodes have chassis/hardware assigned; optics at a link endpoint + contribute cost/power only when the endpoint node has hardware. + + Returns: + Mapping from node name to whether it has hardware assigned. + """ node_has_hw: Dict[str, bool] = {} for nd in self.network.nodes.values(): comp, hw_count = resolve_node_hardware(nd.attrs, self.components_library) @@ -398,105 +413,113 @@ def set_node_counts(n: TreeNode): and node_comp_capacity > 0.0 and not _node_is_disabled(nd) ): - # Sum capacities of all enabled links attached to this node - attached_capacity = 0.0 - # Track optics usage in "equivalent optics" and ports tally - used_optics_equiv = 0.0 - used_ports = 0.0 - for lk in self.network.links.values(): - if _link_is_disabled(lk): - continue - if lk.source == nd.name or lk.target == nd.name: - # If the opposite endpoint is disabled, skip in active view - other = lk.target if lk.source == nd.name else lk.source - other_node = self.network.nodes.get(other, Node(name=other)) - if _node_is_disabled(other_node): - continue - attached_capacity += float(lk.capacity) - - # Compute optics usage for this endpoint if per-end hardware is set - (src_end, dst_end, per_end) = resolve_link_end_components( - lk.attrs, self.components_library - ) - if per_end: - end = src_end if lk.source == nd.name else dst_end - end_comp, end_cnt, end_excl = end - if end_comp is not None: - # Count optics-equivalents by component count - used_optics_equiv += end_cnt - # Ports used equals count * ports per optic (fractional allowed) - ports_per_optic = float( - getattr(end_comp, "ports", 0) or 0 - ) - if ports_per_optic > 0: - used_ports += end_cnt * ports_per_optic - - # Compute ports availability and violations - total_ports_available = float(getattr(comp, "ports", 0) or 0) * float( - hw_count + self._validate_node_utilization(nd, comp, hw_count, node_comp_capacity) + + return node_has_hw + + def _validate_node_utilization( + self, + nd: Node, + comp: "Component", + hw_count: float, + node_comp_capacity: float, + ) -> None: + """Validate and record node hardware utilization. + + Checks attached link capacity and port usage against node hardware limits. + Records utilization snapshot and raises if strict_validation is enabled. + """ + # Sum capacities of all enabled links attached to this node + attached_capacity = 0.0 + # Track optics usage in "equivalent optics" and ports tally + used_ports = 0.0 + for lk in self.network.links.values(): + if _link_is_disabled(lk): + continue + if lk.source == nd.name or lk.target == nd.name: + # If the opposite endpoint is disabled, skip in active view + other = lk.target if lk.source == nd.name else lk.source + other_node = self.network.nodes.get(other, Node(name=other)) + if _node_is_disabled(other_node): + continue + attached_capacity += float(lk.capacity) + + # Compute optics usage for this endpoint if per-end hardware is set + (src_end, dst_end, per_end) = resolve_link_end_components( + lk.attrs, self.components_library + ) + if per_end: + end = src_end if lk.source == nd.name else dst_end + end_comp, end_cnt, _end_excl = end + if end_comp is not None: + # Ports used equals count * ports per optic (fractional allowed) + ports_per_optic = float(getattr(end_comp, "ports", 0) or 0) + if ports_per_optic > 0: + used_ports += end_cnt * ports_per_optic + + # Compute ports availability and violations + total_ports_available = float(getattr(comp, "ports", 0) or 0) * float(hw_count) + capacity_violation = attached_capacity > node_comp_capacity + ports_violation = False + if getattr(comp, "ports", 0) and comp.ports > 0: + ports_violation = used_ports > total_ports_available + 1e-9 + + # Record per-node utilization snapshot for active topology + capacity_utilization = ( + (attached_capacity / node_comp_capacity) + if node_comp_capacity > 0.0 + else 0.0 + ) + ports_utilization = ( + (used_ports / total_ports_available) if total_ports_available > 0.0 else 0.0 + ) + self._node_utilization[nd.name] = NodeUtilization( + node_name=nd.name, + component_name=comp.name, + hw_count=float(hw_count), + capacity_supported=float(node_comp_capacity), + attached_capacity_active=float(attached_capacity), + capacity_utilization=float(capacity_utilization), + ports_available=float(total_ports_available), + ports_used=float(used_ports), + ports_utilization=float(ports_utilization), + capacity_violation=bool(capacity_violation), + ports_violation=bool(ports_violation), + disabled=_node_is_disabled(nd), + ) + + # Enforce strict behavior after recording + if capacity_violation and self.strict_validation: + raise ValueError( + ( + "Node '%s' total attached capacity %.6g exceeds hardware " + "capacity %.6g from component '%s' (hw_count=%.6g)." ) - capacity_violation = attached_capacity > node_comp_capacity - ports_violation = False - if getattr(comp, "ports", 0) and comp.ports > 0: - ports_violation = used_ports > total_ports_available + 1e-9 - - # Record per-node utilization snapshot for active topology - capacity_utilization = ( - (attached_capacity / node_comp_capacity) - if node_comp_capacity > 0.0 - else 0.0 + % ( + nd.name, + attached_capacity, + node_comp_capacity, + comp.name, + hw_count, ) - ports_utilization = ( - (used_ports / total_ports_available) - if total_ports_available > 0.0 - else 0.0 + ) + if ports_violation and self.strict_validation: + raise ValueError( + ( + "Node '%s' requires %.6g ports for link optics but only %.6g ports " + "are available on '%s' (count=%.6g)." ) - self._node_utilization[nd.name] = NodeUtilization( - node_name=nd.name, - component_name=comp.name, - hw_count=float(hw_count), - capacity_supported=float(node_comp_capacity), - attached_capacity_active=float(attached_capacity), - capacity_utilization=float(capacity_utilization), - ports_available=float(total_ports_available), - ports_used=float(used_ports), - ports_utilization=float(ports_utilization), - capacity_violation=bool(capacity_violation), - ports_violation=bool(ports_violation), - disabled=_node_is_disabled(nd), + % ( + nd.name, + used_ports, + total_ports_available, + comp.name, + hw_count, ) + ) - # Enforce strict behavior after recording - if capacity_violation and self.strict_validation: - raise ValueError( - ( - "Node '%s' total attached capacity %.6g exceeds hardware " - "capacity %.6g from component '%s' (hw_count=%.6g)." - ) - % ( - nd.name, - attached_capacity, - node_comp_capacity, - comp.name, - hw_count, - ) - ) - if ports_violation and self.strict_validation: - raise ValueError( - ( - "Node '%s' requires %.6g ports for link optics but only %.6g ports " - "are available on '%s' (count=%.6g)." - ) - % ( - nd.name, - used_ports, - total_ports_available, - comp.name, - hw_count, - ) - ) - - # 3) Accumulate link stats (internal/external + capex/power) and validate + def _compute_link_stats(self, node_has_hw: Dict[str, bool]) -> None: + """Accumulate link stats (internal/external + capex/power) and validate.""" for link in self.network.links.values(): src = link.source dst = link.target @@ -514,6 +537,10 @@ def set_node_counts(n: TreeNode): src_power = 0.0 dst_cost = 0.0 dst_power = 0.0 + src_comp = None + dst_comp = None + src_cnt_bom = 0.0 + dst_cnt_bom = 0.0 if per_end: src_comp, src_cnt, src_exclusive = src_end @@ -584,13 +611,6 @@ def set_node_counts(n: TreeNode): inter_anc = A_src & A_dst # sees link as "internal" xor_anc = A_src ^ A_dst # sees link as "external" - # Initialize defaults for BOM variables if per_end is False - # Establish defaults to satisfy type checker; will be overwritten when per_end - src_comp = None - dst_comp = None - src_cnt_bom = 0.0 - dst_cnt_bom = 0.0 - # ----- "ALL" stats ----- for an in inter_anc: an.stats.internal_link_count += 1 diff --git a/ngraph/lib/nx.py b/ngraph/lib/nx.py index 1a1ae38..fa03773 100644 --- a/ngraph/lib/nx.py +++ b/ngraph/lib/nx.py @@ -80,8 +80,9 @@ def __len__(self) -> int: return len(self.to_index) -# Type alias for edge references: (source_node, target_node, edge_key) -EdgeRef = Tuple[Hashable, Hashable, Any] +# Type alias for NetworkX edge references: (source_node, target_node, edge_key) +# Named NxEdgeTuple to avoid confusion with ngraph.types.dto.EdgeRef dataclass +NxEdgeTuple = Tuple[Hashable, Hashable, Any] @dataclass @@ -106,8 +107,8 @@ class EdgeMap: ... G.edges[u, v, key]["flow"] = flow """ - to_ref: Dict[int, EdgeRef] = field(default_factory=dict) - from_ref: Dict[EdgeRef, List[int]] = field(default_factory=dict) + to_ref: Dict[int, NxEdgeTuple] = field(default_factory=dict) + from_ref: Dict[NxEdgeTuple, List[int]] = field(default_factory=dict) def __len__(self) -> int: """Return the number of edge mappings.""" @@ -183,8 +184,8 @@ def from_networkx( cost_list: List[int] = [] ext_id_list: List[int] = [] - edge_to_ref: Dict[int, EdgeRef] = {} - ref_to_edges: Dict[EdgeRef, List[int]] = {} + edge_to_ref: Dict[int, NxEdgeTuple] = {} + ref_to_edges: Dict[NxEdgeTuple, List[int]] = {} edge_id = 0 is_multigraph = isinstance(G, (nx.MultiDiGraph, nx.MultiGraph)) @@ -200,7 +201,7 @@ def from_networkx( dst_idx = node_map.to_index[v] cap = float(data.get(capacity_attr, default_capacity)) cst = int(data.get(cost_attr, default_cost)) - edge_ref: EdgeRef = (u, v, key) + edge_ref: NxEdgeTuple = (u, v, key) # Forward edge src_list.append(src_idx) diff --git a/ngraph/model/__init__.py b/ngraph/model/__init__.py index 6576271..e63d522 100644 --- a/ngraph/model/__init__.py +++ b/ngraph/model/__init__.py @@ -5,6 +5,21 @@ for analysis are handled via node_mask and edge_mask parameters in Core algorithms. """ +from ngraph.model.demand import TrafficDemand, TrafficMatrixSet +from ngraph.model.flow import FlowPolicyPreset +from ngraph.model.network import Link, Network, Node, RiskGroup +from ngraph.model.path import Path + __all__ = [ - # Re-exported names are defined in submodules; kept minimal to avoid import side effects + # Network topology + "Network", + "Node", + "Link", + "RiskGroup", + "Path", + # Traffic demands + "TrafficDemand", + "TrafficMatrixSet", + # Flow configuration + "FlowPolicyPreset", ] diff --git a/ngraph/model/demand/__init__.py b/ngraph/model/demand/__init__.py new file mode 100644 index 0000000..97a58fc --- /dev/null +++ b/ngraph/model/demand/__init__.py @@ -0,0 +1,20 @@ +"""Traffic demand specification and matrix containers. + +This package provides data structures for defining traffic demands +and organizing them into named traffic matrix sets. + +Public API: + TrafficDemand: Individual demand specification with source/sink selectors + TrafficMatrixSet: Named collection of TrafficDemand lists + build_traffic_matrix_set: Construct TrafficMatrixSet from parsed YAML +""" + +from ngraph.model.demand.builder import build_traffic_matrix_set +from ngraph.model.demand.matrix import TrafficMatrixSet +from ngraph.model.demand.spec import TrafficDemand + +__all__ = [ + "TrafficDemand", + "TrafficMatrixSet", + "build_traffic_matrix_set", +] diff --git a/ngraph/exec/demand/builder.py b/ngraph/model/demand/builder.py similarity index 100% rename from ngraph/exec/demand/builder.py rename to ngraph/model/demand/builder.py diff --git a/ngraph/model/failure/__init__.py b/ngraph/model/failure/__init__.py index f0d1a1b..4124b1e 100644 --- a/ngraph/model/failure/__init__.py +++ b/ngraph/model/failure/__init__.py @@ -1,30 +1,40 @@ """Failure modeling package. -Provides primitives to define failure selection rules and to run Monte Carlo +Provides primitives to define failure selection rules for Monte Carlo failure analyses. The `policy` module defines data classes for expressing -selection logic over nodes, links, and risk groups. The `manager` subpackage -contains the engine that applies those policies to a `Network` and runs -iterative analyses. +selection logic over nodes, links, and risk groups. Public entry points: -- `ngraph.failure.policy` - failure selection rules and policy application -- `ngraph.failure.manager` - `FailureManager` for running analyses -- `ngraph.failure.validation` - risk group reference validation -- `ngraph.failure.membership` - risk group membership rule resolution -- `ngraph.failure.generate` - dynamic risk group generation +- `ngraph.model.failure.policy` - failure selection rules and policy application +- `ngraph.model.failure.policy_set` - named collection of failure policies +- `ngraph.model.failure.validation` - risk group reference validation +- `ngraph.model.failure.membership` - risk group membership rule resolution +- `ngraph.model.failure.generate` - dynamic risk group generation +- `ngraph.analysis.failure_manager` - `FailureManager` for running Monte Carlo analyses """ from .generate import GenerateSpec, generate_risk_groups, parse_generate_spec from .membership import MembershipSpec, resolve_membership_rules +from .policy import FailureCondition, FailureMode, FailurePolicy, FailureRule +from .policy_set import FailurePolicySet from .validation import validate_risk_group_hierarchy, validate_risk_group_references __all__ = [ + # Policy classes + "FailurePolicy", + "FailureRule", + "FailureMode", + "FailureCondition", + "FailurePolicySet", + # Generation "GenerateSpec", "generate_risk_groups", "parse_generate_spec", + # Membership "MembershipSpec", "resolve_membership_rules", + # Validation "validate_risk_group_hierarchy", "validate_risk_group_references", ] diff --git a/ngraph/model/flow/__init__.py b/ngraph/model/flow/__init__.py new file mode 100644 index 0000000..af1bf31 --- /dev/null +++ b/ngraph/model/flow/__init__.py @@ -0,0 +1,22 @@ +"""Flow policy configuration for NetGraph. + +This package provides preset configurations for traffic routing policies +used in demand placement and flow analysis. + +Public API: + FlowPolicyPreset: Enum of common flow policy configurations + create_flow_policy: Factory function to create FlowPolicy instances + serialize_policy_preset: Serialize preset to string for JSON storage +""" + +from ngraph.model.flow.policy_config import ( + FlowPolicyPreset, + create_flow_policy, + serialize_policy_preset, +) + +__all__ = [ + "FlowPolicyPreset", + "create_flow_policy", + "serialize_policy_preset", +] diff --git a/ngraph/model/path.py b/ngraph/model/path.py index 203b3a9..1ad9bef 100644 --- a/ngraph/model/path.py +++ b/ngraph/model/path.py @@ -166,14 +166,15 @@ def get_sub_path( of `dst_node` and ensuring that the final element has an empty tuple of edges. Note: With EdgeRef-based paths, cost recalculation requires graph lookup. - The graph parameter is reserved for future implementation. Currently, cost - is set to infinity to explicitly indicate it needs recalculation. Check for - `math.isinf(sub_path.cost)` if you need the actual cost. + The graph and cost_attr parameters are accepted for interface compatibility + but not currently used. Cost is set to infinity to explicitly indicate + recalculation is needed. Check for `math.isinf(sub_path.cost)` if you need + the actual cost. Args: dst_node: The node at which to truncate the path. - graph: Reserved for future cost recalculation (currently unused). - cost_attr: Reserved for future cost recalculation (currently unused). + graph: Graph for cost recalculation (currently unused). + cost_attr: Edge attribute for cost lookup (currently unused). Returns: A new Path instance representing the sub-path from the original source @@ -182,7 +183,7 @@ def get_sub_path( Raises: ValueError: If `dst_node` is not found in the current path. """ - # Suppress unused parameter warnings - reserved for future cost recalculation + # Suppress unused parameter warnings - accepted for interface compatibility _ = graph, cost_attr new_elements = [] diff --git a/ngraph/results/__init__.py b/ngraph/results/__init__.py index c81c8ec..1108de9 100644 --- a/ngraph/results/__init__.py +++ b/ngraph/results/__init__.py @@ -6,6 +6,18 @@ from __future__ import annotations +from .artifacts import CapacityEnvelope +from .flow import FlowEntry, FlowIterationResult, FlowSummary from .store import Results, WorkflowStepMetadata -__all__ = ["Results", "WorkflowStepMetadata"] +__all__ = [ + # Store + "Results", + "WorkflowStepMetadata", + # Flow results + "FlowEntry", + "FlowIterationResult", + "FlowSummary", + # Artifacts + "CapacityEnvelope", +] diff --git a/ngraph/scenario.py b/ngraph/scenario.py index cc7253e..4fc4250 100644 --- a/ngraph/scenario.py +++ b/ngraph/scenario.py @@ -7,9 +7,9 @@ from ngraph.dsl.blueprints.expand import expand_network_dsl from ngraph.dsl.loader import load_scenario_yaml -from ngraph.exec.demand.builder import build_traffic_matrix_set from ngraph.logging import get_logger from ngraph.model.components import ComponentsLibrary +from ngraph.model.demand.builder import build_traffic_matrix_set from ngraph.model.demand.matrix import TrafficMatrixSet from ngraph.model.failure.generate import generate_risk_groups, parse_generate_spec from ngraph.model.failure.membership import resolve_membership_rules @@ -73,6 +73,10 @@ def run(self) -> None: Each step may modify scenario data or store outputs in scenario.results. """ + # Reset execution counter to ensure step ordering is local to this run + import ngraph.workflow.base as workflow_base + + workflow_base._execution_counter = 0 for step in self.workflow: step.execute(self) diff --git a/ngraph/types/__init__.py b/ngraph/types/__init__.py index 1204741..34f9684 100644 --- a/ngraph/types/__init__.py +++ b/ngraph/types/__init__.py @@ -4,3 +4,21 @@ to describe nodes, edges, demands, and workflow interfaces. It centralizes typing to improve readability and static analysis and contains no runtime logic. """ + +from ngraph.types.base import MIN_CAP, MIN_FLOW, Cost, EdgeSelect, FlowPlacement, Mode +from ngraph.types.dto import EdgeDir, EdgeRef, MaxFlowResult + +__all__ = [ + # Enums + "Mode", + "FlowPlacement", + "EdgeSelect", + # Type aliases and constants + "Cost", + "MIN_CAP", + "MIN_FLOW", + "EdgeDir", + # DTOs + "EdgeRef", + "MaxFlowResult", +] diff --git a/ngraph/workflow/max_flow_step.py b/ngraph/workflow/max_flow_step.py index 92514d3..7f3f5d3 100644 --- a/ngraph/workflow/max_flow_step.py +++ b/ngraph/workflow/max_flow_step.py @@ -32,7 +32,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, Union -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager from ngraph.logging import get_logger from ngraph.results.flow import FlowIterationResult from ngraph.types.base import FlowPlacement diff --git a/ngraph/workflow/maximum_supported_demand_step.py b/ngraph/workflow/maximum_supported_demand_step.py index 2e7c6a0..291f7c5 100644 --- a/ngraph/workflow/maximum_supported_demand_step.py +++ b/ngraph/workflow/maximum_supported_demand_step.py @@ -21,10 +21,11 @@ import netgraph_core import numpy as np -from ngraph.exec.demand.expand import ExpandedDemand, expand_demands +from ngraph.analysis.demand import ExpandedDemand, expand_demands +from ngraph.analysis.placement import place_demands from ngraph.logging import get_logger from ngraph.model.demand.spec import TrafficDemand -from ngraph.model.flow.policy_config import FlowPolicyPreset, create_flow_policy +from ngraph.model.flow.policy_config import FlowPolicyPreset from ngraph.workflow.base import WorkflowStep, register_workflow_step if TYPE_CHECKING: @@ -42,12 +43,14 @@ class _MSDCache: node_mask: Pre-built node mask (no exclusions during MSD). edge_mask: Pre-built edge mask (no exclusions during MSD). base_expanded: Expanded demands with base volumes. + resolved_ids: Pre-resolved (src_id, dst_id) pairs. """ ctx: "AnalysisContext" node_mask: np.ndarray edge_mask: np.ndarray base_expanded: list[ExpandedDemand] + resolved_ids: list[tuple[int, int]] @dataclass @@ -271,11 +274,18 @@ def _build_cache(scenario: Any, matrix_name: str) -> _MSDCache: node_mask = ctx._build_node_mask(excluded_nodes=None) edge_mask = ctx._build_edge_mask(excluded_links=None) + # Pre-resolve node IDs once + resolved_ids = [ + (ctx.node_mapper.to_id(d.src_name), ctx.node_mapper.to_id(d.dst_name)) + for d in expansion.demands + ] + return _MSDCache( ctx=ctx, node_mask=node_mask, edge_mask=edge_mask, base_expanded=expansion.demands, + resolved_ids=resolved_ids, ) @staticmethod @@ -289,51 +299,34 @@ def _evaluate_alpha( Uses pre-built cache; only scales demand volumes by alpha. """ ctx = cache.ctx - node_mask = cache.node_mask - edge_mask = cache.edge_mask - decisions: list[bool] = [] min_ratios: list[float] = [] + # Pre-compute scaled volumes once per alpha + volumes = [d.volume * alpha for d in cache.base_expanded] + for _ in range(max(1, int(seeds))): flow_graph = netgraph_core.FlowGraph(ctx.multidigraph) - total_demand = 0.0 - total_placed = 0.0 - - for base_demand in cache.base_expanded: - scaled_volume = base_demand.volume * alpha - src_id = ctx.node_mapper.to_id(base_demand.src_name) - dst_id = ctx.node_mapper.to_id(base_demand.dst_name) - - policy = create_flow_policy( - ctx.algorithms, - ctx.handle, - base_demand.policy_preset, - node_mask=node_mask, - edge_mask=edge_mask, - ) - - placed, _ = policy.place_demand( - flow_graph, - src_id, - dst_id, - base_demand.priority, - scaled_volume, - ) - total_demand += scaled_volume - total_placed += placed + result = place_demands( + cache.base_expanded, + volumes, + flow_graph, + ctx, + cache.node_mask, + cache.edge_mask, + resolved_ids=cache.resolved_ids, + collect_entries=False, + ) - if total_demand == 0.0: + if result.summary.total_demand == 0.0: raise ValueError( f"Cannot evaluate feasibility for alpha={alpha:.6g}: " "total demand is zero." ) - ratio = total_placed / total_demand - is_feasible = ratio >= 1.0 - 1e-12 - decisions.append(is_feasible) - min_ratios.append(ratio) + decisions.append(result.summary.is_feasible) + min_ratios.append(result.summary.ratio) yes = sum(1 for d in decisions if d) required = (len(decisions) // 2) + 1 diff --git a/ngraph/workflow/traffic_matrix_placement_step.py b/ngraph/workflow/traffic_matrix_placement_step.py index 9afa171..1109159 100644 --- a/ngraph/workflow/traffic_matrix_placement_step.py +++ b/ngraph/workflow/traffic_matrix_placement_step.py @@ -13,7 +13,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager from ngraph.logging import get_logger from ngraph.results.flow import FlowIterationResult from ngraph.workflow.base import ( diff --git a/pyproject.toml b/pyproject.toml index a40e4ed..2df487f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" # --------------------------------------------------------------------- [project] name = "ngraph" -version = "0.15.0" +version = "0.16.0" description = "A tool and a library for network modeling and analysis." readme = "README.md" authors = [{ name = "Andrey Golovanov" }] diff --git a/tests/analysis/__init__.py b/tests/analysis/__init__.py new file mode 100644 index 0000000..ce23e31 --- /dev/null +++ b/tests/analysis/__init__.py @@ -0,0 +1 @@ +"""Tests for ngraph.analysis module.""" diff --git a/tests/adapters/test_adapters.py b/tests/analysis/test_context.py similarity index 100% rename from tests/adapters/test_adapters.py rename to tests/analysis/test_context.py diff --git a/tests/exec/demand/test_expand.py b/tests/analysis/test_demand.py similarity index 99% rename from tests/exec/demand/test_expand.py rename to tests/analysis/test_demand.py index a14ef76..1c6a653 100644 --- a/tests/exec/demand/test_expand.py +++ b/tests/analysis/test_demand.py @@ -2,7 +2,7 @@ import pytest -from ngraph.exec.demand.expand import expand_demands +from ngraph.analysis.demand import expand_demands from ngraph.model.demand.spec import TrafficDemand from ngraph.model.network import Link, Network, Node diff --git a/tests/exec/failure/test_manager.py b/tests/analysis/test_failure_manager.py similarity index 98% rename from tests/exec/failure/test_manager.py rename to tests/analysis/test_failure_manager.py index 2503092..54542d2 100644 --- a/tests/exec/failure/test_manager.py +++ b/tests/analysis/test_failure_manager.py @@ -9,7 +9,7 @@ import pytest -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager from ngraph.model.failure.policy import ( FailureCondition, FailureMode, @@ -273,7 +273,7 @@ def mock_analysis_func(*args: Any, **kwargs: Any) -> dict[str, Any]: class TestFailureManagerConvenienceMethods: """Test convenience methods for specific analysis types.""" - @patch("ngraph.exec.failure.manager.FailureManager.run_monte_carlo_analysis") + @patch("ngraph.analysis.failure_manager.FailureManager.run_monte_carlo_analysis") def test_run_max_flow_monte_carlo_delegates( self, mock_mc_analysis: MagicMock, failure_manager: FailureManager ) -> None: @@ -294,7 +294,7 @@ def test_run_max_flow_monte_carlo_delegates( assert mock_mc_analysis.called assert result == mock_mc_analysis.return_value - @patch("ngraph.exec.failure.manager.FailureManager.run_monte_carlo_analysis") + @patch("ngraph.analysis.failure_manager.FailureManager.run_monte_carlo_analysis") def test_run_demand_placement_monte_carlo_delegates( self, mock_mc_analysis: MagicMock, failure_manager: FailureManager ) -> None: @@ -370,7 +370,7 @@ def test_case_insensitive_flow_placement_conversion( class TestFailureManagerErrorHandling: """Test error handling and edge cases.""" - @patch("ngraph.exec.failure.manager.ThreadPoolExecutor") + @patch("ngraph.analysis.failure_manager.ThreadPoolExecutor") def test_parallel_execution_error_propagation( self, mock_pool_executor: MagicMock, failure_manager: FailureManager ) -> None: diff --git a/tests/exec/failure/test_manager_integration.py b/tests/analysis/test_failure_manager_integration.py similarity index 99% rename from tests/exec/failure/test_manager_integration.py rename to tests/analysis/test_failure_manager_integration.py index 5119004..895fc9c 100644 --- a/tests/exec/failure/test_manager_integration.py +++ b/tests/analysis/test_failure_manager_integration.py @@ -2,8 +2,8 @@ import pytest -from ngraph.exec.analysis.flow import max_flow_analysis -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager +from ngraph.analysis.functions import max_flow_analysis from ngraph.model.failure.policy import FailurePolicy, FailureRule from ngraph.model.failure.policy_set import FailurePolicySet from ngraph.model.network import Network diff --git a/tests/solver/test_flow_placement_semantics.py b/tests/analysis/test_flow_placement_semantics.py similarity index 100% rename from tests/solver/test_flow_placement_semantics.py rename to tests/analysis/test_flow_placement_semantics.py diff --git a/tests/exec/analysis/test_functions.py b/tests/analysis/test_functions.py similarity index 96% rename from tests/exec/analysis/test_functions.py rename to tests/analysis/test_functions.py index d384510..90721d7 100644 --- a/tests/exec/analysis/test_functions.py +++ b/tests/analysis/test_functions.py @@ -1,8 +1,8 @@ -"""Tests for analysis.flow module.""" +"""Tests for analysis.functions module.""" import pytest -from ngraph.exec.analysis.flow import ( +from ngraph.analysis.functions import ( demand_placement_analysis, max_flow_analysis, sensitivity_analysis, @@ -217,7 +217,7 @@ def diamond_network(self) -> Network: def test_context_caching_pairwise_mode(self, diamond_network: Network) -> None: """Context caching works with pairwise mode.""" - from ngraph.exec.analysis.flow import build_demand_context + from ngraph.analysis.functions import build_demand_context demands_config = [ { @@ -246,7 +246,7 @@ def test_context_caching_pairwise_mode(self, diamond_network: Network) -> None: def test_context_caching_combine_mode(self, diamond_network: Network) -> None: """Context caching works with combine mode (uses pseudo nodes).""" - from ngraph.exec.analysis.flow import build_demand_context + from ngraph.analysis.functions import build_demand_context demands_config = [ { @@ -277,7 +277,7 @@ def test_context_caching_combine_multiple_iterations( self, diamond_network: Network ) -> None: """Context can be reused for multiple analysis iterations.""" - from ngraph.exec.analysis.flow import build_demand_context + from ngraph.analysis.functions import build_demand_context demands_config = [ { @@ -304,7 +304,7 @@ def test_context_caching_combine_multiple_iterations( def test_context_caching_without_id_raises(self, diamond_network: Network) -> None: """Context caching without stable ID raises KeyError for combine mode.""" - from ngraph.exec.analysis.flow import build_demand_context + from ngraph.analysis.functions import build_demand_context # Config without explicit ID - each reconstruction generates new ID demands_config = [ @@ -319,8 +319,8 @@ def test_context_caching_without_id_raises(self, diamond_network: Network) -> No # Build context - creates pseudo nodes with auto-generated ID (uuid1) ctx = build_demand_context(diamond_network, demands_config) - # Analysis reconstructs TrafficDemand without ID → generates new ID (uuid2) - # Tries to find pseudo nodes _src_...|uuid2 which don't exist → KeyError + # Analysis reconstructs TrafficDemand without ID -> generates new ID (uuid2) + # Tries to find pseudo nodes _src_...|uuid2 which don't exist -> KeyError with pytest.raises(KeyError): demand_placement_analysis( network=diamond_network, diff --git a/tests/exec/analysis/test_functions_details.py b/tests/analysis/test_functions_details.py similarity index 97% rename from tests/exec/analysis/test_functions_details.py rename to tests/analysis/test_functions_details.py index 68f3134..101ed03 100644 --- a/tests/exec/analysis/test_functions_details.py +++ b/tests/analysis/test_functions_details.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ngraph.exec.analysis.flow import demand_placement_analysis +from ngraph.analysis.functions import demand_placement_analysis from ngraph.model.network import Link, Network, Node diff --git a/tests/solver/test_maxflow_api.py b/tests/analysis/test_maxflow_api.py similarity index 100% rename from tests/solver/test_maxflow_api.py rename to tests/analysis/test_maxflow_api.py diff --git a/tests/solver/test_maxflow_cache.py b/tests/analysis/test_maxflow_cache.py similarity index 100% rename from tests/solver/test_maxflow_cache.py rename to tests/analysis/test_maxflow_cache.py diff --git a/tests/solver/test_maxflow_cost_distribution.py b/tests/analysis/test_maxflow_cost_distribution.py similarity index 100% rename from tests/solver/test_maxflow_cost_distribution.py rename to tests/analysis/test_maxflow_cost_distribution.py diff --git a/tests/solver/test_paths.py b/tests/analysis/test_paths.py similarity index 100% rename from tests/solver/test_paths.py rename to tests/analysis/test_paths.py diff --git a/tests/exec/analysis/test_spf_caching.py b/tests/analysis/test_placement.py similarity index 93% rename from tests/exec/analysis/test_spf_caching.py rename to tests/analysis/test_placement.py index c6d356d..cdcb63a 100644 --- a/tests/exec/analysis/test_spf_caching.py +++ b/tests/analysis/test_placement.py @@ -1,4 +1,4 @@ -"""Tests for SPF caching in demand_placement_analysis. +"""Tests for SPF caching in demand placement. This module tests the SPF caching optimization that reduces redundant shortest path computations when placing demands from the same source nodes. @@ -10,13 +10,11 @@ import pytest -from ngraph.exec.analysis.flow import ( - _CACHEABLE_PRESETS, - _CACHEABLE_SIMPLE, - _CACHEABLE_TE, - _get_placement_for_preset, - _get_selection_for_preset, - demand_placement_analysis, +from ngraph.analysis.functions import demand_placement_analysis +from ngraph.analysis.placement import ( + CACHEABLE_PRESETS, + _get_edge_selection, + _get_flow_placement, ) from ngraph.model.flow.policy_config import FlowPolicyPreset from ngraph.model.network import Link, Network, Node @@ -28,73 +26,57 @@ class TestHelperFunctions: def test_get_selection_for_ecmp(self) -> None: """Test EdgeSelection for ECMP preset.""" - selection = _get_selection_for_preset(FlowPolicyPreset.SHORTEST_PATHS_ECMP) + selection = _get_edge_selection(FlowPolicyPreset.SHORTEST_PATHS_ECMP) assert selection.multi_edge is True assert selection.require_capacity is False def test_get_selection_for_wcmp(self) -> None: """Test EdgeSelection for WCMP preset.""" - selection = _get_selection_for_preset(FlowPolicyPreset.SHORTEST_PATHS_WCMP) + selection = _get_edge_selection(FlowPolicyPreset.SHORTEST_PATHS_WCMP) assert selection.multi_edge is True assert selection.require_capacity is False def test_get_selection_for_te_wcmp_unlim(self) -> None: """Test EdgeSelection for TE_WCMP_UNLIM preset.""" - selection = _get_selection_for_preset(FlowPolicyPreset.TE_WCMP_UNLIM) + selection = _get_edge_selection(FlowPolicyPreset.TE_WCMP_UNLIM) assert selection.multi_edge is True assert selection.require_capacity is True - def test_get_selection_for_invalid_preset(self) -> None: - """Test that invalid preset raises ValueError.""" - with pytest.raises(ValueError, match="not cacheable"): - _get_selection_for_preset(FlowPolicyPreset.TE_ECMP_16_LSP) - def test_get_placement_for_ecmp(self) -> None: """Test FlowPlacement for ECMP preset.""" import netgraph_core - placement = _get_placement_for_preset(FlowPolicyPreset.SHORTEST_PATHS_ECMP) + placement = _get_flow_placement(FlowPolicyPreset.SHORTEST_PATHS_ECMP) assert placement == netgraph_core.FlowPlacement.EQUAL_BALANCED def test_get_placement_for_wcmp(self) -> None: """Test FlowPlacement for WCMP preset.""" import netgraph_core - placement = _get_placement_for_preset(FlowPolicyPreset.SHORTEST_PATHS_WCMP) + placement = _get_flow_placement(FlowPolicyPreset.SHORTEST_PATHS_WCMP) assert placement == netgraph_core.FlowPlacement.PROPORTIONAL def test_get_placement_for_te_wcmp_unlim(self) -> None: """Test FlowPlacement for TE_WCMP_UNLIM preset.""" import netgraph_core - placement = _get_placement_for_preset(FlowPolicyPreset.TE_WCMP_UNLIM) + placement = _get_flow_placement(FlowPolicyPreset.TE_WCMP_UNLIM) assert placement == netgraph_core.FlowPlacement.PROPORTIONAL class TestCacheablePresets: """Test that cacheable preset sets are correctly defined.""" - def test_cacheable_simple_presets(self) -> None: - """Test that simple cacheable presets are defined correctly.""" - assert FlowPolicyPreset.SHORTEST_PATHS_ECMP in _CACHEABLE_SIMPLE - assert FlowPolicyPreset.SHORTEST_PATHS_WCMP in _CACHEABLE_SIMPLE - # TE policies should not be in simple set - assert FlowPolicyPreset.TE_WCMP_UNLIM not in _CACHEABLE_SIMPLE - - def test_cacheable_te_presets(self) -> None: - """Test that TE cacheable presets are defined correctly.""" - assert FlowPolicyPreset.TE_WCMP_UNLIM in _CACHEABLE_TE - # Simple policies should not be in TE set - assert FlowPolicyPreset.SHORTEST_PATHS_ECMP not in _CACHEABLE_TE - - def test_cacheable_presets_is_union(self) -> None: - """Test that _CACHEABLE_PRESETS is the union of simple and TE.""" - assert _CACHEABLE_PRESETS == _CACHEABLE_SIMPLE | _CACHEABLE_TE + def test_cacheable_presets_contains_expected(self) -> None: + """Test that cacheable presets contain expected policies.""" + assert FlowPolicyPreset.SHORTEST_PATHS_ECMP in CACHEABLE_PRESETS + assert FlowPolicyPreset.SHORTEST_PATHS_WCMP in CACHEABLE_PRESETS + assert FlowPolicyPreset.TE_WCMP_UNLIM in CACHEABLE_PRESETS def test_lsp_policies_not_cacheable(self) -> None: """Test that LSP policies are not in cacheable set.""" - assert FlowPolicyPreset.TE_ECMP_16_LSP not in _CACHEABLE_PRESETS - assert FlowPolicyPreset.TE_ECMP_UP_TO_256_LSP not in _CACHEABLE_PRESETS + assert FlowPolicyPreset.TE_ECMP_16_LSP not in CACHEABLE_PRESETS + assert FlowPolicyPreset.TE_ECMP_UP_TO_256_LSP not in CACHEABLE_PRESETS class TestSPFCachingBasic: @@ -277,7 +259,7 @@ def _run_demand_placement_without_cache( import netgraph_core from ngraph.analysis import AnalysisContext - from ngraph.exec.demand.expand import expand_demands + from ngraph.analysis.demand import expand_demands from ngraph.model.demand.spec import TrafficDemand from ngraph.model.flow.policy_config import ( FlowPolicyPreset, diff --git a/tests/exec/analysis/__init__.py b/tests/exec/analysis/__init__.py deleted file mode 100644 index 87949d7..0000000 --- a/tests/exec/analysis/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Monte Carlo analysis tests diff --git a/tests/exec/failure/__init__.py b/tests/exec/failure/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/exec/demand/test_builder.py b/tests/model/demand/test_builder.py similarity index 99% rename from tests/exec/demand/test_builder.py rename to tests/model/demand/test_builder.py index e656483..7acd792 100644 --- a/tests/exec/demand/test_builder.py +++ b/tests/model/demand/test_builder.py @@ -2,7 +2,7 @@ import pytest -from ngraph.exec.demand.builder import ( +from ngraph.model.demand.builder import ( _coerce_flow_policy_config, build_traffic_matrix_set, ) diff --git a/tests/model/failure/test_failure_trace.py b/tests/model/failure/test_failure_trace.py index 6a377fb..6d29b74 100644 --- a/tests/model/failure/test_failure_trace.py +++ b/tests/model/failure/test_failure_trace.py @@ -2,7 +2,7 @@ import pytest -from ngraph.exec.failure.manager import FailureManager +from ngraph.analysis.failure_manager import FailureManager from ngraph.model.failure.policy import ( FailureCondition, FailureMode, diff --git a/tests/solver/__init__.py b/tests/solver/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/workflow/test_maximum_supported_demand.py b/tests/workflow/test_maximum_supported_demand.py index 3cace34..4cd489f 100644 --- a/tests/workflow/test_maximum_supported_demand.py +++ b/tests/workflow/test_maximum_supported_demand.py @@ -94,7 +94,7 @@ def _eval(cache, alpha, seeds): def test_msd_end_to_end_single_link() -> None: """Test MSD end-to-end with a simple single-link scenario.""" - from ngraph.exec.analysis.flow import demand_placement_analysis + from ngraph.analysis.functions import demand_placement_analysis from ngraph.workflow.maximum_supported_demand_step import ( MaximumSupportedDemand as MSD, ) From a62b6ec50716e41aae9ff7155aec4d1e7e287bc8 Mon Sep 17 00:00:00 2001 From: Andrey Golovanov Date: Mon, 22 Dec 2025 01:22:10 +0000 Subject: [PATCH 2/2] Refactor execution counter handling for thread safety --- docs/reference/api-full.md | 3 ++- ngraph/scenario.py | 8 ++++---- ngraph/workflow/base.py | 13 +++++-------- tests/workflow/test_base.py | 1 + tests/workflow/test_cost_power.py | 3 +++ tests/workflow/test_msd_perf_safety.py | 1 + tests/workflow/test_tm_analysis_perf_safety.py | 1 + 7 files changed, 17 insertions(+), 13 deletions(-) diff --git a/docs/reference/api-full.md b/docs/reference/api-full.md index 1fc68ce..ed0eeda 100644 --- a/docs/reference/api-full.md +++ b/docs/reference/api-full.md @@ -12,7 +12,7 @@ Quick links: - [CLI Reference](cli.md) - [DSL Reference](dsl.md) -Generated from source code on: December 22, 2025 at 00:44 UTC +Generated from source code on: December 22, 2025 at 01:21 UTC Modules auto-discovered: 53 @@ -265,6 +265,7 @@ Typical usage example: - `results` (Results) = Results(_store={}, _metadata={}, _active_step=None, _scenario={}) - `components_library` (ComponentsLibrary) = ComponentsLibrary(components={}) - `seed` (Optional[int]) +- `_execution_counter` (int) = 0 **Methods:** diff --git a/ngraph/scenario.py b/ngraph/scenario.py index 4fc4250..b1a3042 100644 --- a/ngraph/scenario.py +++ b/ngraph/scenario.py @@ -54,6 +54,8 @@ class Scenario: results: Results = field(default_factory=Results) components_library: ComponentsLibrary = field(default_factory=ComponentsLibrary) seed: Optional[int] = None + # Per-instance execution counter for thread-safe step ordering + _execution_counter: int = field(default=0, init=False, repr=False) # Module-level logger _logger = get_logger(__name__) @@ -73,10 +75,8 @@ def run(self) -> None: Each step may modify scenario data or store outputs in scenario.results. """ - # Reset execution counter to ensure step ordering is local to this run - import ngraph.workflow.base as workflow_base - - workflow_base._execution_counter = 0 + # Reset instance execution counter for this run + self._execution_counter = 0 for step in self.workflow: step.execute(self) diff --git a/ngraph/workflow/base.py b/ngraph/workflow/base.py index 11fb189..79ee552 100644 --- a/ngraph/workflow/base.py +++ b/ngraph/workflow/base.py @@ -24,10 +24,6 @@ # Registry for workflow step classes WORKFLOW_STEP_REGISTRY: Dict[str, Type["WorkflowStep"]] = {} -# Per-process execution counter for tracking step order. -# Reset to zero at the start of each scenario run to keep ordering local. -_execution_counter = 0 - def register_workflow_step(step_type: str): """Return a decorator that registers a `WorkflowStep` subclass. @@ -105,8 +101,6 @@ def execute(self, scenario: "Scenario") -> None: Exception: Re-raises any exception raised by `run()` after logging duration and context. """ - global _execution_counter - step_type = self.__class__.__name__ # Guarantee a stable results namespace even when name is not provided step_name = self.name or step_type @@ -133,18 +127,21 @@ def execute(self, scenario: "Scenario") -> None: seed_source = "none" active_seed = None + # Get execution order from scenario instance (thread-safe) + execution_order = scenario._execution_counter + scenario._execution_counter += 1 + # Enter step scope and store workflow metadata scenario.results.enter_step(step_name) scenario.results.put_step_metadata( step_name=step_name, step_type=step_type, - execution_order=_execution_counter, + execution_order=execution_order, scenario_seed=scenario_seed, step_seed=step_seed, seed_source=seed_source, active_seed=active_seed, ) - _execution_counter += 1 if self.seed is not None: logger.debug( diff --git a/tests/workflow/test_base.py b/tests/workflow/test_base.py index 6579b1c..3ffd048 100644 --- a/tests/workflow/test_base.py +++ b/tests/workflow/test_base.py @@ -65,6 +65,7 @@ def run(self, scenario) -> None: scen = MagicMock(spec=Scenario) scen.results = Results() scen.seed = 1010 + scen._execution_counter = 0 step = Dummy(name="d1") step.execute(scen) diff --git a/tests/workflow/test_cost_power.py b/tests/workflow/test_cost_power.py index 14baec4..7abb677 100644 --- a/tests/workflow/test_cost_power.py +++ b/tests/workflow/test_cost_power.py @@ -66,6 +66,7 @@ class _Scenario: network = net components_library = comps results = Results() + _execution_counter = 0 scenario = _Scenario() @@ -124,6 +125,7 @@ class _Scenario: network = net components_library = comps results = Results() + _execution_counter = 0 scenario = _Scenario() @@ -160,6 +162,7 @@ class _Scenario: network = net components_library = comps results = Results() + _execution_counter = 0 scenario = _Scenario() step = CostPower(name="cp3", include_disabled=False, aggregation_level=0) diff --git a/tests/workflow/test_msd_perf_safety.py b/tests/workflow/test_msd_perf_safety.py index cdba8eb..33aefa4 100644 --- a/tests/workflow/test_msd_perf_safety.py +++ b/tests/workflow/test_msd_perf_safety.py @@ -10,6 +10,7 @@ def __init__(self, network: Any, tmset: Any, results: Any) -> None: self.network = network self.traffic_matrix_set = tmset self.results = results + self._execution_counter = 0 def test_msd_reuse_tm_across_seeds_is_behaviorally_identical(monkeypatch): diff --git a/tests/workflow/test_tm_analysis_perf_safety.py b/tests/workflow/test_tm_analysis_perf_safety.py index 16806f3..af88589 100644 --- a/tests/workflow/test_tm_analysis_perf_safety.py +++ b/tests/workflow/test_tm_analysis_perf_safety.py @@ -15,6 +15,7 @@ def __init__( self.traffic_matrix_set = tmset self.results = results self.failure_policy_set = failure_policy_set + self._execution_counter = 0 def test_tm_basic_behavior_unchanged(monkeypatch):