Skip to content

Commit ed81719

Browse files
committed
Add profiling support for parallel workflows
1 parent 8a24d9a commit ed81719

File tree

4 files changed

+148
-8
lines changed

4 files changed

+148
-8
lines changed

ngraph/cli.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import argparse
66
import json
77
import logging
8+
import os
89
import sys
910
from pathlib import Path
1011
from typing import Any, Dict, List, Optional
@@ -421,6 +422,12 @@ def _run_scenario(
421422

422423
logger.info("Starting scenario execution with profiling")
423424

425+
# Enable child-process profiling for parallel workflows
426+
child_profile_dir = Path("worker_profiles")
427+
child_profile_dir.mkdir(exist_ok=True)
428+
os.environ["NGRAPH_PROFILE_DIR"] = str(child_profile_dir.resolve())
429+
logger.info(f"Worker profiles will be saved to: {child_profile_dir}")
430+
424431
# Manual execution of workflow steps with profiling
425432
for step in scenario.workflow:
426433
step_name = step.name or step.__class__.__name__
@@ -429,12 +436,33 @@ def _run_scenario(
429436
with profiler.profile_step(step_name, step_type):
430437
step.execute(scenario)
431438

439+
# Merge any worker profiles generated by this step
440+
if child_profile_dir.exists():
441+
profiler.merge_child_profiles(child_profile_dir, step_name)
442+
432443
logger.info("Scenario execution completed successfully")
433444

434445
# End scenario profiling and analyze results
435446
profiler.end_scenario()
436447
profiler.analyze_performance()
437448

449+
# Clean up any remaining worker profile files
450+
if child_profile_dir.exists():
451+
remaining_files = list(child_profile_dir.glob("*.pstats"))
452+
if remaining_files:
453+
logger.debug(
454+
f"Cleaning up {len(remaining_files)} remaining profile files"
455+
)
456+
for f in remaining_files:
457+
try:
458+
f.unlink()
459+
except Exception:
460+
pass
461+
try:
462+
child_profile_dir.rmdir() # Remove dir if empty
463+
except Exception:
464+
pass
465+
438466
# Generate and display performance report
439467
reporter = PerformanceReporter(profiler.results)
440468
performance_report = reporter.generate_report()

ngraph/profiling.py

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class StepProfile:
3131
function_calls: Number of function calls during execution.
3232
memory_peak: Peak memory usage during step (if available).
3333
cprofile_stats: Detailed cProfile statistics object.
34+
worker_profiles_merged: Number of worker profiles merged into this step.
3435
"""
3536

3637
step_name: str
@@ -40,6 +41,7 @@ class StepProfile:
4041
function_calls: int
4142
memory_peak: Optional[float] = None
4243
cprofile_stats: Optional[pstats.Stats] = None
44+
worker_profiles_merged: int = 0
4345

4446

4547
@dataclass
@@ -168,6 +170,61 @@ def profile_step(
168170
f"({wall_time:.3f}s wall, {cpu_time:.3f}s CPU, {function_calls:,} calls)"
169171
)
170172

173+
def merge_child_profiles(self, profile_dir: Path, step_name: str) -> None:
174+
"""Merge child worker profiles into the parent step profile.
175+
176+
Args:
177+
profile_dir: Directory containing worker profile files.
178+
step_name: Name of the workflow step these workers belong to.
179+
"""
180+
# Find the step profile to merge into
181+
step_profile = None
182+
for profile in self.results.step_profiles:
183+
if profile.step_name == step_name:
184+
step_profile = profile
185+
break
186+
187+
if not step_profile or not step_profile.cprofile_stats:
188+
logger.warning(f"No parent profile found for step: {step_name}")
189+
return
190+
191+
# Find all worker profile files for this step
192+
worker_files = list(profile_dir.glob("*_worker_*.pstats"))
193+
if not worker_files:
194+
logger.debug(f"No worker profiles found in {profile_dir}")
195+
return
196+
197+
logger.debug(f"Found {len(worker_files)} worker profiles to merge")
198+
199+
# Merge all worker stats into the parent stats
200+
try:
201+
merged_count = 0
202+
for worker_file in worker_files:
203+
step_profile.cprofile_stats.add(str(worker_file))
204+
logger.debug(f"Merged worker profile: {worker_file.name}")
205+
merged_count += 1
206+
207+
# Update function call count after merge
208+
stats_data = getattr(step_profile.cprofile_stats, "stats", {})
209+
step_profile.function_calls = sum(
210+
stat_tuple[0] for stat_tuple in stats_data.values()
211+
)
212+
step_profile.worker_profiles_merged = merged_count
213+
214+
logger.info(
215+
f"Merged {len(worker_files)} worker profiles into step '{step_name}'"
216+
)
217+
218+
# Clean up worker files after successful merge
219+
for worker_file in worker_files:
220+
try:
221+
worker_file.unlink()
222+
except Exception:
223+
pass # Best effort cleanup
224+
225+
except Exception as e:
226+
logger.warning(f"Failed to merge worker profiles: {type(e).__name__}: {e}")
227+
171228
def analyze_performance(self) -> None:
172229
"""Analyze profiling results and identify bottlenecks.
173230
@@ -332,8 +389,8 @@ def generate_report(self) -> str:
332389
["=" * 80, "NETGRAPH PERFORMANCE PROFILING REPORT", "=" * 80, ""]
333390
)
334391

335-
# Executive summary
336-
report_lines.extend(self._generate_executive_summary())
392+
# Summary
393+
report_lines.extend(self._generate_summary())
337394

338395
# Step-by-step timing analysis
339396
report_lines.extend(self._generate_timing_analysis())
@@ -350,8 +407,8 @@ def generate_report(self) -> str:
350407

351408
return "\n".join(report_lines)
352409

353-
def _generate_executive_summary(self) -> List[str]:
354-
"""Generate executive summary section of the report."""
410+
def _generate_summary(self) -> List[str]:
411+
"""Generate summary section of the report."""
355412
summary = self.results.analysis_summary
356413

357414
lines = [
@@ -385,7 +442,15 @@ def _generate_timing_analysis(self) -> List[str]:
385442
)
386443

387444
# Create formatted table
388-
headers = ["Step Name", "Type", "Wall Time", "CPU Time", "Calls", "% Total"]
445+
headers = [
446+
"Step Name",
447+
"Type",
448+
"Wall Time",
449+
"CPU Time",
450+
"Calls",
451+
"% Total",
452+
"Workers",
453+
]
389454

390455
# Calculate column widths
391456
col_widths = [len(h) for h in headers]
@@ -404,6 +469,9 @@ def _generate_timing_analysis(self) -> List[str]:
404469
f"{step.cpu_time:.3f}s",
405470
f"{step.function_calls:,}",
406471
f"{percentage:.1f}%",
472+
f"{step.worker_profiles_merged}"
473+
if step.worker_profiles_merged > 0
474+
else "-",
407475
]
408476
table_data.append(row)
409477

ngraph/workflow/capacity_envelope_analysis.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def _worker(args: tuple[Any, ...]) -> tuple[list[tuple[str, str, float]], float]
2929
3030
Args:
3131
args: Tuple containing (base_network, base_policy, source_regex, sink_regex,
32-
mode, shortest_path, flow_placement, seed_offset, is_baseline)
32+
mode, shortest_path, flow_placement, seed_offset, is_baseline, step_name)
3333
3434
Returns:
3535
Tuple of (flow_results, total_capacity) where:
@@ -49,8 +49,20 @@ def _worker(args: tuple[Any, ...]) -> tuple[list[tuple[str, str, float]], float]
4949
flow_placement,
5050
seed_offset,
5151
is_baseline,
52+
step_name,
5253
) = args
5354

55+
# Optional per-worker profiling -------------------------------------------------
56+
profile_dir_env = os.getenv("NGRAPH_PROFILE_DIR")
57+
collect_profile: bool = bool(profile_dir_env)
58+
59+
profiler: "cProfile.Profile | None" = None # Lazy init to avoid overhead
60+
if collect_profile:
61+
import cProfile # Local import to avoid cost when profiling disabled
62+
63+
profiler = cProfile.Profile()
64+
profiler.enable()
65+
5466
worker_pid = os.getpid()
5567
worker_logger.debug(f"Worker {worker_pid} started with seed_offset={seed_offset}")
5668

@@ -126,6 +138,28 @@ def _worker(args: tuple[Any, ...]) -> tuple[list[tuple[str, str, float]], float]
126138
worker_logger.debug(f"Worker {worker_pid} computed {len(result)} flow results")
127139
worker_logger.debug(f"Worker {worker_pid} total capacity: {total_capacity:.2f}")
128140

141+
# Dump profile if enabled ------------------------------------------------------
142+
if profiler is not None:
143+
profiler.disable()
144+
try:
145+
import pstats
146+
import uuid
147+
from pathlib import Path
148+
149+
profile_dir = Path(profile_dir_env) if profile_dir_env else None
150+
if profile_dir is not None:
151+
profile_dir.mkdir(parents=True, exist_ok=True)
152+
unique_id = uuid.uuid4().hex[:8]
153+
profile_path = (
154+
profile_dir / f"{step_name}_worker_{worker_pid}_{unique_id}.pstats"
155+
)
156+
pstats.Stats(profiler).dump_stats(profile_path)
157+
worker_logger.debug("Saved worker profile to %s", profile_path.name)
158+
except Exception as exc: # pragma: no cover – best-effort profiling
159+
worker_logger.warning(
160+
"Failed to save worker profile: %s: %s", type(exc).__name__, exc
161+
)
162+
129163
return result, total_capacity
130164

131165

@@ -172,6 +206,7 @@ def _run_single_iteration(
172206
flow_placement,
173207
seed_offset,
174208
is_baseline,
209+
"", # step_name not available in serial execution
175210
)
176211
)
177212
logger.debug(
@@ -463,6 +498,7 @@ def _run_parallel_analysis(
463498
self.flow_placement,
464499
seed_offset,
465500
is_baseline,
501+
self.name or self.__class__.__name__,
466502
)
467503
)
468504

tests/workflow/test_capacity_envelope_analysis.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ def test_initialization_with_parameters(self):
100100
def test_string_flow_placement_conversion(self):
101101
"""Test automatic conversion of string flow_placement to enum."""
102102
step = CapacityEnvelopeAnalysis(
103-
source_path="^A", sink_path="^C", flow_placement="EQUAL_BALANCED"
103+
source_path="^A",
104+
sink_path="^C",
105+
flow_placement="EQUAL_BALANCED", # type: ignore[arg-type]
104106
)
105107
assert step.flow_placement == FlowPlacement.EQUAL_BALANCED
106108

@@ -121,7 +123,9 @@ def test_validation_errors(self):
121123
# Test invalid flow_placement string
122124
with pytest.raises(ValueError, match="Invalid flow_placement"):
123125
CapacityEnvelopeAnalysis(
124-
source_path="^A", sink_path="^C", flow_placement="INVALID"
126+
source_path="^A",
127+
sink_path="^C",
128+
flow_placement="INVALID", # type: ignore[arg-type]
125129
)
126130

127131
def test_validation_iterations_without_failure_policy(self):
@@ -406,6 +410,7 @@ def test_worker_no_failures(self, simple_network):
406410
FlowPlacement.PROPORTIONAL,
407411
42, # seed
408412
False, # is_baseline
413+
"test_step", # step_name
409414
)
410415

411416
flow_results, total_capacity = _worker(args)
@@ -435,6 +440,7 @@ def test_worker_with_failures(self, simple_network, simple_failure_policy):
435440
FlowPlacement.PROPORTIONAL,
436441
42, # seed
437442
False, # is_baseline
443+
"test_step", # step_name
438444
)
439445

440446
flow_results, total_capacity = _worker(args)
@@ -656,6 +662,7 @@ def test_worker_baseline_iteration(self, simple_network, simple_failure_policy):
656662
FlowPlacement.PROPORTIONAL,
657663
42, # seed
658664
True, # is_baseline - should skip failures
665+
"test_step", # step_name
659666
)
660667

661668
flow_results, total_capacity = _worker(args)
@@ -675,6 +682,7 @@ def test_worker_baseline_iteration(self, simple_network, simple_failure_policy):
675682
FlowPlacement.PROPORTIONAL,
676683
42, # seed
677684
False, # is_baseline
685+
"test_step", # step_name
678686
)
679687

680688
baseline_results, baseline_capacity = _worker(args)

0 commit comments

Comments
 (0)