English · Español
Lab 01 — DDP entre 2 procesos de CPU (backend gloo, local, gratis)¶
Objetivo: hacer que un all-reduce real de
torch.distributedocurra en el portátil de Borja, entre dos procesos de entrenamiento del MiniGPT-grammar. Ver el protocolo de wire. Gratis.Tiempo estimado: 2–3 horas.
Prerrequisito: el bucle de entrenamiento de la Fase 18 funciona; los internos de PyTorch de la Fase 25 hechos. PyTorch local instalado con el grupo opt
serve(el backend gloo es parte del PyTorch estándar).
Qué produces¶
Una corrida funcional de DDP en experiments/35-ddp-cpu/ que:
- Lanza 2 procesos Python sobre la CPU local usando
torchrun --nproc-per-node=2. - Cada proceso entrena el mismo modelo MiniGPT-grammar sobre un shard disjunto del corpus de gramática-de-verbos en inglés (Fase 12).
- Los gradientes se hacen all-reduce vía gloo en cada step.
- Tras 100 steps, los tensores de parámetros de ambos procesos son byte-equivalentes (módulo tolerancia fp) — verificando que DDP hizo su trabajo.
- El tiempo de all-reduce de gradiente por step se mide y se loguea.
- Se commitea un gráfico de curva de scaling (1 proceso vs 2 procesos, tokens/seg en el eje y).
Más una extensión a src/minitrain/:
src/minitrain/ddp.py— un wrapper fino que proveewrap_for_ddp(model),is_main_process(),barrier(),cleanup(). Según el plan A12, esto es una extensión desrc/minitrain/, no un nuevo módulo top-level (src/distributed/no se crea en esta fase).
TODOs¶
Bloque A — implementa el wrapper de DDP¶
# src/minitrain/ddp.py — esqueleto (Borja escribe el cuerpo)
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as TorchDDP
def init_distributed(backend: str = "gloo") -> tuple[int, int]:
"""Initialize torch.distributed from torchrun env vars. Returns (rank, world_size)."""
...
def wrap_for_ddp(model: torch.nn.Module) -> torch.nn.Module:
"""Wraps with DDP. Assumes init_distributed already called."""
...
def is_main_process() -> bool:
"""True if rank == 0 (used to gate logging/checkpointing)."""
...
def cleanup() -> None:
"""dist.destroy_process_group() — call at the end of training."""
...
Restricciones:
- Solo backend gloo. La máquina de Borja no tiene GPU NVIDIA; NCCL no está disponible. El lab 02 usará NCCL — pero aquí no.
- Sin tuning de bucket-size. Valores por defecto de DDP de PyTorch estándar. El objetivo de este lab es el mecanismo, no la optimización.
- Determinismo por rank. Seed =
35000 + rank. Seeds distintos por rank (para que los shards de datos difieran); misma seed-base global para reproducibilidad.
Bloque B — parte el corpus en 2 shards¶
El corpus de gramática-de-verbos en inglés de la Fase 12 produce un único train.jsonl. Para DDP cada rank lee su propio shard:
- Shard por
index % world_size == rank. Sin solapamiento, cobertura completa, sin leakage. - Sanity check: la suma de conteo de frases del rank 0 y rank 1 = total.
- Loguea la primera frase que cada rank ve — verifica que son distintas (así sabes que el sharding funciona).
La lógica de shard vive en src/minitrain/data.py (ya existe según el plan de la Fase 18); añade una función make_shard(rank, world_size).
Bloque C — escribe el script de entrenamiento¶
experiments/35-ddp-cpu/train.py:
# Skeleton — Borja writes the body
def main():
rank, world_size = init_distributed("gloo")
seed_everything(35000 + rank)
model = build_minigpt(...) # Phase 17 module
model = wrap_for_ddp(model)
dataset = make_shard(rank, world_size) # Phase 18 module + extension
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
for step in range(100):
batch = next(dataset)
t_compute_start = time.perf_counter()
loss = model(batch).loss
loss.backward() # gradient all-reduce fires here
t_compute_end = time.perf_counter()
optimizer.step()
optimizer.zero_grad()
if is_main_process():
log_step(step, loss.item(), t_compute_end - t_compute_start)
cleanup()
Bloque D — mide el wall time del all-reduce¶
El truco: loss.backward() incluye el all-reduce, asíncronamente. Para aislar el coste de comm:
- Variante A — barrier entre paréntesis. Rodea
loss.backward()con llamadas adist.barrier(); el segundo barrier espera a que los all-reduces acaben; la diferencia de tiempo incluye al all-reduce. - Variante B — DDP comm hooks. El
register_comm_hookde PyTorch te deja interceptar el all-reduce; mide el hook directamente. - Implementa la variante A (más simple); la B se menciona para completitud.
Loguea los tiempos por step a experiments/35-ddp-cpu/timings.jsonl. Calcula media + mediana + p95 sobre los últimos 50 steps (los primeros 50 son warmup).
Bloque E — verifica equivalencia byte-a-byte¶
Tras 100 steps:
if is_main_process():
state_main = {k: v.clone() for k, v in model.state_dict().items()}
dist.barrier()
# rank-0 saves; rank-1 also saves to a separate file; we compare
torch.save(model.state_dict(), f"experiments/35-ddp-cpu/state-rank-{rank}.pt")
Luego en un script de análisis pequeño:
- Carga ambos ficheros.
- Para cada tensor, assertea
torch.allclose(s0, s1, atol=1e-5). - Imprime la máxima diferencia absoluta. Esperado: << 1e-5 (un all-reduce de gloo sobre modelos con seeds idénticas y datos en shards, tras la init idéntica del paso 0, producirá params idénticos en el paso 1; el drift numérico debería estar cerca de epsilon de máquina).
Bloque F — curva de scaling¶
Ejecuta el script con --nproc-per-node=1 y --nproc-per-node=2. Grafica:
- eje x: número de procesos.
- eje y: tokens/seg.
- Forma esperada: 2-procesos es más lento en tokens/seg totales que 1-proceso a esta escala, porque:
- El modelo es diminuto; el overhead del all-reduce domina al cómputo.
- Dos procesos contienden por los mismos cores de CPU.
- La lección está en el resultado negativo. Borja escribe un párrafo de "por qué 2 procesos es más lento aquí" en el manifest.
Commitea el gráfico como experiments/35-ddp-cpu/scaling.png.
Bloque G — manifest¶
experiments/35-ddp-cpu/manifest.json:
{
"seed": 35000,
"lab": "01-ddp-on-cpu",
"backend": "gloo",
"world_size": 2,
"model_params": "<filled in>",
"steps": 100,
"allreduce_p50_ms": "<filled in>",
"allreduce_p95_ms": "<filled in>",
"tokens_per_sec_1proc": "<filled in>",
"tokens_per_sec_2proc": "<filled in>",
"byte_equivalent": "<true/false>",
"lesson_notes": "<the negative-result paragraph>"
}
Restricciones¶
- Sin NCCL. Este lab es solo gloo. Guarda NCCL para el lab 02 donde tienes GPUs.
- Sin tuning de calidad de producción. Nada de
find_unused_parameters=True, nada degradient_as_bucket_view. Por defecto. El objetivo es el mecanismo. - Coste: $0. Este lab nunca debe tocar una instancia cloud. Si te encuentras alargando la mano hacia una, para y relee el objetivo.
- Solo single-node.
torchrun --nnodes=1. Multi-nodo está fuera de alcance (y fuera de presupuesto, e irrelevante para DDP educativo).
Condiciones de parada¶
Has acabado cuando:
experiments/35-ddp-cpu/{train.py, timings.jsonl, scaling.png, manifest.json, state-rank-0.pt, state-rank-1.pt}existen todos.state-rank-0.ptystate-rank-1.ptson byte-equivalentes con atol 1e-5.manifest.jsonregistra los wall times p50/p95 del all-reduce.- El párrafo de "por qué 2 procesos es más lento" está en el manifest y te suena correcto.
src/minitrain/ddp.pytiene tests entests/minitrain/test_ddp.pyque pasan (mockeados opytest --forkedpara lanzar el test de 2 procesos).
Pista de último recurso¶
Si torchrun te está peleando (las versiones de PyTorch se mueven, las convenciones de env-vars cambian): baja a torch.multiprocessing.spawn como fallback. La mecánica es la misma; el launcher difiere. Documenta la elección en el manifest.
Si gloo da error de "address in use", las colisiones de puerto son la causa. Pon MASTER_PORT=29500 (o lo que sea) antes de torchrun. Los docs de PyTorch lo cubren; mira ahí antes de improvisar.
Cuándo consultar solutions/¶
Tras haber commiteado el experimento. La solución vive en solutions/01-ddp-cpu-ref.md — escrita en la apertura de la fase tras tener el bucle de entrenamiento de la Fase 18 de Borja y los internos de PyTorch de la Fase 25.
Siguiente lab: lab/02-tp-inference-cloud.md.