Skip to content

English · Español

Lab 01 — DDP across 2 CPU processes (gloo backend, local, free)

Goal: make a real torch.distributed all-reduce happen on Borja's laptop, between two MiniGPT-grammar training processes. See the wire protocol. Free.

Estimated time: 2–3 hours.

Prereq: Phase 18 training loop works; Phase 25 PyTorch internals done. Local PyTorch installed with the serve opt group (gloo backend is part of stock PyTorch).


What you produce

A working DDP run at experiments/35-ddp-cpu/ that:

  1. Spawns 2 Python processes on the local CPU using torchrun --nproc-per-node=2.
  2. Each process trains the same MiniGPT-grammar model on a disjoint shard of the English-verb-grammar corpus (Phase 12).
  3. Gradients are all-reduced via gloo at every step.
  4. After 100 steps, both processes' parameter tensors are byte-equivalent (modulo fp tolerance) — verifying DDP did its job.
  5. Per-step gradient all-reduce time is measured and logged.
  6. A scaling-curve plot (1 process vs 2 processes, tokens/sec on y-axis) committed.

Plus an extension to src/minitrain/:

  • src/minitrain/ddp.py — a thin wrapper providing wrap_for_ddp(model), is_main_process(), barrier(), cleanup(). Per A12 plan, this is an extension of src/minitrain/, not a new top-level module (src/distributed/ is not created in this phase).

TODOs

Block A — implement the DDP wrapper

# src/minitrain/ddp.py — skeleton (Borja writes the body)

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as TorchDDP

def init_distributed(backend: str = "gloo") -> tuple[int, int]:
    """Initialize torch.distributed from torchrun env vars. Returns (rank, world_size)."""
    ...

def wrap_for_ddp(model: torch.nn.Module) -> torch.nn.Module:
    """Wraps with DDP. Assumes init_distributed already called."""
    ...

def is_main_process() -> bool:
    """True if rank == 0 (used to gate logging/checkpointing)."""
    ...

def cleanup() -> None:
    """dist.destroy_process_group() — call at the end of training."""
    ...

Constraints:

  • gloo backend only. Borja's machine has no NVIDIA GPU; NCCL is unavailable. Lab 02 will use NCCL — but not here.
  • No bucket-size tuning. Stock PyTorch DDP defaults. The point of this lab is mechanism, not optimization.
  • Determinism per rank. Seed = 35000 + rank. Different seeds per rank (so the data shards differ); same global seed-base for reproducibility.

Block B — split the corpus into 2 shards

The English-verb-grammar corpus from Phase 12 produces a single train.jsonl. For DDP each rank reads its own shard:

  • Shard by index % world_size == rank. No overlap, full coverage, no leakage.
  • Sanity check: rank 0 and rank 1 sums of sentence counts = total.
  • Log the first sentence each rank sees — verify they're different (so you know the sharding works).

The shard logic lives in src/minitrain/data.py (already exists per Phase 18 plan); add a make_shard(rank, world_size) function.

Block C — write the training script

experiments/35-ddp-cpu/train.py:

# Skeleton — Borja writes the body

def main():
    rank, world_size = init_distributed("gloo")
    seed_everything(35000 + rank)

    model = build_minigpt(...)            # Phase 17 module
    model = wrap_for_ddp(model)

    dataset = make_shard(rank, world_size)  # Phase 18 module + extension
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

    for step in range(100):
        batch = next(dataset)
        t_compute_start = time.perf_counter()
        loss = model(batch).loss
        loss.backward()                   # gradient all-reduce fires here
        t_compute_end = time.perf_counter()
        optimizer.step()
        optimizer.zero_grad()

        if is_main_process():
            log_step(step, loss.item(), t_compute_end - t_compute_start)

    cleanup()

Block D — measure all-reduce wall time

The trick: loss.backward() includes the all-reduce, asynchronously. To isolate the comm cost:

  • Variant A — bracketed barrier. Surround loss.backward() with dist.barrier() calls; the second barrier waits for all-reduces to finish; the time difference includes all-reduce.
  • Variant B — DDP comm hooks. PyTorch's register_comm_hook lets you intercept the all-reduce; time the hook directly.
  • Implement variant A (simpler); variant B is mentioned for completeness.

Log per-step times to experiments/35-ddp-cpu/timings.jsonl. Compute mean + median + p95 over the last 50 steps (the first 50 are warmup).

Block E — verify byte-equivalence

After 100 steps:

if is_main_process():
    state_main = {k: v.clone() for k, v in model.state_dict().items()}
dist.barrier()
# rank-0 saves; rank-1 also saves to a separate file; we compare
torch.save(model.state_dict(), f"experiments/35-ddp-cpu/state-rank-{rank}.pt")

Then in a tiny analysis script:

  • Load both files.
  • For each tensor, assert torch.allclose(s0, s1, atol=1e-5).
  • Print the maximum absolute difference. Expected: << 1e-5 (gloo all-reduce on identical-seeded models with sharded data, after step 0's identical-init, will produce identical step-1 params; numerical drift should be near machine epsilon).

Block F — scaling curve

Run the script with --nproc-per-node=1 and --nproc-per-node=2. Plot:

  • x-axis: number of processes.
  • y-axis: tokens/sec.
  • Expected shape: 2-process is slower per total tokens-per-second than 1-process at this scale, because:
  • The model is tiny; all-reduce overhead dominates compute.
  • Two processes contend on the same CPU's cores.
  • The lesson is in the negative result. Borja writes a one-paragraph "why 2 processes is slower here" in the manifest.

Commit the plot as experiments/35-ddp-cpu/scaling.png.

Block G — manifest

experiments/35-ddp-cpu/manifest.json:

{
  "seed": 35000,
  "lab": "01-ddp-on-cpu",
  "backend": "gloo",
  "world_size": 2,
  "model_params": "<filled in>",
  "steps": 100,
  "allreduce_p50_ms": "<filled in>",
  "allreduce_p95_ms": "<filled in>",
  "tokens_per_sec_1proc": "<filled in>",
  "tokens_per_sec_2proc": "<filled in>",
  "byte_equivalent": "<true/false>",
  "lesson_notes": "<the negative-result paragraph>"
}

Constraints

  • No NCCL. This lab is gloo only. Save NCCL for lab 02 where you have GPUs.
  • No production-grade tuning. No find_unused_parameters=True, no gradient_as_bucket_view. Defaults. The point is the mechanism.
  • Cost: $0. This lab must never touch a cloud instance. If you find yourself reaching for one, stop and reread the goal.
  • Single-node only. torchrun --nnodes=1. Multi-node is out of scope (and out of budget, and irrelevant for educational DDP).

Stop conditions

You're done when:

  1. experiments/35-ddp-cpu/{train.py, timings.jsonl, scaling.png, manifest.json, state-rank-0.pt, state-rank-1.pt} all exist.
  2. state-rank-0.pt and state-rank-1.pt are byte-equivalent within atol 1e-5.
  3. manifest.json records the all-reduce p50/p95 wall times.
  4. The "why 2 processes is slower" paragraph is in the manifest and reads correctly to you.
  5. src/minitrain/ddp.py has tests under tests/minitrain/test_ddp.py that pass (mocked or pytest --forked to spawn the 2-process test).

Hint of last resort

If torchrun is fighting you (PyTorch versions move, env-var conventions shift): drop to torch.multiprocessing.spawn as a fallback. The mechanics are the same; the launcher differs. Document the choice in the manifest.

If gloo errors out with "address in use", port collisions are the cause. Set MASTER_PORT=29500 (or whatever) before torchrun. PyTorch's docs cover this; check there before improvising.

When to consult solutions/

After you have committed the experiment. Solution lives in solutions/01-ddp-cpu-ref.md — written at phase open after Borja's Phase 18 training loop and Phase 25 PyTorch internals are in.


Next lab: lab/02-tp-inference-cloud.md.