Skip to content

English · Español

Lab 01 — DDP entre 2 procesos de CPU (backend gloo, local, gratis)

Objetivo: hacer que un all-reduce real de torch.distributed ocurra en el portátil de Borja, entre dos procesos de entrenamiento del MiniGPT-grammar. Ver el protocolo de wire. Gratis.

Tiempo estimado: 2–3 horas.

Prerrequisito: el bucle de entrenamiento de la Fase 18 funciona; los internos de PyTorch de la Fase 25 hechos. PyTorch local instalado con el grupo opt serve (el backend gloo es parte del PyTorch estándar).


Qué produces

Una corrida funcional de DDP en experiments/35-ddp-cpu/ que:

  1. Lanza 2 procesos Python sobre la CPU local usando torchrun --nproc-per-node=2.
  2. Cada proceso entrena el mismo modelo MiniGPT-grammar sobre un shard disjunto del corpus de gramática-de-verbos en inglés (Fase 12).
  3. Los gradientes se hacen all-reduce vía gloo en cada step.
  4. Tras 100 steps, los tensores de parámetros de ambos procesos son byte-equivalentes (módulo tolerancia fp) — verificando que DDP hizo su trabajo.
  5. El tiempo de all-reduce de gradiente por step se mide y se loguea.
  6. Se commitea un gráfico de curva de scaling (1 proceso vs 2 procesos, tokens/seg en el eje y).

Más una extensión a src/minitrain/:

  • src/minitrain/ddp.py — un wrapper fino que provee wrap_for_ddp(model), is_main_process(), barrier(), cleanup(). Según el plan A12, esto es una extensión de src/minitrain/, no un nuevo módulo top-level (src/distributed/ no se crea en esta fase).

TODOs

Bloque A — implementa el wrapper de DDP

# src/minitrain/ddp.py — esqueleto (Borja escribe el cuerpo)

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as TorchDDP

def init_distributed(backend: str = "gloo") -> tuple[int, int]:
    """Initialize torch.distributed from torchrun env vars. Returns (rank, world_size)."""
    ...

def wrap_for_ddp(model: torch.nn.Module) -> torch.nn.Module:
    """Wraps with DDP. Assumes init_distributed already called."""
    ...

def is_main_process() -> bool:
    """True if rank == 0 (used to gate logging/checkpointing)."""
    ...

def cleanup() -> None:
    """dist.destroy_process_group() — call at the end of training."""
    ...

Restricciones:

  • Solo backend gloo. La máquina de Borja no tiene GPU NVIDIA; NCCL no está disponible. El lab 02 usará NCCL — pero aquí no.
  • Sin tuning de bucket-size. Valores por defecto de DDP de PyTorch estándar. El objetivo de este lab es el mecanismo, no la optimización.
  • Determinismo por rank. Seed = 35000 + rank. Seeds distintos por rank (para que los shards de datos difieran); misma seed-base global para reproducibilidad.

Bloque B — parte el corpus en 2 shards

El corpus de gramática-de-verbos en inglés de la Fase 12 produce un único train.jsonl. Para DDP cada rank lee su propio shard:

  • Shard por index % world_size == rank. Sin solapamiento, cobertura completa, sin leakage.
  • Sanity check: la suma de conteo de frases del rank 0 y rank 1 = total.
  • Loguea la primera frase que cada rank ve — verifica que son distintas (así sabes que el sharding funciona).

La lógica de shard vive en src/minitrain/data.py (ya existe según el plan de la Fase 18); añade una función make_shard(rank, world_size).

Bloque C — escribe el script de entrenamiento

experiments/35-ddp-cpu/train.py:

# Skeleton — Borja writes the body

def main():
    rank, world_size = init_distributed("gloo")
    seed_everything(35000 + rank)

    model = build_minigpt(...)            # Phase 17 module
    model = wrap_for_ddp(model)

    dataset = make_shard(rank, world_size)  # Phase 18 module + extension
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

    for step in range(100):
        batch = next(dataset)
        t_compute_start = time.perf_counter()
        loss = model(batch).loss
        loss.backward()                   # gradient all-reduce fires here
        t_compute_end = time.perf_counter()
        optimizer.step()
        optimizer.zero_grad()

        if is_main_process():
            log_step(step, loss.item(), t_compute_end - t_compute_start)

    cleanup()

Bloque D — mide el wall time del all-reduce

El truco: loss.backward() incluye el all-reduce, asíncronamente. Para aislar el coste de comm:

  • Variante A — barrier entre paréntesis. Rodea loss.backward() con llamadas a dist.barrier(); el segundo barrier espera a que los all-reduces acaben; la diferencia de tiempo incluye al all-reduce.
  • Variante B — DDP comm hooks. El register_comm_hook de PyTorch te deja interceptar el all-reduce; mide el hook directamente.
  • Implementa la variante A (más simple); la B se menciona para completitud.

Loguea los tiempos por step a experiments/35-ddp-cpu/timings.jsonl. Calcula media + mediana + p95 sobre los últimos 50 steps (los primeros 50 son warmup).

Bloque E — verifica equivalencia byte-a-byte

Tras 100 steps:

if is_main_process():
    state_main = {k: v.clone() for k, v in model.state_dict().items()}
dist.barrier()
# rank-0 saves; rank-1 also saves to a separate file; we compare
torch.save(model.state_dict(), f"experiments/35-ddp-cpu/state-rank-{rank}.pt")

Luego en un script de análisis pequeño:

  • Carga ambos ficheros.
  • Para cada tensor, assertea torch.allclose(s0, s1, atol=1e-5).
  • Imprime la máxima diferencia absoluta. Esperado: << 1e-5 (un all-reduce de gloo sobre modelos con seeds idénticas y datos en shards, tras la init idéntica del paso 0, producirá params idénticos en el paso 1; el drift numérico debería estar cerca de epsilon de máquina).

Bloque F — curva de scaling

Ejecuta el script con --nproc-per-node=1 y --nproc-per-node=2. Grafica:

  • eje x: número de procesos.
  • eje y: tokens/seg.
  • Forma esperada: 2-procesos es más lento en tokens/seg totales que 1-proceso a esta escala, porque:
  • El modelo es diminuto; el overhead del all-reduce domina al cómputo.
  • Dos procesos contienden por los mismos cores de CPU.
  • La lección está en el resultado negativo. Borja escribe un párrafo de "por qué 2 procesos es más lento aquí" en el manifest.

Commitea el gráfico como experiments/35-ddp-cpu/scaling.png.

Bloque G — manifest

experiments/35-ddp-cpu/manifest.json:

{
  "seed": 35000,
  "lab": "01-ddp-on-cpu",
  "backend": "gloo",
  "world_size": 2,
  "model_params": "<filled in>",
  "steps": 100,
  "allreduce_p50_ms": "<filled in>",
  "allreduce_p95_ms": "<filled in>",
  "tokens_per_sec_1proc": "<filled in>",
  "tokens_per_sec_2proc": "<filled in>",
  "byte_equivalent": "<true/false>",
  "lesson_notes": "<the negative-result paragraph>"
}

Restricciones

  • Sin NCCL. Este lab es solo gloo. Guarda NCCL para el lab 02 donde tienes GPUs.
  • Sin tuning de calidad de producción. Nada de find_unused_parameters=True, nada de gradient_as_bucket_view. Por defecto. El objetivo es el mecanismo.
  • Coste: $0. Este lab nunca debe tocar una instancia cloud. Si te encuentras alargando la mano hacia una, para y relee el objetivo.
  • Solo single-node. torchrun --nnodes=1. Multi-nodo está fuera de alcance (y fuera de presupuesto, e irrelevante para DDP educativo).

Condiciones de parada

Has acabado cuando:

  1. experiments/35-ddp-cpu/{train.py, timings.jsonl, scaling.png, manifest.json, state-rank-0.pt, state-rank-1.pt} existen todos.
  2. state-rank-0.pt y state-rank-1.pt son byte-equivalentes con atol 1e-5.
  3. manifest.json registra los wall times p50/p95 del all-reduce.
  4. El párrafo de "por qué 2 procesos es más lento" está en el manifest y te suena correcto.
  5. src/minitrain/ddp.py tiene tests en tests/minitrain/test_ddp.py que pasan (mockeados o pytest --forked para lanzar el test de 2 procesos).

Pista de último recurso

Si torchrun te está peleando (las versiones de PyTorch se mueven, las convenciones de env-vars cambian): baja a torch.multiprocessing.spawn como fallback. La mecánica es la misma; el launcher difiere. Documenta la elección en el manifest.

Si gloo da error de "address in use", las colisiones de puerto son la causa. Pon MASTER_PORT=29500 (o lo que sea) antes de torchrun. Los docs de PyTorch lo cubren; mira ahí antes de improvisar.

Cuándo consultar solutions/

Tras haber commiteado el experimento. La solución vive en solutions/01-ddp-cpu-ref.md — escrita en la apertura de la fase tras tener el bucle de entrenamiento de la Fase 18 de Borja y los internos de PyTorch de la Fase 25.


Siguiente lab: lab/02-tp-inference-cloud.md.