"""Full distributed path against topology.yaml as-is (no overrides). The same flow a real DDP training script would use: init_process_group(backend="ahbm") → mp.spawn → dist.all_reduce. """ from __future__ import annotations from kernbench.runtime_api.context import RuntimeContext from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology from tests.sccl._allreduce_helpers import ( DEFAULT_N_ELEM, TOPOLOGY_PATH, _worker, _write_ccl_yaml, ) def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch): monkeypatch.chdir(_write_ccl_yaml(tmp_path)) topo = resolve_topology(str(TOPOLOGY_PATH)) engine = GraphEngine(topo.topology_obj, enable_data=True) spec = topo.topology_obj.spec n_sips = int(spec["system"]["sips"]["count"]) cm = spec["sip"]["cube_mesh"] n_cubes = int(cm["w"]) * int(cm["h"]) with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id="dist_intercube_ar", spec=spec, ) as ctx: ctx.distributed.init_process_group(backend="ahbm") assert ctx.distributed.get_world_size() == n_sips t_start = engine._env.now ctx.multiprocessing.spawn( _worker, args=(n_cubes, DEFAULT_N_ELEM, n_sips, ctx), nprocs=n_sips, ) t_end = engine._env.now print(f"\n[distributed] sim latency = " f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)")