Skip to content

Commit 4c1948b

Browse files
committed
bugfixing and logging
1 parent 74d1918 commit 4c1948b

16 files changed

+653
-75
lines changed

ngraph/demand/__init__.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,25 @@ def place(
108108
# If max_fraction <= 0, do not place any new volume (unless volume is infinite).
109109
to_place = self.volume if self.volume == float("inf") else 0.0
110110

111-
# Delegate flow placement
111+
# Ensure we request at least MIN_FLOW when there is meaningful leftover
112+
if 0.0 < to_place < MIN_FLOW:
113+
to_place = min(self.volume - self.placed_demand, MIN_FLOW)
114+
115+
# Delegate flow placement (do not force min_flow threshold here; policy handles it)
116+
# Use a demand-unique flow_class to avoid collisions across different
117+
# Demand instances that share the same numerical demand_class.
118+
demand_unique_flow_class = (
119+
self.demand_class,
120+
self.src_node,
121+
self.dst_node,
122+
id(self),
123+
)
124+
112125
self.flow_policy.place_demand(
113126
flow_graph,
114127
self.src_node,
115128
self.dst_node,
116-
self.demand_class,
129+
demand_unique_flow_class,
117130
to_place,
118131
)
119132

ngraph/demand/manager/expand.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,15 @@ def _expand_combine(
108108
with infinite-capacity, zero-cost edges, and creates one aggregate
109109
`Demand` from pseudo-source to pseudo-sink with the full volume.
110110
"""
111-
# Flatten the source and sink node lists
112-
src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes]
113-
dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes]
111+
# Flatten and sort source and sink node lists for deterministic order
112+
src_nodes = sorted(
113+
(node for group_nodes in src_groups.values() for node in group_nodes),
114+
key=lambda n: n.name,
115+
)
116+
dst_nodes = sorted(
117+
(node for group_nodes in snk_groups.values() for node in group_nodes),
118+
key=lambda n: n.name,
119+
)
114120

115121
if not src_nodes or not dst_nodes or graph is None:
116122
return
@@ -161,17 +167,22 @@ def _expand_pairwise(
161167
Creates one `Demand` for each valid source-destination pair (excluding
162168
self-pairs) and splits total volume evenly across pairs.
163169
"""
164-
# Flatten the source and sink node lists
165-
src_nodes = [node for group_nodes in src_groups.values() for node in group_nodes]
166-
dst_nodes = [node for group_nodes in snk_groups.values() for node in group_nodes]
167-
168-
# Generate all valid (src, dst) pairs
169-
valid_pairs = [
170-
(s_node, t_node)
171-
for s_node in src_nodes
172-
for t_node in dst_nodes
173-
if s_node.name != t_node.name
174-
]
170+
# Flatten and sort source and sink node lists for deterministic order
171+
src_nodes = sorted(
172+
(node for group_nodes in src_groups.values() for node in group_nodes),
173+
key=lambda n: n.name,
174+
)
175+
dst_nodes = sorted(
176+
(node for group_nodes in snk_groups.values() for node in group_nodes),
177+
key=lambda n: n.name,
178+
)
179+
180+
# Generate all valid (src, dst) pairs in deterministic lexicographic order
181+
valid_pairs = []
182+
for s_node in src_nodes:
183+
for t_node in dst_nodes:
184+
if s_node.name != t_node.name:
185+
valid_pairs.append((s_node, t_node))
175186
pair_count = len(valid_pairs)
176187
if pair_count == 0:
177188
return

ngraph/demand/manager/manager.py

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,13 @@ class TrafficManager:
6565
on a configurable 'mode' ("combine" or "pairwise").
6666
3) Each Demand is associated with a FlowPolicy, which handles how flows
6767
are placed (split across paths, balancing, etc.).
68-
4) Provides methods to place all demands incrementally with optional
69-
re-optimization, reset usage, and retrieve flow/usage summaries.
68+
4) Provides methods to place all demands incrementally with optional
69+
re-optimization, reset usage, and retrieve flow/usage summaries.
70+
71+
Auto rounds semantics:
72+
- placement_rounds="auto" performs up to a small number of fairness passes
73+
(at most 3), with early stop when diminishing returns are detected. Each
74+
pass asks the scheduler to place full leftovers without step splitting.
7075
7176
In particular:
7277
- 'combine' mode:
@@ -172,21 +177,57 @@ def place_all_demands(
172177
raise RuntimeError("Graph not built yet. Call build_graph() first.")
173178

174179
if isinstance(placement_rounds, str) and placement_rounds.lower() == "auto":
175-
placement_rounds = self._estimate_rounds()
180+
# Simple, reliable auto: up to 3 passes with early stop.
181+
from ngraph.algorithms.base import MIN_FLOW
182+
183+
total_placed = 0.0
184+
max_auto_rounds = 3
185+
for _ in range(max_auto_rounds):
186+
placed_now = place_demands_round_robin(
187+
graph=self.graph,
188+
demands=self.demands,
189+
placement_rounds=1,
190+
reoptimize_after_each_round=False,
191+
)
192+
total_placed += placed_now
176193

177-
# Ensure placement_rounds is an int for range() and arithmetic operations
178-
placement_rounds_int = (
179-
int(placement_rounds)
180-
if isinstance(placement_rounds, str)
181-
else placement_rounds
182-
)
194+
# Early stops: no progress or negligible leftover
195+
if placed_now < MIN_FLOW:
196+
break
183197

184-
total_placed = place_demands_round_robin(
185-
graph=self.graph,
186-
demands=self.demands,
187-
placement_rounds=placement_rounds_int,
188-
reoptimize_after_each_round=reoptimize_after_each_round,
189-
)
198+
leftover_total = sum(
199+
max(0.0, d.volume - d.placed_demand) for d in self.demands
200+
)
201+
if leftover_total < 0.05 * placed_now:
202+
break
203+
204+
# Fairness check: if served ratios are already close, stop
205+
served = [
206+
(d.placed_demand / d.volume)
207+
for d in self.demands
208+
if d.volume > 0.0 and (d.volume - d.placed_demand) >= MIN_FLOW
209+
]
210+
if served:
211+
s_min, s_max = min(served), max(served)
212+
if s_max <= 0.0 or s_min >= 0.8 * s_max:
213+
break
214+
else:
215+
# Ensure placement_rounds is an int for range() and arithmetic operations
216+
placement_rounds_int = (
217+
int(placement_rounds)
218+
if isinstance(placement_rounds, str)
219+
else placement_rounds
220+
)
221+
222+
if placement_rounds_int <= 0:
223+
raise ValueError("placement_rounds must be positive")
224+
225+
total_placed = place_demands_round_robin(
226+
graph=self.graph,
227+
demands=self.demands,
228+
placement_rounds=placement_rounds_int,
229+
reoptimize_after_each_round=reoptimize_after_each_round,
230+
)
190231

191232
# Update each TrafficDemand's placed volume
192233
for td in self._get_traffic_demands():

ngraph/demand/manager/schedule.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,31 +42,53 @@ def place_demands_round_robin(
4242

4343
for priority_class in sorted_priorities:
4444
demands_in_class = prio_map[priority_class]
45+
placed_before_class = sum(d.placed_demand for d in demands_in_class)
4546

47+
# Unified fairness loop: attempt to place full leftover per demand each round.
48+
# For rounds > 0, reorder by least-served ratio to improve fairness.
49+
reopt_attempted = False
4650
for round_idx in range(placement_rounds):
4751
placed_in_this_round = 0.0
48-
rounds_left = placement_rounds - round_idx
4952

50-
for demand in demands_in_class:
53+
if round_idx == 0:
54+
iteration_order = list(demands_in_class)
55+
else:
56+
iteration_order = sorted(
57+
demands_in_class,
58+
key=lambda d: (
59+
(d.placed_demand / d.volume) if d.volume > 0 else 1.0,
60+
d.placed_demand,
61+
),
62+
)
63+
64+
for demand in iteration_order:
5165
leftover = demand.volume - demand.placed_demand
5266
if leftover < base.MIN_FLOW:
5367
continue
5468

55-
step_to_place = leftover / float(rounds_left)
56-
placed_now, _remain = demand.place(
57-
flow_graph=graph,
58-
max_placement=step_to_place,
59-
)
60-
total_placed += placed_now
69+
placed_now, _remain = demand.place(flow_graph=graph)
6170
placed_in_this_round += placed_now
6271

6372
if reoptimize_after_each_round and placed_in_this_round > 0.0:
6473
_reoptimize_priority_demands(graph, demands_in_class)
6574

66-
# If no progress was made, no need to continue extra rounds
6775
if placed_in_this_round < base.MIN_FLOW:
76+
any_leftover = any(
77+
(d.volume - d.placed_demand) >= base.MIN_FLOW
78+
for d in demands_in_class
79+
)
80+
if not any_leftover:
81+
break
82+
if not reopt_attempted:
83+
_reoptimize_priority_demands(graph, demands_in_class)
84+
reopt_attempted = True
85+
continue
6886
break
6987

88+
# Add only the net increase for this class to avoid double counting
89+
placed_after_class = sum(d.placed_demand for d in demands_in_class)
90+
total_placed += max(0.0, placed_after_class - placed_before_class)
91+
7092
return total_placed
7193

7294

@@ -82,11 +104,18 @@ def _reoptimize_priority_demands(
82104
continue
83105
placed_volume = dmd.placed_demand
84106
dmd.flow_policy.remove_demand(graph)
107+
# Use a demand-unique flow_class key to keep policy flows consistent
108+
flow_class_key = (
109+
dmd.demand_class,
110+
dmd.src_node,
111+
dmd.dst_node,
112+
id(dmd),
113+
)
85114
dmd.flow_policy.place_demand(
86115
graph,
87116
dmd.src_node,
88117
dmd.dst_node,
89-
dmd.demand_class,
118+
flow_class_key,
90119
placed_volume,
91120
)
92121
dmd.placed_demand = dmd.flow_policy.placed_demand

0 commit comments

Comments
 (0)