From f53cfb53304357b21803abc0bbba4345255f5eda Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 19:00:56 -0800 Subject: [PATCH 01/18] init tutorial --- .../simple_distributed_training_tutorial.py | 613 ++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 beginner_source/simple_distributed_training_tutorial.py diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/simple_distributed_training_tutorial.py new file mode 100644 index 0000000000..0ad1bbc879 --- /dev/null +++ b/beginner_source/simple_distributed_training_tutorial.py @@ -0,0 +1,613 @@ +""" +Distributed training at scale with PyTorch and Ray Train +========================================================= + +**Author:** `Ricardo Decal `__ + +This tutorial shows how to distribute PyTorch training across multiple GPUs +using Ray Train and Ray Data for scalable, production-ready model training. + +.. grid:: 2 + + .. grid-item-card:: :octicon:`mortar-board;1em;` You will learn how to: + :class-card: card-prerequisites + + * Pre-train a ~117M-parameter decoder-only transformer language model + using PyTorch. + * Distribute training across multiple GPUs with Ray Train. + * Stream training data from Hugging Face datasets with Ray Data. + * Save and load distributed checkpoints. + * Scale from a single node to a multi-node cluster with minimal code changes. + * Monitor training with the Ray dashboard. + + .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites + :class-card: card-prerequisites + + * PyTorch v2.9+. + * Ray Train (``ray[train]``) v2.52.1+. + * ``tiktoken`` and ``datasets`` (Hugging Face). + * One or more GPUs are recommended but not required. + +`Ray Train `__ is a +scalable framework for distributed deep learning. +Ray Train builds on top of `Ray `_, a +unified framework for scaling AI and Python applications that +simplifies the complexities of distributed computing. Ray is also open source +and part of the PyTorch Foundation. + +Ray Train enables you to +scale from a single GPU to hundreds of GPUs without rewriting your training +loop. Combined with `Ray Data `__ +for streaming data ingestion, you get an end-to-end distributed training +pipeline that handles data loading, sharding, gradient synchronization, +checkpointing, and fault tolerance. + +Setup +----- + +To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets``. + +Then, import the required libraries: +""" + +############################################################################### + +import math +import os +import tempfile + +import numpy as np +import ray +import ray.train +import tiktoken +import torch +import torch.nn as nn +from datasets import load_dataset +from ray.train import CheckpointConfig, RunConfig, ScalingConfig +from ray.train.torch import TorchTrainer + +############################################################################### +# Load the dataset with Ray Data +# ------------------------------ +# +# This tutorial uses the `Wikitext-103 `__ +# dataset, a collection of over 100 million tokens from verified Good and +# Featured articles on Wikipedia. +# +# The ``ray.data.from_huggingface()`` function converts a Hugging Face +# dataset into a Ray Dataset, enabling distributed streaming and +# preprocessing across all available nodes. + +hf_ds = load_dataset("Salesforce/wikitext", "wikitext-103-raw-v1") +train_ds = ray.data.from_huggingface(hf_ds["train"]) +val_ds = ray.data.from_huggingface(hf_ds["validation"]) +print(train_ds) + +############################################################################### +# Ray divides the data into **blocks** and dispatches them to workers. +# This block-based architecture enables **streaming execution**: as soon as +# a stage outputs a block, the next stage can begin processing it +# immediately without waiting for previous stages to finish the entire +# dataset. +# TODO: move the above text elsewhere; we should discuss .schema() lets you inspect the data + +print(train_ds.schema()) + +# TODO discuss the output of schema. + +############################################################################### +# Tokenize and chunk the data +# ---------------------------- +# +# Language models consume fixed-length sequences of token IDs. The +# preprocessing step converts raw text into overlapping input/target pairs +# for next-token prediction. +# +# This tutorial uses ``tiktoken`` with the GPT-2 encoding (vocabulary size +# 50,257). ``tiktoken`` is a fast, standalone tokenizer that has no +# dependency on the Hugging Face ``transformers`` library. +# +# The ``tokenize_and_chunk`` function: +# +# 1. Tokenizes each batch of text. +# 2. Concatenates all tokens into a single stream. +# 3. Splits the stream into fixed-length blocks of ``block_size + 1`` +# tokens. +# 4. Returns ``input_ids`` (the first ``block_size`` tokens) and +# ``labels`` (shifted by one position for next-token prediction). + +BLOCK_SIZE = 256 +VOCAB_SIZE = 50257 + +encoding = tiktoken.get_encoding("gpt2") + + +def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + """Tokenize text and split into fixed-length chunks for language modeling.""" + # Tokenize all texts in the batch and concatenate + all_tokens: list[int] = [] + for text in batch["text"]: + if text.strip(): # skip empty lines + all_tokens.extend(encoding.encode_ordinary(text)) + + # Split into chunks of block_size + 1 (input + 1 shifted target) + chunk_len = BLOCK_SIZE + 1 + num_chunks = len(all_tokens) // chunk_len + all_tokens = all_tokens[: num_chunks * chunk_len] + + if num_chunks == 0: + return {"input_ids": np.array([], dtype=np.int64).reshape(0, BLOCK_SIZE), + "labels": np.array([], dtype=np.int64).reshape(0, BLOCK_SIZE)} + + tokens_array = np.array(all_tokens, dtype=np.int64).reshape(num_chunks, chunk_len) + return { + "input_ids": tokens_array[:, :-1], + "labels": tokens_array[:, 1:], + } + + + +############################################################################### +# Apply the tokenization with ``map_batches()``. This operation is **lazy**, +# meaning that Ray Data defers execution until a downstream consumer requests the +# results. Lazy execution lets Ray optimize the entire pipeline before any +# work begins. + +train_ds = train_ds.map_batches(tokenize_and_chunk, batch_format="numpy") +val_ds = val_ds.map_batches(tokenize_and_chunk, batch_format="numpy") +print(train_ds.schema()) + + +############################################################################### +# Define the transformer model +# ---------------------------- +# +# The model is a decoder-only transformer language model, similar to GPT-2, +# built entirely from standard PyTorch modules. It has approximately 117 +# million parameters. +# +# The architecture: +# +# * **Token embedding** maps token IDs to dense vectors. +# * **Positional embedding** encodes position information. +# * **Transformer encoder** with a causal (triangular) attention mask ensures +# that each token can only attend to preceding tokens. Note: PyTorch's +# ``TransformerEncoder`` with a causal mask is functionally equivalent to a +# decoder-only transformer. +# * **Output head** projects the hidden states back to the vocabulary. + +class TransformerLM(nn.Module): + """Decoder-only transformer language model (~117M parameters).""" + + def __init__( + self, + vocab_size: int = VOCAB_SIZE, + d_model: int = 768, + nhead: int = 12, + num_layers: int = 12, + dim_feedforward: int = 3072, + max_seq_len: int = BLOCK_SIZE, + dropout: float = 0.1, + ): + super().__init__() + self.d_model = d_model + self.token_embedding = nn.Embedding(vocab_size, d_model) + self.position_embedding = nn.Embedding(max_seq_len, d_model) + + encoder_layer = nn.TransformerEncoderLayer( + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + dropout=dropout, + batch_first=True, + norm_first=True, # Pre-norm architecture for training stability + ) + self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) + self.ln_f = nn.LayerNorm(d_model) + self.output_head = nn.Linear(d_model, vocab_size, bias=False) + + # Weight tying: share weights between token embedding and output head + self.output_head.weight = self.token_embedding.weight + + self._init_weights() + + def _init_weights(self): + """Initialize weights with small values for stable training.""" + nn.init.normal_(self.token_embedding.weight, mean=0.0, std=0.02) + nn.init.normal_(self.position_embedding.weight, mean=0.0, std=0.02) + + def forward(self, input_ids: torch.Tensor) -> torch.Tensor: + """ + Args: + input_ids: Token IDs of shape (batch_size, seq_len). + + Returns: + Logits of shape (batch_size, seq_len, vocab_size). + """ + batch_size, seq_len = input_ids.shape + positions = torch.arange(seq_len, device=input_ids.device).unsqueeze(0) + + x = self.token_embedding(input_ids) + self.position_embedding(positions) + + # Create causal mask so each token only attends to previous tokens + causal_mask = nn.Transformer.generate_square_subsequent_mask( + seq_len, device=input_ids.device + ) + x = self.transformer(x, mask=causal_mask, is_causal=True) + x = self.ln_f(x) + logits = self.output_head(x) + return logits + + + +############################################################################### +# Verify the model size: + +model = TransformerLM() +num_params = sum(p.numel() for p in model.parameters()) +print(f"Model parameters: {num_params:,} ({num_params / 1e6:.1f}M)") +del model # Free memory before distributed training + + +############################################################################### +# Define the distributed training function +# ----------------------------------------- +# +# The training function runs on each worker process. Ray Train +# manages the distributed setup: it wraps the model in +# ``DistributedDataParallel``, shards the data across workers, and +# synchronizes gradients automatically. +# +# The key Ray Train integration points are: +# +# 1. **``ray.train.get_dataset_shard("train")``** retrieves the +# worker's portion of the data. Ray Data automatically splits the +# dataset across all workers. +# 2. **``ray.train.torch.prepare_model(model)``** wraps the model in +# ``DistributedDataParallel`` and moves it to the correct GPU. +# 3. **``shard.iter_torch_batches(batch_size=...)``** returns an iterator +# of ``dict[str, torch.Tensor]`` batches, with tensors automatically +# placed on the worker's GPU. +# 4. **``ray.train.report(metrics, checkpoint=...)``** reports metrics +# to the driver and optionally saves a checkpoint. + +def train_func_per_worker(config: dict): + """Training function executed by each distributed worker.""" + lr = config["lr"] + epochs = config["epochs"] + batch_size = config["batch_size_per_worker"] + + # --- Data ----------------------------------------------------------- + # Each worker gets an automatic shard of the dataset. + train_data_shard = ray.train.get_dataset_shard("train") + val_data_shard = ray.train.get_dataset_shard("validation") + + # --- Model ---------------------------------------------------------- + model = TransformerLM() + # prepare_model wraps the model in DistributedDataParallel and places + # it on the correct device. + model = ray.train.torch.prepare_model(model) + + optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.1) + loss_fn = nn.CrossEntropyLoss() + + # --- Training loop -------------------------------------------------- + for epoch in range(epochs): + model.train() + train_loss_sum = 0.0 + train_batches = 0 + + # iter_torch_batches returns dicts of tensors already on the GPU. + for batch in train_data_shard.iter_torch_batches( + batch_size=batch_size, dtypes=torch.long + ): + input_ids = batch["input_ids"] + labels = batch["labels"] + + logits = model(input_ids) + # Flatten for cross-entropy: (batch * seq_len, vocab_size) vs (batch * seq_len,) + loss = loss_fn( + logits.view(-1, logits.size(-1)), + labels.view(-1), + ) + + optimizer.zero_grad() + loss.backward() + # Gradient clipping for training stability + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + optimizer.step() + + train_loss_sum += loss.item() + train_batches += 1 + + avg_train_loss = train_loss_sum / max(train_batches, 1) + + # --- Validation ------------------------------------------------- + model.eval() + val_loss_sum = 0.0 + val_batches = 0 + + with torch.no_grad(): + for batch in val_data_shard.iter_torch_batches( + batch_size=batch_size, dtypes=torch.long + ): + input_ids = batch["input_ids"] + labels = batch["labels"] + + logits = model(input_ids) + loss = loss_fn( + logits.view(-1, logits.size(-1)), + labels.view(-1), + ) + val_loss_sum += loss.item() + val_batches += 1 + + avg_val_loss = val_loss_sum / max(val_batches, 1) + val_perplexity = math.exp(min(avg_val_loss, 20)) # cap to avoid overflow + + # --- Checkpointing ---------------------------------------------- + # Save a checkpoint at the end of each epoch. + with tempfile.TemporaryDirectory() as tmp_dir: + torch.save( + model.module.state_dict(), # .module unwraps DDP + os.path.join(tmp_dir, "model.pt"), + ) + checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) + + ray.train.report( + metrics={ + "train_loss": avg_train_loss, + "val_loss": avg_val_loss, + "val_perplexity": val_perplexity, + "epoch": epoch, + }, + checkpoint=checkpoint, + ) + + if ray.train.get_context().get_world_rank() == 0: + print( + f"Epoch {epoch}: " + f"train_loss={avg_train_loss:.4f}, " + f"val_loss={avg_val_loss:.4f}, " + f"val_perplexity={val_perplexity:.2f}" + ) + + + +############################################################################### +# Configure and launch distributed training +# ------------------------------------------ +# +# The ``TorchTrainer`` brings everything together. It accepts: +# +# * **``train_func_per_worker``**: the function each worker executes. +# * **``train_loop_config``**: a dictionary of hyperparameters forwarded +# to the training function. +# * **``datasets``**: a dictionary of Ray Datasets. Ray Train automatically +# splits each dataset across workers. +# * **``scaling_config``**: specifies the number of workers and whether to +# use GPUs. +# +# Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray +# Train handles ``torch.distributed`` initialization, NCCL backend setup, +# and ``DistributedDataParallel`` wrapping behind the scenes. + +NUM_WORKERS = 8 # One worker per GPU on this machine +NUM_EPOCHS = 2 +BATCH_SIZE_PER_WORKER = 16 + +trainer = TorchTrainer( + train_loop_per_worker=train_func_per_worker, + train_loop_config={ + "lr": 3e-4, + "epochs": NUM_EPOCHS, + "batch_size_per_worker": BATCH_SIZE_PER_WORKER, + }, + datasets={"train": train_ds, "validation": val_ds}, + scaling_config=ScalingConfig( + num_workers=NUM_WORKERS, + use_gpu=True, + ), + run_config=RunConfig( + # Keep the best 2 checkpoints by validation loss + checkpoint_config=CheckpointConfig( + num_to_keep=2, + ), + ), +) + +result = trainer.fit() + +############################################################################### +# Inspect results +# --------------- +# +# After training, the ``Result`` object contains the final metrics and +# the path to the best checkpoint. + +print(f"\nTraining finished!") +print(f"Final metrics: {result.metrics}") +print(f"Best checkpoint path: {result.checkpoint.path}") + + +############################################################################### +# Load a checkpoint and generate text +# ------------------------------------ +# +# As a sanity check, load the trained model from the checkpoint and +# generate a few tokens using greedy decoding: + +def generate(model: TransformerLM, prompt_tokens: list[int], max_new_tokens: int = 50) -> list[int]: + """Generate tokens autoregressively using greedy decoding.""" + model.eval() + device = next(model.parameters()).device + tokens = prompt_tokens[:] + + with torch.no_grad(): + for _ in range(max_new_tokens): + # Only use the last BLOCK_SIZE tokens if the sequence is too long + input_ids = torch.tensor([tokens[-BLOCK_SIZE:]], device=device) + logits = model(input_ids) + next_token = logits[0, -1, :].argmax().item() + tokens.append(next_token) + + return tokens + + +# Load the model from the checkpoint +checkpoint_path = os.path.join(result.checkpoint.path, "model.pt") +trained_model = TransformerLM() +trained_model.load_state_dict(torch.load(checkpoint_path, weights_only=True)) +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +trained_model = trained_model.to(device) + +# Generate text from a prompt +prompt = "The history of science" +prompt_tokens = encoding.encode_ordinary(prompt) +generated_tokens = generate(trained_model, prompt_tokens, max_new_tokens=50) +generated_text = encoding.decode(generated_tokens) +print(f"Prompt: {prompt}") +print(f"Generated: {generated_text}") + +############################################################################### +# .. note:: +# +# With only 2 epochs of training on Wikitext-103, the generated text +# will be mostly incoherent. This is expected for a tutorial that +# prioritizes demonstrating the distributed training workflow over +# producing a fully-trained model. In a real pre-training run, you would +# train for many more epochs with a learning rate schedule and a larger +# dataset. +# +# Scaling to a multi-node cluster +# ------------------------------- +# +# The code above runs on a single 8-GPU machine. Scaling to a multi-node +# cluster requires only two changes: +# +# 1. **Increase ``num_workers``** to match the total number of GPUs across +# all nodes. +# 2. **Set a shared storage path** so that all nodes can access checkpoints. +# +# For example, to train on a cluster of 4 nodes with 8 GPUs each +# (32 GPUs total): +# +# .. code-block:: python +# +# trainer = TorchTrainer( +# train_loop_per_worker=train_func_per_worker, +# train_loop_config={...}, +# datasets={"train": train_ds, "validation": val_ds}, +# scaling_config=ScalingConfig( +# num_workers=32, # 4 nodes x 8 GPUs +# use_gpu=True, +# ), +# run_config=RunConfig( +# # Shared storage accessible from all nodes +# storage_path="s3://my-bucket/ray-checkpoints", +# checkpoint_config=CheckpointConfig(num_to_keep=2), +# ), +# ) +# +# Ray Train automatically: +# +# * Launches workers across all available nodes. +# * Initializes ``torch.distributed`` with the NCCL backend. +# * Configures ``DistributedDataParallel`` across nodes. +# * Shards data across all workers. +# +# No changes to the training function are needed. The same +# ``train_func_per_worker`` runs identically whether on 1 GPU or 256 GPUs. + +############################################################################### +# Fault tolerance +# --------------- +# +# Long-running distributed training jobs are vulnerable to hardware +# failures. Ray Train provides fault tolerance so that training can +# recover from failures without restarting from scratch. +# +# Ray Train's fault tolerance mechanisms include: +# +# * **Worker restart**: If a worker process crashes, Ray Train +# automatically restarts it and resumes training from the last +# checkpoint. +# * **Checkpoint recovery**: Ray Train saves checkpoints to persistent +# storage. When recovering from a failure, training resumes from the +# latest checkpoint rather than starting over. +# * **Node failure handling**: If an entire node goes down, Ray +# redistributes work to surviving nodes and replaces the failed node +# when new resources become available. +# +# To enable automatic failure recovery, configure ``FailureConfig`` in your ``RunConfig``: +# +# .. code-block:: python +# +# from ray.train import FailureConfig +# +# run_config = RunConfig( +# storage_path="s3://my-bucket/ray-checkpoints", +# failure_config=FailureConfig(max_failures=3), +# checkpoint_config=CheckpointConfig(num_to_keep=2), +# ) + +############################################################################### +# Monitor your training jobs +# -------------------------- +# +# Monitoring is critical when running distributed training. +# The `Ray dashboard `__ +# displays real-time metrics including: +# +# * Training loss and validation metrics per epoch +# * GPU utilization and memory usage per worker +# * Data loading throughput +# * Worker status and error logs +# +# To view the dashboard, open the link printed in the logs after Ray +# initializes. Typically, this link is ``http://localhost:8265``. +# +# The dashboard lets you: +# +# * Monitor training progress across all workers +# * Inspect logs from individual workers +# * Identify data loading or communication bottlenecks +# * View resource utilization for CPU, GPU, and memory per worker +# * Debug failures with detailed error messages and stack traces +# +# For more information, see the `Ray Train monitoring +# documentation `__. + +############################################################################### +# Conclusion +# ---------- +# +# In this tutorial, you: +# +# * Built a ~117M-parameter decoder-only transformer language model +# using pure PyTorch. +# * Loaded and preprocessed the Wikitext-103 dataset using Ray Data +# with distributed streaming. +# * Distributed training across 8 GPUs using Ray Train's +# ``TorchTrainer`` with only minimal changes to a standard PyTorch +# training loop. +# * Saved and loaded distributed checkpoints for model recovery. +# * Learned how to scale to multi-node clusters by changing +# ``ScalingConfig`` and ``RunConfig``. +# * Learned about Ray Train's **fault tolerance** mechanisms for +# production training jobs. +# * Monitored training with the Ray dashboard. +# +# Ray Train handles the complexity of distributed systems, gradient +# synchronization, and resource allocation so that you can focus on +# your model and data. + +############################################################################### +# Further reading +# --------------- +# +# * `Ray Train documentation `__ +# * `Ray Data for training `__ +# * `PyTorch DistributedDataParallel `__ +# * `Ray Train fault tolerance `__ +# * `Ray cluster setup `__ From 01a19e368beb3c2f6d41585200d0639d4ead3a33 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 20:21:01 -0800 Subject: [PATCH 02/18] switch to gpt-2 --- .../simple_distributed_training_tutorial.py | 397 ++++++++++-------- 1 file changed, 215 insertions(+), 182 deletions(-) diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/simple_distributed_training_tutorial.py index 0ad1bbc879..bac4418ef0 100644 --- a/beginner_source/simple_distributed_training_tutorial.py +++ b/beginner_source/simple_distributed_training_tutorial.py @@ -12,8 +12,8 @@ .. grid-item-card:: :octicon:`mortar-board;1em;` You will learn how to: :class-card: card-prerequisites - * Pre-train a ~117M-parameter decoder-only transformer language model - using PyTorch. + * Pre-train a GPT-2 (~124M-parameter) language model using PyTorch + and Hugging Face Transformers. * Distribute training across multiple GPUs with Ray Train. * Stream training data from Hugging Face datasets with Ray Data. * Save and load distributed checkpoints. @@ -25,7 +25,7 @@ * PyTorch v2.9+. * Ray Train (``ray[train]``) v2.52.1+. - * ``tiktoken`` and ``datasets`` (Hugging Face). + * ``tiktoken``, ``datasets``, and ``transformers`` (Hugging Face). * One or more GPUs are recommended but not required. `Ray Train `__ is a @@ -45,7 +45,7 @@ Setup ----- -To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets``. +To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets transformers``. Then, import the required libraries: """ @@ -65,6 +65,7 @@ from datasets import load_dataset from ray.train import CheckpointConfig, RunConfig, ScalingConfig from ray.train.torch import TorchTrainer +from transformers import GPT2Config, GPT2LMHeadModel ############################################################################### # Load the dataset with Ray Data @@ -81,21 +82,65 @@ hf_ds = load_dataset("Salesforce/wikitext", "wikitext-103-raw-v1") train_ds = ray.data.from_huggingface(hf_ds["train"]) val_ds = ray.data.from_huggingface(hf_ds["validation"]) -print(train_ds) + +print(f"Dataset schema:\n{train_ds.schema()}") ############################################################################### -# Ray divides the data into **blocks** and dispatches them to workers. -# This block-based architecture enables **streaming execution**: as soon as -# a stage outputs a block, the next stage can begin processing it -# immediately without waiting for previous stages to finish the entire -# dataset. -# TODO: move the above text elsewhere; we should discuss .schema() lets you inspect the data +# The schema should look like this: +# ```text` +# Schema: Column Type +# ------ ---- +# text string +# ``` +# +# This means that the dataset has one column called "text" and it is a string. +# +# Inspect raw data +# ~~~~~~~~~~~~~~~~ +# +# Use ``take(n)`` to fetch a small number of rows for inspection. +# Each row is a dictionary with the column names as keys. +print("--- Raw data sample (train_ds.take(2)) ---") +sample = train_ds.take(2) +for i, row in enumerate(sample): + text_preview = row["text"][:120] + "..." if len(row["text"]) > 120 else row["text"] + print(f" Row {i}: {text_preview!r}") + +############################################################################### +# You'll see output like: +# +# .. code-block:: text +# +# Row 0: '' +# Row 1: ' = Valkyria Chronicles III = \n' +# +# The raw dataset evidently contains empty lines and short headers that would +# produce zero tokens after chunking. Use ``filter()`` to keep only rows +# with at least 20 words, which removes noise and avoids wasted work in +# downstream stages. + +MIN_WORDS = 20 +train_ds = train_ds.filter(lambda row: len(row["text"].split()) >= MIN_WORDS) +val_ds = val_ds.filter(lambda row: len(row["text"].split()) >= MIN_WORDS) -print(train_ds.schema()) +# DEBUG: limit dataset size for fast iteration +train_ds = train_ds.limit(100) +val_ds = val_ds.limit(100) -# TODO discuss the output of schema. +print("--- After filtering short rows (train_ds.take(2)) ---") +filtered_sample = train_ds.take(2) +for i, row in enumerate(filtered_sample): + text_preview = row["text"][:120] + "..." if len(row["text"]) > 120 else row["text"] + print(f" Row {i}: {text_preview!r}") ############################################################################### +# After filtering, only substantive paragraphs remain: +# +# .. code-block:: text +# +# Row 0: ' Senjō no Valkyria 3 : Unrecorded Chronicles ( Japanese : ...' +# Row 1: ' The game began development in 2010 , carrying over a large ...' +# # Tokenize and chunk the data # ---------------------------- # @@ -136,13 +181,14 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: all_tokens = all_tokens[: num_chunks * chunk_len] if num_chunks == 0: - return {"input_ids": np.array([], dtype=np.int64).reshape(0, BLOCK_SIZE), - "labels": np.array([], dtype=np.int64).reshape(0, BLOCK_SIZE)} + return {"input_ids": [], "labels": []} tokens_array = np.array(all_tokens, dtype=np.int64).reshape(num_chunks, chunk_len) + input_ids = tokens_array[:, :-1] + labels = tokens_array[:, 1:] return { - "input_ids": tokens_array[:, :-1], - "labels": tokens_array[:, 1:], + "input_ids": input_ids, + "labels": labels, } @@ -155,98 +201,96 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: train_ds = train_ds.map_batches(tokenize_and_chunk, batch_format="numpy") val_ds = val_ds.map_batches(tokenize_and_chunk, batch_format="numpy") -print(train_ds.schema()) +############################################################################### +# Inspect the tokenized output with ``take(2)``: + +print("--- Tokenized data sample (train_ds.take(2)) ---") +tokenized_sample = train_ds.take(2) +for i, row in enumerate(tokenized_sample): + ids = row["input_ids"] + print(f" Row {i}: input_ids shape={ids.shape}, first 10 tokens={ids[:10].tolist()}") + print(f" Decoded: {encoding.decode(ids[:30].tolist())!r}...") + +############################################################################### +# Each row now contains a fixed-length ``input_ids`` array of 256 tokens and +# a corresponding ``labels`` array shifted by one position. These are the +# input/target pairs for next-token prediction. +# +# Streaming execution +# ~~~~~~~~~~~~~~~~~~~ +# +# Under the hood, Ray divides the data into **blocks** and dispatches them to +# workers. This block-based architecture enables **streaming execution**: as +# soon as a stage outputs a block, the next stage can begin processing it +# immediately without waiting for previous stages to finish the entire +# dataset. This means the ``map_batches`` tokenization above runs in a +# streaming pipeline with the training loop, so the full dataset never needs +# to fit in memory at once. +# +# When training starts, Ray Data logs the execution plan. For this tutorial +# it looks like: +# +# .. code-block:: text +# +# Execution plan: InputDataBuffer[Input] +# -> TaskPoolMapOperator[Filter] +# -> TaskPoolMapOperator[MapBatches(tokenize_and_chunk)] +# -> OutputSplitter[split(8, equal=True)] +# +# This tells you exactly how Ray Data will stream through filter, tokenize, +# and split the data across 8 workers. ############################################################################### # Define the transformer model # ---------------------------- # -# The model is a decoder-only transformer language model, similar to GPT-2, -# built entirely from standard PyTorch modules. It has approximately 117 -# million parameters. -# -# The architecture: -# -# * **Token embedding** maps token IDs to dense vectors. -# * **Positional embedding** encodes position information. -# * **Transformer encoder** with a causal (triangular) attention mask ensures -# that each token can only attend to preceding tokens. Note: PyTorch's -# ``TransformerEncoder`` with a causal mask is functionally equivalent to a -# decoder-only transformer. -# * **Output head** projects the hidden states back to the vocabulary. - -class TransformerLM(nn.Module): - """Decoder-only transformer language model (~117M parameters).""" - - def __init__( - self, - vocab_size: int = VOCAB_SIZE, - d_model: int = 768, - nhead: int = 12, - num_layers: int = 12, - dim_feedforward: int = 3072, - max_seq_len: int = BLOCK_SIZE, - dropout: float = 0.1, - ): - super().__init__() - self.d_model = d_model - self.token_embedding = nn.Embedding(vocab_size, d_model) - self.position_embedding = nn.Embedding(max_seq_len, d_model) - - encoder_layer = nn.TransformerEncoderLayer( - d_model=d_model, - nhead=nhead, - dim_feedforward=dim_feedforward, - dropout=dropout, - batch_first=True, - norm_first=True, # Pre-norm architecture for training stability - ) - self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) - self.ln_f = nn.LayerNorm(d_model) - self.output_head = nn.Linear(d_model, vocab_size, bias=False) - - # Weight tying: share weights between token embedding and output head - self.output_head.weight = self.token_embedding.weight - - self._init_weights() - - def _init_weights(self): - """Initialize weights with small values for stable training.""" - nn.init.normal_(self.token_embedding.weight, mean=0.0, std=0.02) - nn.init.normal_(self.position_embedding.weight, mean=0.0, std=0.02) - - def forward(self, input_ids: torch.Tensor) -> torch.Tensor: - """ - Args: - input_ids: Token IDs of shape (batch_size, seq_len). - - Returns: - Logits of shape (batch_size, seq_len, vocab_size). - """ - batch_size, seq_len = input_ids.shape - positions = torch.arange(seq_len, device=input_ids.device).unsqueeze(0) +# The model is a decoder-only transformer language model using Hugging Face's +# ``GPT2LMHeadModel``. +# +# The GPT-2 "small" architecture: +# +# * 12 transformer layers, 12 attention heads, 768 hidden size +# * ~124M parameters +# * Built-in causal attention masking and weight tying - x = self.token_embedding(input_ids) + self.position_embedding(positions) +MODEL_CONFIG = GPT2Config( + vocab_size=VOCAB_SIZE, + n_positions=BLOCK_SIZE, + n_embd=768, + n_layer=12, + n_head=12, +) - # Create causal mask so each token only attends to previous tokens - causal_mask = nn.Transformer.generate_square_subsequent_mask( - seq_len, device=input_ids.device - ) - x = self.transformer(x, mask=causal_mask, is_causal=True) - x = self.ln_f(x) - logits = self.output_head(x) - return logits +def create_model(): + """Create a fresh GPT-2 model from config (random weights).""" + model = GPT2LMHeadModel(MODEL_CONFIG) + model.loss_type = "ForCausalLM" + return model ############################################################################### # Verify the model size: -model = TransformerLM() +model = create_model() num_params = sum(p.numel() for p in model.parameters()) print(f"Model parameters: {num_params:,} ({num_params / 1e6:.1f}M)") -del model # Free memory before distributed training + +############################################################################### +# You should see approximately **123.8M parameters** — the standard GPT-2 +# "small" size. +# +# Quick smoke test: run a forward pass on CPU to verify the model produces the +# expected output shape before launching distributed training: + +model.eval() +dummy_input = torch.randint(0, VOCAB_SIZE, (2, BLOCK_SIZE)) +with torch.no_grad(): + out = model(dummy_input) +print(f"Smoke test passed — logits shape: {out.logits.shape}") +del model, dummy_input, out # Free memory before distributed training + ############################################################################### @@ -276,6 +320,7 @@ def train_func_per_worker(config: dict): lr = config["lr"] epochs = config["epochs"] batch_size = config["batch_size_per_worker"] + max_steps_per_epoch = config.get("max_steps_per_epoch") # DEBUG: cap steps # --- Data ----------------------------------------------------------- # Each worker gets an automatic shard of the dataset. @@ -283,13 +328,12 @@ def train_func_per_worker(config: dict): val_data_shard = ray.train.get_dataset_shard("validation") # --- Model ---------------------------------------------------------- - model = TransformerLM() + model = create_model() # prepare_model wraps the model in DistributedDataParallel and places # it on the correct device. model = ray.train.torch.prepare_model(model) optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.1) - loss_fn = nn.CrossEntropyLoss() # --- Training loop -------------------------------------------------- for epoch in range(epochs): @@ -304,12 +348,10 @@ def train_func_per_worker(config: dict): input_ids = batch["input_ids"] labels = batch["labels"] - logits = model(input_ids) - # Flatten for cross-entropy: (batch * seq_len, vocab_size) vs (batch * seq_len,) - loss = loss_fn( - logits.view(-1, logits.size(-1)), - labels.view(-1), - ) + # GPT2LMHeadModel computes cross-entropy loss internally + # when labels are provided. + out = model(input_ids=input_ids, labels=labels) + loss = out.loss optimizer.zero_grad() loss.backward() @@ -320,6 +362,9 @@ def train_func_per_worker(config: dict): train_loss_sum += loss.item() train_batches += 1 + if max_steps_per_epoch and train_batches >= max_steps_per_epoch: + break # DEBUG: early stop + avg_train_loss = train_loss_sum / max(train_batches, 1) # --- Validation ------------------------------------------------- @@ -334,35 +379,27 @@ def train_func_per_worker(config: dict): input_ids = batch["input_ids"] labels = batch["labels"] - logits = model(input_ids) - loss = loss_fn( - logits.view(-1, logits.size(-1)), - labels.view(-1), - ) + out = model(input_ids=input_ids, labels=labels) + loss = out.loss val_loss_sum += loss.item() val_batches += 1 + if max_steps_per_epoch and val_batches >= max_steps_per_epoch: + break # DEBUG: early stop + avg_val_loss = val_loss_sum / max(val_batches, 1) val_perplexity = math.exp(min(avg_val_loss, 20)) # cap to avoid overflow - # --- Checkpointing ---------------------------------------------- - # Save a checkpoint at the end of each epoch. - with tempfile.TemporaryDirectory() as tmp_dir: - torch.save( - model.module.state_dict(), # .module unwraps DDP - os.path.join(tmp_dir, "model.pt"), - ) - checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) - - ray.train.report( - metrics={ - "train_loss": avg_train_loss, - "val_loss": avg_val_loss, - "val_perplexity": val_perplexity, - "epoch": epoch, - }, - checkpoint=checkpoint, - ) + # --- Report metrics ----------------------------------------------- + ray.train.report( + metrics={ + "train_loss": avg_train_loss, + "val_loss": avg_val_loss, + "val_perplexity": val_perplexity, + "epoch": epoch, + }, + checkpoint=None, # If we were checkpointing, we'd pass checkpoint to Ray Train here + ) if ray.train.get_context().get_world_rank() == 0: print( @@ -390,10 +427,20 @@ def train_func_per_worker(config: dict): # # Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray # Train handles ``torch.distributed`` initialization, NCCL backend setup, -# and ``DistributedDataParallel`` wrapping behind the scenes. +# and ``DistributedDataParallel`` wrapping behind the scenes. In the logs +# you will see each worker being assigned a rank and device: +# +# .. code-block:: text +# +# Started training worker group of size 8: +# - (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 +# - (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 +# ... +# Moving model to device: cuda:0 +# Wrapping provided model in DistributedDataParallel. NUM_WORKERS = 8 # One worker per GPU on this machine -NUM_EPOCHS = 2 +NUM_EPOCHS = 1 # DEBUG: reduced from 2 BATCH_SIZE_PER_WORKER = 16 trainer = TorchTrainer( @@ -402,18 +449,14 @@ def train_func_per_worker(config: dict): "lr": 3e-4, "epochs": NUM_EPOCHS, "batch_size_per_worker": BATCH_SIZE_PER_WORKER, + "max_steps_per_epoch": 5, # DEBUG: cap at 5 steps for fast iteration }, datasets={"train": train_ds, "validation": val_ds}, scaling_config=ScalingConfig( num_workers=NUM_WORKERS, use_gpu=True, ), - run_config=RunConfig( - # Keep the best 2 checkpoints by validation loss - checkpoint_config=CheckpointConfig( - num_to_keep=2, - ), - ), + # run_config=RunConfig(), ) result = trainer.fit() @@ -422,62 +465,52 @@ def train_func_per_worker(config: dict): # Inspect results # --------------- # -# After training, the ``Result`` object contains the final metrics and -# the path to the best checkpoint. +# After training, the ``Result`` object contains the final metrics reported +# by the workers. print(f"\nTraining finished!") print(f"Final metrics: {result.metrics}") -print(f"Best checkpoint path: {result.checkpoint.path}") - ############################################################################### -# Load a checkpoint and generate text -# ------------------------------------ -# -# As a sanity check, load the trained model from the checkpoint and -# generate a few tokens using greedy decoding: - -def generate(model: TransformerLM, prompt_tokens: list[int], max_new_tokens: int = 50) -> list[int]: - """Generate tokens autoregressively using greedy decoding.""" - model.eval() - device = next(model.parameters()).device - tokens = prompt_tokens[:] - - with torch.no_grad(): - for _ in range(max_new_tokens): - # Only use the last BLOCK_SIZE tokens if the sequence is too long - input_ids = torch.tensor([tokens[-BLOCK_SIZE:]], device=device) - logits = model(input_ids) - next_token = logits[0, -1, :].argmax().item() - tokens.append(next_token) - - return tokens - - -# Load the model from the checkpoint -checkpoint_path = os.path.join(result.checkpoint.path, "model.pt") -trained_model = TransformerLM() -trained_model.load_state_dict(torch.load(checkpoint_path, weights_only=True)) -device = torch.device("cuda" if torch.cuda.is_available() else "cpu") -trained_model = trained_model.to(device) - -# Generate text from a prompt -prompt = "The history of science" -prompt_tokens = encoding.encode_ordinary(prompt) -generated_tokens = generate(trained_model, prompt_tokens, max_new_tokens=50) -generated_text = encoding.decode(generated_tokens) -print(f"Prompt: {prompt}") -print(f"Generated: {generated_text}") +# The logs from each worker show training and validation metrics per epoch. +# With random weights and only a few steps, expect a high loss (~10–11) +# and perplexity in the tens of thousands — this is normal. +# +# .. code-block:: text +# +# Epoch 0: train_loss=10.9492, val_loss=10.0157, val_perplexity=22374.06 +# +# In a real training run with more epochs and the full dataset, you would +# see these values steadily decrease. ############################################################################### -# .. note:: +# Checkpointing +# ~~~~~~~~~~~~~ +# +# In a production training run you would enable checkpointing so that +# training can resume from the last saved state after a failure. This +# requires a **shared storage path** (e.g. an S3 bucket or NFS mount) +# accessible from all nodes: +# +# .. code-block:: python +# +# trainer = TorchTrainer( +# ..., +# run_config=RunConfig( +# storage_path="s3://my-bucket/ray-checkpoints", +# checkpoint_config=CheckpointConfig(num_to_keep=2), +# ), +# ) +# +# Inside the training function, save a checkpoint with +# ``ray.train.report()``: +# +# .. code-block:: python # -# With only 2 epochs of training on Wikitext-103, the generated text -# will be mostly incoherent. This is expected for a tutorial that -# prioritizes demonstrating the distributed training workflow over -# producing a fully-trained model. In a real pre-training run, you would -# train for many more epochs with a learning rate schedule and a larger -# dataset. +# with tempfile.TemporaryDirectory() as tmp_dir: +# model.module.save_pretrained(tmp_dir) # .module unwraps DDP +# checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) +# ray.train.report(metrics={...}, checkpoint=checkpoint) # # Scaling to a multi-node cluster # ------------------------------- @@ -584,8 +617,8 @@ def generate(model: TransformerLM, prompt_tokens: list[int], max_new_tokens: int # # In this tutorial, you: # -# * Built a ~117M-parameter decoder-only transformer language model -# using pure PyTorch. +# * Pre-trained a GPT-2 (~124M-parameter) language model using +# Hugging Face Transformers and PyTorch. # * Loaded and preprocessed the Wikitext-103 dataset using Ray Data # with distributed streaming. # * Distributed training across 8 GPUs using Ray Train's From a20b52215dc64b4b7f1de40e540d389574709243 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 22:08:19 -0800 Subject: [PATCH 03/18] another pass --- .../simple_distributed_training_tutorial.py | 222 +++++++++--------- 1 file changed, 114 insertions(+), 108 deletions(-) diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/simple_distributed_training_tutorial.py index bac4418ef0..b8acb05f4a 100644 --- a/beginner_source/simple_distributed_training_tutorial.py +++ b/beginner_source/simple_distributed_training_tutorial.py @@ -52,21 +52,25 @@ ############################################################################### -import math -import os -import tempfile +import time import numpy as np import ray import ray.train import tiktoken import torch -import torch.nn as nn from datasets import load_dataset from ray.train import CheckpointConfig, RunConfig, ScalingConfig from ray.train.torch import TorchTrainer from transformers import GPT2Config, GPT2LMHeadModel +# Enable smoke test to run this tutorial quickly. +SMOKE_TEST = True + +# Reduce Ray Data verbosity +ray.data.DataContext.get_current().enable_progress_bars = False +ray.data.DataContext.get_current().print_on_execution_start = False + ############################################################################### # Load the dataset with Ray Data # ------------------------------ @@ -87,23 +91,24 @@ ############################################################################### # The schema should look like this: -# ```text` -# Schema: Column Type -# ------ ---- -# text string -# ``` # -# This means that the dataset has one column called "text" and it is a string. +# .. code-block:: text +# +# Column Type +# ------ ---- +# text string +# +# This means that the dataset has one column called ``text`` and it is a string. # # Inspect raw data # ~~~~~~~~~~~~~~~~ # # Use ``take(n)`` to fetch a small number of rows for inspection. # Each row is a dictionary with the column names as keys. -print("--- Raw data sample (train_ds.take(2)) ---") +print("--- Raw data sample ---") sample = train_ds.take(2) for i, row in enumerate(sample): - text_preview = row["text"][:120] + "..." if len(row["text"]) > 120 else row["text"] + text_preview = (row["text"][:120] + "...") if len(row["text"]) > 120 else row["text"] print(f" Row {i}: {text_preview!r}") ############################################################################### @@ -114,39 +119,25 @@ # Row 0: '' # Row 1: ' = Valkyria Chronicles III = \n' # -# The raw dataset evidently contains empty lines and short headers that would -# produce zero tokens after chunking. Use ``filter()`` to keep only rows -# with at least 20 words, which removes noise and avoids wasted work in -# downstream stages. - -MIN_WORDS = 20 -train_ds = train_ds.filter(lambda row: len(row["text"].split()) >= MIN_WORDS) -val_ds = val_ds.filter(lambda row: len(row["text"].split()) >= MIN_WORDS) - -# DEBUG: limit dataset size for fast iteration -train_ds = train_ds.limit(100) -val_ds = val_ds.limit(100) +# Each row in Wikitext-103 is a single line from a Wikipedia article. +# Consecutive rows belong to the same article, with empty rows separating +# paragraphs. New articles begin with a title line like +# ``= Article Title =``. The tokenization step below inserts an +# ``<|endoftext|>`` separator token before each title line so the model +# learns to reset context at article boundaries. -print("--- After filtering short rows (train_ds.take(2)) ---") -filtered_sample = train_ds.take(2) -for i, row in enumerate(filtered_sample): - text_preview = row["text"][:120] + "..." if len(row["text"]) > 120 else row["text"] - print(f" Row {i}: {text_preview!r}") +# Limit dataset size for fast iteration during smoke tests. +if SMOKE_TEST: + train_ds = train_ds.limit(1000) + val_ds = val_ds.limit(1000) ############################################################################### -# After filtering, only substantive paragraphs remain: -# -# .. code-block:: text -# -# Row 0: ' Senjō no Valkyria 3 : Unrecorded Chronicles ( Japanese : ...' -# Row 1: ' The game began development in 2010 , carrying over a large ...' -# # Tokenize and chunk the data # ---------------------------- # # Language models consume fixed-length sequences of token IDs. The -# preprocessing step converts raw text into overlapping input/target pairs -# for next-token prediction. +# preprocessing step converts raw text into input/target pairs for +# next-token prediction. # # This tutorial uses ``tiktoken`` with the GPT-2 encoding (vocabulary size # 50,257). ``tiktoken`` is a fast, standalone tokenizer that has no @@ -154,26 +145,38 @@ # # The ``tokenize_and_chunk`` function: # -# 1. Tokenizes each batch of text. -# 2. Concatenates all tokens into a single stream. -# 3. Splits the stream into fixed-length blocks of ``block_size + 1`` +# 1. Tokenizes each batch of text, concatenating into a single stream. +# Article title lines (e.g. ``= Article Title =``) trigger an +# ``<|endoftext|>`` separator so the model resets context at article +# boundaries. +# 2. Splits the stream into fixed-length blocks of ``block_size + 1`` # tokens. -# 4. Returns ``input_ids`` (the first ``block_size`` tokens) and +# 3. Returns ``input_ids`` (the first ``block_size`` tokens) and # ``labels`` (shifted by one position for next-token prediction). BLOCK_SIZE = 256 VOCAB_SIZE = 50257 encoding = tiktoken.get_encoding("gpt2") +EOT_TOKEN = encoding.eot_token # <|endoftext|> token ID (50256) + + +def _is_article_title(text: str) -> bool: + """Detect Wikitext article title lines like ' = Some Title = '.""" + stripped = text.strip() + return stripped.startswith("= ") and stripped.endswith(" =") and not stripped.startswith("= =") def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Tokenize text and split into fixed-length chunks for language modeling.""" - # Tokenize all texts in the batch and concatenate + # Reconstruct the original text stream by joining rows with newlines. + # Article title lines signal new articles, so we insert an + # <|endoftext|> separator before them. all_tokens: list[int] = [] for text in batch["text"]: - if text.strip(): # skip empty lines - all_tokens.extend(encoding.encode_ordinary(text)) + if _is_article_title(text): + all_tokens.append(EOT_TOKEN) + all_tokens.extend(encoding.encode_ordinary(text + "\n")) # Split into chunks of block_size + 1 (input + 1 shifted target) chunk_len = BLOCK_SIZE + 1 @@ -192,7 +195,6 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: } - ############################################################################### # Apply the tokenization with ``map_batches()``. This operation is **lazy**, # meaning that Ray Data defers execution until a downstream consumer requests the @@ -205,7 +207,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ############################################################################### # Inspect the tokenized output with ``take(2)``: -print("--- Tokenized data sample (train_ds.take(2)) ---") +print("--- After tokenization ---") tokenized_sample = train_ds.take(2) for i, row in enumerate(tokenized_sample): ids = row["input_ids"] @@ -254,18 +256,16 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # * ~124M parameters # * Built-in causal attention masking and weight tying -MODEL_CONFIG = GPT2Config( - vocab_size=VOCAB_SIZE, - n_positions=BLOCK_SIZE, - n_embd=768, - n_layer=12, - n_head=12, -) - def create_model(): """Create a fresh GPT-2 model from config (random weights).""" - model = GPT2LMHeadModel(MODEL_CONFIG) + model = GPT2LMHeadModel(GPT2Config( + vocab_size=VOCAB_SIZE, + n_positions=BLOCK_SIZE, + n_embd=768, + n_layer=12, + n_head=12, + )) model.loss_type = "ForCausalLM" return model @@ -277,21 +277,11 @@ def create_model(): num_params = sum(p.numel() for p in model.parameters()) print(f"Model parameters: {num_params:,} ({num_params / 1e6:.1f}M)") +del model # Free memory before training + ############################################################################### # You should see approximately **123.8M parameters** — the standard GPT-2 # "small" size. -# -# Quick smoke test: run a forward pass on CPU to verify the model produces the -# expected output shape before launching distributed training: - -model.eval() -dummy_input = torch.randint(0, VOCAB_SIZE, (2, BLOCK_SIZE)) -with torch.no_grad(): - out = model(dummy_input) -print(f"Smoke test passed — logits shape: {out.logits.shape}") -del model, dummy_input, out # Free memory before distributed training - - ############################################################################### # Define the distributed training function @@ -315,12 +305,15 @@ def create_model(): # 4. **``ray.train.report(metrics, checkpoint=...)``** reports metrics # to the driver and optionally saves a checkpoint. + def train_func_per_worker(config: dict): """Training function executed by each distributed worker.""" lr = config["lr"] + weight_decay = config["weight_decay"] + max_grad_norm = config["max_grad_norm"] epochs = config["epochs"] batch_size = config["batch_size_per_worker"] - max_steps_per_epoch = config.get("max_steps_per_epoch") # DEBUG: cap steps + max_steps_per_epoch = config.get("max_steps_per_epoch") # --- Data ----------------------------------------------------------- # Each worker gets an automatic shard of the dataset. @@ -333,13 +326,15 @@ def train_func_per_worker(config: dict): # it on the correct device. model = ray.train.torch.prepare_model(model) - optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.1) + optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay) # --- Training loop -------------------------------------------------- for epoch in range(epochs): model.train() train_loss_sum = 0.0 train_batches = 0 + train_items = 0 + epoch_start = time.perf_counter() # iter_torch_batches returns dicts of tensors already on the GPU. for batch in train_data_shard.iter_torch_batches( @@ -356,18 +351,20 @@ def train_func_per_worker(config: dict): optimizer.zero_grad() loss.backward() # Gradient clipping for training stability - torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_grad_norm) optimizer.step() train_loss_sum += loss.item() train_batches += 1 + train_items += batch_size if max_steps_per_epoch and train_batches >= max_steps_per_epoch: - break # DEBUG: early stop + break + train_elapsed = time.perf_counter() - epoch_start avg_train_loss = train_loss_sum / max(train_batches, 1) - # --- Validation ------------------------------------------------- + # --- Validation ----------------------------------------------------- model.eval() val_loss_sum = 0.0 val_batches = 0 @@ -385,31 +382,23 @@ def train_func_per_worker(config: dict): val_batches += 1 if max_steps_per_epoch and val_batches >= max_steps_per_epoch: - break # DEBUG: early stop + break avg_val_loss = val_loss_sum / max(val_batches, 1) - val_perplexity = math.exp(min(avg_val_loss, 20)) # cap to avoid overflow - # --- Report metrics ----------------------------------------------- + # --- Report metrics ------------------------------------------------- + metrics = { + "train_loss": round(avg_train_loss, 4), + "val_loss": round(avg_val_loss, 4), + "epoch": epoch, + "batches_per_sec": round(train_batches / max(train_elapsed, 1e-6), 2), + "items_per_sec": round(train_items / max(train_elapsed, 1e-6), 2), + } ray.train.report( - metrics={ - "train_loss": avg_train_loss, - "val_loss": avg_val_loss, - "val_perplexity": val_perplexity, - "epoch": epoch, - }, - checkpoint=None, # If we were checkpointing, we'd pass checkpoint to Ray Train here + metrics=metrics, + checkpoint=None, # If we were checkpointing, we'd pass a Checkpoint here ) - if ray.train.get_context().get_world_rank() == 0: - print( - f"Epoch {epoch}: " - f"train_loss={avg_train_loss:.4f}, " - f"val_loss={avg_val_loss:.4f}, " - f"val_perplexity={val_perplexity:.2f}" - ) - - ############################################################################### # Configure and launch distributed training @@ -440,23 +429,27 @@ def train_func_per_worker(config: dict): # Wrapping provided model in DistributedDataParallel. NUM_WORKERS = 8 # One worker per GPU on this machine -NUM_EPOCHS = 1 # DEBUG: reduced from 2 +NUM_EPOCHS = 20 BATCH_SIZE_PER_WORKER = 16 +LR = 3e-4 +WEIGHT_DECAY = 0.1 +MAX_GRAD_NORM = 1.0 trainer = TorchTrainer( train_loop_per_worker=train_func_per_worker, train_loop_config={ - "lr": 3e-4, + "lr": LR, + "weight_decay": WEIGHT_DECAY, + "max_grad_norm": MAX_GRAD_NORM, "epochs": NUM_EPOCHS, "batch_size_per_worker": BATCH_SIZE_PER_WORKER, - "max_steps_per_epoch": 5, # DEBUG: cap at 5 steps for fast iteration + "max_steps_per_epoch": 5 if SMOKE_TEST else None, }, datasets={"train": train_ds, "validation": val_ds}, scaling_config=ScalingConfig( num_workers=NUM_WORKERS, use_gpu=True, ), - # run_config=RunConfig(), ) result = trainer.fit() @@ -465,23 +458,27 @@ def train_func_per_worker(config: dict): # Inspect results # --------------- # -# After training, the ``Result`` object contains the final metrics reported -# by the workers. +# After training, the ``Result`` object contains the final metrics and +# checkpoint. ``result.metrics`` is populated from the last +# ``ray.train.report()`` call. ``result.checkpoint`` is ``None`` here +# because this tutorial does not save checkpoints. -print(f"\nTraining finished!") -print(f"Final metrics: {result.metrics}") +print("\nTraining finished!") ############################################################################### -# The logs from each worker show training and validation metrics per epoch. -# With random weights and only a few steps, expect a high loss (~10–11) -# and perplexity in the tens of thousands — this is normal. +# ``result.metrics`` contains the metrics dict from the last +# ``ray.train.report()`` call: # # .. code-block:: text # -# Epoch 0: train_loss=10.9492, val_loss=10.0157, val_perplexity=22374.06 +# {'train_loss': 10.95, 'val_loss': 10.02, 'epoch': 0, +# 'batches_per_sec': 1.23, 'items_per_sec': 19.68} # -# In a real training run with more epochs and the full dataset, you would -# see these values steadily decrease. +# The per-worker logs show training loss, validation loss, and throughput +# metrics for each epoch. With random weights and only a few steps, expect +# a high loss (~10–11) — this is normal. In a real training run with more +# epochs and the full dataset, you would see loss steadily decrease and +# throughput stabilize. ############################################################################### # Checkpointing @@ -551,6 +548,15 @@ def train_func_per_worker(config: dict): # # No changes to the training function are needed. The same # ``train_func_per_worker`` runs identically whether on 1 GPU or 256 GPUs. +# +# .. note:: +# +# This tutorial uses ``DistributedDataParallel`` (DDP), which replicates +# the full model on every GPU. For larger models that don't fit on a +# single GPU, you can switch to +# `FullyShardedDataParallel `__ +# (FSDP) to shard parameters, gradients, and optimizer states across +# workers by setting ``prepare_model(parallel_strategy="fsdp")``. ############################################################################### # Fault tolerance @@ -621,10 +627,10 @@ def train_func_per_worker(config: dict): # Hugging Face Transformers and PyTorch. # * Loaded and preprocessed the Wikitext-103 dataset using Ray Data # with distributed streaming. -# * Distributed training across 8 GPUs using Ray Train's +# * Ran distributed training across 8 GPUs using Ray Train's # ``TorchTrainer`` with only minimal changes to a standard PyTorch # training loop. -# * Saved and loaded distributed checkpoints for model recovery. +# * Learned how to save distributed checkpoints for model recovery. # * Learned how to scale to multi-node clusters by changing # ``ScalingConfig`` and ``RunConfig``. # * Learned about Ray Train's **fault tolerance** mechanisms for From 4e134f8225796ffe5bf40462cfaf3cd4a50d05db Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 22:30:16 -0800 Subject: [PATCH 04/18] another pass --- .../simple_distributed_training_tutorial.py | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/simple_distributed_training_tutorial.py index b8acb05f4a..cc393303eb 100644 --- a/beginner_source/simple_distributed_training_tutorial.py +++ b/beginner_source/simple_distributed_training_tutorial.py @@ -126,10 +126,10 @@ # ``<|endoftext|>`` separator token before each title line so the model # learns to reset context at article boundaries. -# Limit dataset size for fast iteration during smoke tests. +# Limit dataset size for fast iteration during smoke tests.= if SMOKE_TEST: - train_ds = train_ds.limit(1000) - val_ds = val_ds.limit(1000) + train_ds = train_ds.limit(2500) + val_ds = val_ds.limit(2500) ############################################################################### # Tokenize and chunk the data @@ -275,7 +275,7 @@ def create_model(): model = create_model() num_params = sum(p.numel() for p in model.parameters()) -print(f"Model parameters: {num_params:,} ({num_params / 1e6:.1f}M)") +print(f"Model parameters: {num_params / 1e6:.1f}M") del model # Free memory before training @@ -333,7 +333,7 @@ def train_func_per_worker(config: dict): model.train() train_loss_sum = 0.0 train_batches = 0 - train_items = 0 + train_tokens = 0 epoch_start = time.perf_counter() # iter_torch_batches returns dicts of tensors already on the GPU. @@ -356,7 +356,7 @@ def train_func_per_worker(config: dict): train_loss_sum += loss.item() train_batches += 1 - train_items += batch_size + train_tokens += input_ids.numel() if max_steps_per_epoch and train_batches >= max_steps_per_epoch: break @@ -385,14 +385,16 @@ def train_func_per_worker(config: dict): break avg_val_loss = val_loss_sum / max(val_batches, 1) + epoch_elapsed = time.perf_counter() - epoch_start # --- Report metrics ------------------------------------------------- metrics = { "train_loss": round(avg_train_loss, 4), "val_loss": round(avg_val_loss, 4), "epoch": epoch, - "batches_per_sec": round(train_batches / max(train_elapsed, 1e-6), 2), - "items_per_sec": round(train_items / max(train_elapsed, 1e-6), 2), + "epoch_time_sec": round(epoch_elapsed, 2), + "epoch_tokens": train_tokens, + "tokens_per_sec": round(train_tokens / max(train_elapsed, 1e-6), 2), } ray.train.report( metrics=metrics, @@ -427,9 +429,14 @@ def train_func_per_worker(config: dict): # ... # Moving model to device: cuda:0 # Wrapping provided model in DistributedDataParallel. +# +# ``batch_size_per_worker`` is the number of sequences each worker +# processes per gradient step. With 8 workers and a per-worker batch size +# of 16, the **effective global batch size** is 8 × 16 = 128 sequences, +# or 128 × 256 = 32,768 tokens per optimizer step. NUM_WORKERS = 8 # One worker per GPU on this machine -NUM_EPOCHS = 20 +NUM_EPOCHS = 5 BATCH_SIZE_PER_WORKER = 16 LR = 3e-4 WEIGHT_DECAY = 0.1 @@ -471,8 +478,8 @@ def train_func_per_worker(config: dict): # # .. code-block:: text # -# {'train_loss': 10.95, 'val_loss': 10.02, 'epoch': 0, -# 'batches_per_sec': 1.23, 'items_per_sec': 19.68} +# {'train_loss': 7.0646, 'val_loss': 7.6051, 'epoch': 4, +# 'epoch_time_sec': 12.34, 'epoch_tokens': 20480, 'tokens_per_sec': 1759.8} # # The per-worker logs show training loss, validation loss, and throughput # metrics for each epoch. With random weights and only a few steps, expect From 00ab23ad2a73658cbb1dd8902ad97417dce818de Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 23:25:30 -0800 Subject: [PATCH 05/18] another pass --- .../simple_distributed_training_tutorial.py | 167 ++++++++++++------ 1 file changed, 116 insertions(+), 51 deletions(-) diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/simple_distributed_training_tutorial.py index cc393303eb..be7a2e842b 100644 --- a/beginner_source/simple_distributed_training_tutorial.py +++ b/beginner_source/simple_distributed_training_tutorial.py @@ -14,10 +14,11 @@ * Pre-train a GPT-2 (~124M-parameter) language model using PyTorch and Hugging Face Transformers. - * Distribute training across multiple GPUs with Ray Train. - * Stream training data from Hugging Face datasets with Ray Data. + * Distribute training across multiple GPUs with Ray Train with minimal code changes. + * Stream training data from Hugging Face datasets with Ray Data's distributed workers. * Save and load distributed checkpoints. * Scale from a single node to a multi-node cluster with minimal code changes. + * Optimize cost and performance with heterogeneous clusters. * Monitor training with the Ray dashboard. .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites @@ -50,8 +51,6 @@ Then, import the required libraries: """ -############################################################################### - import time import numpy as np @@ -87,6 +86,11 @@ train_ds = ray.data.from_huggingface(hf_ds["train"]) val_ds = ray.data.from_huggingface(hf_ds["validation"]) +# Limit dataset size for fast iteration during smoke tests.= +if SMOKE_TEST: + train_ds = train_ds.limit(2500) + val_ds = val_ds.limit(2500) + print(f"Dataset schema:\n{train_ds.schema()}") ############################################################################### @@ -101,10 +105,12 @@ # This means that the dataset has one column called ``text`` and it is a string. # # Inspect raw data +# # ~~~~~~~~~~~~~~~~ # # Use ``take(n)`` to fetch a small number of rows for inspection. # Each row is a dictionary with the column names as keys. + print("--- Raw data sample ---") sample = train_ds.take(2) for i, row in enumerate(sample): @@ -117,7 +123,7 @@ # .. code-block:: text # # Row 0: '' -# Row 1: ' = Valkyria Chronicles III = \n' +# Row 1: ' = Valkyria Chronicles III = ' # # Each row in Wikitext-103 is a single line from a Wikipedia article. # Consecutive rows belong to the same article, with empty rows separating @@ -125,13 +131,7 @@ # ``= Article Title =``. The tokenization step below inserts an # ``<|endoftext|>`` separator token before each title line so the model # learns to reset context at article boundaries. - -# Limit dataset size for fast iteration during smoke tests.= -if SMOKE_TEST: - train_ds = train_ds.limit(2500) - val_ds = val_ds.limit(2500) - -############################################################################### +# # Tokenize and chunk the data # ---------------------------- # @@ -195,12 +195,14 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: } + ############################################################################### # Apply the tokenization with ``map_batches()``. This operation is **lazy**, # meaning that Ray Data defers execution until a downstream consumer requests the # results. Lazy execution lets Ray optimize the entire pipeline before any # work begins. +# These do not trigger execution. train_ds = train_ds.map_batches(tokenize_and_chunk, batch_format="numpy") val_ds = val_ds.map_batches(tokenize_and_chunk, batch_format="numpy") @@ -236,29 +238,23 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # .. code-block:: text # # Execution plan: InputDataBuffer[Input] -# -> TaskPoolMapOperator[Filter] # -> TaskPoolMapOperator[MapBatches(tokenize_and_chunk)] # -> OutputSplitter[split(8, equal=True)] # -# This tells you exactly how Ray Data will stream through filter, tokenize, -# and split the data across 8 workers. +# This tells you exactly how Ray Data will stream through tokenization +# and split the data across 8 trainer workers. ############################################################################### # Define the transformer model # ---------------------------- # # The model is a decoder-only transformer language model using Hugging Face's -# ``GPT2LMHeadModel``. +# ``GPT2LMHeadModel``. The hyperparameters below are the standard GPT-2 "small" architecture. # -# The GPT-2 "small" architecture: -# -# * 12 transformer layers, 12 attention heads, 768 hidden size -# * ~124M parameters -# * Built-in causal attention masking and weight tying def create_model(): - """Create a fresh GPT-2 model from config (random weights).""" + """Create a GPT-2 small model with random weights.""" model = GPT2LMHeadModel(GPT2Config( vocab_size=VOCAB_SIZE, n_positions=BLOCK_SIZE, @@ -270,6 +266,7 @@ def create_model(): return model + ############################################################################### # Verify the model size: @@ -280,8 +277,7 @@ def create_model(): del model # Free memory before training ############################################################################### -# You should see approximately **123.8M parameters** — the standard GPT-2 -# "small" size. +# You should see approximately 123.8M parameters. ############################################################################### # Define the distributed training function @@ -294,15 +290,15 @@ def create_model(): # # The key Ray Train integration points are: # -# 1. **``ray.train.get_dataset_shard("train")``** retrieves the +# 1. ``ray.train.get_dataset_shard("train")`` retrieves the # worker's portion of the data. Ray Data automatically splits the # dataset across all workers. -# 2. **``ray.train.torch.prepare_model(model)``** wraps the model in +# 2. ``ray.train.torch.prepare_model(model)`` wraps the model in # ``DistributedDataParallel`` and moves it to the correct GPU. -# 3. **``shard.iter_torch_batches(batch_size=...)``** returns an iterator +# 3. ``shard.iter_torch_batches(batch_size=...)`` returns an iterator # of ``dict[str, torch.Tensor]`` batches, with tensors automatically -# placed on the worker's GPU. -# 4. **``ray.train.report(metrics, checkpoint=...)``** reports metrics +# placed on the worker's GPU. Setting ``prefetch_batches=2`` opportunistically fetches 2 batches ahead of the current batch. +# 4. ``ray.train.report(metrics, checkpoint=...)`` reports metrics # to the driver and optionally saves a checkpoint. @@ -338,7 +334,7 @@ def train_func_per_worker(config: dict): # iter_torch_batches returns dicts of tensors already on the GPU. for batch in train_data_shard.iter_torch_batches( - batch_size=batch_size, dtypes=torch.long + batch_size=batch_size, dtypes=torch.long, prefetch_batches=2 ): input_ids = batch["input_ids"] labels = batch["labels"] @@ -371,7 +367,7 @@ def train_func_per_worker(config: dict): with torch.no_grad(): for batch in val_data_shard.iter_torch_batches( - batch_size=batch_size, dtypes=torch.long + batch_size=batch_size, dtypes=torch.long, prefetch_batches=2 ): input_ids = batch["input_ids"] labels = batch["labels"] @@ -402,18 +398,20 @@ def train_func_per_worker(config: dict): ) + ############################################################################### # Configure and launch distributed training # ------------------------------------------ # -# The ``TorchTrainer`` brings everything together. It accepts: +# The ``TorchTrainer`` brings everything together. Running ``trainer.fit()`` finally +# triggers the execution of the full data pipeline and training loop. The Trainer accepts: # -# * **``train_func_per_worker``**: the function each worker executes. -# * **``train_loop_config``**: a dictionary of hyperparameters forwarded +# * ``train_func_per_worker``: the function each worker executes. +# * ``train_loop_config``: a dictionary of hyperparameters forwarded # to the training function. -# * **``datasets``**: a dictionary of Ray Datasets. Ray Train automatically +# * ``datasets``: a dictionary of Ray Datasets. Ray Train automatically # splits each dataset across workers. -# * **``scaling_config``**: specifies the number of workers and whether to +# * ``scaling_config``: specifies the number of workers and whether to # use GPUs. # # Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray @@ -424,8 +422,9 @@ def train_func_per_worker(config: dict): # .. code-block:: text # # Started training worker group of size 8: -# - (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 -# - (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 +# +# * (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 +# * (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 # ... # Moving model to device: cuda:0 # Wrapping provided model in DistributedDataParallel. @@ -452,6 +451,7 @@ def train_func_per_worker(config: dict): "batch_size_per_worker": BATCH_SIZE_PER_WORKER, "max_steps_per_epoch": 5 if SMOKE_TEST else None, }, + # Register the datasets, datasets={"train": train_ds, "validation": val_ds}, scaling_config=ScalingConfig( num_workers=NUM_WORKERS, @@ -483,12 +483,11 @@ def train_func_per_worker(config: dict): # # The per-worker logs show training loss, validation loss, and throughput # metrics for each epoch. With random weights and only a few steps, expect -# a high loss (~10–11) — this is normal. In a real training run with more -# epochs and the full dataset, you would see loss steadily decrease and -# throughput stabilize. +# a high loss (~10–11). ############################################################################### # Checkpointing +# # ~~~~~~~~~~~~~ # # In a production training run you would enable checkpointing so that @@ -507,23 +506,70 @@ def train_func_per_worker(config: dict): # ) # # Inside the training function, save a checkpoint with -# ``ray.train.report()``: +# ``ray.train.report()``. Every worker must still call ``ray.train.report()``: # # .. code-block:: python # # with tempfile.TemporaryDirectory() as tmp_dir: -# model.module.save_pretrained(tmp_dir) # .module unwraps DDP -# checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) +# checkpoint = None +# if ray.train.get_context().get_world_rank() == 0: +# torch.save(model.module.state_dict(), +# os.path.join(tmp_dir, "model.pt")) +# torch.save(optimizer.state_dict(), +# os.path.join(tmp_dir, "optimizer.pt")) +# torch.save({"epoch": epoch}, +# os.path.join(tmp_dir, "extra_state.pt")) +# checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) # ray.train.report(metrics={...}, checkpoint=checkpoint) # +# Note that ``.module`` unwraps the ``DistributedDataParallel`` wrapper so +# you save the underlying model weights rather than the DDP wrapper. +# +# To **resume training from a checkpoint**, call +# ``ray.train.get_checkpoint()`` at the top of your training function. +# When Ray Train restarts workers (for example, after a failure), it +# automatically provides the latest checkpoint. If no checkpoint exists +# (i.e. this is a fresh run), the function returns ``None``: +# +# .. code-block:: python +# +# def train_func_per_worker(config: dict): +# model = create_model() +# model = ray.train.torch.prepare_model(model) +# optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"]) +# +# # Resume from the latest checkpoint if one exists. +# start_epoch = 0 +# checkpoint = ray.train.get_checkpoint() +# if checkpoint: +# with checkpoint.as_directory() as ckpt_dir: +# model.module.load_state_dict( +# torch.load(os.path.join(ckpt_dir, "model.pt")) +# ) +# optimizer.load_state_dict( +# torch.load(os.path.join(ckpt_dir, "optimizer.pt")) +# ) +# start_epoch = torch.load( +# os.path.join(ckpt_dir, "extra_state.pt") +# )["epoch"] + 1 +# +# for epoch in range(start_epoch, config["epochs"]): +# # ... training loop ... +# +# You can also call ``TorchTrainer.restore(path, datasets=...)`` to +# restore an entire interrupted experiment from its results directory +# without re-specifying the full trainer configuration. See the `Ray Train +# checkpointing guide +# `__ +# for more details. +# # Scaling to a multi-node cluster # ------------------------------- # # The code above runs on a single 8-GPU machine. Scaling to a multi-node # cluster requires only two changes: # -# 1. **Increase ``num_workers``** to match the total number of GPUs across -# all nodes. +# 1. **Increase ``num_workers``** to match the total number of GPUs in the cluster. # 2. **Set a shared storage path** so that all nodes can access checkpoints. # # For example, to train on a cluster of 4 nodes with 8 GPUs each @@ -549,8 +595,6 @@ def train_func_per_worker(config: dict): # Ray Train automatically: # # * Launches workers across all available nodes. -# * Initializes ``torch.distributed`` with the NCCL backend. -# * Configures ``DistributedDataParallel`` across nodes. # * Shards data across all workers. # # No changes to the training function are needed. The same @@ -564,6 +608,24 @@ def train_func_per_worker(config: dict): # `FullyShardedDataParallel `__ # (FSDP) to shard parameters, gradients, and optimizer states across # workers by setting ``prepare_model(parallel_strategy="fsdp")``. +# +# Heterogeneous clusters: separate data and training resources +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Because Ray Data and Ray Train are separate systems, they don't have to +# share the same machines. By default, Ray Data preprocessing and training +# workers all run on the same nodes. However, you can optionally add +# **CPU-only nodes** to your cluster and Ray Data will automatically +# schedule preprocessing tasks on them, keeping your expensive GPU nodes +# free for training. +# +# This is useful when data preprocessing is a bottleneck. If you notice +# low GPU utilization because workers are waiting on data, you can add +# cheaper CPU-only nodes to the cluster and Ray Data scales out +# preprocessing to them. +# +# For more details, see `Configuring data ingest +# `__. ############################################################################### # Fault tolerance @@ -582,8 +644,7 @@ def train_func_per_worker(config: dict): # storage. When recovering from a failure, training resumes from the # latest checkpoint rather than starting over. # * **Node failure handling**: If an entire node goes down, Ray -# redistributes work to surviving nodes and replaces the failed node -# when new resources become available. +# replaces the failed node and resumes training. # # To enable automatic failure recovery, configure ``FailureConfig`` in your ``RunConfig``: # @@ -637,9 +698,13 @@ def train_func_per_worker(config: dict): # * Ran distributed training across 8 GPUs using Ray Train's # ``TorchTrainer`` with only minimal changes to a standard PyTorch # training loop. -# * Learned how to save distributed checkpoints for model recovery. +# * Learned how to save and load distributed checkpoints for model +# recovery. # * Learned how to scale to multi-node clusters by changing # ``ScalingConfig`` and ``RunConfig``. +# * Learned how heterogeneous clusters let you run data preprocessing +# on CPU nodes and training on GPU nodes for cost and performance +# optimization. # * Learned about Ray Train's **fault tolerance** mechanisms for # production training jobs. # * Monitored training with the Ray dashboard. From dadec17cb0f1c2051248f2fdbd53dad05eeada84 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 23:28:40 -0800 Subject: [PATCH 06/18] rename tutorial and add to index --- ...torial.py => distributed_training_with_ray_tutorial.py} | 0 index.rst | 7 +++++++ 2 files changed, 7 insertions(+) rename beginner_source/{simple_distributed_training_tutorial.py => distributed_training_with_ray_tutorial.py} (100%) diff --git a/beginner_source/simple_distributed_training_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py similarity index 100% rename from beginner_source/simple_distributed_training_tutorial.py rename to beginner_source/distributed_training_with_ray_tutorial.py diff --git a/index.rst b/index.rst index 5a5e80abfb..301a8ad5e0 100644 --- a/index.rst +++ b/index.rst @@ -159,6 +159,13 @@ Welcome to PyTorch Tutorials :link: advanced/usb_semisup_learn.html :tags: Image/Video +.. customcarditem:: + :header: Distributed Training with Ray Train + :card_description: Pre-train a transformer language model across multiple GPUs using PyTorch and Ray Train. + :image: _static/img/ray-data.png + :link: beginner/distributed_training_with_ray_tutorial.html + :tags: Text,Best-Practice,Ray-Distributed,Parallel-and-Distributed-Training + .. Audio .. customcarditem:: From 8e198bc38fc7042b3dea61034596872094572a14 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 9 Feb 2026 23:39:38 -0800 Subject: [PATCH 07/18] another pass --- .../distributed_training_with_ray_tutorial.py | 71 +++++++++---------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index be7a2e842b..967ba4c6f3 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -48,7 +48,7 @@ To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets transformers``. -Then, import the required libraries: +Then, import the required libraries. """ import time @@ -86,7 +86,7 @@ train_ds = ray.data.from_huggingface(hf_ds["train"]) val_ds = ray.data.from_huggingface(hf_ds["validation"]) -# Limit dataset size for fast iteration during smoke tests.= +# Limit dataset size for fast iteration during smoke tests. if SMOKE_TEST: train_ds = train_ds.limit(2500) val_ds = val_ds.limit(2500) @@ -94,7 +94,7 @@ print(f"Dataset schema:\n{train_ds.schema()}") ############################################################################### -# The schema should look like this: +# The schema can look like this: # # .. code-block:: text # @@ -105,7 +105,6 @@ # This means that the dataset has one column called ``text`` and it is a string. # # Inspect raw data -# # ~~~~~~~~~~~~~~~~ # # Use ``take(n)`` to fetch a small number of rows for inspection. @@ -143,16 +142,16 @@ # 50,257). ``tiktoken`` is a fast, standalone tokenizer that has no # dependency on the Hugging Face ``transformers`` library. # -# The ``tokenize_and_chunk`` function: +# The ``tokenize_and_chunk`` function does the following: # -# 1. Tokenizes each batch of text, concatenating into a single stream. -# Article title lines (e.g. ``= Article Title =``) trigger an -# ``<|endoftext|>`` separator so the model resets context at article -# boundaries. -# 2. Splits the stream into fixed-length blocks of ``block_size + 1`` -# tokens. -# 3. Returns ``input_ids`` (the first ``block_size`` tokens) and -# ``labels`` (shifted by one position for next-token prediction). +# * Tokenizes each batch of text, concatenating into a single stream. +# Article title lines (for example, ``= Article Title =``) trigger an +# ``<|endoftext|`` separator so the model resets context at article +# boundaries. +# * Splits the stream into fixed-length blocks of ``block_size + 1`` +# tokens. +# * Returns ``input_ids`` (the first ``block_size`` tokens) and +# ``labels`` (shifted by one position for next-token prediction). BLOCK_SIZE = 256 VOCAB_SIZE = 50257 @@ -224,7 +223,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Streaming execution # ~~~~~~~~~~~~~~~~~~~ # -# Under the hood, Ray divides the data into **blocks** and dispatches them to +# Internally, Ray divides the data into **blocks** and dispatches them to # workers. This block-based architecture enables **streaming execution**: as # soon as a stage outputs a block, the next stage can begin processing it # immediately without waiting for previous stages to finish the entire @@ -233,7 +232,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # to fit in memory at once. # # When training starts, Ray Data logs the execution plan. For this tutorial -# it looks like: +# one possible plan is: # # .. code-block:: text # @@ -277,7 +276,7 @@ def create_model(): del model # Free memory before training ############################################################################### -# You should see approximately 123.8M parameters. +# You can see approximately 123.8M parameters. ############################################################################### # Define the distributed training function @@ -417,7 +416,7 @@ def train_func_per_worker(config: dict): # Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray # Train handles ``torch.distributed`` initialization, NCCL backend setup, # and ``DistributedDataParallel`` wrapping behind the scenes. In the logs -# you will see each worker being assigned a rank and device: +# you see each worker assigned a rank and device: # # .. code-block:: text # @@ -466,9 +465,9 @@ def train_func_per_worker(config: dict): # --------------- # # After training, the ``Result`` object contains the final metrics and -# checkpoint. ``result.metrics`` is populated from the last +# checkpoint. ``result.metrics`` comes from the last # ``ray.train.report()`` call. ``result.checkpoint`` is ``None`` here -# because this tutorial does not save checkpoints. +# because this tutorial doesn't save checkpoints. print("\nTraining finished!") @@ -487,12 +486,11 @@ def train_func_per_worker(config: dict): ############################################################################### # Checkpointing -# # ~~~~~~~~~~~~~ # # In a production training run you would enable checkpointing so that # training can resume from the last saved state after a failure. This -# requires a **shared storage path** (e.g. an S3 bucket or NFS mount) +# requires a **shared storage path** (for example, an S3 bucket or NFS mount) # accessible from all nodes: # # .. code-block:: python @@ -528,8 +526,8 @@ def train_func_per_worker(config: dict): # To **resume training from a checkpoint**, call # ``ray.train.get_checkpoint()`` at the top of your training function. # When Ray Train restarts workers (for example, after a failure), it -# automatically provides the latest checkpoint. If no checkpoint exists -# (i.e. this is a fresh run), the function returns ``None``: +# automatically provides the most recent checkpoint. If no checkpoint exists +# (this is a fresh run), the function returns ``None``: # # .. code-block:: python # @@ -600,14 +598,12 @@ def train_func_per_worker(config: dict): # No changes to the training function are needed. The same # ``train_func_per_worker`` runs identically whether on 1 GPU or 256 GPUs. # -# .. note:: -# -# This tutorial uses ``DistributedDataParallel`` (DDP), which replicates -# the full model on every GPU. For larger models that don't fit on a -# single GPU, you can switch to -# `FullyShardedDataParallel `__ -# (FSDP) to shard parameters, gradients, and optimizer states across -# workers by setting ``prepare_model(parallel_strategy="fsdp")``. +# This tutorial uses ``DistributedDataParallel`` (DDP), which replicates +# the full model on every GPU. For larger models that don't fit on a +# single GPU, you can switch to +# `FullyShardedDataParallel `__ +# (FSDP) to shard parameters, gradients, and optimizer states across +# workers by setting ``prepare_model(parallel_strategy="fsdp")``. # # Heterogeneous clusters: separate data and training resources # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -615,12 +611,12 @@ def train_func_per_worker(config: dict): # Because Ray Data and Ray Train are separate systems, they don't have to # share the same machines. By default, Ray Data preprocessing and training # workers all run on the same nodes. However, you can optionally add -# **CPU-only nodes** to your cluster and Ray Data will automatically -# schedule preprocessing tasks on them, keeping your expensive GPU nodes +# **CPU-only nodes** to your cluster and Ray Data automatically +# schedules preprocessing tasks on them, keeping your expensive GPU nodes # free for training. # # This is useful when data preprocessing is a bottleneck. If you notice -# low GPU utilization because workers are waiting on data, you can add +# low GPU use because workers are waiting on data, you can add # cheaper CPU-only nodes to the cluster and Ray Data scales out # preprocessing to them. # @@ -638,12 +634,11 @@ def train_func_per_worker(config: dict): # Ray Train's fault tolerance mechanisms include: # # * **Worker restart**: If a worker process crashes, Ray Train -# automatically restarts it and resumes training from the last -# checkpoint. +# automatically restarts it and resumes training. # * **Checkpoint recovery**: Ray Train saves checkpoints to persistent # storage. When recovering from a failure, training resumes from the # latest checkpoint rather than starting over. -# * **Node failure handling**: If an entire node goes down, Ray +# * **Node failure handling**: If an entire node goes down, Ray Train # replaces the failed node and resumes training. # # To enable automatic failure recovery, configure ``FailureConfig`` in your ``RunConfig``: @@ -679,7 +674,7 @@ def train_func_per_worker(config: dict): # * Monitor training progress across all workers # * Inspect logs from individual workers # * Identify data loading or communication bottlenecks -# * View resource utilization for CPU, GPU, and memory per worker +# * View resource use for CPU, GPU, and memory per worker # * Debug failures with detailed error messages and stack traces # # For more information, see the `Ray Train monitoring From a5a7e8b1857600cef7ec9cb64f183c50ff889e0b Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 14:22:13 -0800 Subject: [PATCH 08/18] ignore localhost url for link checker --- .lycheeignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.lycheeignore b/.lycheeignore index fc1e3f1fa8..464315b47d 100644 --- a/.lycheeignore +++ b/.lycheeignore @@ -13,5 +13,8 @@ https://pytorch.org/tutorials/beginner/colab/n # Ignore local host link from intermediate_source/tensorboard_tutorial.rst http://localhost:6006 +# Ignore local host link for Ray Dashboard +http://localhost:8265 + # Ignore local host link from advanced_source/cpp_frontend.rst https://www.uber.com/blog/deep-neuroevolution/ From 5654b90e4c6c1ad9fb8eab04142f14313c181409 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 14:22:29 -0800 Subject: [PATCH 09/18] add url to distributed toctree --- distributed.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed.rst b/distributed.rst index 8fe636d725..ba52e487cc 100644 --- a/distributed.rst +++ b/distributed.rst @@ -213,3 +213,4 @@ Custom Extensions intermediate/monarch_distributed_tutorial advanced/rpc_ddp_tutorial advanced/generic_join + beginner/distributed_training_with_ray_tutorial From fc031135c6ff08c24871cb437867eeb7f55af23e Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 14:23:54 -0800 Subject: [PATCH 10/18] add ray[train] and tiktoken to reqs --- .ci/docker/requirements.txt | 3 ++- .devcontainer/requirements.txt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.ci/docker/requirements.txt b/.ci/docker/requirements.txt index 851a2615ce..727bf9831f 100644 --- a/.ci/docker/requirements.txt +++ b/.ci/docker/requirements.txt @@ -32,7 +32,8 @@ bs4 awscliv2==2.1.1 flask spacy==3.4.1 -ray[tune]==2.52.1 +ray[train,tune]==2.52.1 +tiktoken tensorboard jinja2==3.1.3 pytorch-lightning diff --git a/.devcontainer/requirements.txt b/.devcontainer/requirements.txt index 2be1df895b..bc87c80e4a 100644 --- a/.devcontainer/requirements.txt +++ b/.devcontainer/requirements.txt @@ -14,7 +14,7 @@ bs4 awscli==1.16.35 flask spacy -ray[tune] +ray[train,tune] # PyTorch Theme -e git+https://github.com/pytorch/pytorch_sphinx_theme.git#egg=pytorch_sphinx_theme @@ -26,6 +26,7 @@ pandas scikit-image pillow==10.3.0 wget +tiktoken # for codespaces env pylint From a88a635326dcb7b44394fad252635a5ccc4938b7 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 14:59:07 -0800 Subject: [PATCH 11/18] add distributed training tutorial to ecosystem.rst --- ecosystem.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ecosystem.rst b/ecosystem.rst index e02d976ac0..2149bbc1fe 100644 --- a/ecosystem.rst +++ b/ecosystem.rst @@ -63,6 +63,13 @@ to production deployment. :link: beginner/mosaic_memory_profiling_tutorial.html :tags: Model-Optimization,Best-Practice,Profiling,Ecosystem +.. customcarditem:: + :header: Distributed Training with Ray Train + :card_description: Pre-train a transformer language model across multiple GPUs using PyTorch and Ray Train. + :image: _static/img/ray-data.png + :link: beginner/distributed_training_with_ray_tutorial.html + :tags: Text,Best-Practice,Ecosystem,Ray-Distributed,Parallel-and-Distributed-Training + .. End of tutorial card section .. ----------------------------------------- .. Page TOC @@ -76,3 +83,4 @@ to production deployment. intermediate/tensorboard_profiler_tutorial intermediate/realtime_rpi beginner/mosaic_memory_profiling_tutorial + beginner/distributed_training_with_ray_tutorial From 928421d1a54c24c934fe39f6a860c8ac70c54c15 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 17:35:11 -0800 Subject: [PATCH 12/18] Apply suggestion from @justinvyu Co-authored-by: Justin Yu --- beginner_source/distributed_training_with_ray_tutorial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index 967ba4c6f3..b8eb73b46c 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -592,7 +592,7 @@ def train_func_per_worker(config: dict): # # Ray Train automatically: # -# * Launches workers across all available nodes. +# * Launches workers across all available nodes, bringing up new nodes if needed in an autoscaling Ray cluster. # * Shards data across all workers. # # No changes to the training function are needed. The same From 1df47da2345f23f1ee28ad757b84f196731ec2b6 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 17:35:34 -0800 Subject: [PATCH 13/18] Apply suggestion from @justinvyu Co-authored-by: Justin Yu --- beginner_source/distributed_training_with_ray_tutorial.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index b8eb73b46c..b8e286f76d 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -704,9 +704,6 @@ def train_func_per_worker(config: dict): # production training jobs. # * Monitored training with the Ray dashboard. # -# Ray Train handles the complexity of distributed systems, gradient -# synchronization, and resource allocation so that you can focus on -# your model and data. ############################################################################### # Further reading From 2f58a6d5c004857bdd17284a79de46a0d67970fb Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 19:46:45 -0800 Subject: [PATCH 14/18] rewrote checkpointing and fault tolerance sections; style edits --- .../distributed_training_with_ray_tutorial.py | 206 +++++++----------- 1 file changed, 76 insertions(+), 130 deletions(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index b8e286f76d..4d0554cb4f 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -17,7 +17,7 @@ * Distribute training across multiple GPUs with Ray Train with minimal code changes. * Stream training data from Hugging Face datasets with Ray Data's distributed workers. * Save and load distributed checkpoints. - * Scale from a single node to a multi-node cluster with minimal code changes. + * Scale from a single node to a multinode cluster with minimal code changes. * Optimize cost and performance with heterogeneous clusters. * Monitor training with the Ray dashboard. @@ -48,7 +48,7 @@ To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets transformers``. -Then, import the required libraries. +Then, import the required libraries: """ import time @@ -105,6 +105,7 @@ # This means that the dataset has one column called ``text`` and it is a string. # # Inspect raw data +# # ~~~~~~~~~~~~~~~~ # # Use ``take(n)`` to fetch a small number of rows for inspection. @@ -146,7 +147,7 @@ # # * Tokenizes each batch of text, concatenating into a single stream. # Article title lines (for example, ``= Article Title =``) trigger an -# ``<|endoftext|`` separator so the model resets context at article +# ``<|endoftext|>`` separator so the model resets context at article # boundaries. # * Splits the stream into fixed-length blocks of ``block_size + 1`` # tokens. @@ -242,13 +243,13 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # This tells you exactly how Ray Data will stream through tokenization # and split the data across 8 trainer workers. - -############################################################################### +# +# # Define the transformer model # ---------------------------- # # The model is a decoder-only transformer language model using Hugging Face's -# ``GPT2LMHeadModel``. The hyperparameters below are the standard GPT-2 "small" architecture. +# ``GPT2LMHeadModel``. The hyperparameters below are for the standard GPT-2 "small" architecture. # @@ -289,16 +290,13 @@ def create_model(): # # The key Ray Train integration points are: # -# 1. ``ray.train.get_dataset_shard("train")`` retrieves the -# worker's portion of the data. Ray Data automatically splits the -# dataset across all workers. -# 2. ``ray.train.torch.prepare_model(model)`` wraps the model in -# ``DistributedDataParallel`` and moves it to the correct GPU. -# 3. ``shard.iter_torch_batches(batch_size=...)`` returns an iterator -# of ``dict[str, torch.Tensor]`` batches, with tensors automatically -# placed on the worker's GPU. Setting ``prefetch_batches=2`` opportunistically fetches 2 batches ahead of the current batch. -# 4. ``ray.train.report(metrics, checkpoint=...)`` reports metrics -# to the driver and optionally saves a checkpoint. +# - ``ray.train.get_dataset_shard("train")`` retrieves the worker's portion of the +# dataset, and Ray Data automatically splits the dataset across all workers. +# - ``ray.train.torch.prepare_model(model)`` wraps the model in +# ``DistributedDataParallel`` and moves it to the correct GPU. +# - ``shard.iter_torch_batches(batch_size=...)`` returns an iterator +# of ``dict[str, torch.Tensor]`` batches, with tensors automatically placed on the worker's GPU. Setting ``prefetch_batches=2`` opportunistically fetches 2 batches ahead of the current batch. +# - ``ray.train.report(metrics, checkpoint=...)`` reports metrics to the driver and optionally saves a checkpoint. def train_func_per_worker(config: dict): @@ -405,25 +403,25 @@ def train_func_per_worker(config: dict): # The ``TorchTrainer`` brings everything together. Running ``trainer.fit()`` finally # triggers the execution of the full data pipeline and training loop. The Trainer accepts: # -# * ``train_func_per_worker``: the function each worker executes. -# * ``train_loop_config``: a dictionary of hyperparameters forwarded +# - ``train_func_per_worker``: the function each worker executes. +# - ``train_loop_config``: a dictionary of hyperparameters forwarded # to the training function. -# * ``datasets``: a dictionary of Ray Datasets. Ray Train automatically +# - ``datasets``: a dictionary of Ray Datasets. Ray Train automatically # splits each dataset across workers. -# * ``scaling_config``: specifies the number of workers and whether to +# - ``scaling_config``: specifies the number of workers and whether to # use GPUs. # # Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray # Train handles ``torch.distributed`` initialization, NCCL backend setup, -# and ``DistributedDataParallel`` wrapping behind the scenes. In the logs +# and ``DistributedDataParallel`` wrapping behind the scenes. In the logs, # you see each worker assigned a rank and device: # # .. code-block:: text # # Started training worker group of size 8: # -# * (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 -# * (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 +# * (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 +# * (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 # ... # Moving model to device: cuda:0 # Wrapping provided model in DistributedDataParallel. @@ -482,84 +480,26 @@ def train_func_per_worker(config: dict): # # The per-worker logs show training loss, validation loss, and throughput # metrics for each epoch. With random weights and only a few steps, expect -# a high loss (~10–11). +# a high loss (~10-11). ############################################################################### # Checkpointing # ~~~~~~~~~~~~~ # -# In a production training run you would enable checkpointing so that -# training can resume from the last saved state after a failure. This -# requires a **shared storage path** (for example, an S3 bucket or NFS mount) -# accessible from all nodes: -# -# .. code-block:: python -# -# trainer = TorchTrainer( -# ..., -# run_config=RunConfig( -# storage_path="s3://my-bucket/ray-checkpoints", -# checkpoint_config=CheckpointConfig(num_to_keep=2), -# ), -# ) -# -# Inside the training function, save a checkpoint with -# ``ray.train.report()``. Every worker must still call ``ray.train.report()``: +# In production training, you can enable checkpointing to make +# your training jobs robust to unexpected failures. Checkpointing +# permits you to take advantage of Ray Train's fault tolerance mechanisms described in the +# `Fault tolerance`_ section. # -# .. code-block:: python +# Ray Train offers several checkpointing optimizations. Asynchronous +# uploading enables you to continue training while checkpoints stream to remote storage +# in the background. +# Distributed checkpointing uploads shards from each worker in parallel, avoiding +# a gather step into a single worker's memory that risks OOM errors for large models. # -# with tempfile.TemporaryDirectory() as tmp_dir: -# checkpoint = None -# if ray.train.get_context().get_world_rank() == 0: -# torch.save(model.module.state_dict(), -# os.path.join(tmp_dir, "model.pt")) -# torch.save(optimizer.state_dict(), -# os.path.join(tmp_dir, "optimizer.pt")) -# torch.save({"epoch": epoch}, -# os.path.join(tmp_dir, "extra_state.pt")) -# checkpoint = ray.train.Checkpoint.from_directory(tmp_dir) -# ray.train.report(metrics={...}, checkpoint=checkpoint) -# -# Note that ``.module`` unwraps the ``DistributedDataParallel`` wrapper so -# you save the underlying model weights rather than the DDP wrapper. -# -# To **resume training from a checkpoint**, call -# ``ray.train.get_checkpoint()`` at the top of your training function. -# When Ray Train restarts workers (for example, after a failure), it -# automatically provides the most recent checkpoint. If no checkpoint exists -# (this is a fresh run), the function returns ``None``: -# -# .. code-block:: python -# -# def train_func_per_worker(config: dict): -# model = create_model() -# model = ray.train.torch.prepare_model(model) -# optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"]) -# -# # Resume from the latest checkpoint if one exists. -# start_epoch = 0 -# checkpoint = ray.train.get_checkpoint() -# if checkpoint: -# with checkpoint.as_directory() as ckpt_dir: -# model.module.load_state_dict( -# torch.load(os.path.join(ckpt_dir, "model.pt")) -# ) -# optimizer.load_state_dict( -# torch.load(os.path.join(ckpt_dir, "optimizer.pt")) -# ) -# start_epoch = torch.load( -# os.path.join(ckpt_dir, "extra_state.pt") -# )["epoch"] + 1 -# -# for epoch in range(start_epoch, config["epochs"]): -# # ... training loop ... -# -# You can also call ``TorchTrainer.restore(path, datasets=...)`` to -# restore an entire interrupted experiment from its results directory -# without re-specifying the full trainer configuration. See the `Ray Train -# checkpointing guide -# `__ -# for more details. +# For a full guide on checkpointing with Ray Train, see the +# `Ray Train checkpointing guide +# `__. # # Scaling to a multi-node cluster # ------------------------------- @@ -601,7 +541,7 @@ def train_func_per_worker(config: dict): # This tutorial uses ``DistributedDataParallel`` (DDP), which replicates # the full model on every GPU. For larger models that don't fit on a # single GPU, you can switch to -# `FullyShardedDataParallel `__ +# `FullyShardedDataParallel `__ # (FSDP) to shard parameters, gradients, and optimizer states across # workers by setting ``prepare_model(parallel_strategy="fsdp")``. # @@ -620,28 +560,30 @@ def train_func_per_worker(config: dict): # cheaper CPU-only nodes to the cluster and Ray Data scales out # preprocessing to them. # -# For more details, see `Configuring data ingest +# For more information, see `Configuring data ingest # `__. ############################################################################### +# .. _Fault tolerance: +# # Fault tolerance # --------------- # # Long-running distributed training jobs are vulnerable to hardware -# failures. Ray Train provides fault tolerance so that training can -# recover from failures without restarting from scratch. +# failures. These include hardware failures, network failures, or preemption. +# Without fault tolerance, any of these events can force you to restart +# training from scratch, wasting time and compute. # -# Ray Train's fault tolerance mechanisms include: +# Ray Train has features that handle these failures automatically. When a worker process +# crashes, Ray Train restarts it in place and resumes training. If an +# entire node goes down, Ray Train provisions a replacement and +# recovers from the most recent checkpoint so that only a small amount +# of work is lost. This makes it practical to interrupt training jobs and resume +# them later. # -# * **Worker restart**: If a worker process crashes, Ray Train -# automatically restarts it and resumes training. -# * **Checkpoint recovery**: Ray Train saves checkpoints to persistent -# storage. When recovering from a failure, training resumes from the -# latest checkpoint rather than starting over. -# * **Node failure handling**: If an entire node goes down, Ray Train -# replaces the failed node and resumes training. -# -# To enable automatic failure recovery, configure ``FailureConfig`` in your ``RunConfig``: +# To enable automatic failure recovery, configure ``FailureConfig`` in +# your ``RunConfig``. The ``max_failures`` parameter controls how many +# consecutive failures Ray Train tolerates before giving up: # # .. code-block:: python # @@ -652,6 +594,9 @@ def train_func_per_worker(config: dict): # failure_config=FailureConfig(max_failures=3), # checkpoint_config=CheckpointConfig(num_to_keep=2), # ) +# +# For more details, see the `Ray Train fault tolerance guide +# `__. ############################################################################### # Monitor your training jobs @@ -661,21 +606,21 @@ def train_func_per_worker(config: dict): # The `Ray dashboard `__ # displays real-time metrics including: # -# * Training loss and validation metrics per epoch -# * GPU utilization and memory usage per worker -# * Data loading throughput -# * Worker status and error logs +# - Training loss and validation metrics per epoch +# - GPU utilization and memory usage per worker +# - Data loading throughput +# - Worker status and error logs # # To view the dashboard, open the link printed in the logs after Ray # initializes. Typically, this link is ``http://localhost:8265``. # # The dashboard lets you: # -# * Monitor training progress across all workers -# * Inspect logs from individual workers -# * Identify data loading or communication bottlenecks -# * View resource use for CPU, GPU, and memory per worker -# * Debug failures with detailed error messages and stack traces +# - Monitor training progress across all workers +# - Inspect logs from individual workers +# - Identify data loading or communication bottlenecks +# - View resource use for CPU, GPU, and memory per worker +# - Debug failures with detailed error messages and stack traces # # For more information, see the `Ray Train monitoring # documentation `__. @@ -686,31 +631,32 @@ def train_func_per_worker(config: dict): # # In this tutorial, you: # -# * Pre-trained a GPT-2 (~124M-parameter) language model using +# - Pre-trained a GPT-2 (~124M-parameter) language model using # Hugging Face Transformers and PyTorch. -# * Loaded and preprocessed the Wikitext-103 dataset using Ray Data +# - Loaded and preprocessed the Wikitext-103 dataset using Ray Data # with distributed streaming. -# * Ran distributed training across 8 GPUs using Ray Train's +# - Ran distributed training across 8 GPUs using Ray Train's # ``TorchTrainer`` with only minimal changes to a standard PyTorch # training loop. -# * Learned how to save and load distributed checkpoints for model +# - Learned how to save and load distributed checkpoints for model # recovery. -# * Learned how to scale to multi-node clusters by changing +# - Learned how to scale to multi-node clusters by changing # ``ScalingConfig`` and ``RunConfig``. -# * Learned how heterogeneous clusters let you run data preprocessing +# - Learned how heterogeneous clusters let you run data preprocessing # on CPU nodes and training on GPU nodes for cost and performance # optimization. -# * Learned about Ray Train's **fault tolerance** mechanisms for +# - Learned about Ray Train's **fault tolerance** mechanisms for # production training jobs. -# * Monitored training with the Ray dashboard. +# - Monitored training with the Ray dashboard. # ############################################################################### # Further reading # --------------- # -# * `Ray Train documentation `__ -# * `Ray Data for training `__ -# * `PyTorch DistributedDataParallel `__ -# * `Ray Train fault tolerance `__ -# * `Ray cluster setup `__ +# - `Ray Train documentation `__ +# - `Ray Data for training `__ +# - `Saving and loading checkpoints `__ +# - `PyTorch DistributedDataParallel `__ +# - `Ray Train fault tolerance `__ +# - `Ray cluster setup `__ From 77e89dbbf8af5c71eab3ed343b847a6724a5e229 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Tue, 10 Feb 2026 19:57:13 -0800 Subject: [PATCH 15/18] fix double-shift labels bug --- .../distributed_training_with_ray_tutorial.py | 43 ++++++++----------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index 4d0554cb4f..f1777118d6 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -136,7 +136,7 @@ # ---------------------------- # # Language models consume fixed-length sequences of token IDs. The -# preprocessing step converts raw text into input/target pairs for +# preprocessing step converts raw text into token ID sequences for # next-token prediction. # # This tutorial uses ``tiktoken`` with the GPT-2 encoding (vocabulary size @@ -149,10 +149,10 @@ # Article title lines (for example, ``= Article Title =``) trigger an # ``<|endoftext|>`` separator so the model resets context at article # boundaries. -# * Splits the stream into fixed-length blocks of ``block_size + 1`` -# tokens. -# * Returns ``input_ids`` (the first ``block_size`` tokens) and -# ``labels`` (shifted by one position for next-token prediction). +# * Splits the stream into fixed-length blocks of ``block_size`` tokens. +# * Returns ``input_ids`` for each block. During training, the same +# tensor serves as both input and label because ``GPT2LMHeadModel`` +# shifts the labels internally when computing the cross-entropy loss. BLOCK_SIZE = 256 VOCAB_SIZE = 50257 @@ -178,21 +178,15 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: all_tokens.append(EOT_TOKEN) all_tokens.extend(encoding.encode_ordinary(text + "\n")) - # Split into chunks of block_size + 1 (input + 1 shifted target) - chunk_len = BLOCK_SIZE + 1 - num_chunks = len(all_tokens) // chunk_len - all_tokens = all_tokens[: num_chunks * chunk_len] + # Split into fixed-length chunks of block_size tokens. + num_chunks = len(all_tokens) // BLOCK_SIZE + all_tokens = all_tokens[: num_chunks * BLOCK_SIZE] if num_chunks == 0: - return {"input_ids": [], "labels": []} + return {"input_ids": []} - tokens_array = np.array(all_tokens, dtype=np.int64).reshape(num_chunks, chunk_len) - input_ids = tokens_array[:, :-1] - labels = tokens_array[:, 1:] - return { - "input_ids": input_ids, - "labels": labels, - } + tokens_array = np.array(all_tokens, dtype=np.int64).reshape(num_chunks, BLOCK_SIZE) + return {"input_ids": tokens_array} @@ -217,9 +211,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: print(f" Decoded: {encoding.decode(ids[:30].tolist())!r}...") ############################################################################### -# Each row now contains a fixed-length ``input_ids`` array of 256 tokens and -# a corresponding ``labels`` array shifted by one position. These are the -# input/target pairs for next-token prediction. +# Each row now contains a fixed-length ``input_ids`` array of 256 tokens. # # Streaming execution # ~~~~~~~~~~~~~~~~~~~ @@ -334,11 +326,11 @@ def train_func_per_worker(config: dict): batch_size=batch_size, dtypes=torch.long, prefetch_batches=2 ): input_ids = batch["input_ids"] - labels = batch["labels"] - # GPT2LMHeadModel computes cross-entropy loss internally - # when labels are provided. - out = model(input_ids=input_ids, labels=labels) + # GPT2LMHeadModel shifts labels internally to align each + # position with the next token, so we can use input_ids as + # both the input and the labels. + out = model(input_ids=input_ids, labels=input_ids) loss = out.loss optimizer.zero_grad() @@ -367,9 +359,8 @@ def train_func_per_worker(config: dict): batch_size=batch_size, dtypes=torch.long, prefetch_batches=2 ): input_ids = batch["input_ids"] - labels = batch["labels"] - out = model(input_ids=input_ids, labels=labels) + out = model(input_ids=input_ids, labels=input_ids) loss = out.loss val_loss_sum += loss.item() val_batches += 1 From 8d45236cf91c8e90513c94b896f934bf4ecb59cc Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Thu, 12 Feb 2026 12:03:52 -0800 Subject: [PATCH 16/18] enable checkpointing in the tutorial --- .../distributed_training_with_ray_tutorial.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index f1777118d6..ff27c0ff1a 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -51,6 +51,8 @@ Then, import the required libraries: """ +import os +import tempfile import time import numpy as np @@ -288,7 +290,7 @@ def create_model(): # ``DistributedDataParallel`` and moves it to the correct GPU. # - ``shard.iter_torch_batches(batch_size=...)`` returns an iterator # of ``dict[str, torch.Tensor]`` batches, with tensors automatically placed on the worker's GPU. Setting ``prefetch_batches=2`` opportunistically fetches 2 batches ahead of the current batch. -# - ``ray.train.report(metrics, checkpoint=...)`` reports metrics to the driver and optionally saves a checkpoint. +# - ``ray.train.report(metrics, checkpoint=...)`` reports metrics to the driver and saves a checkpoint. def train_func_per_worker(config: dict): @@ -371,7 +373,7 @@ def train_func_per_worker(config: dict): avg_val_loss = val_loss_sum / max(val_batches, 1) epoch_elapsed = time.perf_counter() - epoch_start - # --- Report metrics ------------------------------------------------- + # --- Report metrics and save checkpoint ------------------------------ metrics = { "train_loss": round(avg_train_loss, 4), "val_loss": round(avg_val_loss, 4), @@ -380,10 +382,18 @@ def train_func_per_worker(config: dict): "epoch_tokens": train_tokens, "tokens_per_sec": round(train_tokens / max(train_elapsed, 1e-6), 2), } - ray.train.report( - metrics=metrics, - checkpoint=None, # If we were checkpointing, we'd pass a Checkpoint here - ) + + with tempfile.TemporaryDirectory() as temp_checkpoint_dir: + torch.save( + { + "epoch": epoch, + "model_state_dict": model.module.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + }, + os.path.join(temp_checkpoint_dir, "checkpoint.pt"), + ) + checkpoint = ray.train.Checkpoint.from_directory(temp_checkpoint_dir) + ray.train.report(metrics=metrics, checkpoint=checkpoint) @@ -445,6 +455,10 @@ def train_func_per_worker(config: dict): num_workers=NUM_WORKERS, use_gpu=True, ), + run_config=RunConfig( + name="gpt2-small-pretraining", + storage_path="/tmp/ray-train-checkpoints", + ), ) result = trainer.fit() @@ -455,8 +469,8 @@ def train_func_per_worker(config: dict): # # After training, the ``Result`` object contains the final metrics and # checkpoint. ``result.metrics`` comes from the last -# ``ray.train.report()`` call. ``result.checkpoint`` is ``None`` here -# because this tutorial doesn't save checkpoints. +# ``ray.train.report()`` call. ``result.checkpoint`` contains the +# checkpoint from the last ``ray.train.report()`` call. print("\nTraining finished!") From 98d935796f0169d705c7d7cd68f6ce34ff3c7b32 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Thu, 12 Feb 2026 12:20:15 -0800 Subject: [PATCH 17/18] reduced gpus required to 4; updated CI hardware --- .jenkins/metadata.json | 3 ++ .../distributed_training_with_ray_tutorial.py | 29 ++++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/.jenkins/metadata.json b/.jenkins/metadata.json index 86bda8fa1e..933674ed1f 100644 --- a/.jenkins/metadata.json +++ b/.jenkins/metadata.json @@ -73,5 +73,8 @@ }, "prototype_source/gpu_quantization_torchao_tutorial.py": { "needs": "linux.g5.4xlarge.nvidia.gpu" + }, + "beginner_source/distributed_training_with_ray_tutorial.py": { + "needs": "linux.g4dn.12xlarge.nvidia.gpu" } } diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index ff27c0ff1a..bc8cfc7f25 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -27,7 +27,7 @@ * PyTorch v2.9+. * Ray Train (``ray[train]``) v2.52.1+. * ``tiktoken``, ``datasets``, and ``transformers`` (Hugging Face). - * One or more GPUs are recommended but not required. + * One or more GPUs are recommended but not required. This tutorial is tested on a ``g4dn.12xlarge`` instance, which has 4 NVIDIA T4 GPUs (16GB of memory per GPU). `Ray Train `__ is a scalable framework for distributed deep learning. @@ -84,6 +84,7 @@ # dataset into a Ray Dataset, enabling distributed streaming and # preprocessing across all available nodes. +ray.init(log_to_driver=False) # Reduce verbosity. hf_ds = load_dataset("Salesforce/wikitext", "wikitext-103-raw-v1") train_ds = ray.data.from_huggingface(hf_ds["train"]) val_ds = ray.data.from_huggingface(hf_ds["validation"]) @@ -233,10 +234,10 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Execution plan: InputDataBuffer[Input] # -> TaskPoolMapOperator[MapBatches(tokenize_and_chunk)] -# -> OutputSplitter[split(8, equal=True)] +# -> OutputSplitter[split(4, equal=True)] # # This tells you exactly how Ray Data will stream through tokenization -# and split the data across 8 trainer workers. +# and split the data across 4 trainer workers. # # # Define the transformer model @@ -412,14 +413,14 @@ def train_func_per_worker(config: dict): # - ``scaling_config``: specifies the number of workers and whether to # use GPUs. # -# Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray +# Setting ``num_workers=4`` launches 4 parallel workers, one per GPU. Ray # Train handles ``torch.distributed`` initialization, NCCL backend setup, # and ``DistributedDataParallel`` wrapping behind the scenes. In the logs, # you see each worker assigned a rank and device: # # .. code-block:: text # -# Started training worker group of size 8: +# Started training worker group of size 4: # # * (ip=10.0.176.183, pid=25636) world_rank=0, local_rank=0, node_rank=0 # * (ip=10.0.176.183, pid=25637) world_rank=1, local_rank=1, node_rank=0 @@ -428,11 +429,11 @@ def train_func_per_worker(config: dict): # Wrapping provided model in DistributedDataParallel. # # ``batch_size_per_worker`` is the number of sequences each worker -# processes per gradient step. With 8 workers and a per-worker batch size -# of 16, the **effective global batch size** is 8 × 16 = 128 sequences, -# or 128 × 256 = 32,768 tokens per optimizer step. +# processes per gradient step. With 4 workers and a per-worker batch size +# of 16, the **effective global batch size** is 4 × 16 = 64 sequences, +# or 64 × 256 = 4,096 tokens per optimizer step. -NUM_WORKERS = 8 # One worker per GPU on this machine +NUM_WORKERS = 4 # One worker per GPU on this machine NUM_EPOCHS = 5 BATCH_SIZE_PER_WORKER = 16 LR = 3e-4 @@ -509,14 +510,14 @@ def train_func_per_worker(config: dict): # Scaling to a multi-node cluster # ------------------------------- # -# The code above runs on a single 8-GPU machine. Scaling to a multi-node +# The code above runs on a single 4-GPU machine. Scaling to a multi-node # cluster requires only two changes: # # 1. **Increase ``num_workers``** to match the total number of GPUs in the cluster. # 2. **Set a shared storage path** so that all nodes can access checkpoints. # -# For example, to train on a cluster of 4 nodes with 8 GPUs each -# (32 GPUs total): +# For example, to train on a cluster of 4 nodes with 4 GPUs each +# (16 GPUs total): # # .. code-block:: python # @@ -525,7 +526,7 @@ def train_func_per_worker(config: dict): # train_loop_config={...}, # datasets={"train": train_ds, "validation": val_ds}, # scaling_config=ScalingConfig( -# num_workers=32, # 4 nodes x 8 GPUs +# num_workers=16, # 4 nodes x 4 GPUs # use_gpu=True, # ), # run_config=RunConfig( @@ -640,7 +641,7 @@ def train_func_per_worker(config: dict): # Hugging Face Transformers and PyTorch. # - Loaded and preprocessed the Wikitext-103 dataset using Ray Data # with distributed streaming. -# - Ran distributed training across 8 GPUs using Ray Train's +# - Ran distributed training across 4 GPUs using Ray Train's # ``TorchTrainer`` with only minimal changes to a standard PyTorch # training loop. # - Learned how to save and load distributed checkpoints for model From d8b40fb8b076c389f2ab983a65389c0eb21bda3c Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Thu, 12 Feb 2026 12:44:25 -0800 Subject: [PATCH 18/18] revert log to driver --- beginner_source/distributed_training_with_ray_tutorial.py | 1 - 1 file changed, 1 deletion(-) diff --git a/beginner_source/distributed_training_with_ray_tutorial.py b/beginner_source/distributed_training_with_ray_tutorial.py index bc8cfc7f25..c051f38504 100644 --- a/beginner_source/distributed_training_with_ray_tutorial.py +++ b/beginner_source/distributed_training_with_ray_tutorial.py @@ -84,7 +84,6 @@ # dataset into a Ray Dataset, enabling distributed streaming and # preprocessing across all available nodes. -ray.init(log_to_driver=False) # Reduce verbosity. hf_ds = load_dataset("Salesforce/wikitext", "wikitext-103-raw-v1") train_ds = ray.data.from_huggingface(hf_ds["train"]) val_ds = ray.data.from_huggingface(hf_ds["validation"])