Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 287 additions & 46 deletions docs/reference/api-full.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ngraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from __future__ import annotations

from . import cli, config, logging
from .results.artifacts import CapacityEnvelope, PlacementResultSet, TrafficMatrixSet
from .demand.matrix import TrafficMatrixSet
from .results.artifacts import CapacityEnvelope, PlacementResultSet

__all__ = [
"cli",
Expand Down
44 changes: 40 additions & 4 deletions ngraph/demand/manager/builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
from __future__ import annotations

"""Builders for traffic matrices.

Public functions in this module assemble traffic matrices from higher-level
inputs. This is a placeholder for future matrix construction utilities.
Construct `TrafficMatrixSet` from raw dictionaries (e.g. parsed YAML).
This logic was previously embedded in `Scenario.from_yaml`.
"""

from __future__ import annotations

from typing import Dict, List

from ngraph.demand.matrix import TrafficMatrixSet
from ngraph.demand.spec import TrafficDemand
from ngraph.yaml_utils import normalize_yaml_dict_keys


def build_traffic_matrix_set(raw: Dict[str, List[dict]]) -> TrafficMatrixSet:
"""Build a `TrafficMatrixSet` from a mapping of name -> list of dicts.

Args:
raw: Mapping where each key is a matrix name and each value is a list of
dictionaries with `TrafficDemand` constructor fields.

Returns:
Initialized `TrafficMatrixSet` with constructed `TrafficDemand` objects.

Raises:
ValueError: If ``raw`` is not a mapping of name -> list[dict].
"""
if not isinstance(raw, dict):
raise ValueError(
"'traffic_matrix_set' must be a mapping of name -> list[TrafficDemand]"
)

normalized_raw = normalize_yaml_dict_keys(raw)
tms = TrafficMatrixSet()
for name, td_list in normalized_raw.items():
if not isinstance(td_list, list):
raise ValueError(
f"Matrix '{name}' must map to a list of TrafficDemand dicts"
)
tms.add(name, [TrafficDemand(**d) for d in td_list])

return tms
198 changes: 193 additions & 5 deletions ngraph/demand/manager/expand.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,196 @@
from __future__ import annotations

"""Expansion helpers for traffic demand specifications.
Functions in this module transform high-level demand specs into concrete
`Demand` objects. This is a placeholder module; expansion is currently
implemented in `TrafficManager`.
Public functions here convert user-facing `TrafficDemand` specifications into
concrete `Demand` objects that can be placed on a `StrictMultiDiGraph`.
This module provides the pure expansion logic that was previously embedded in
`TrafficManager`.
"""

from __future__ import annotations

from typing import Dict, List, Tuple, Union

from ngraph.algorithms.flow_init import init_flow_graph
from ngraph.demand import Demand
from ngraph.demand.spec import TrafficDemand
from ngraph.flows.policy import FlowPolicyConfig, get_flow_policy
from ngraph.graph.strict_multidigraph import StrictMultiDiGraph
from ngraph.model.network import Network, Node

try:
# Avoid importing at runtime if not needed while keeping type hints precise
from typing import TYPE_CHECKING

if TYPE_CHECKING: # pragma: no cover - typing only
from ngraph.model.view import NetworkView
except Exception: # pragma: no cover - defensive for environments without extras
TYPE_CHECKING = False


def expand_demands(
network: Union[Network, "NetworkView"],
graph: StrictMultiDiGraph | None,
traffic_demands: List[TrafficDemand],
default_flow_policy_config: FlowPolicyConfig,
) -> Tuple[List[Demand], Dict[str, List[Demand]]]:
"""Expand traffic demands into concrete `Demand` objects.
The result is a flat list of `Demand` plus a mapping from
``TrafficDemand.id`` to the list of expanded demands for that entry.
Args:
network: Network or NetworkView used for node group selection.
graph: Flow graph to operate on. If ``None``, expansion that requires
graph mutation (pseudo nodes/edges) is skipped.
traffic_demands: List of high-level traffic demand specifications.
default_flow_policy_config: Default policy to apply when a demand does
not specify an explicit `flow_policy`.
Returns:
A tuple ``(expanded, td_map)`` where:
- ``expanded`` is the flattened, sorted list of all expanded demands
(sorted by ascending ``demand_class``).
- ``td_map`` maps ``TrafficDemand.id`` to its expanded demands.
"""
td_to_demands: Dict[str, List[Demand]] = {}
expanded: List[Demand] = []

for td in traffic_demands:
# Gather node groups for source and sink
src_groups = network.select_node_groups_by_path(td.source_path)
snk_groups = network.select_node_groups_by_path(td.sink_path)

if not src_groups or not snk_groups:
td_to_demands[td.id] = []
continue

demands_of_td: List[Demand] = []
if td.mode == "combine":
_expand_combine(
demands_of_td,
td,
src_groups,
snk_groups,
graph,
default_flow_policy_config,
)
elif td.mode == "pairwise":
_expand_pairwise(
demands_of_td,
td,
src_groups,
snk_groups,
default_flow_policy_config,
)
else:
raise ValueError(f"Unknown mode: {td.mode}")

expanded.extend(demands_of_td)
td_to_demands[td.id] = demands_of_td

# Sort final demands by ascending demand_class (i.e., priority)
expanded.sort(key=lambda d: d.demand_class)
return expanded, td_to_demands


def _expand_combine(
expanded: List[Demand],
td: TrafficDemand,
src_groups: Dict[str, List[Node]],
snk_groups: Dict[str, List[Node]],
graph: StrictMultiDiGraph | None,
default_flow_policy_config: FlowPolicyConfig,
) -> None:
"""Expand a single demand using the ``combine`` mode.
Adds pseudo-source and pseudo-sink nodes, connects them to real nodes
with infinite-capacity, zero-cost edges, and creates one aggregate
`Demand` from pseudo-source to pseudo-sink with the full volume.
"""
# Flatten the source and sink node lists
src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes]
dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes]

if not src_nodes or not dst_nodes or graph is None:
return

# Create pseudo-source / pseudo-sink names
pseudo_source_name = f"combine_src::{td.id}"
pseudo_sink_name = f"combine_snk::{td.id}"

# Add pseudo nodes to the graph (no-op if they already exist)
graph.add_node(pseudo_source_name)
graph.add_node(pseudo_sink_name)

# Link pseudo-source to real sources, and real sinks to pseudo-sink
for s_node in src_nodes:
graph.add_edge(pseudo_source_name, s_node.name, capacity=float("inf"), cost=0)
for t_node in dst_nodes:
graph.add_edge(t_node.name, pseudo_sink_name, capacity=float("inf"), cost=0)

init_flow_graph(graph) # Re-initialize flow-related attributes

# Create a single Demand with the full volume
if td.flow_policy:
flow_policy = td.flow_policy.deep_copy()
else:
fp_config = td.flow_policy_config or default_flow_policy_config
flow_policy = get_flow_policy(fp_config)

expanded.append(
Demand(
src_node=pseudo_source_name,
dst_node=pseudo_sink_name,
volume=td.demand,
demand_class=td.priority,
flow_policy=flow_policy,
)
)


def _expand_pairwise(
expanded: List[Demand],
td: TrafficDemand,
src_groups: Dict[str, List[Node]],
snk_groups: Dict[str, List[Node]],
default_flow_policy_config: FlowPolicyConfig,
) -> None:
"""Expand a single demand using the ``pairwise`` mode.
Creates one `Demand` for each valid source-destination pair (excluding
self-pairs) and splits total volume evenly across pairs.
"""
# Flatten the source and sink node lists
src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes]
dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes]

# Generate all valid (src, dst) pairs
valid_pairs = [
(s_node, t_node)
for s_node in src_nodes
for t_node in dst_nodes
if s_node.name != t_node.name
]
pair_count = len(valid_pairs)
if pair_count == 0:
return

demand_per_pair = td.demand / float(pair_count)

for s_node, t_node in valid_pairs:
if td.flow_policy:
flow_policy = td.flow_policy.deep_copy()
else:
fp_config = td.flow_policy_config or default_flow_policy_config
flow_policy = get_flow_policy(fp_config)

expanded.append(
Demand(
src_node=s_node.name,
dst_node=t_node.name,
volume=demand_per_pair,
demand_class=td.priority,
flow_policy=flow_policy,
)
)
Loading