English · Español
Lab 01 — DDP across 2 CPU processes (gloo backend, local, free)¶
Goal: make a real
torch.distributedall-reduce happen on Borja's laptop, between two MiniGPT-grammar training processes. See the wire protocol. Free.Estimated time: 2–3 hours.
Prereq: Phase 18 training loop works; Phase 25 PyTorch internals done. Local PyTorch installed with the
serveopt group (gloo backend is part of stock PyTorch).
What you produce¶
A working DDP run at experiments/35-ddp-cpu/ that:
- Spawns 2 Python processes on the local CPU using
torchrun --nproc-per-node=2. - Each process trains the same MiniGPT-grammar model on a disjoint shard of the English-verb-grammar corpus (Phase 12).
- Gradients are all-reduced via gloo at every step.
- After 100 steps, both processes' parameter tensors are byte-equivalent (modulo fp tolerance) — verifying DDP did its job.
- Per-step gradient all-reduce time is measured and logged.
- A scaling-curve plot (1 process vs 2 processes, tokens/sec on y-axis) committed.
Plus an extension to src/minitrain/:
src/minitrain/ddp.py— a thin wrapper providingwrap_for_ddp(model),is_main_process(),barrier(),cleanup(). Per A12 plan, this is an extension ofsrc/minitrain/, not a new top-level module (src/distributed/is not created in this phase).
TODOs¶
Block A — implement the DDP wrapper¶
# src/minitrain/ddp.py — skeleton (Borja writes the body)
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as TorchDDP
def init_distributed(backend: str = "gloo") -> tuple[int, int]:
"""Initialize torch.distributed from torchrun env vars. Returns (rank, world_size)."""
...
def wrap_for_ddp(model: torch.nn.Module) -> torch.nn.Module:
"""Wraps with DDP. Assumes init_distributed already called."""
...
def is_main_process() -> bool:
"""True if rank == 0 (used to gate logging/checkpointing)."""
...
def cleanup() -> None:
"""dist.destroy_process_group() — call at the end of training."""
...
Constraints:
- gloo backend only. Borja's machine has no NVIDIA GPU; NCCL is unavailable. Lab 02 will use NCCL — but not here.
- No bucket-size tuning. Stock PyTorch DDP defaults. The point of this lab is mechanism, not optimization.
- Determinism per rank. Seed =
35000 + rank. Different seeds per rank (so the data shards differ); same global seed-base for reproducibility.
Block B — split the corpus into 2 shards¶
The English-verb-grammar corpus from Phase 12 produces a single train.jsonl. For DDP each rank reads its own shard:
- Shard by
index % world_size == rank. No overlap, full coverage, no leakage. - Sanity check: rank 0 and rank 1 sums of sentence counts = total.
- Log the first sentence each rank sees — verify they're different (so you know the sharding works).
The shard logic lives in src/minitrain/data.py (already exists per Phase 18 plan); add a make_shard(rank, world_size) function.
Block C — write the training script¶
experiments/35-ddp-cpu/train.py:
# Skeleton — Borja writes the body
def main():
rank, world_size = init_distributed("gloo")
seed_everything(35000 + rank)
model = build_minigpt(...) # Phase 17 module
model = wrap_for_ddp(model)
dataset = make_shard(rank, world_size) # Phase 18 module + extension
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
for step in range(100):
batch = next(dataset)
t_compute_start = time.perf_counter()
loss = model(batch).loss
loss.backward() # gradient all-reduce fires here
t_compute_end = time.perf_counter()
optimizer.step()
optimizer.zero_grad()
if is_main_process():
log_step(step, loss.item(), t_compute_end - t_compute_start)
cleanup()
Block D — measure all-reduce wall time¶
The trick: loss.backward() includes the all-reduce, asynchronously. To isolate the comm cost:
- Variant A — bracketed barrier. Surround
loss.backward()withdist.barrier()calls; the second barrier waits for all-reduces to finish; the time difference includes all-reduce. - Variant B — DDP comm hooks. PyTorch's
register_comm_hooklets you intercept the all-reduce; time the hook directly. - Implement variant A (simpler); variant B is mentioned for completeness.
Log per-step times to experiments/35-ddp-cpu/timings.jsonl. Compute mean + median + p95 over the last 50 steps (the first 50 are warmup).
Block E — verify byte-equivalence¶
After 100 steps:
if is_main_process():
state_main = {k: v.clone() for k, v in model.state_dict().items()}
dist.barrier()
# rank-0 saves; rank-1 also saves to a separate file; we compare
torch.save(model.state_dict(), f"experiments/35-ddp-cpu/state-rank-{rank}.pt")
Then in a tiny analysis script:
- Load both files.
- For each tensor, assert
torch.allclose(s0, s1, atol=1e-5). - Print the maximum absolute difference. Expected: << 1e-5 (gloo all-reduce on identical-seeded models with sharded data, after step 0's identical-init, will produce identical step-1 params; numerical drift should be near machine epsilon).
Block F — scaling curve¶
Run the script with --nproc-per-node=1 and --nproc-per-node=2. Plot:
- x-axis: number of processes.
- y-axis: tokens/sec.
- Expected shape: 2-process is slower per total tokens-per-second than 1-process at this scale, because:
- The model is tiny; all-reduce overhead dominates compute.
- Two processes contend on the same CPU's cores.
- The lesson is in the negative result. Borja writes a one-paragraph "why 2 processes is slower here" in the manifest.
Commit the plot as experiments/35-ddp-cpu/scaling.png.
Block G — manifest¶
experiments/35-ddp-cpu/manifest.json:
{
"seed": 35000,
"lab": "01-ddp-on-cpu",
"backend": "gloo",
"world_size": 2,
"model_params": "<filled in>",
"steps": 100,
"allreduce_p50_ms": "<filled in>",
"allreduce_p95_ms": "<filled in>",
"tokens_per_sec_1proc": "<filled in>",
"tokens_per_sec_2proc": "<filled in>",
"byte_equivalent": "<true/false>",
"lesson_notes": "<the negative-result paragraph>"
}
Constraints¶
- No NCCL. This lab is gloo only. Save NCCL for lab 02 where you have GPUs.
- No production-grade tuning. No
find_unused_parameters=True, nogradient_as_bucket_view. Defaults. The point is the mechanism. - Cost: $0. This lab must never touch a cloud instance. If you find yourself reaching for one, stop and reread the goal.
- Single-node only.
torchrun --nnodes=1. Multi-node is out of scope (and out of budget, and irrelevant for educational DDP).
Stop conditions¶
You're done when:
experiments/35-ddp-cpu/{train.py, timings.jsonl, scaling.png, manifest.json, state-rank-0.pt, state-rank-1.pt}all exist.state-rank-0.ptandstate-rank-1.ptare byte-equivalent within atol 1e-5.manifest.jsonrecords the all-reduce p50/p95 wall times.- The "why 2 processes is slower" paragraph is in the manifest and reads correctly to you.
src/minitrain/ddp.pyhas tests undertests/minitrain/test_ddp.pythat pass (mocked orpytest --forkedto spawn the 2-process test).
Hint of last resort¶
If torchrun is fighting you (PyTorch versions move, env-var conventions shift): drop to torch.multiprocessing.spawn as a fallback. The mechanics are the same; the launcher differs. Document the choice in the manifest.
If gloo errors out with "address in use", port collisions are the cause. Set MASTER_PORT=29500 (or whatever) before torchrun. PyTorch's docs cover this; check there before improvising.
When to consult solutions/¶
After you have committed the experiment. Solution lives in solutions/01-ddp-cpu-ref.md — written at phase open after Borja's Phase 18 training loop and Phase 25 PyTorch internals are in.
Next lab: lab/02-tp-inference-cloud.md.