"""Config-driven multi-device allreduce test application. Reads ``ccl.yaml`` + ``topology.yaml``, dynamically loads the kernel module from ``ccl.yaml → module``, and picks the inter-SIP exchange pattern from ``topology.yaml → system.sips.topology``. Run directly:: python -m pytest tests/allreduce_app.py -v -s """ from __future__ import annotations import importlib import math from pathlib import Path from typing import Any import numpy as np from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip from kernbench.policy.placement.dp import DPPolicy def _sip_topo_dims(sip_topo: str, n_sips: int) -> tuple[int, int]: if sip_topo == "ring_1d": return (0, 0) side = int(round(math.sqrt(n_sips))) if side * side != n_sips: raise ValueError( f"SIP topology '{sip_topo}' requires square n_sips, got {n_sips}" ) return (side, side) def run_allreduce( ctx: Any, engine: Any, spec: dict, *, algorithm: str | None = None, ccl_yaml: str | None = None, ) -> dict: """Config-driven allreduce: read yaml, load kernel, run. Everything is resolved from config — no hardcoded kernel imports. """ cfg_all = load_ccl_config(ccl_yaml) cfg = resolve_algorithm_config(cfg_all, algorithm) # Dynamic import from ccl.yaml → module algo_module = importlib.import_module(cfg["module"]) kernel_fn = algo_module.kernel topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND n_elem = int(cfg.get("n_elem", 8)) n_sips = int(spec.get("system", {}).get("sips", {}).get("count", 1)) sip_topo = str( spec.get("system", {}).get("sips", {}).get("topology", "ring_1d") ) cm = spec["sip"]["cube_mesh"] cube_w = int(cm["w"]) cube_h = int(cm["h"]) n_cubes = cube_w * cube_h sip_topo_kind = topo_name_to_kind.get(sip_topo, 0) sip_topo_w, sip_topo_h = _sip_topo_dims(sip_topo, n_sips) algo_name = cfg.get("algorithm", "allreduce") print(f"\n{'=' * 60}") print(f"algorithm: {algo_name}") print(f"module: {cfg['module']}") print(f"sip_topology: {sip_topo}") print(f"kernel: {kernel_fn.__name__}") print(f"n_sips: {n_sips}") print(f"n_cubes: {n_cubes}") print(f"n_elem: {n_elem}") print(f"{'=' * 60}") configure_sfr_intercube_multisip(engine, spec, cfg) dp = DPPolicy( cube="row_wise", pe="replicate", num_pes=1, num_cubes=n_cubes, ) tensors = [] for sip in range(n_sips): ctx.ahbm.set_device(sip) t = ctx.zeros( (n_cubes, n_elem), dtype="f16", dp=dp, name=f"sip{sip}", ) t.copy_(ctx.from_numpy( np.full((n_cubes, n_elem), float(sip + 1), dtype=np.float16) )) tensors.append(t) for sip in range(n_sips): arr = tensors[sip].numpy() print(f"[SIP {sip}] input cube0[:4] = {arr[0][:4].tolist()} " f"cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}") t_start = engine._env.now all_pending = [] for sip_rank, t in enumerate(tensors): pending = ctx.launch( algo_name, kernel_fn, t, n_elem, cube_w, cube_h, n_sips, sip_rank, sip_topo_kind, sip_topo_w, sip_topo_h, _defer_wait=True, ) all_pending.extend(pending) for h, sip_id, meta in all_pending: ctx.wait(h, _meta=meta) t_end = engine._env.now latency_ns = t_end - t_start print(f"\n[{algo_name} ws={n_sips}] sim latency = " f"{latency_ns:.1f} ns ({latency_ns / 1000:.3f} us)") for key, (_, trace) in engine._results.items(): if not isinstance(trace, dict): continue total = trace.get("total_ns", 0.0) pe_exec = trace.get("pe_exec_ns", 0.0) or 0.0 network = total - pe_exec print(f" [{key}] total={total:.1f} ns " f"pe_exec={pe_exec:.1f} ns network={network:.1f} ns") expected = float(n_cubes * sum(range(1, n_sips + 1))) print() for sip in range(n_sips): arr = tensors[sip].numpy() print(f"[SIP {sip}] output cube0[:4] = {arr[0][:4].tolist()}") print(f"[SIP {sip}] output cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}") ok_cubes = 0 for sip in range(n_sips): arr = tensors[sip].numpy() for cube_id in range(n_cubes): assert np.allclose( arr[cube_id], expected, rtol=1e-1, atol=1e-1, ), ( f"SIP{sip} cube {cube_id}: " f"got {arr[cube_id][:4]}, expected {expected}" ) ok_cubes += 1 print(f"\n {algo_name} (ws={n_sips}): {ok_cubes} OK") return { "expected": expected, "latency_ns": latency_ns, "ok_cubes": ok_cubes, } # ── pytest entry point ─────────────────────────────────────────────── import pytest import yaml from kernbench.runtime_api.context import RuntimeContext from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" CONFIGS = [ pytest.param("intercube_allreduce", "ring_1d", 2, id="ring_2sip"), pytest.param("intercube_allreduce", "torus_2d", 4, id="torus_4sip"), pytest.param("intercube_allreduce", "mesh_2d_no_wrap", 4, id="mesh_4sip"), ] def _write_temp_configs(tmp_path, sip_topology, n_sips, algorithm): """Write temp topology.yaml and ccl.yaml with the given overrides.""" with open(TOPOLOGY_PATH) as f: topo_cfg = yaml.safe_load(f) topo_cfg["system"]["sips"]["count"] = n_sips topo_cfg["system"]["sips"]["topology"] = sip_topology topo_path = tmp_path / "topology.yaml" with open(topo_path, "w") as f: yaml.dump(topo_cfg, f, default_flow_style=False) ccl_path = Path(__file__).parent.parent / "ccl.yaml" with open(ccl_path) as f: ccl_cfg = yaml.safe_load(f) ccl_cfg["defaults"]["algorithm"] = algorithm tmp_ccl = tmp_path / "ccl.yaml" with open(tmp_ccl, "w") as f: yaml.dump(ccl_cfg, f, default_flow_style=False) return str(topo_path), str(tmp_ccl) @pytest.mark.parametrize("algorithm,sip_topology,n_sips", CONFIGS) def test_allreduce(tmp_path, algorithm, sip_topology, n_sips): topo_path, ccl_path = _write_temp_configs( tmp_path, sip_topology, n_sips, algorithm, ) topo = resolve_topology(topo_path) engine = GraphEngine(topo.topology_obj, enable_data=True) spec = topo.topology_obj.spec with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id=f"test_{algorithm}_{sip_topology}", spec=spec, ) as ctx: result = run_allreduce( ctx, engine, spec, algorithm=algorithm, ccl_yaml=ccl_path, ) assert result["ok_cubes"] > 0