From d2791bc7abf3bb1fdcf4f9b68d625415fef3459c Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Tue, 23 Dec 2025 21:17:09 +0800 Subject: [PATCH 1/7] refactor: single-source task_id generation in submit methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change perf_tracer.register_task() to accept task_id parameter - Add _register_task() method with sequential counter to WorkflowExecutor and RolloutController - Add optional task_id parameter to submit() in InferenceEngine, RemoteInfEngine, RemoteSGLangEngine, RemotevLLMEngine, and RolloutController - Remove uuid-based task_id generation in favor of centralized sequential IDs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- areal/controller/rollout_controller.py | 7 +++++++ areal/core/workflow_executor.py | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/areal/controller/rollout_controller.py b/areal/controller/rollout_controller.py index 56b000804..30ef8a230 100644 --- a/areal/controller/rollout_controller.py +++ b/areal/controller/rollout_controller.py @@ -71,6 +71,7 @@ def __init__( # State self._version_lock = Lock() self._version = 0 + self._task_cnt = 0 self._task_id_generator = TaskIdGenerator() @@ -366,6 +367,12 @@ async def _submit_then_wait() -> _RemoteRolloutResult | None: def get_capacity(self): return self.staleness_manager.get_capacity() + def _register_task(self) -> int: + with self._version_lock: + task_id = self._task_cnt + self._task_cnt += 1 + return task_id + def submit( self, data: dict[str, Any], diff --git a/areal/core/workflow_executor.py b/areal/core/workflow_executor.py index 0cd7f8c48..eca5c105d 100644 --- a/areal/core/workflow_executor.py +++ b/areal/core/workflow_executor.py @@ -1043,6 +1043,12 @@ async def _managed_workflow() -> _RolloutResult | None: return _managed_workflow + def _register_task(self) -> int: + with self._lock: + task_id = self._task_cnt + self._task_cnt += 1 + return task_id + def submit( self, data: dict[str, Any], From f56793ce074bba0739ea04d4da096deca224c70c Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 16:09:20 +0800 Subject: [PATCH 2/7] modify perf tracer for single-controller and add name_resolve for rpc server --- areal/controller/rollout_controller.py | 7 -- areal/core/workflow_executor.py | 6 -- areal/experimental/trainer/rl.py | 2 +- areal/scheduler/local.py | 59 +++++++++-- areal/scheduler/rpc/rpc_server.py | 64 ++++++++--- areal/tests/test_local_scheduler.py | 140 +++++++++++++++++++------ areal/tests/test_perf_tracer.py | 126 +++++++++++++++++++++- areal/tools/perf_trace_converter.py | 134 +++++++++++++++-------- areal/tools/plot_session_trace.py | 13 ++- areal/utils/names.py | 4 + areal/utils/perf_tracer.py | 48 +++++++-- 11 files changed, 474 insertions(+), 129 deletions(-) diff --git a/areal/controller/rollout_controller.py b/areal/controller/rollout_controller.py index 30ef8a230..56b000804 100644 --- a/areal/controller/rollout_controller.py +++ b/areal/controller/rollout_controller.py @@ -71,7 +71,6 @@ def __init__( # State self._version_lock = Lock() self._version = 0 - self._task_cnt = 0 self._task_id_generator = TaskIdGenerator() @@ -367,12 +366,6 @@ async def _submit_then_wait() -> _RemoteRolloutResult | None: def get_capacity(self): return self.staleness_manager.get_capacity() - def _register_task(self) -> int: - with self._version_lock: - task_id = self._task_cnt - self._task_cnt += 1 - return task_id - def submit( self, data: dict[str, Any], diff --git a/areal/core/workflow_executor.py b/areal/core/workflow_executor.py index eca5c105d..0cd7f8c48 100644 --- a/areal/core/workflow_executor.py +++ b/areal/core/workflow_executor.py @@ -1043,12 +1043,6 @@ async def _managed_workflow() -> _RolloutResult | None: return _managed_workflow - def _register_task(self) -> int: - with self._lock: - task_id = self._task_cnt - self._task_cnt += 1 - return task_id - def submit( self, data: dict[str, Any], diff --git a/areal/experimental/trainer/rl.py b/areal/experimental/trainer/rl.py index 3c5332a99..bd18f825e 100644 --- a/areal/experimental/trainer/rl.py +++ b/areal/experimental/trainer/rl.py @@ -60,7 +60,7 @@ def __init__( rank = int(os.getenv("RANK", "0")) # Configure performance tracer if config.perf_tracer is not None: - perf_tracer.configure(config.perf_tracer, rank=rank) + perf_tracer.configure(config.perf_tracer, rank=rank, role="master") self.config = config self.processor, self.tokenizer = load_hf_processor_and_tokenizer( diff --git a/areal/scheduler/local.py b/areal/scheduler/local.py index 98d8562d4..c102a37df 100644 --- a/areal/scheduler/local.py +++ b/areal/scheduler/local.py @@ -13,7 +13,7 @@ import orjson import requests -from areal.api.cli_args import BaseExperimentConfig +from areal.api.cli_args import BaseExperimentConfig, NameResolveConfig from areal.api.scheduler_api import Job, Scheduler, SchedulingSpec, Worker from areal.platforms import current_platform from areal.scheduler.exceptions import ( @@ -31,7 +31,7 @@ WorkerTimeoutError, ) from areal.scheduler.rpc.serialization import deserialize_value, serialize_value -from areal.utils import logging +from areal.utils import logging, name_resolve, names from areal.utils.http import get_default_connector from areal.utils.launcher import ( get_env_vars, @@ -90,24 +90,50 @@ def __init__( fileroot: str | None = None, experiment_name: str | None = None, trial_name: str | None = None, + name_resolve_type: str = "nfs", + nfs_record_root: str = "/tmp/areal/name_resolve", + etcd3_addr: str = "localhost:2379", exp_config: BaseExperimentConfig | None = None, ): self.gpu_devices = gpu_devices or self._detect_gpus() + + # Resolve experiment/trial names (exp_config overwrites direct params) + self.experiment_name = experiment_name + self.trial_name = trial_name + self.fileroot = fileroot + if exp_config is not None: + self.experiment_name = exp_config.experiment_name + self.trial_name = exp_config.trial_name + self.fileroot = exp_config.cluster.fileroot + + # name_resolve config (exp_config overwrites direct params) + self.name_resolve_config = NameResolveConfig( + type=name_resolve_type, + nfs_record_root=nfs_record_root, + etcd3_addr=etcd3_addr, + ) + if exp_config is not None: + self.name_resolve_config = exp_config.cluster.name_resolve + + # Reconfigure name_resolve and clear old entries + if self.experiment_name and self.trial_name: + name_resolve.reconfigure(self.name_resolve_config) + name_resolve.clear_subtree( + names.trial_root(self.experiment_name, self.trial_name) + ) + if log_dir is not None: self.log_dir = Path(log_dir) else: - experiment_name = experiment_name or exp_config.experiment_name - trial_name = trial_name or exp_config.trial_name - fileroot = fileroot or exp_config.cluster.fileroot - assert experiment_name is not None - assert trial_name is not None - assert fileroot is not None + assert self.experiment_name is not None + assert self.trial_name is not None + assert self.fileroot is not None self.log_dir = ( - Path(fileroot) + Path(self.fileroot) / "logs" / getpass.getuser() - / experiment_name - / trial_name + / self.experiment_name + / self.trial_name ) self.exp_config = exp_config @@ -168,6 +194,7 @@ def _get_colocated_gpus(self, target_role: str, worker_idx: int) -> list[int]: return target_workers[worker_idx].gpu_devices def _allocate_ports(self, count: int) -> list[int]: + # Workers are on the same node, so we can directly allocate ports in the scheduler try: ports = find_free_ports(count, exclude_ports=set(self._allocated_ports)) self._allocated_ports.update(ports) @@ -323,6 +350,16 @@ def create_workers(self, job: Job, *args, **kwargs) -> list[str]: ) cmd = shlex.split(scheduling.cmd) cmd.extend(["--port", str(ports[0])]) + # Add name_resolve and worker identity args + cmd.extend(["--experiment-name", self.experiment_name]) + cmd.extend(["--trial-name", self.trial_name]) + cmd.extend(["--role", role]) + cmd.extend(["--worker-index", str(idx)]) + cmd.extend(["--name-resolve-type", self.name_resolve_config.type]) + cmd.extend( + ["--nfs-record-root", self.name_resolve_config.nfs_record_root] + ) + cmd.extend(["--etcd3-addr", self.name_resolve_config.etcd3_addr]) logger.info(f"Starting worker {worker_id}: {' '.join(cmd)}") if cmd[0].startswith("python"): diff --git a/areal/scheduler/rpc/rpc_server.py b/areal/scheduler/rpc/rpc_server.py index 853c0f504..8eee6d9bd 100644 --- a/areal/scheduler/rpc/rpc_server.py +++ b/areal/scheduler/rpc/rpc_server.py @@ -1,7 +1,6 @@ import argparse import logging as stdlib_logging import os -import socket import traceback from collections.abc import Callable from concurrent.futures import Future @@ -11,8 +10,9 @@ import orjson from flask import Flask, Response, jsonify, request +from werkzeug.serving import make_server -from areal.api.cli_args import BaseExperimentConfig +from areal.api.cli_args import BaseExperimentConfig, NameResolveConfig from areal.api.engine_api import InferenceEngine, TrainEngine from areal.platforms import current_platform from areal.scheduler.rpc import rtensor @@ -21,12 +21,13 @@ deserialize_value, serialize_value, ) -from areal.utils import logging, name_resolve, seeding +from areal.utils import logging, name_resolve, names, perf_tracer, seeding from areal.utils.data import ( broadcast_tensor_container, tensor_container_to, ) from areal.utils.dynamic_import import import_from_string +from areal.utils.network import gethostip logger = logging.getLogger("SyncRPCServer") @@ -45,6 +46,7 @@ _server_host: str = "0.0.0.0" _server_port: int = 8000 + # Create Flask app app = Flask(__name__) @@ -129,7 +131,8 @@ def configure(): config: BaseExperimentConfig def execute_configure(): - name_resolve.reconfigure(config.cluster.name_resolve) + if config.perf_tracer is not None: + perf_tracer.configure(config.perf_tracer, rank=rank, role=role) seeding.set_random_seed(config.seed, key=f"{role}{rank}") return { "status": "success", @@ -526,7 +529,12 @@ def main(): parser = argparse.ArgumentParser( description="AReaL Sync RPC Server for TrainEngine/InferenceEngine" ) - parser.add_argument("--port", type=int, required=True, help="Port to serve on") + parser.add_argument( + "--port", + type=int, + default=0, + help="Port to serve on (default: 0 = auto-assign)", + ) parser.add_argument( "--host", type=str, default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)" ) @@ -537,6 +545,16 @@ def main(): choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Log level for Werkzeug (Flask's WSGI server). Default: WARNING", ) + # name_resolve config + parser.add_argument("--experiment-name", type=str, required=True) + parser.add_argument("--trial-name", type=str, required=True) + parser.add_argument("--role", type=str, required=True) + parser.add_argument("--worker-index", type=int, required=True) + parser.add_argument("--name-resolve-type", type=str, default="nfs") + parser.add_argument( + "--nfs-record-root", type=str, default="/tmp/areal/name_resolve" + ) + parser.add_argument("--etcd3-addr", type=str, default="localhost:2379") args, _ = parser.parse_known_args() @@ -548,26 +566,40 @@ def main(): global _server_host, _server_port _server_host = args.host if _server_host == "0.0.0.0": - _server_host = socket.gethostbyname(socket.gethostname()) - _server_port = args.port + _server_host = gethostip() + + # Get worker identity + worker_id = f"{args.role}/{args.worker_index}" + + # Make a flask server + server = make_server(args.host, args.port, app, threaded=True) + _server_port = server.socket.getsockname()[1] - logger.info(f"Starting sync RPC server on {args.host}:{args.port}") + name_resolve.reconfigure( + NameResolveConfig( + type=args.name_resolve_type, + nfs_record_root=args.nfs_record_root, + etcd3_addr=args.etcd3_addr, + ) + ) + key = names.worker_discovery( + args.experiment_name, args.trial_name, args.role, args.worker_index + ) + name_resolve.add(key, f"{_server_host}:{_server_port}", replace=True) + + logger.info( + f"Starting sync RPC server on {_server_host}:{_server_port} for worker {worker_id}" + ) logger.info(f"Werkzeug log level: {args.werkzeug_log_level}") try: - app.run( - host=args.host, - port=args.port, - threaded=True, # Multi-threaded for concurrent /data/ request handling - processes=1, # Single process - debug=False, - use_reloader=False, - ) + server.serve_forever() except KeyboardInterrupt: logger.info("Shutting down sync RPC server") finally: cleanup_engine_thread() cleanup_engine() + server.shutdown() if __name__ == "__main__": diff --git a/areal/tests/test_local_scheduler.py b/areal/tests/test_local_scheduler.py index 082145630..6100029af 100644 --- a/areal/tests/test_local_scheduler.py +++ b/areal/tests/test_local_scheduler.py @@ -8,7 +8,6 @@ import pytest import requests -from areal.api.cli_args import BaseExperimentConfig from areal.api.scheduler_api import ( Job, SchedulingSpec, @@ -39,7 +38,10 @@ def scheduler(tmp_path): """Create a LocalScheduler instance with default configuration.""" return LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) @@ -47,7 +49,10 @@ def scheduler(tmp_path): def multi_gpu_scheduler(tmp_path): """Create a LocalScheduler instance with multiple GPUs.""" return LocalScheduler( - gpu_devices=[0, 1, 2], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1, 2], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) @@ -144,7 +149,8 @@ def test_init_with_explicit_gpu_devices(self, tmp_path): log_dir=str(tmp_path), startup_timeout=60.0, health_check_interval=2.0, - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) assert scheduler.gpu_devices == [0, 1, 2] @@ -160,7 +166,9 @@ def test_init_without_gpu_devices_uses_cuda_visible_devices(self, tmp_path): """Should detect GPUs from CUDA_VISIBLE_DEVICES environment variable.""" with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1,3"}): scheduler = LocalScheduler( - log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) assert scheduler.gpu_devices == [0, 1, 3] @@ -168,7 +176,9 @@ def test_init_with_invalid_cuda_visible_devices(self, tmp_path): """Should fall back to default [0] when CUDA_VISIBLE_DEVICES is invalid.""" with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "invalid,gpu,ids"}): scheduler = LocalScheduler( - log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) assert scheduler.gpu_devices == [0] @@ -178,7 +188,9 @@ def test_init_creates_log_directory(self, tmp_path): assert not log_dir.exists() scheduler = LocalScheduler( - log_dir=str(log_dir), exp_config=BaseExperimentConfig() + log_dir=str(log_dir), + experiment_name="test_exp", + trial_name="test_trial", ) assert log_dir.exists() @@ -193,7 +205,8 @@ def test_allocate_gpus_round_robin(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0, 1, 2], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) # First allocation @@ -211,7 +224,10 @@ def test_allocate_gpus_round_robin(self, tmp_path): def test_allocate_gpus_exceeds_available(self, tmp_path): """Should raise GPUAllocationError when requesting more GPUs than available.""" scheduler = LocalScheduler( - gpu_devices=[0, 1], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) with pytest.raises(GPUAllocationError) as exc_info: @@ -276,7 +292,8 @@ def test_allocate_ports_success(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) ports = scheduler._allocate_ports(3) @@ -295,7 +312,8 @@ def test_allocate_ports_excludes_already_allocated(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) # First allocation @@ -320,7 +338,8 @@ def test_allocate_ports_failure(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) with pytest.raises(PortAllocationError) as exc_info: @@ -352,7 +371,10 @@ def test_create_workers_with_default_spec( mock_popen.side_effect = [mock_process1, mock_process2] scheduler = LocalScheduler( - gpu_devices=[0, 1], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) with patch.object(scheduler, "_configure_worker", return_value=None): @@ -388,7 +410,8 @@ def test_create_workers_with_single_spec_for_all( scheduler = LocalScheduler( gpu_devices=[0, 1, 2], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -434,7 +457,10 @@ def test_create_workers_with_per_worker_specs( mock_popen.side_effect = [mock_proc1, mock_proc2] scheduler = LocalScheduler( - gpu_devices=[0, 1], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -480,7 +506,10 @@ def test_create_workers_with_custom_command( mock_popen.return_value = mock_proc scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -528,7 +557,10 @@ def test_create_workers_with_environment_variables( mock_popen.return_value = mock_proc scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -584,7 +616,8 @@ def test_create_workers_with_colocate_strategy( scheduler = LocalScheduler( gpu_devices=[0, 1, 2, 3], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) # Create target workers (actors) @@ -642,7 +675,10 @@ def test_create_workers_with_colocate_strategy( def test_create_workers_duplicate_role_error(self, tmp_path): """Should raise WorkerCreationError when attempting to create workers for existing role.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) with ( @@ -671,7 +707,10 @@ def test_create_workers_duplicate_role_error(self, tmp_path): def test_create_workers_zero_replicas_error(self, tmp_path): """Should raise WorkerCreationError when replicas is 0.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job(replicas=0, role="test") @@ -684,7 +723,10 @@ def test_create_workers_zero_replicas_error(self, tmp_path): def test_create_workers_invalid_specs_length(self, tmp_path): """Should raise WorkerCreationError when tasks length is invalid.""" scheduler = LocalScheduler( - gpu_devices=[0, 1], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -731,7 +773,10 @@ def test_create_workers_subprocess_fails_immediately( log_file.write_text("Error: Failed to start server\n") scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job(replicas=1, role="test") @@ -764,7 +809,10 @@ def test_create_workers_cleanup_on_partial_failure( mock_popen.return_value = mock_proc1 scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job(replicas=2, role="test") @@ -780,7 +828,10 @@ def test_create_workers_cleanup_on_partial_failure( def test_create_workers_colocate_strategy_missing_target(self, tmp_path): """Should raise WorkerCreationError when colocation strategy is missing target role.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job( @@ -1132,7 +1183,10 @@ class TestLogFileHandling: def test_read_log_tail_success(self, tmp_path): """Should read last N lines from log file.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) log_file = tmp_path / "test.log" @@ -1149,7 +1203,10 @@ def test_read_log_tail_success(self, tmp_path): def test_read_log_tail_file_not_found(self, tmp_path): """Should return error message when log file doesn't exist.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) tail = scheduler._read_log_tail("/nonexistent/file.log") @@ -1159,7 +1216,10 @@ def test_read_log_tail_file_not_found(self, tmp_path): def test_read_log_tail_fewer_lines_than_requested(self, tmp_path): """Should return all lines when file has fewer lines than requested.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) log_file = tmp_path / "test.log" @@ -1646,7 +1706,10 @@ class TestEdgeCases: def test_gpu_counter_wraps_correctly(self, tmp_path): """Should correctly wrap GPU counter for round-robin allocation.""" scheduler = LocalScheduler( - gpu_devices=[0, 1], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0, 1], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) # Allocate many times to ensure wrapping @@ -1667,7 +1730,8 @@ def test_port_allocation_accumulates_correctly(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) scheduler._allocate_ports(2) @@ -1703,7 +1767,10 @@ def test_worker_id_format( mock_popen.side_effect = mock_processes scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) job = Job(replicas=5, role="worker") @@ -1721,7 +1788,10 @@ def test_worker_id_format( def test_empty_workers_dict_operations(self, tmp_path): """Should handle operations on empty workers dictionary gracefully.""" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(tmp_path), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(tmp_path), + experiment_name="test_exp", + trial_name="test_trial", ) # Delete all workers when none exist @@ -1738,7 +1808,8 @@ def test_concurrent_gpu_allocations(self, tmp_path): scheduler = LocalScheduler( gpu_devices=[0, 1, 2], log_dir=str(tmp_path), - exp_config=BaseExperimentConfig(), + experiment_name="test_exp", + trial_name="test_trial", ) # Simulate multiple workers requesting GPUs simultaneously @@ -1754,7 +1825,10 @@ def test_log_directory_with_special_characters(self, tmp_path): """Should handle log directory paths with special characters.""" log_dir = tmp_path / "logs with spaces" / "special-chars_123" scheduler = LocalScheduler( - gpu_devices=[0], log_dir=str(log_dir), exp_config=BaseExperimentConfig() + gpu_devices=[0], + log_dir=str(log_dir), + experiment_name="test_exp", + trial_name="test_trial", ) assert log_dir.exists() diff --git a/areal/tests/test_perf_tracer.py b/areal/tests/test_perf_tracer.py index 212c2da8e..f56a5cba1 100644 --- a/areal/tests/test_perf_tracer.py +++ b/areal/tests/test_perf_tracer.py @@ -34,36 +34,42 @@ def _expected_trace_path( config: PerfTracerConfig, *, rank: int, + role: str | None = None, ) -> Path: base_dir = Path(os.path.expanduser(config.fileroot)) filename = f"traces-r{rank}.jsonl" - return ( + path = ( base_dir / "logs" / getpass.getuser() / config.experiment_name / config.trial_name / "perf_tracer" - / filename ) + if role: + path = path / role + return path / filename def _expected_session_trace_path( config: PerfTracerConfig, *, rank: int, + role: str | None = None, ) -> Path: base_dir = Path(os.path.expanduser(config.fileroot)) filename = f"sessions-r{rank}.jsonl" - return ( + path = ( base_dir / "logs" / getpass.getuser() / config.experiment_name / config.trial_name / "session_tracer" - / filename ) + if role: + path = path / role + return path / filename def _load_trace_events(path: Path) -> list[dict]: @@ -137,6 +143,41 @@ def test_perf_tracer_records_events_and_save(tmp_path): assert {"unit-block", "inner-mark", "outer-mark"}.issubset(event_names) +def test_perf_tracer_with_role(tmp_path): + """Test that role parameter is properly handled in tracer.""" + config = _make_config(tmp_path, experiment="unit", trial="role_test") + tracer = perf_tracer.PerfTracer(config, rank=0, role="actor") + assert tracer._rank == 0 # noqa: SLF001 + assert tracer._role == "actor" # noqa: SLF001 + + with tracer.trace_scope("role-block", category=Category.INSTR): + tracer.instant("role-mark") + + tracer.save() + saved_path = _expected_trace_path(config, rank=0, role="actor") + assert saved_path.exists() + + events = _load_trace_events(saved_path) + non_meta_events = [evt for evt in events if evt.get("ph") != "M"] + + # Verify role is in event args + for evt in non_meta_events: + assert evt["args"].get("role") == "actor" + assert evt["args"].get("rank") == 0 + + # Verify process metadata includes role in label + process_name_events = [ + evt + for evt in events + if evt.get("ph") == "M" and evt.get("name") == "process_name" + ] + assert len(process_name_events) > 0 + for evt in process_name_events: + process_name = evt["args"].get("name", "") + assert "[actor]" in process_name + assert "Rank 0" in process_name + + def test_perf_tracer_emits_separate_rank_logs(tmp_path): config0 = _make_config( tmp_path, @@ -237,6 +278,47 @@ async def test_global_tracer_configure_roundtrip(tmp_path): assert {"async-step", "inside-async", "sync-step"}.issubset(event_names) +@pytest.mark.asyncio +async def test_configure_with_role(tmp_path): + """Test that configure() function accepts role parameter.""" + config = _make_config(tmp_path, experiment="global", trial="role_roundtrip") + tracer = perf_tracer.configure( + config, + rank=1, + role="learner", + ) + + # Verify tracer has role stored + assert tracer._role == "learner" # noqa: SLF001 + + with perf_tracer.trace_scope("role-sync-step", category=Category.INSTR): + perf_tracer.instant("role-inside", args={"task": "train"}) + + tracer.save() + saved_path = _expected_trace_path(config, rank=1, role="learner") + assert saved_path.exists() + # Session tracer is not enabled in this config, so session_path doesn't exist + + events = _load_trace_events(saved_path) + + # Verify role in events + non_meta_events = [evt for evt in events if evt.get("ph") != "M"] + for evt in non_meta_events: + assert evt["args"].get("role") == "learner" + + # Verify process metadata + process_events = [ + evt + for evt in events + if evt.get("ph") == "M" and evt.get("name") == "process_name" + ] + assert len(process_events) > 0 + for evt in process_events: + pname = evt["args"].get("name", "") + assert "[learner]" in pname + assert "Rank 1" in pname + + @pytest.mark.asyncio async def test_async_multi_session_cross_phase_trace(tmp_path): config = _make_config(tmp_path, experiment="async", trial="sessions") @@ -327,6 +409,42 @@ def test_configure_rejects_repeated_calls(tmp_path): ) +def test_session_tracer_with_role(tmp_path): + """Test SessionTracer with role parameter produces correct output paths.""" + config = _make_config(tmp_path, experiment="session", trial="role_enabled") + config.session_tracer = SessionTracerConfig(enabled=True, flush_threshold=1) + tracer = perf_tracer.PerfTracer(config, rank=2, role="critic") + + session_tracer = tracer.session_tracer + assert session_tracer is not None + assert session_tracer._role == "critic" # noqa: SLF001 + + task_id = 456 + session_tracer.register_task(task_id) + session_id = session_tracer.register_session(task_id) + session_tracer.record_event( + session_id, + SessionTraceEvent.GENERATE_START, + ) + session_tracer.record_event( + session_id, + SessionTraceEvent.GENERATE_END, + ) + session_tracer.record_event( + session_id, SessionTraceEvent.FINALIZED, status="accepted" + ) + tracer.save(force=True) + + session_path = _expected_session_trace_path(config, rank=2, role="critic") + assert session_path.exists() + payload = [json.loads(line) for line in session_path.read_text().splitlines()] + assert any(entry["status"] == "accepted" for entry in payload) + + # Verify role field in session record + record = payload[0] + assert record.get("role") == "critic" + + def test_module_level_save_helper(tmp_path): config = _make_config(tmp_path, experiment="module", trial="helper") perf_tracer.configure( diff --git a/areal/tools/perf_trace_converter.py b/areal/tools/perf_trace_converter.py index 3bf3b8256..50a28d4f2 100644 --- a/areal/tools/perf_trace_converter.py +++ b/areal/tools/perf_trace_converter.py @@ -52,6 +52,18 @@ def _extract_rank(event: dict) -> str | int | None: return str(rank) +def _extract_role(event: dict) -> str | None: + """Best-effort extraction of the role identifier from a trace event.""" + + args = event.get("args") + if not isinstance(args, dict): + return None + role = args.get("role") + if role is None or not isinstance(role, str): + return None + return role.strip() or None + + def _format_rank(rank: str | int) -> str: return str(rank) @@ -64,6 +76,12 @@ def _rank_sort_key(rank: str | int | None) -> tuple[int, object]: return (1, str(rank)) +def _role_sort_key(role: str | None) -> tuple[int, str]: + if role is None: + return (1, "") + return (0, role) + + def _value_sort_key(value: object) -> tuple[int, object]: if isinstance(value, bool): return (0, int(value)) @@ -100,8 +118,10 @@ def _tid_sort_key(value: object) -> tuple[int, object]: def _remap_process_and_thread_ids( events: list[dict], - existing_process_names: dict[tuple[str | int, object], str] | None = None, - existing_thread_names: dict[tuple[str | int, object, object], str] | None = None, + existing_process_names: dict[tuple[str | int, str | None, object], str] + | None = None, + existing_thread_names: dict[tuple[str | int, str | None, object, object], str] + | None = None, ) -> list[dict]: """Remap pid/tid to be unique and return metadata events. @@ -113,69 +133,78 @@ def _remap_process_and_thread_ids( if existing_thread_names is None: existing_thread_names = {} - pid_keys: set[tuple[str | int, object]] = set() - tid_keys: set[tuple[str | int, object, object]] = set() + # pid_keys: (rank, role, original_pid) + pid_keys: set[tuple[str | int, str | None, object]] = set() + # tid_keys: (rank, role, original_pid, original_tid) + tid_keys: set[tuple[str | int, str | None, object, object]] = set() for event in events: rank = _extract_rank(event) if rank is None: continue + role = _extract_role(event) original_pid = event.get("pid") if original_pid is None: continue - pid_keys.add((rank, original_pid)) + pid_keys.add((rank, role, original_pid)) original_tid = event.get("tid") if original_tid is not None: - tid_keys.add((rank, original_pid, original_tid)) + tid_keys.add((rank, role, original_pid, original_tid)) sorted_pid_keys = sorted( pid_keys, - key=lambda item: (_rank_sort_key(item[0]), _value_sort_key(item[1])), + key=lambda item: ( + _rank_sort_key(item[0]), + _role_sort_key(item[1]), + _value_sort_key(item[2]), + ), ) - pid_map: dict[tuple[str | int, object], int] = {} - pid_labels: dict[int, tuple[str | int, object]] = {} + pid_map: dict[tuple[str | int, str | None, object], int] = {} + pid_labels: dict[int, tuple[str | int, str | None, object]] = {} for new_pid, key in enumerate(sorted_pid_keys): pid_map[key] = new_pid + 1 pid_labels[new_pid + 1] = key tid_counters: dict[int, int] = {} - tid_map: dict[tuple[str | int, object, object], int] = {} - tid_labels: dict[tuple[int, int], tuple[str | int, object]] = {} + tid_map: dict[tuple[str | int, str | None, object, object], int] = {} + tid_labels: dict[tuple[int, int], tuple[str | int, str | None, object]] = {} sorted_tid_keys = sorted( tid_keys, key=lambda item: ( _rank_sort_key(item[0]), - _value_sort_key(item[1]), - _tid_sort_key(item[2]), + _role_sort_key(item[1]), + _value_sort_key(item[2]), + _tid_sort_key(item[3]), ), ) for key in sorted_tid_keys: - rank, original_pid, original_tid = key - new_pid = pid_map[(rank, original_pid)] + rank, role, original_pid, original_tid = key + new_pid = pid_map[(rank, role, original_pid)] next_tid = tid_counters.get(new_pid, new_pid) tid_counters[new_pid] = next_tid + 1 tid_map[key] = next_tid - tid_labels[(new_pid, next_tid)] = (rank, original_tid) + tid_labels[(new_pid, next_tid)] = (rank, role, original_tid) for event in events: rank = _extract_rank(event) if rank is None: continue + role = _extract_role(event) original_pid = event.get("pid") if original_pid is None: continue - new_pid = pid_map[(rank, original_pid)] + new_pid = pid_map[(rank, role, original_pid)] event["pid"] = new_pid original_tid = event.get("tid") if original_tid is not None: - tid_key = (rank, original_pid, original_tid) + tid_key = (rank, role, original_pid, original_tid) if tid_key in tid_map: event["tid"] = tid_map[tid_key] else: @@ -183,51 +212,61 @@ def _remap_process_and_thread_ids( event["tid"] = None metadata_events: list[dict] = [] - for pid, (rank, original_pid) in pid_labels.items(): + for pid, (rank, role, original_pid) in pid_labels.items(): rank_text = _format_rank(rank) - process_name = existing_process_names.get((rank, original_pid)) + process_name = existing_process_names.get((rank, role, original_pid)) if process_name is None: - process_name = f"[Rank {rank_text}, Process {original_pid}]" + if role: + process_name = f"[{role}] Rank {rank_text}, Process {original_pid}" + else: + process_name = f"[Rank {rank_text}, Process {original_pid}]" + + args: dict = {"name": process_name, "rank": rank} + if role is not None: + args["role"] = role metadata_events.append( { "name": "process_name", "ph": "M", "pid": pid, - "args": { - "name": process_name, - "rank": rank, - }, + "args": args, } ) + sort_args: dict = {"sort_index": pid, "rank": rank} + if role is not None: + sort_args["role"] = role metadata_events.append( { "name": "process_sort_index", "ph": "M", "pid": pid, - "args": {"sort_index": pid, "rank": rank}, + "args": sort_args, } ) - for (pid, tid), (rank, original_tid) in tid_labels.items(): + for (pid, tid), (rank, role, original_tid) in tid_labels.items(): # Retrieve the correct original_pid for this new_pid - _, original_pid = pid_labels[pid] + _, _, original_pid = pid_labels[pid] rank_text = _format_rank(rank) - thread_name = existing_thread_names.get((rank, original_pid, original_tid)) + thread_name = existing_thread_names.get( + (rank, role, original_pid, original_tid) + ) if thread_name is None: thread_name = f"[Thread {original_tid}]" + thread_args: dict = {"name": thread_name, "rank": rank} + if role is not None: + thread_args["role"] = role + metadata_events.append( { "name": "thread_name", "ph": "M", "pid": pid, "tid": tid, - "args": { - "name": thread_name, - "rank": rank, - }, + "args": thread_args, } ) metadata_events.append( @@ -274,8 +313,8 @@ def convert_jsonl_to_chrome_trace( for path in sources: events.extend(_load_events(path)) - existing_process_names: dict[tuple[str | int, object], str] = {} - existing_thread_names: dict[tuple[str | int, object, object], str] = {} + existing_process_names: dict[tuple[str | int, str | None, object], str] = {} + existing_thread_names: dict[tuple[str | int, str | None, object, object], str] = {} filtered_events: list[dict] = [] ignored_metadata = { @@ -286,6 +325,7 @@ def convert_jsonl_to_chrome_trace( } for event in events: rank = _extract_rank(event) + role = _extract_role(event) if event.get("ph") == "M": name = event.get("name") args = event.get("args", {}) @@ -296,13 +336,13 @@ def convert_jsonl_to_chrome_trace( if name == "process_name" and isinstance(args, dict): pname = args.get("name") if pname: - existing_process_names[(rank, pid)] = str(pname) + existing_process_names[(rank, role, pid)] = str(pname) elif ( name == "thread_name" and tid is not None and isinstance(args, dict) ): tname = args.get("name") if tname: - existing_thread_names[(rank, pid, tid)] = str(tname) + existing_thread_names[(rank, role, pid, tid)] = str(tname) if name in ignored_metadata: continue @@ -311,25 +351,31 @@ def convert_jsonl_to_chrome_trace( events = filtered_events # Collect all unique flow IDs / correlations to remap them sequentially - flow_id_keys = set() + # flow_id_keys: (rank, role, flow_id) + flow_id_keys: set[tuple[str | int, str | None, object]] = set() for event in events: rank = _extract_rank(event) if rank is None: continue + role = _extract_role(event) # Collect from flow events if event.get("ph") in ("s", "t", "f") and "id" in event: - flow_id_keys.add((rank, event["id"])) + flow_id_keys.add((rank, role, event["id"])) # Collect from args.correlation args = event.get("args") if isinstance(args, dict) and "correlation" in args: - flow_id_keys.add((rank, args["correlation"])) + flow_id_keys.add((rank, role, args["correlation"])) # Sort and create mapping sorted_flow_keys = sorted( flow_id_keys, - key=lambda item: (_rank_sort_key(item[0]), _value_sort_key(item[1])), + key=lambda item: ( + _rank_sort_key(item[0]), + _role_sort_key(item[1]), + _value_sort_key(item[2]), + ), ) flow_id_map = {key: i for i, key in enumerate(sorted_flow_keys, start=1)} @@ -339,15 +385,16 @@ def convert_jsonl_to_chrome_trace( rank = _extract_rank(event) if rank is None: continue + role = _extract_role(event) if event.get("ph") in ("s", "t", "f") and "id" in event: - key = (rank, event["id"]) + key = (rank, role, event["id"]) if key in flow_id_map: event["id"] = flow_id_map[key] args = event.get("args") if isinstance(args, dict) and "correlation" in args: - key = (rank, args["correlation"]) + key = (rank, role, args["correlation"]) if key in flow_id_map: args["correlation"] = flow_id_map[key] @@ -360,6 +407,7 @@ def convert_jsonl_to_chrome_trace( metadata_events.sort( key=lambda event: ( _rank_sort_key(event.get("args", {}).get("rank")), + _role_sort_key(event.get("args", {}).get("role")), _metadata_name_sort_key(event.get("name")), _value_sort_key(event.get("pid")), _value_sort_key(event.get("tid")), diff --git a/areal/tools/plot_session_trace.py b/areal/tools/plot_session_trace.py index 016bd70c9..8b50f05cb 100644 --- a/areal/tools/plot_session_trace.py +++ b/areal/tools/plot_session_trace.py @@ -434,11 +434,15 @@ def _build_timeline( for idx, record_tuple in enumerate(timeline_df.itertuples()): rank_label = _format_index_label(getattr(record_tuple, "rank", math.nan)) + role_value = getattr(record_tuple, "role", None) task_label = _format_index_label(getattr(record_tuple, "task_id", math.nan)) session_label = _format_index_label( getattr(record_tuple, "session_id", math.nan) ) - label = f"r{rank_label}-t{task_label}-s{session_label}" + if role_value: + label = f"[{role_value}]r{rank_label}-t{task_label}-s{session_label}" + else: + label = f"r{rank_label}-t{task_label}-s{session_label}" submit_offset = getattr(record_tuple, "submit_ts_offset", math.nan) finalized_offset = getattr(record_tuple, "finalized_ts_offset", math.nan) @@ -612,6 +616,10 @@ def _subset_rank_label(value: Any) -> str: # Build customdata with available fields custom_cols: list[str] = ["rank", "task_id", "session_id", "status"] + # Add role if available + has_role = "role" in subset.columns and subset["role"].notna().any() + if has_role: + custom_cols.append("role") # Add optional duration fields if they exist for col_name in ["generate_s", "reward_s", "toolcall_s"]: if col_name in subset.columns: @@ -625,6 +633,9 @@ def _subset_rank_label(value: Any) -> str: "Status: %{customdata[3]}", ] custom_idx = 4 + if has_role: + hover_parts.insert(0, "Role: %{customdata[4]}") + custom_idx += 1 hover_parts.append("Total duration: %{y:.3f}s") if "generate_s" in custom_cols: hover_parts.append(f"Generate: %{{customdata[{custom_idx}]:.3f}}s") diff --git a/areal/utils/names.py b/areal/utils/names.py index 5479f427c..00773cf69 100644 --- a/areal/utils/names.py +++ b/areal/utils/names.py @@ -26,3 +26,7 @@ def gen_servers(experiment_name, trial_name): def update_weights_from_disk(experiment_name, trial_name, model_version): return f"{USER_NAMESPACE}/{experiment_name}/{trial_name}/update_weights_from_disk/{model_version}" + + +def worker_discovery(experiment_name, trial_name, role, task_id): + return f"{USER_NAMESPACE}/{experiment_name}/{trial_name}/workers/{role}/{task_id}" diff --git a/areal/utils/perf_tracer.py b/areal/utils/perf_tracer.py index 737774b25..39abbd9ee 100644 --- a/areal/utils/perf_tracer.py +++ b/areal/utils/perf_tracer.py @@ -198,6 +198,7 @@ def _default_trace_path( config: PerfTracerConfig, *, rank: int, + role: str | None = None, filename: str = _PERF_TRACE_FILENAME, subdir: str | None = None, ) -> str: @@ -212,6 +213,9 @@ def _default_trace_path( Configuration containing fileroot, experiment_name, and trial_name. rank : int Rank identifier to include in filename. + role : str | None, default=None + Optional role identifier (e.g., "actor", "learner"). When provided, + creates a subdirectory under the tracer directory. filename : str, default="traces.jsonl" Base filename before rank qualification. subdir : str | None, default=None @@ -232,6 +236,8 @@ def _default_trace_path( ) if subdir: base_dir = os.path.join(base_dir, subdir) + if role: + base_dir = os.path.join(base_dir, role) return os.path.join(base_dir, _rank_qualified_filename(filename, rank)) @@ -450,6 +456,8 @@ class SessionRecord: Unique identifier for this session. rank : int Rank identifier for the process that created this session. + role : str | None, default=None + Optional role identifier (e.g., "actor", "learner") for the process. submit_ts : float Timestamp when the session was submitted (from time.time(), wall-clock time). status : str, default="pending" @@ -469,6 +477,7 @@ class SessionRecord: task_id: int session_id: int rank: int + role: str | None submit_ts: float status: str = "pending" reason: str | None = None @@ -613,6 +622,7 @@ def default_field_specs(cls) -> tuple[FieldSpec, ...]: FieldSpec("task_id"), FieldSpec("session_id"), FieldSpec("rank"), + FieldSpec("role", omit_if_none=True), FieldSpec("status"), FieldSpec("reason", omit_if_none=True), FieldSpec("submit_ts"), @@ -937,6 +947,8 @@ class SessionTracer: Absolute path to the output JSONL file where session records will be written. rank : int Rank identifier for this process in distributed training. + role : str | None, default=None + Optional role identifier (e.g., "actor", "learner") for the process. See Also -------- @@ -951,9 +963,11 @@ def __init__( *, output_path: str, rank: int, + role: str | None = None, ) -> None: self._config = config self._rank = rank + self._role = role self._lock = threading.Lock() self._next_session_id = 0 # task id sequence and mapping from task_id -> set(session_id) @@ -986,6 +1000,7 @@ def register_session(self, task_id: int) -> int: task_id=task_id, session_id=session_id, rank=self._rank, + role=self._role, submit_ts=now, ) self._task_to_sessions.setdefault(task_id, set()).add(session_id) @@ -1273,6 +1288,8 @@ class PerfTracer: Configuration including enable flags, output paths, and session tracer settings. rank : int Rank identifier for this process in distributed training. + role : str | None, default=None + Optional role identifier (e.g., "actor", "learner") for the process. See Also -------- @@ -1280,12 +1297,15 @@ class PerfTracer: trace_session_event : Function to record session events """ - def __init__(self, config: PerfTracerConfig, *, rank: int) -> None: + def __init__( + self, config: PerfTracerConfig, *, rank: int, role: str | None = None + ) -> None: if rank < 0: raise ValueError("rank must be a non-negative integer") self._config = config self._enabled = config.enabled self._rank = rank + self._role = role self._events: list[dict[str, Any]] = [] self._lock = threading.Lock() self._pid = os.getpid() @@ -1298,12 +1318,13 @@ def __init__(self, config: PerfTracerConfig, *, rank: int) -> None: self._output_path = _default_trace_path( config, rank=rank, + role=role, subdir="perf_tracer", ) self._save_interval = _normalize_save_interval(config) self._profile_steps = self._normalize_profile_steps(config.profile_steps) self._session_tracer: SessionTracer | None = None - self._configure_session_tracer(config, rank=rank) + self._configure_session_tracer(config, rank=rank, role=role) # ------------------------------------------------------------------ # Configuration helpers @@ -1315,7 +1336,9 @@ def enabled(self) -> bool: def set_enabled(self, flag: bool) -> None: self._enabled = flag - def _configure_session_tracer(self, config: PerfTracerConfig, *, rank: int) -> None: + def _configure_session_tracer( + self, config: PerfTracerConfig, *, rank: int, role: str | None = None + ) -> None: session_cfg = getattr(config, "session_tracer", None) enabled = bool(session_cfg and getattr(session_cfg, "enabled", False)) if enabled: @@ -1324,6 +1347,7 @@ def _configure_session_tracer(self, config: PerfTracerConfig, *, rank: int) -> N config, filename=_SESSION_TRACE_FILENAME, rank=rank, + role=role, subdir="session_tracer", ) if self._session_tracer is None: @@ -1331,6 +1355,7 @@ def _configure_session_tracer(self, config: PerfTracerConfig, *, rank: int) -> N session_cfg, output_path=output_path, rank=rank, + role=role, ) else: raise RuntimeError("Session tracer is already configured") @@ -1562,12 +1587,14 @@ def merge_profiler_trace( if "ts" in new_event: new_event["ts"] = new_event["ts"] + time_offset_us - # Attach rank info + # Attach rank and role info event_args = new_event.get("args") if not isinstance(event_args, dict): event_args = {} new_event["args"] = event_args event_args["rank"] = self._rank + if self._role is not None: + event_args["role"] = self._role new_events.append(new_event) @@ -1667,6 +1694,8 @@ def _record_event(self, event: dict[str, Any]) -> None: if event.get("ph") != "M": args = event.setdefault("args", {}) args.setdefault("rank", self._rank) + if self._role is not None: + args.setdefault("role", self._role) with self._lock: self._events.append(event) @@ -1720,7 +1749,10 @@ def _ensure_process_metadata(self, pid: int) -> None: if pid in self._process_meta_emitted: return self._process_meta_emitted.add(pid) - rank_label = f"Rank {self._rank}, Process" + if self._role is not None: + rank_label = f"[{self._role}] Rank {self._rank}, Process" + else: + rank_label = f"Rank {self._rank}, Process" process_name_event = { "name": "process_name", "ph": "M", @@ -1787,6 +1819,7 @@ def configure( config: PerfTracerConfig, *, rank: int, + role: str | None = None, ) -> PerfTracer: global GLOBAL_TRACER with _GLOBAL_TRACER_LOCK: @@ -1795,12 +1828,13 @@ def configure( "PerfTracer has already been configured. Call perf_tracer.reset() " "before configuring again." ) - GLOBAL_TRACER = PerfTracer(config, rank=rank) + GLOBAL_TRACER = PerfTracer(config, rank=rank, role=role) logger.info( - "Configured global PerfTracer: enabled=%s, session_tracing=%s, rank=%s", + "Configured global PerfTracer: enabled=%s, session_tracing=%s, rank=%s, role=%s", GLOBAL_TRACER.enabled, GLOBAL_TRACER.session_tracer is not None, rank, + role, ) return GLOBAL_TRACER From 1293e960c1d4a4639b69b209344735760d184a6f Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 21:31:00 +0800 Subject: [PATCH 3/7] fix perf_tracer in the single-controller mode --- areal/api/engine_api.py | 33 ++++++++++++ areal/controller/rollout_controller.py | 21 +++++++- areal/controller/train_controller.py | 21 +++++++- areal/engine/fsdp_engine.py | 19 ++++++- areal/engine/megatron_engine.py | 12 ++++- areal/engine/sglang_remote.py | 12 ++++- areal/engine/vllm_remote.py | 12 ++++- areal/experimental/trainer/rl.py | 31 +++++++++-- areal/experimental/trainer/sft.py | 17 ++++-- areal/scheduler/local.py | 2 +- areal/scheduler/rpc/rpc_server.py | 63 ++++++++++++++++++---- areal/tests/test_local_scheduler.py | 38 ++++++------- areal/tools/perf_trace_converter.py | 2 +- examples/single-controller/gsm8k_grpo.yaml | 34 ++++++++---- 14 files changed, 259 insertions(+), 58 deletions(-) diff --git a/areal/api/engine_api.py b/areal/api/engine_api.py index 9dc549e5f..ebc39b78c 100644 --- a/areal/api/engine_api.py +++ b/areal/api/engine_api.py @@ -10,6 +10,7 @@ from torchdata.stateful_dataloader import StatefulDataLoader from areal.api.alloc_mode import ParallelStrategy +from areal.api.cli_args import PerfTracerConfig from areal.api.io_struct import ( DeviceRuntimeInfo, LocalInfServerInfo, @@ -478,6 +479,22 @@ def offload(self) -> None: def get_device_stats(self) -> DeviceRuntimeInfo: raise NotImplementedError() + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + """Save performance tracer data. + + Parameters + ---------- + step : int, optional + The current training step number, by default None + force : bool, optional + If True, force save regardless of internal conditions, by default False + """ + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + """Configure performance tracer.""" + class InferenceEngine(abc.ABC): def initialize(self, *args, **kwargs): @@ -867,3 +884,19 @@ def export_stats(self) -> dict[str, float]: The recorded scalar statistics. """ raise NotImplementedError() + + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + """Save performance tracer data. + + Parameters + ---------- + step : int, optional + The current training step number, by default None + force : bool, optional + If True, force save regardless of internal conditions, by default False + """ + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + """Configure performance tracer.""" diff --git a/areal/controller/rollout_controller.py b/areal/controller/rollout_controller.py index 56b000804..e3a8fba01 100644 --- a/areal/controller/rollout_controller.py +++ b/areal/controller/rollout_controller.py @@ -11,7 +11,7 @@ from torchdata.stateful_dataloader import StatefulDataLoader from areal.api.alloc_mode import AllocationMode -from areal.api.cli_args import InferenceEngineConfig, SchedulingSpec +from areal.api.cli_args import InferenceEngineConfig, PerfTracerConfig, SchedulingSpec from areal.api.engine_api import InferenceEngine from areal.api.io_struct import ( LocalInfServerInfo, @@ -551,6 +551,25 @@ def export_stats(self) -> dict[str, float]: final_stats[k] = v / counts[count_key] return final_stats + def config_perf_tracer(self, config: PerfTracerConfig, role: str) -> None: + async def _call(): + tasks = [ + self.scheduler.async_call_engine( + worker_id=worker.id, + method="config_perf_tracer", + rank=rank, + role=role, + config=config, + ) + for rank, worker in enumerate(self.workers) + ] + return await asyncio.gather(*tasks) + + asyncio.run(_call()) + + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + self._collective_rpc("save_perf_tracer", step=step, force=force) + @property def staleness_manager(self): return self._staleness_manager diff --git a/areal/controller/train_controller.py b/areal/controller/train_controller.py index d07f6637e..e6934dd94 100644 --- a/areal/controller/train_controller.py +++ b/areal/controller/train_controller.py @@ -9,7 +9,7 @@ from torchdata.stateful_dataloader import StatefulDataLoader from areal.api.alloc_mode import ParallelStrategy -from areal.api.cli_args import TrainEngineConfig +from areal.api.cli_args import PerfTracerConfig, TrainEngineConfig from areal.api.engine_api import TrainEngine from areal.api.io_struct import ( AllocationMode, @@ -490,6 +490,25 @@ def connect_engine(self, rollout: RolloutController, meta: WeightUpdateMeta): def get_device_stats(self): return self._custom_function_call("get_device_stats") + def config_perf_tracer(self, config: PerfTracerConfig, role: str) -> None: + async def _call(): + tasks = [ + self.scheduler.async_call_engine( + worker_id=worker.id, + method="config_perf_tracer", + rank=rank, + role=role, + config=config, + ) + for rank, worker in enumerate(self.workers) + ] + return await asyncio.gather(*tasks) + + self._run_async_task(_call()) + + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + self._custom_function_call("save_perf_tracer", step=step, force=force) + def prepare_batch( self, dataloader: StatefulDataLoader, diff --git a/areal/engine/fsdp_engine.py b/areal/engine/fsdp_engine.py index 2ffdd76eb..135b776ae 100644 --- a/areal/engine/fsdp_engine.py +++ b/areal/engine/fsdp_engine.py @@ -41,7 +41,7 @@ ) from areal.api.alloc_mode import FSDPParallelStrategy, ParallelStrategy -from areal.api.cli_args import TrainEngineConfig +from areal.api.cli_args import PerfTracerConfig, TrainEngineConfig from areal.api.engine_api import InferenceEngine, TrainEngine from areal.api.io_struct import ( DeviceRuntimeInfo, @@ -59,7 +59,14 @@ ) from areal.models.transformers.ulyssess_patch import apply_monkey_patch from areal.platforms import current_platform -from areal.utils import logging, name_resolve, names, pkg_version, stats_tracker +from areal.utils import ( + logging, + name_resolve, + names, + perf_tracer, + pkg_version, + stats_tracker, +) from areal.utils.constants import DIST_GROUP_DEFAULT_TIMEOUT from areal.utils.data import ( MicroBatchItem, @@ -616,6 +623,14 @@ def clear_batches(self, *args): def get_device_stats(self) -> DeviceRuntimeInfo: return DeviceRuntimeInfo.get_current() + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + perf_tracer.save(step=step, force=force) + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + perf_tracer.configure(config, rank=rank, role=role) + def _make_parallel_strategy( self, parallel_strategy: ParallelStrategy ) -> FSDPParallelStrategy: diff --git a/areal/engine/megatron_engine.py b/areal/engine/megatron_engine.py index e463f915e..b85839faa 100644 --- a/areal/engine/megatron_engine.py +++ b/areal/engine/megatron_engine.py @@ -27,7 +27,7 @@ from transformers import PretrainedConfig from areal.api.alloc_mode import MegatronParallelStrategy, ParallelStrategy -from areal.api.cli_args import MicroBatchSpec, TrainEngineConfig +from areal.api.cli_args import MicroBatchSpec, PerfTracerConfig, TrainEngineConfig from areal.api.engine_api import InferenceEngine, TrainEngine from areal.api.io_struct import ( DeviceRuntimeInfo, @@ -47,7 +47,7 @@ from areal.models.mcore.hf_save import save_weights_to_hf_with_mbridge_fast from areal.models.mcore.registry import make_hf_and_mcore_config, make_mcore_model from areal.platforms import current_platform -from areal.utils import logging, name_resolve, names, stats_tracker +from areal.utils import logging, name_resolve, names, perf_tracer, stats_tracker from areal.utils.constants import DIST_GROUP_DEFAULT_TIMEOUT from areal.utils.data import ( MicroBatchItem, @@ -695,6 +695,14 @@ def clear_batches(self, *args): def get_device_stats(self) -> DeviceRuntimeInfo: return DeviceRuntimeInfo.get_current() + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + perf_tracer.save(step=step, force=force) + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + perf_tracer.configure(config, rank=rank, role=role) + def _make_parallel_strategy( self, parallel_strategy: ParallelStrategy ) -> MegatronParallelStrategy: diff --git a/areal/engine/sglang_remote.py b/areal/engine/sglang_remote.py index 6846c0230..c49b5c0f6 100644 --- a/areal/engine/sglang_remote.py +++ b/areal/engine/sglang_remote.py @@ -8,7 +8,7 @@ from torchdata.stateful_dataloader import StatefulDataLoader -from areal.api.cli_args import InferenceEngineConfig, SGLangConfig +from areal.api.cli_args import InferenceEngineConfig, PerfTracerConfig, SGLangConfig from areal.api.engine_api import InferenceEngine from areal.api.io_struct import ( HttpGenerationResult, @@ -26,7 +26,7 @@ from areal.core import RemoteInfEngine from areal.core.workflow_executor import WorkflowExecutor from areal.platforms import current_platform -from areal.utils import stats_tracker +from areal.utils import perf_tracer, stats_tracker from areal.utils.launcher import TRITON_CACHE_PATH @@ -364,3 +364,11 @@ def as_controller( def clear_batches(self, *args): """Placeholder method of single-controller API.""" + + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + perf_tracer.save(step=step, force=force) + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + perf_tracer.configure(config, rank=rank, role=role) diff --git a/areal/engine/vllm_remote.py b/areal/engine/vllm_remote.py index 56c9686bd..5f10d181f 100644 --- a/areal/engine/vllm_remote.py +++ b/areal/engine/vllm_remote.py @@ -8,7 +8,7 @@ from torchdata.stateful_dataloader import StatefulDataLoader -from areal.api.cli_args import InferenceEngineConfig, vLLMConfig +from areal.api.cli_args import InferenceEngineConfig, PerfTracerConfig, vLLMConfig from areal.api.engine_api import InferenceEngine from areal.api.io_struct import ( HttpGenerationResult, @@ -26,7 +26,7 @@ from areal.core import RemoteInfEngine from areal.core.workflow_executor import WorkflowExecutor from areal.platforms import current_platform -from areal.utils import stats_tracker +from areal.utils import perf_tracer, stats_tracker from areal.utils.launcher import TRITON_CACHE_PATH @@ -406,3 +406,11 @@ def as_controller( def clear_batches(self, *args): """Placeholder method of single-controller API.""" + + def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None: + perf_tracer.save(step=step, force=force) + + def config_perf_tracer( + self, config: PerfTracerConfig, rank: int, role: str + ) -> None: + perf_tracer.configure(config, rank=rank, role=role) diff --git a/areal/experimental/trainer/rl.py b/areal/experimental/trainer/rl.py index bd18f825e..442caf6b3 100644 --- a/areal/experimental/trainer/rl.py +++ b/areal/experimental/trainer/rl.py @@ -58,9 +58,6 @@ def __init__( valid_dataset: Dataset | None = None, ): rank = int(os.getenv("RANK", "0")) - # Configure performance tracer - if config.perf_tracer is not None: - perf_tracer.configure(config.perf_tracer, rank=rank, role="master") self.config = config self.processor, self.tokenizer = load_hf_processor_and_tokenizer( @@ -175,6 +172,8 @@ def __init__( weight_update_meta=self.weight_update_meta, ) + self._config_perf_tracer() + def train( self, workflow: RolloutWorkflow | type[RolloutWorkflow] | str, @@ -378,7 +377,7 @@ def train( # Resume rollout self.rollout.resume() - perf_tracer.save(step=global_step) + self._save_perf_tracer(step=global_step) def close(self): self.stats_logger.close() @@ -391,6 +390,30 @@ def close(self): self.actor.destroy() perf_tracer.save(force=True) + def _config_perf_tracer(self): + rank = int(os.getenv("RANK", "0")) + if self.config.perf_tracer is not None: + perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") + self.actor.config_perf_tracer(self.config.perf_tracer, role="actor") + if self.critic is not None: + self.critic.config_perf_tracer(self.config.perf_tracer, role="critic") + if self.ref is not None: + self.ref.config_perf_tracer(self.config.perf_tracer, role="ref") + self.rollout.config_perf_tracer(self.config.perf_tracer, role="rollout") + self.eval_rollout.config_perf_tracer( + self.config.perf_tracer, role="eval-rollout" + ) + + def _save_perf_tracer(self, step: int): + self.actor.save_perf_tracer(step=step) + if self.ref is not None: + self.ref.save_perf_tracer(step=step) + if self.critic is not None: + self.critic.save_perf_tracer(step=step) + self.eval_rollout.save_perf_tracer(step=step) + self.rollout.save_perf_tracer(step=step) + perf_tracer.save(step=step) + def _init_scheduler(self) -> Scheduler: cfg = self.config.scheduler if cfg.type == "local": diff --git a/areal/experimental/trainer/sft.py b/areal/experimental/trainer/sft.py index 008edfeff..d33f61d79 100644 --- a/areal/experimental/trainer/sft.py +++ b/areal/experimental/trainer/sft.py @@ -45,9 +45,6 @@ def __init__( valid_dataset: Dataset | None = None, ): rank = int(os.getenv("RANK", "0")) - # Configure performance tracer - if config.perf_tracer is not None: - perf_tracer.configure(config.perf_tracer, rank=rank) self.config = config self.processor, self.tokenizer = load_hf_processor_and_tokenizer( @@ -119,6 +116,8 @@ def __init__( self.train_dataloader, ) + self._config_perf_tracer() + def train(self): config = self.config start_step = ( @@ -221,13 +220,23 @@ def train(self): epoch=epoch, epoch_step=step, global_step=global_step ) - perf_tracer.save(step=global_step) + self._save_perf_tracer(step=global_step) def close(self): self.stats_logger.close() self.actor.destroy() perf_tracer.save(force=True) + def _config_perf_tracer(self): + rank = int(os.getenv("RANK", "0")) + if self.config.perf_tracer is not None: + perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") + self.actor.config_perf_tracer(self.config.perf_tracer, role="actor") + + def _save_perf_tracer(self, step: int): + self.actor.save_perf_tracer(step=step) + perf_tracer.save(step=step) + def _init_scheduler(self) -> Scheduler: cfg = self.config.scheduler if cfg.type == "local": diff --git a/areal/scheduler/local.py b/areal/scheduler/local.py index c102a37df..936ad5317 100644 --- a/areal/scheduler/local.py +++ b/areal/scheduler/local.py @@ -331,7 +331,7 @@ def create_workers(self, job: Job, *args, **kwargs) -> list[str]: if scheduling.env_vars: env.update(scheduling.env_vars) - log_file = self.log_dir / f"{worker_id.replace('/', '_')}.log" + log_file = self.log_dir / f"{role}.log" if not scheduling.cmd: self._cleanup_workers(workers) diff --git a/areal/scheduler/rpc/rpc_server.py b/areal/scheduler/rpc/rpc_server.py index 8eee6d9bd..11170ad1f 100644 --- a/areal/scheduler/rpc/rpc_server.py +++ b/areal/scheduler/rpc/rpc_server.py @@ -131,8 +131,6 @@ def configure(): config: BaseExperimentConfig def execute_configure(): - if config.perf_tracer is not None: - perf_tracer.configure(config.perf_tracer, rank=rank, role=role) seeding.set_random_seed(config.seed, key=f"{role}{rank}") return { "status": "success", @@ -332,6 +330,17 @@ def execute_in_engine_thread(): f"Broadcasting data for TrainEngine method: {method_name}" ) + nonlocal raw_args, raw_kwargs + raw_args = broadcast_tensor_container( + raw_args, + src_rank=_engine.current_data_parallel_head(), + group=_engine.context_and_model_parallel_group, + ) + raw_kwargs = broadcast_tensor_container( + raw_kwargs, + src_rank=_engine.current_data_parallel_head(), + group=_engine.context_and_model_parallel_group, + ) args_bcast = tensor_container_to( args, current_platform.current_device() ) @@ -354,14 +363,49 @@ def execute_in_engine_thread(): kwargs_bcast = kwargs logger.debug(f"Calling engine method: {method_name}") - method = getattr(_engine, method_name) - result = method(*args_bcast, **kwargs_bcast) - # Handle update weights future - if isinstance(result, Future): - logger.debug("Waiting for update weights future") - result = result.result() - logger.debug("Update weights future done") + # Determine trace category based on method name + category = "misc" # Default category + method_lower = method_name.lower() + if any(keyword in method_lower for keyword in ["submit", "wait"]): + category = "scheduler" + elif any( + keyword in method_lower + for keyword in ["update_weights", "broadcast"] + ): + category = "comm" + elif any(keyword in method_lower for keyword in ["save", "load"]): + category = "io" + elif any( + keyword in method_lower + for keyword in [ + "train", + "eval", + "forward", + "compute", + "step", + "update", + "optimizer", + "zero_grad", + "lr_scheduler", + ] + ): + category = "compute" + + # Wrap engine method call with perf_tracer + with perf_tracer.trace_scope( + f"rpc.{method_name}", + category=category, + args={"method": method_name}, + ): + method = getattr(_engine, method_name) + result = method(*args_bcast, **kwargs_bcast) + + # Handle update weights future + if isinstance(result, Future): + logger.debug("Waiting for update weights future") + result = result.result() + logger.debug("Update weights future done") return result except AttributeError as e: @@ -597,6 +641,7 @@ def main(): except KeyboardInterrupt: logger.info("Shutting down sync RPC server") finally: + perf_tracer.save(force=True) cleanup_engine_thread() cleanup_engine() server.shutdown() diff --git a/areal/tests/test_local_scheduler.py b/areal/tests/test_local_scheduler.py index 6100029af..325d12786 100644 --- a/areal/tests/test_local_scheduler.py +++ b/areal/tests/test_local_scheduler.py @@ -769,7 +769,7 @@ def test_create_workers_subprocess_fails_immediately( mock_popen.return_value = mock_proc # Create log file with error message - log_file = tmp_path / "test_0.log" + log_file = tmp_path / "test.log" log_file.write_text("Error: Failed to start server\n") scheduler = LocalScheduler( @@ -872,10 +872,10 @@ def test_get_workers_success(self, mock_sleep, scheduler, tmp_path): """Should return workers when all are ready.""" # Create mock workers worker1 = create_worker_info( - worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test_0.log") + worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test.log") ) worker2 = create_worker_info( - worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test_1.log") + worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test.log") ) scheduler._workers["test"] = [worker1, worker2] @@ -894,7 +894,7 @@ def test_get_workers_timeout(self, mock_sleep, mock_time, scheduler, tmp_path): # Mock time progression - provide enough values mock_time.side_effect = [0.0] + [i for i in range(1, 20)] - worker = create_worker_info(log_file=str(tmp_path / "test_0.log")) + worker = create_worker_info(log_file=str(tmp_path / "test.log")) worker.created_at = 0.0 scheduler._workers["test"] = [worker] @@ -909,7 +909,7 @@ def test_get_workers_timeout(self, mock_sleep, mock_time, scheduler, tmp_path): def test_get_workers_process_died(self, scheduler, tmp_path): """Should raise WorkerFailedError when worker process dies during readiness check.""" - log_file = tmp_path / "test_0.log" + log_file = tmp_path / "test.log" log_file.write_text("Error: Connection refused\n") # Process dies after first check @@ -931,10 +931,10 @@ def test_get_workers_process_died(self, scheduler, tmp_path): def test_get_workers_gradual_readiness(self, mock_sleep, scheduler, tmp_path): """Should wait for all workers to become ready gradually.""" worker1 = create_worker_info( - worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test_0.log") + worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test.log") ) worker2 = create_worker_info( - worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test_1.log") + worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test.log") ) scheduler._workers["test"] = [worker1, worker2] @@ -982,10 +982,10 @@ def test_is_worker_ready_connection_error(self, scheduler, tmp_path): def test_check_worker_health_all_healthy(self, scheduler, tmp_path): """Should pass when all workers are healthy.""" worker1 = create_worker_info( - worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test_0.log") + worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test.log") ) worker2 = create_worker_info( - worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test_1.log") + worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test.log") ) scheduler._workers["test"] = [worker1, worker2] @@ -995,7 +995,7 @@ def test_check_worker_health_all_healthy(self, scheduler, tmp_path): def test_check_worker_health_worker_failed(self, scheduler, tmp_path): """Should raise WorkerFailedError when a worker has failed.""" - log_file = tmp_path / "test_0.log" + log_file = tmp_path / "test.log" log_file.write_text("Killed by signal\n") mock_proc = create_mock_process(is_alive=False, exit_code=137) @@ -1025,13 +1025,13 @@ def test_delete_workers_specific_role(self, scheduler, tmp_path): worker_id="role1/0", role="role1", ports=["8000"], - log_file=str(tmp_path / "role1_0.log"), + log_file=str(tmp_path / "role1.log"), ) worker2 = create_worker_info( worker_id="role2/0", role="role2", ports=["8001"], - log_file=str(tmp_path / "role2_0.log"), + log_file=str(tmp_path / "role2.log"), ) scheduler._workers["role1"] = [worker1] @@ -1052,13 +1052,13 @@ def test_delete_workers_all_roles(self, scheduler, tmp_path): worker_id="role1/0", role="role1", ports=["8000"], - log_file=str(tmp_path / "role1_0.log"), + log_file=str(tmp_path / "role1.log"), ) worker2 = create_worker_info( worker_id="role2/0", role="role2", ports=["8001"], - log_file=str(tmp_path / "role2_0.log"), + log_file=str(tmp_path / "role2.log"), ) scheduler._workers["role1"] = [worker1] @@ -1093,10 +1093,10 @@ def test_cleanup_workers_handles_errors( ): """Should continue cleanup even if terminating a process fails.""" worker1 = create_worker_info( - worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test_0.log") + worker_id="test/0", ports=["8000"], log_file=str(tmp_path / "test.log") ) worker2 = create_worker_info( - worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test_1.log") + worker_id="test/1", ports=["8001"], log_file=str(tmp_path / "test.log") ) # First termination fails, second succeeds @@ -1651,13 +1651,13 @@ def test_find_worker_by_id_success(self, scheduler, tmp_path): worker_id="role1/0", role="role1", ports=["8000"], - log_file=str(tmp_path / "role1_0.log"), + log_file=str(tmp_path / "role1.log"), ) worker2 = create_worker_info( worker_id="role2/0", role="role2", ports=["8001"], - log_file=str(tmp_path / "role2_0.log"), + log_file=str(tmp_path / "role2.log"), ) scheduler._workers["role1"] = [worker1] @@ -1671,7 +1671,7 @@ def test_find_worker_by_id_success(self, scheduler, tmp_path): def test_find_worker_by_id_not_found(self, scheduler, tmp_path): """Should return None when worker ID is not found.""" worker = create_worker_info( - worker_id="role1/0", role="role1", log_file=str(tmp_path / "role1_0.log") + worker_id="role1/0", role="role1", log_file=str(tmp_path / "role1.log") ) scheduler._workers["role1"] = [worker] diff --git a/areal/tools/perf_trace_converter.py b/areal/tools/perf_trace_converter.py index 50a28d4f2..f1b739f1a 100644 --- a/areal/tools/perf_trace_converter.py +++ b/areal/tools/perf_trace_converter.py @@ -286,7 +286,7 @@ def _resolve_trace_files(source: Path) -> list[Path]: if source.is_file(): return [source] if source.is_dir(): - return sorted(p for p in source.glob("*.jsonl") if p.is_file()) + return sorted(p for p in source.rglob("*.jsonl") if p.is_file()) matches = [Path(p) for p in glob(str(source), recursive=True)] files = [p for p in matches if p.is_file()] return sorted(files) diff --git a/examples/single-controller/gsm8k_grpo.yaml b/examples/single-controller/gsm8k_grpo.yaml index 7e37bd340..64c75a357 100644 --- a/examples/single-controller/gsm8k_grpo.yaml +++ b/examples/single-controller/gsm8k_grpo.yaml @@ -24,11 +24,7 @@ rollout: consumer_batch_size: ${train_dataset.batch_size} max_head_offpolicyness: 2 enable_rollout_tracing: false - scheduling_spec: - - task_type: worker - port_count: 2 - gpu: 1 - cmd: python3 -m areal.scheduler.rpc.rpc_server + scheduling_spec: ${actor.scheduling_spec} gconfig: n_samples: 4 @@ -82,6 +78,20 @@ actor: port_count: 2 gpu: 1 cmd: python3 -m areal.scheduler.rpc.rpc_server + env_vars: + NCCL_DEBUG: "WARN" + NCCL_IB_DISABLE: "0" + NCCL_SOCKET_IFNAME: "bond0" + NCCL_NET: "IB" + NCCL_NET_PLUGIN: "" + NCCL_IB_GID_INDEX: "3" + NCCL_IB_TIMEOUT: "22" + NCCL_IB_RETRY_CNT: "7" + NCCL_IB_SL: "5" + NCCL_IB_TC: "136" + NCCL_IB_HCA: "mlx5_bond" + NCCL_IB_QPS_PER_CONNECTION: "8" + NCCL_SET_THREAD_NAME: "1" ref: experiment_name: ${experiment_name} @@ -96,11 +106,7 @@ ref: scheduling_strategy: type: colocation target: actor - scheduling_spec: - - task_type: worker - port_count: 2 - gpu: 1 - cmd: python3 -m areal.scheduler.rpc.rpc_server + scheduling_spec: ${actor.scheduling_spec} # SGLang sglang: @@ -171,6 +177,14 @@ stats_logger: wandb: mode: disabled +perf_tracer: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + enabled: false + session_tracer: + enabled: false + launcher: inference_server_cpus_per_gpu: 4 inference_server_mem_per_gpu: 32768 From 77be099c8e4989d71bddf4c9dafbcd9003fe2255 Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 21:43:21 +0800 Subject: [PATCH 4/7] resolve perf_tracer config None issue --- areal/experimental/trainer/rl.py | 5 +++-- areal/experimental/trainer/sft.py | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/areal/experimental/trainer/rl.py b/areal/experimental/trainer/rl.py index 442caf6b3..43863df13 100644 --- a/areal/experimental/trainer/rl.py +++ b/areal/experimental/trainer/rl.py @@ -392,8 +392,9 @@ def close(self): def _config_perf_tracer(self): rank = int(os.getenv("RANK", "0")) - if self.config.perf_tracer is not None: - perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") + if self.config.perf_tracer is None: + return + perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") self.actor.config_perf_tracer(self.config.perf_tracer, role="actor") if self.critic is not None: self.critic.config_perf_tracer(self.config.perf_tracer, role="critic") diff --git a/areal/experimental/trainer/sft.py b/areal/experimental/trainer/sft.py index d33f61d79..7d37f98ee 100644 --- a/areal/experimental/trainer/sft.py +++ b/areal/experimental/trainer/sft.py @@ -229,11 +229,14 @@ def close(self): def _config_perf_tracer(self): rank = int(os.getenv("RANK", "0")) - if self.config.perf_tracer is not None: - perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") + if self.config.perf_tracer is None: + return + perf_tracer.configure(self.config.perf_tracer, rank=rank, role="master") self.actor.config_perf_tracer(self.config.perf_tracer, role="actor") def _save_perf_tracer(self, step: int): + if self.config.perf_tracer is None: + return self.actor.save_perf_tracer(step=step) perf_tracer.save(step=step) From 3ac4557f9b2ce5286238bfd8dcba5f544e3e48bc Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 22:46:10 +0800 Subject: [PATCH 5/7] minor update docstring --- areal/api/engine_api.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/areal/api/engine_api.py b/areal/api/engine_api.py index ebc39b78c..e6c571db3 100644 --- a/areal/api/engine_api.py +++ b/areal/api/engine_api.py @@ -899,4 +899,15 @@ def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None def config_perf_tracer( self, config: PerfTracerConfig, rank: int, role: str ) -> None: - """Configure performance tracer.""" + """Configure performance tracer. + + Parameters + ---------- + config : PerfTracerConfig + Configuration for the performance tracer. + rank : int + Rank of the current process within its role. + role : str + Role of this process. "master" by default or "actor", + "ref", "rollout", etc. in RPC workers. + """ From 27055e8b63c72cd54df0730523a3e7514e7da8ab Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 22:56:21 +0800 Subject: [PATCH 6/7] fix test --- areal/scheduler/local.py | 6 +++--- areal/scheduler/rpc/rpc_server.py | 11 +++++------ areal/tests/test_rollout_controller.py | 6 +++++- areal/tests/test_rtensor.py | 12 ++++++++++-- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/areal/scheduler/local.py b/areal/scheduler/local.py index 936ad5317..d2e7be1be 100644 --- a/areal/scheduler/local.py +++ b/areal/scheduler/local.py @@ -87,9 +87,9 @@ def __init__( startup_timeout: float = 30.0, health_check_interval: float = 1.0, *, - fileroot: str | None = None, experiment_name: str | None = None, trial_name: str | None = None, + fileroot: str | None = None, name_resolve_type: str = "nfs", nfs_record_root: str = "/tmp/areal/name_resolve", etcd3_addr: str = "localhost:2379", @@ -351,8 +351,8 @@ def create_workers(self, job: Job, *args, **kwargs) -> list[str]: cmd = shlex.split(scheduling.cmd) cmd.extend(["--port", str(ports[0])]) # Add name_resolve and worker identity args - cmd.extend(["--experiment-name", self.experiment_name]) - cmd.extend(["--trial-name", self.trial_name]) + cmd.extend(["--experiment-name", str(self.experiment_name)]) + cmd.extend(["--trial-name", str(self.trial_name)]) cmd.extend(["--role", role]) cmd.extend(["--worker-index", str(idx)]) cmd.extend(["--name-resolve-type", self.name_resolve_config.type]) diff --git a/areal/scheduler/rpc/rpc_server.py b/areal/scheduler/rpc/rpc_server.py index 11170ad1f..1b0c3aaf2 100644 --- a/areal/scheduler/rpc/rpc_server.py +++ b/areal/scheduler/rpc/rpc_server.py @@ -34,6 +34,7 @@ # Global engine instance - must be TrainEngine or InferenceEngine _engine: TrainEngine | InferenceEngine | None = None +_role: str | None = None # Engine thread for executing all engine-related endpoints serially # This ensures NCCL compatibility by running engine operations in a single thread, @@ -119,10 +120,6 @@ def configure(): if config is None: return jsonify({"detail": "Missing 'config' field in request"}), 400 - role = data.get("role") - if role is None: - return jsonify({"detail": "Missing 'role' field in request"}), 400 - rank = data.get("rank") if rank is None: return jsonify({"detail": "Missing 'rank' field in request"}), 400 @@ -131,7 +128,8 @@ def configure(): config: BaseExperimentConfig def execute_configure(): - seeding.set_random_seed(config.seed, key=f"{role}{rank}") + global _role + seeding.set_random_seed(config.seed, key=f"{_role}{rank}") return { "status": "success", "message": "Worker configured successful.", @@ -607,10 +605,11 @@ def main(): werkzeug_logger.setLevel(getattr(stdlib_logging, args.werkzeug_log_level)) # Set global server address variables - global _server_host, _server_port + global _server_host, _server_port, _role _server_host = args.host if _server_host == "0.0.0.0": _server_host = gethostip() + _role = args.role # Get worker identity worker_id = f"{args.role}/{args.worker_index}" diff --git a/areal/tests/test_rollout_controller.py b/areal/tests/test_rollout_controller.py index 3a9bde2cf..20afcffdc 100644 --- a/areal/tests/test_rollout_controller.py +++ b/areal/tests/test_rollout_controller.py @@ -967,7 +967,11 @@ def test_parametrized_capacity_settings( @pytest.mark.ci def test_rollout_controller_integration(tmp_path, model_path): tokenizer = load_hf_tokenizer(model_path) - scheduler = LocalScheduler(log_dir=tmp_path) + scheduler = LocalScheduler( + log_dir=tmp_path, + experiment_name="test_rollout_controller_integration", + trial_name="trial0", + ) rollout = RolloutController( inf_engine=RemoteSGLangEngine, config=InferenceEngineConfig( diff --git a/areal/tests/test_rtensor.py b/areal/tests/test_rtensor.py index 73d798fb9..6a9a7ce38 100644 --- a/areal/tests/test_rtensor.py +++ b/areal/tests/test_rtensor.py @@ -29,9 +29,17 @@ def rpc_server(): "localhost", "--port", str(RPC_SERVER_PORT), + "--experiment-name", + "test-rtensor", + "--trial-name", + "trial0", + "--role", + "master", + "--worker-index", + "0", ], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, + stdout=sys.stdout, + stderr=sys.stdout, ) # Wait for server to be ready From feb76466162c28ae820cfe6c37612e96ac6132e3 Mon Sep 17 00:00:00 2001 From: garrett4wade Date: Wed, 24 Dec 2025 23:55:07 +0800 Subject: [PATCH 7/7] minor change docstring --- areal/api/engine_api.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/areal/api/engine_api.py b/areal/api/engine_api.py index e6c571db3..3decc7b04 100644 --- a/areal/api/engine_api.py +++ b/areal/api/engine_api.py @@ -493,7 +493,18 @@ def save_perf_tracer(self, step: int | None = None, force: bool = False) -> None def config_perf_tracer( self, config: PerfTracerConfig, rank: int, role: str ) -> None: - """Configure performance tracer.""" + """Configure performance tracer. + + Parameters + ---------- + config : PerfTracerConfig + Configuration for the performance tracer. + rank : int + Rank of the current process within its role. + role : str + Role of this process. "master" by default or "actor", + "ref", "rollout", etc. in RPC workers. + """ class InferenceEngine(abc.ABC):