"""End-to-end distributed test for intercube allreduce. Exercises the full process-group path: dist.init_process_group(backend="ahbm") → mp.spawn(nprocs=n_sips) → each worker: set_device → allocate → fill → dist.all_reduce → verify This is the same flow a real DDP training script would use. """ from __future__ import annotations import os import textwrap from pathlib import Path import numpy as np import pytest TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" N_CUBES = 16 N_ELEM = 8 def _write_ccl_yaml(tmp_path) -> str: body = textwrap.dedent("""\ defaults: algorithm: lrab_hierarchical_allreduce buffer_kind: tcm backpressure: sleep n_slots: 4 slot_size: 4096 vc_chunk_size: 256 ipcq_credit_size_bytes: 16 algorithms: lrab_hierarchical_allreduce: module: kernbench.ccl.algorithms.lrab_hierarchical_allreduce topology: none buffer_kind: tcm n_elem: 8 root_cube: 15 """) (tmp_path / "ccl.yaml").write_text(body) return str(tmp_path) def _worker(rank: int, n_sips: int, torch) -> None: """Per-SIP worker: allocate, fill, all_reduce, verify.""" from kernbench.policy.placement.dp import DPPolicy torch.ahbm.set_device(rank) dp = DPPolicy( cube="row_wise", pe="replicate", num_pes=1, num_cubes=N_CUBES, ) tensor = torch.zeros( (N_CUBES, N_ELEM), dtype="f16", dp=dp, name=f"sip{rank}", ) init_arr = np.full((N_CUBES, N_ELEM), float(rank + 1), dtype=np.float16) tensor.copy_(torch.from_numpy(init_arr)) print(f"[SIP {rank}] input cube0[:4] = {tensor.numpy()[0][:4].tolist()}") torch.distributed.all_reduce(tensor, op="sum") arr = tensor.numpy() expected = float(N_CUBES * sum(range(1, n_sips + 1))) print(f"[SIP {rank}] output cube0[:4] = {arr[0][:4].tolist()}") print(f"[SIP {rank}] output cube15[:4] = {arr[15][:4].tolist()}") for cube_id in range(N_CUBES): assert np.allclose(arr[cube_id], expected, rtol=1e-1, atol=1e-1), ( f"SIP{rank} cube {cube_id}: " f"got {arr[cube_id][:4]}, expected {expected}" ) if rank == 0: print(f"\n lrab_hierarchical_allreduce (ws={n_sips}): " f"{n_sips * N_CUBES} OK") def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch): """Full distributed path: init_process_group → mp.spawn → all_reduce.""" from kernbench.runtime_api.context import RuntimeContext from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology monkeypatch.chdir(_write_ccl_yaml(tmp_path)) topo = resolve_topology(str(TOPOLOGY_PATH)) engine = GraphEngine(topo.topology_obj, enable_data=True) spec = topo.topology_obj.spec n_sips = int(spec["system"]["sips"]["count"]) with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id="dist_intercube_ar", spec=spec, ) as ctx: ctx.distributed.init_process_group(backend="ahbm") assert ctx.distributed.get_world_size() == n_sips t_start = engine._env.now ctx.multiprocessing.spawn( _worker, args=(n_sips, ctx), nprocs=n_sips, ) t_end = engine._env.now print(f"\n[distributed] sim latency = " f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)")