Skip to content

Commit 27874af

Browse files
committed
debug script
1 parent c67f3e0 commit 27874af

File tree

1 file changed

+349
-0
lines changed

1 file changed

+349
-0
lines changed

dev/manual_maxflow_clos.py

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
"""Manual max-flow timing on the CLOS scenario graph.
2+
3+
Reads the serialized node-link graph under the ``build_graph`` section of a
4+
results JSON file (e.g., ``clos_scenario.json``), reconstructs a
5+
``StrictMultiDiGraph``, and runs ``calc_max_flow`` between two specified nodes
6+
while reporting timing, optional profiling, and summary diagnostics.
7+
8+
Run from repo root:
9+
10+
python -m dev.manual_maxflow_clos --json clos_scenario.json \
11+
--source "metro1/dc1/dc/dc" --sink "metro10/dc1/dc/dc"
12+
13+
The script prints: load times, node/edge counts, degree of endpoints,
14+
max-flow value, min-cut size, and a few top edges by placed flow.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import argparse
20+
import cProfile
21+
import json
22+
import os
23+
import platform
24+
import pstats
25+
import statistics
26+
import sys
27+
import time
28+
import tracemalloc
29+
from pathlib import Path
30+
from pstats import SortKey
31+
from typing import Any, Iterable
32+
33+
from ngraph.algorithms.max_flow import calc_max_flow
34+
from ngraph.graph.io import node_link_to_graph
35+
from ngraph.graph.strict_multidigraph import StrictMultiDiGraph
36+
37+
38+
def _format_bytes(num_bytes: int) -> str:
39+
"""Return a human-friendly string for a byte count.
40+
41+
Args:
42+
num_bytes: Number of bytes.
43+
44+
Returns:
45+
Formatted size string.
46+
"""
47+
48+
units = ["B", "KB", "MB", "GB", "TB"]
49+
size = float(num_bytes)
50+
for unit in units:
51+
if size < 1024.0 or unit == units[-1]:
52+
return f"{size:.2f} {unit}"
53+
size /= 1024.0
54+
return f"{size:.2f} TB"
55+
56+
57+
def _top_k_by_flow(
58+
edge_flow_items: Iterable[tuple[tuple[Any, Any, Any], float]], k: int
59+
) -> list[tuple[tuple[Any, Any, Any], float]]:
60+
"""Return top-k edges by flow value.
61+
62+
Args:
63+
edge_flow_items: Iterable of ``((u, v, key), flow)`` items.
64+
k: Number of entries to return.
65+
66+
Returns:
67+
List of top-k items sorted by descending flow.
68+
"""
69+
70+
# Convert to list once since we'll sort
71+
items = list(edge_flow_items)
72+
items.sort(key=lambda p: p[1], reverse=True)
73+
return items[:k]
74+
75+
76+
def load_graph_from_results(json_path: Path) -> StrictMultiDiGraph:
77+
"""Load ``StrictMultiDiGraph`` from a results JSON file.
78+
79+
The file must contain ``{"build_graph": {"graph": { ... node-link ... }}}``.
80+
81+
Args:
82+
json_path: Path to results JSON file.
83+
84+
Returns:
85+
Reconstructed ``StrictMultiDiGraph``.
86+
87+
Raises:
88+
FileNotFoundError: If the JSON file does not exist.
89+
KeyError: If required keys are missing.
90+
json.JSONDecodeError: If JSON cannot be parsed.
91+
"""
92+
93+
if not json_path.is_file():
94+
raise FileNotFoundError(f"JSON file not found: {json_path}")
95+
96+
file_size = json_path.stat().st_size
97+
print(f"[load] Reading: {json_path} ({_format_bytes(file_size)})")
98+
99+
t0 = time.perf_counter()
100+
with json_path.open("r", encoding="utf-8") as f:
101+
data = json.load(f)
102+
t1 = time.perf_counter()
103+
print(f"[load] JSON parsed in {1000.0 * (t1 - t0):.2f} ms")
104+
105+
graph_payload = data["build_graph"]["graph"]
106+
t2 = time.perf_counter()
107+
graph = node_link_to_graph(graph_payload)
108+
t3 = time.perf_counter()
109+
print(
110+
f"[load] Graph reconstructed in {1000.0 * (t3 - t2):.2f} ms; "
111+
f"nodes={len(graph)}, edges={graph.number_of_edges()}"
112+
)
113+
return graph
114+
115+
116+
def _run_maxflow_once(
117+
graph: StrictMultiDiGraph, src: Any, dst: Any
118+
) -> tuple[float, Any, float]:
119+
"""Run one max-flow measurement.
120+
121+
Args:
122+
graph: Graph instance.
123+
src: Source node ID.
124+
dst: Sink node ID.
125+
126+
Returns:
127+
Tuple of (flow_value, summary, elapsed_ms).
128+
"""
129+
130+
t_start = time.perf_counter()
131+
flow_value, summary = calc_max_flow(graph, src, dst, return_summary=True)
132+
t_end = time.perf_counter()
133+
return flow_value, summary, 1000.0 * (t_end - t_start)
134+
135+
136+
def _profile_maxflow(
137+
graph: StrictMultiDiGraph,
138+
src: Any,
139+
dst: Any,
140+
*,
141+
sort_by: str,
142+
top_n: int,
143+
save_path: Path | None,
144+
) -> None:
145+
"""Profile ``calc_max_flow`` with cProfile and print top entries.
146+
147+
Args:
148+
graph: Graph instance.
149+
src: Source node ID.
150+
dst: Sink node ID.
151+
sort_by: Sort key for stats (e.g., 'cumulative', 'tottime').
152+
top_n: Number of entries to display.
153+
save_path: If provided, write raw profile data to this path.
154+
"""
155+
156+
print("[prof] cProfile starting...")
157+
pr = cProfile.Profile()
158+
pr.enable()
159+
_ = calc_max_flow(graph, src, dst, return_summary=False)
160+
pr.disable()
161+
162+
if save_path is not None:
163+
pr.dump_stats(str(save_path))
164+
print(f"[prof] raw stats saved to: {save_path}")
165+
166+
stats = pstats.Stats(pr)
167+
sort_key = {
168+
"cumulative": SortKey.CUMULATIVE,
169+
"tottime": SortKey.TIME,
170+
"ncalls": SortKey.CALLS,
171+
"file": SortKey.FILENAME,
172+
"name": SortKey.NAME,
173+
"line": SortKey.LINE,
174+
}.get(sort_by, SortKey.CUMULATIVE)
175+
stats.sort_stats(sort_key)
176+
print(f"[prof] top {top_n} by {sort_by}:")
177+
stats.print_stats(top_n)
178+
179+
180+
def main(argv: list[str] | None = None) -> int:
181+
"""Entry point.
182+
183+
Args:
184+
argv: Optional argument vector.
185+
186+
Returns:
187+
Process exit code (0 on success, non-zero on failure).
188+
"""
189+
190+
parser = argparse.ArgumentParser(
191+
description="Time and optionally profile calc_max_flow on a CLOS scenario results graph",
192+
)
193+
parser.add_argument(
194+
"--json",
195+
type=Path,
196+
required=True,
197+
help="Path to results JSON containing build_graph.graph (e.g., clos_scenario.json)",
198+
)
199+
parser.add_argument(
200+
"--source",
201+
type=str,
202+
default="metro1/dc1/dc/dc",
203+
help="Source node ID",
204+
)
205+
parser.add_argument(
206+
"--sink",
207+
type=str,
208+
default="metro10/dc1/dc/dc",
209+
help="Sink node ID",
210+
)
211+
parser.add_argument(
212+
"--top-k",
213+
type=int,
214+
default=10,
215+
help="Show top-K edges by placed flow in summary",
216+
)
217+
parser.add_argument(
218+
"--repeat",
219+
type=int,
220+
default=1,
221+
help="Repeat max-flow computation N times (report per-run and summary stats)",
222+
)
223+
parser.add_argument(
224+
"--cprofile",
225+
action="store_true",
226+
help="Run cProfile around a scalar max-flow call (no summary).",
227+
)
228+
parser.add_argument(
229+
"--profile-save",
230+
type=Path,
231+
default=None,
232+
help="If set, save raw cProfile stats to this path.",
233+
)
234+
parser.add_argument(
235+
"--profile-sort",
236+
type=str,
237+
choices=["cumulative", "tottime", "ncalls", "file", "name", "line"],
238+
default="cumulative",
239+
help="Sort key for cProfile stats printing.",
240+
)
241+
parser.add_argument(
242+
"--profile-top",
243+
type=int,
244+
default=30,
245+
help="Number of cProfile entries to print.",
246+
)
247+
248+
args = parser.parse_args(argv)
249+
250+
try:
251+
tracemalloc.start()
252+
253+
# Load graph from JSON
254+
graph = load_graph_from_results(args.json)
255+
256+
# Environment diagnostics
257+
print(
258+
f"[env ] Python {platform.python_version()} | pid={os.getpid()} | "
259+
f"cpus={os.cpu_count()}"
260+
)
261+
262+
# Sanity checks on endpoints
263+
src = args.source
264+
dst = args.sink
265+
print(f"[info] Source: {src}")
266+
print(f"[info] Sink : {dst}")
267+
268+
missing = [n for n in (src, dst) if n not in graph]
269+
if missing:
270+
print(f"[error] Missing nodes: {missing}")
271+
# Provide hints for nearby IDs by simple substring heuristic
272+
for node in (src, dst):
273+
if node not in graph:
274+
candidates = [n for n in graph if node.split("/")[0] in str(n)]
275+
sample = candidates[:10]
276+
print(
277+
f"[hint] Examples of nodes sharing metro prefix for {node!r}:"
278+
)
279+
for ex in sample:
280+
print(f" {ex}")
281+
return 2
282+
283+
src_out = graph.out_degree(src)
284+
dst_in = graph.in_degree(dst)
285+
print(f"[info] deg_out({src})={src_out}, deg_in({dst})={dst_in}")
286+
287+
# Repeat runs
288+
times_ms: list[float] = []
289+
last_flow_value: float | None = None
290+
last_summary: Any | None = None
291+
292+
for i in range(args.repeat):
293+
print(f"[run ] iteration {i + 1}/{args.repeat} starting...")
294+
flow_value, summary, elapsed_ms = _run_maxflow_once(graph, src, dst)
295+
times_ms.append(elapsed_ms)
296+
last_flow_value = flow_value
297+
last_summary = summary
298+
print(
299+
f"[done] iteration {i + 1}: {elapsed_ms:.2f} ms; flow={flow_value:.6f}"
300+
)
301+
302+
current, peak = tracemalloc.get_traced_memory()
303+
if times_ms:
304+
print(
305+
f"[stat] time ms -> min={min(times_ms):.2f}, "
306+
f"mean={statistics.mean(times_ms):.2f}, "
307+
f"median={statistics.median(times_ms):.2f}, "
308+
f"max={max(times_ms):.2f}"
309+
)
310+
print(f"[mem ] current={_format_bytes(current)}, peak={_format_bytes(peak)}")
311+
312+
# Diagnostics from last summary
313+
if last_summary is not None and last_flow_value is not None:
314+
summary = last_summary
315+
print(
316+
f"[sum ] min_cut_size={len(summary.min_cut)}, "
317+
f"reachable={len(summary.reachable)}, "
318+
f"cost_buckets={len(summary.cost_distribution)}"
319+
)
320+
# List a few top edges by placed flow
321+
top_k = _top_k_by_flow(summary.edge_flow.items(), args.top_k)
322+
print(f"[sum ] top {len(top_k)} edges by flow:")
323+
for (u, v, k), f in top_k:
324+
if f <= 0:
325+
break
326+
print(f" {u} -> {v} (key={k}) flow={f}")
327+
328+
# Optional profiling (single scalar call for cleaner stats)
329+
if args.cprofile:
330+
_profile_maxflow(
331+
graph,
332+
src,
333+
dst,
334+
sort_by=args.profile_sort,
335+
top_n=args.profile_top,
336+
save_path=args.profile_save,
337+
)
338+
339+
return 0
340+
except KeyboardInterrupt:
341+
print("[abort] interrupted")
342+
return 130
343+
except Exception as exc: # noqa: BLE001 - manual script diagnostics
344+
print(f"[error] {type(exc).__name__}: {exc}")
345+
return 1
346+
347+
348+
if __name__ == "__main__":
349+
sys.exit(main())

0 commit comments

Comments
 (0)