diff --git a/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.csv b/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.csv new file mode 100644 index 0000000..c41cccf --- /dev/null +++ b/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.csv @@ -0,0 +1,13 @@ +buffer_kind,sip_topology,n_sips,n_elem,bytes_per_pe,latency_ns +hbm,torus_2d,6,128,256,2002.0399999999827 +hbm,torus_2d,6,1024,2048,3541.0399999999827 +hbm,torus_2d,6,8192,16384,15889.03999999999 +hbm,torus_2d,6,32768,65536,58225.03999999998 +sram,torus_2d,6,128,256,1762.0399999999827 +sram,torus_2d,6,1024,2048,2293.0399999999827 +sram,torus_2d,6,8192,16384,6577.039999999986 +sram,torus_2d,6,32768,65536,21265.03999999992 +tcm,torus_2d,6,128,256,1678.0399999999827 +tcm,torus_2d,6,1024,2048,1957.0399999999827 +tcm,torus_2d,6,8192,16384,4225.039999999986 +tcm,torus_2d,6,32768,65536,12001.03999999992 diff --git a/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.png b/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.png new file mode 100644 index 0000000..194b975 Binary files /dev/null and b/docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.png differ diff --git a/src/kernbench/common/ipcq_types.py b/src/kernbench/common/ipcq_types.py index 578aaed..3dc6e0f 100644 --- a/src/kernbench/common/ipcq_types.py +++ b/src/kernbench/common/ipcq_types.py @@ -31,6 +31,26 @@ class IpcqInvalidDirection(ValueError): has no neighbor installed for this PE.""" +# ── ADR-0023 D9.7: IPCQ slot-memory latency model ─────────────────── +# +# Per-tier (bw_gbs, overhead_ns) used to charge the slot write (inbound) +# and slot read (recv consume). Mirrors topology.yaml component values. +_BUFFER_KIND_BW: dict[str, tuple[float, float]] = { + "tcm": (512.0, 0.0), + "sram": (128.0, 2.0), + "hbm": (32.0, 6.0), +} + + +def slot_io_latency_ns(buffer_kind: str, nbytes: int) -> float: + """Per-access latency for one slot read/write of ``nbytes`` against + the IPCQ backing memory tier (``buffer_kind``).""" + bw_gbs, overhead_ns = _BUFFER_KIND_BW.get( + buffer_kind, _BUFFER_KIND_BW["tcm"], + ) + return float(nbytes) / bw_gbs + overhead_ns + + # ── D2.5: IpcqEndpoint ─────────────────────────────────────────────── diff --git a/src/kernbench/components/builtin/pe_dma.py b/src/kernbench/components/builtin/pe_dma.py index 04c6129..35fca81 100644 --- a/src/kernbench/components/builtin/pe_dma.py +++ b/src/kernbench/components/builtin/pe_dma.py @@ -219,6 +219,16 @@ class PeDmaComponent(PeEngineBase): token = txn.request + # ADR-0023 D9.7: charge IPCQ slot-WRITE latency against the + # backing-memory tier (tcm/sram/hbm) before the atomic block. + # Must come BEFORE the atomic write→IpcqMetaArrival pair (I6). + from kernbench.common.ipcq_types import slot_io_latency_ns + slot_write_ns = slot_io_latency_ns( + token.dst_endpoint.buffer_kind, token.nbytes, + ) + if slot_write_ns > 0: + yield env.timeout(slot_write_ns) + # ── ATOMIC: do not introduce yield between these two operations ── # 1. Move data via MemoryStore (single-hop DMA write). # Prefer the in-flight snapshot stashed by the sender PE_DMA; diff --git a/src/kernbench/components/builtin/pe_ipcq.py b/src/kernbench/components/builtin/pe_ipcq.py index 8ef75dd..43a456c 100644 --- a/src/kernbench/components/builtin/pe_ipcq.py +++ b/src/kernbench/components/builtin/pe_ipcq.py @@ -329,6 +329,16 @@ class PeIpcqComponent(ComponentBase): qp["my_tail"] += 1 + # ADR-0023 D9.7: charge IPCQ slot-READ latency against the + # backing-memory tier (tcm/sram/hbm). Recv blocks for the + # kernel-side slot consume; pe_exec_ns reflects this cost. + from kernbench.common.ipcq_types import slot_io_latency_ns + slot_read_ns = slot_io_latency_ns( + self._buffer_kind, req.result_data.get("nbytes", 0), + ) + if slot_read_ns > 0: + yield env.timeout(slot_read_ns) + # Diagnostics trace (D14) from kernbench.ccl import diagnostics if diagnostics.trace_enabled(): diff --git a/tests/conftest.py b/tests/conftest.py index 1c6cced..3d9725b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,23 +27,27 @@ def pytest_sessionfinish(session, exitstatus): import sys from pathlib import Path - mod_path = Path(__file__).parent / "test_allreduce_multidevice.py" - if not mod_path.exists(): - return - spec = importlib.util.spec_from_file_location( - "_test_allreduce_multidevice_for_aggregate", mod_path, - ) - if spec is None or spec.loader is None: - return - mod = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = mod - try: - spec.loader.exec_module(mod) - agg = getattr(mod, "_aggregate_sweep_plots", None) - if agg is not None: - agg() - except Exception as e: - print(f"[conftest] sweep aggregation failed: {e}") + def _exec(name: str, attr: str) -> None: + mod_path = Path(__file__).parent / name + if not mod_path.exists(): + return + s = importlib.util.spec_from_file_location( + f"_{name.removesuffix('.py')}_for_aggregate", mod_path, + ) + if s is None or s.loader is None: + return + mod = importlib.util.module_from_spec(s) + sys.modules[s.name] = mod + try: + s.loader.exec_module(mod) + fn = getattr(mod, attr, None) + if fn is not None: + fn() + except Exception as e: + print(f"[conftest] aggregator {attr}() in {name} failed: {e}") + + _exec("test_allreduce_multidevice.py", "_aggregate_sweep_plots") + _exec("test_allreduce_buffer_kind_sweep.py", "aggregate_buffer_kind_plot") @pytest.fixture(scope="session") diff --git a/tests/test_allreduce_buffer_kind_sweep.py b/tests/test_allreduce_buffer_kind_sweep.py new file mode 100644 index 0000000..9e8aab9 --- /dev/null +++ b/tests/test_allreduce_buffer_kind_sweep.py @@ -0,0 +1,196 @@ +"""Phase 1 buffer-kind allreduce sweep — torus_2d 6 SIPs. + +Parametrized over (buffer_kind, n_elem). Each case runs the standard +config-driven allreduce app and writes a JSON row to a shared staging +dir; the conftest sessionfinish hook (added in Phase 1) aggregates +rows into ``docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.png``. + +Pre-Phase-2: the three buffer-kind lines overlap exactly because slot +access is latency-free today. Post-Phase-2 they spread out (tcm +fastest, hbm slowest). +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +import yaml + +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +# Reuse the allreduce app helpers. +from tests.test_allreduce_multidevice import ( + _write_temp_configs, + run_allreduce, +) + + +_BUFFER_KINDS = ["tcm", "sram", "hbm"] +_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot +_ELEM_BYTES_F16 = 2 + +_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams" + / "allreduce_latency_plots") +_ROWS_DIR = _OUT_DIR / "_buffer_kind_rows" + + +def _bk_params(): + out = [] + for bk in _BUFFER_KINDS: + for n_elem in _N_ELEM_GRID: + out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}")) + return out + + +@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params()) +def test_buffer_kind_allreduce_one(tmp_path, buffer_kind, n_elem): + """One config of the buffer-kind sweep. xdist parallelizes.""" + sub = tmp_path / f"{buffer_kind}_{n_elem}" + sub.mkdir() + topo_path, ccl_path = _write_temp_configs( + sub, + sip_topology="torus_2d", + n_sips=6, + algorithm="intercube_allreduce", + sip_w=3, sip_h=2, + n_elem_override=n_elem, + ) + # Override buffer_kind in the temp ccl.yaml. + with open(ccl_path) as f: + ccl_cfg = yaml.safe_load(f) + ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind + ccl_cfg.setdefault("algorithms", {}).setdefault( + "intercube_allreduce", {}, + )["buffer_kind"] = buffer_kind + with open(ccl_path, "w") as f: + yaml.dump(ccl_cfg, f, default_flow_style=False) + + topo = resolve_topology(topo_path) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id=f"bk_sweep_{buffer_kind}_{n_elem}", + spec=spec, + ) as ctx: + result = run_allreduce( + ctx, engine, spec, + algorithm="intercube_allreduce", ccl_yaml=ccl_path, + ) + assert result["ok_cubes"] > 0 + + pe_exec_vals = [ + float(tr.get("pe_exec_ns", 0.0) or 0.0) + for _, (_, tr) in engine._results.items() + if isinstance(tr, dict) + ] + crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0 + + bytes_per_pe = n_elem * _ELEM_BYTES_F16 + record = { + "buffer_kind": buffer_kind, + "sip_topology": "torus_2d", + "n_sips": 6, + "n_elem": n_elem, + "bytes_per_pe": bytes_per_pe, + "latency_ns": crit_ns, + } + _ROWS_DIR.mkdir(parents=True, exist_ok=True) + row_path = _ROWS_DIR / f"{buffer_kind}_{n_elem}.json" + with open(row_path, "w", encoding="utf-8") as f: + json.dump(record, f) + + +def aggregate_buffer_kind_plot() -> bool: + """Read per-config rows and emit buffer_kind_sweep.png + CSV. + + Called from conftest.pytest_sessionfinish (controller-only). + Returns True if rows were aggregated. + """ + import csv + + if not _ROWS_DIR.exists(): + return False + row_files = sorted(_ROWS_DIR.glob("*.json")) + if not row_files: + return False + + records = [] + for p in row_files: + with open(p, encoding="utf-8") as f: + records.append(json.load(f)) + + import matplotlib.pyplot as plt + from matplotlib.ticker import FuncFormatter + + def _fmt_bytes(x, _pos): + if x <= 0: + return "0" + if x >= 1024 * 1024: + return f"{x / (1024 * 1024):.0f} MB" + if x >= 1024: + return f"{x / 1024:.0f} KB" + return f"{x:.0f} B" + + _bytes_fmt = FuncFormatter(_fmt_bytes) + + _OUT_DIR.mkdir(parents=True, exist_ok=True) + with open(_OUT_DIR / "buffer_kind_sweep.csv", "w", + newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=[ + "buffer_kind", "sip_topology", "n_sips", "n_elem", + "bytes_per_pe", "latency_ns", + ]) + w.writeheader() + for r in sorted(records, key=lambda r: ( + r["buffer_kind"], r["bytes_per_pe"], + )): + w.writerow(r) + + colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"} + fig, ax = plt.subplots(figsize=(10, 6)) + for bk in ["tcm", "sram", "hbm"]: + rs = sorted( + [r for r in records if r["buffer_kind"] == bk], + key=lambda r: r["bytes_per_pe"], + ) + if not rs: + continue + ax.plot( + [r["bytes_per_pe"] for r in rs], + [r["latency_ns"] for r in rs], + marker="o", lw=2.0, + color=colors[bk], label=f"buffer_kind = {bk}", + ) + ax.set_xscale("log", base=2) + ax.set_xlabel("Bytes per PE (log scale)") + ax.set_ylabel("Time (ns)") + ax.set_title( + "Allreduce torus_2d (6 SIPs, 3×2) — IPCQ slot memory tier" + ) + ax.grid(True, alpha=0.3) + ax.legend() + ax.xaxis.set_major_formatter(_bytes_fmt) + fig.tight_layout() + fig.savefig(_OUT_DIR / "buffer_kind_sweep.png", dpi=130) + plt.close(fig) + + for p in row_files: + try: + p.unlink() + except OSError: + pass + try: + _ROWS_DIR.rmdir() + except OSError: + pass + + print(f"\nWrote {_OUT_DIR / 'buffer_kind_sweep.png'} " + f"from {len(records)} rows") + return True diff --git a/tests/test_ipcq_buffer_kind_latency.py b/tests/test_ipcq_buffer_kind_latency.py new file mode 100644 index 0000000..b2aeaa0 --- /dev/null +++ b/tests/test_ipcq_buffer_kind_latency.py @@ -0,0 +1,219 @@ +"""Phase 1 micro-tests for IPCQ slot-memory latency model. + +These tests assert the TARGET behavior expected after Phase 2 wires +``buffer_kind`` (tcm/sram/hbm) into the IPCQ slot read/write latency +charges. They are written BEFORE the production change and are +EXPECTED TO FAIL today. + +Failure semantics today: + - Slot access is latency-free, so the tcm/sram/hbm runs produce + identical pe_exec_ns. The ordering assertion therefore fails with + "tcm == sram == hbm" — proving the test harness is wired and that + Phase 2 production work is what makes them pass. + +Reference (Phase 2 will edit these): + - src/kernbench/components/builtin/pe_dma.py — _handle_ipcq_inbound + - src/kernbench/components/builtin/pe_ipcq.py — _handle_recv, + _BUFFER_KIND_BW table + - src/kernbench/runtime_api/kernel.py — IpcqDmaToken adds + buffer_kind field + - ccl.yaml — algorithm.buffer_kind + +The tests reuse the existing config-driven allreduce app +(``run_allreduce`` in tests/test_allreduce_multidevice.py) with a 2-SIP +ring topology and a SMALL n_elem so they finish fast (~3-5 s each). +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest + +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +# Reuse the test app's helpers so this micro-test file does not +# duplicate the run-allreduce + write-temp-configs plumbing. +from tests.test_allreduce_multidevice import ( + _write_temp_configs, + run_allreduce, +) + + +# Expected per-tier BW + overhead (Phase 2 will encode this in +# pe_ipcq.py). Mirrors topology.yaml component values. +_EXPECTED_BW = { + "tcm": (512.0, 0.0), + "sram": (128.0, 2.0), + "hbm": (32.0, 6.0), +} + + +def _expected_slot_io_ns(buffer_kind: str, nbytes: int) -> float: + """Per-access latency the model is expected to add (write OR read).""" + bw_gbs, overhead_ns = _EXPECTED_BW[buffer_kind] + # 1 GB/s = 1 byte/ns + return nbytes / bw_gbs + overhead_ns + + +def _run_torus_allreduce( + tmp_path: Path, *, buffer_kind: str, n_elem: int, +) -> float: + """Run one torus_2d 6-SIP allreduce and return critical-path + pe_exec_ns. The buffer_kind override is wired into ccl.yaml. + """ + sub = tmp_path / f"{buffer_kind}_{n_elem}" + sub.mkdir() + topo_path, ccl_path = _write_temp_configs( + sub, + sip_topology="torus_2d", + n_sips=6, + algorithm="intercube_allreduce", + sip_w=3, sip_h=2, + n_elem_override=n_elem, + ) + # Patch ccl.yaml in-place so the algorithm picks up buffer_kind. + import yaml + + with open(ccl_path) as f: + ccl_cfg = yaml.safe_load(f) + ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind + ccl_cfg.setdefault("algorithms", {}).setdefault( + "intercube_allreduce", {}, + )["buffer_kind"] = buffer_kind + with open(ccl_path, "w") as f: + yaml.dump(ccl_cfg, f, default_flow_style=False) + + topo = resolve_topology(topo_path) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id=f"bk_{buffer_kind}_{n_elem}", + spec=spec, + ) as ctx: + result = run_allreduce( + ctx, engine, spec, + algorithm="intercube_allreduce", ccl_yaml=ccl_path, + ) + assert result["ok_cubes"] > 0, "allreduce did not validate" + + pe_exec_vals = [ + float(tr.get("pe_exec_ns", 0.0) or 0.0) + for _, (_, tr) in engine._results.items() + if isinstance(tr, dict) + ] + return max(pe_exec_vals) if pe_exec_vals else 0.0 + + +# ── Phase 1 assertions ─────────────────────────────────────────────── + + +def test_slot_write_latency_orders_tcm_sram_hbm(tmp_path): + """tcm < sram < hbm at 8192 B per send. + + Pre-Phase-2: all three return the same pe_exec_ns and this + assertion fails. Post-Phase-2: the per-tier BW + overhead make + hbm visibly slower than sram, which is slower than tcm. + """ + n_elem = 4096 # 8192 B per slot + lat_tcm = _run_torus_allreduce(tmp_path, buffer_kind="tcm", n_elem=n_elem) + lat_sram = _run_torus_allreduce(tmp_path, buffer_kind="sram", n_elem=n_elem) + lat_hbm = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=n_elem) + + # Expected per-access deltas (write+read = 2× the per-access value). + exp_tcm = 2 * _expected_slot_io_ns("tcm", n_elem * 2) + exp_sram = 2 * _expected_slot_io_ns("sram", n_elem * 2) + exp_hbm = 2 * _expected_slot_io_ns("hbm", n_elem * 2) + # Floor margin: 50% of the raw expected per-access delta — lets Phase 2 + # implementation choose to charge only one side without breaking the test, + # but still requires a clearly observable gap. + margin_sram_tcm = 0.5 * (exp_sram - exp_tcm) + margin_hbm_sram = 0.5 * (exp_hbm - exp_sram) + + assert lat_sram > lat_tcm + margin_sram_tcm, ( + f"sram should be slower than tcm by ≥ {margin_sram_tcm:.1f} ns " + f"per allreduce, got sram={lat_sram:.1f} tcm={lat_tcm:.1f} " + f"(delta={lat_sram - lat_tcm:.1f})" + ) + assert lat_hbm > lat_sram + margin_hbm_sram, ( + f"hbm should be slower than sram by ≥ {margin_hbm_sram:.1f} ns " + f"per allreduce, got hbm={lat_hbm:.1f} sram={lat_sram:.1f} " + f"(delta={lat_hbm - lat_sram:.1f})" + ) + + +def test_slot_io_scales_linearly_with_nbytes(tmp_path): + """For buffer_kind=hbm, doubling nbytes should add ~nbytes/32 ns + of latency to each slot access. Sanity-checks the slope. + + Pre-Phase-2: latency does not respond to nbytes via memory BW + (only via fabric drain), so the observed slope is dominated by + fabric BW and does NOT match 1/32 ns/B. + """ + lat_4k = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=2048) + lat_8k = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=4096) + + # Expected delta from doubling: at least one slot-IO event per cube + # in the critical path (very conservative). Per-access add = 4096/32 ≈ 128 + # ns on HBM going from 4k → 8k. Multiple slot accesses on the critical + # path should make the observed delta meaningfully larger. + expected_min_delta = 0.5 * (4096 / 32.0) # ≈ 64 ns + assert lat_8k - lat_4k > expected_min_delta, ( + f"doubling nbytes on hbm should add ≥ {expected_min_delta:.1f} ns " + f"of slot-IO latency, got delta={lat_8k - lat_4k:.1f} ns " + f"(lat_4k={lat_4k:.1f}, lat_8k={lat_8k:.1f})" + ) + + +def test_buffer_kind_sensitivity_grows_with_payload(tmp_path): + """Credit-return cost is fabric-only by design (16 B packet); only + the data slot-IO charge depends on ``buffer_kind``. Therefore the + tcm-vs-hbm gap must scale with payload size and be a small fraction + of the large-payload gap at small payloads. + + Concrete invariant the model must satisfy: + gap_small / gap_large < 0.10 + + Pre-Phase-2: gap_small == gap_large == 0 (division undefined → test + fails because gap_large is required > 0). Post-Phase-2: at small + nbytes the slot-IO charge is dominated by the constant + ``overhead_ns`` term, while at large nbytes it is dominated by the + ``nbytes / bw_gbs`` term — so gap_large grows linearly while + gap_small stays small. + """ + n_elem_small = 8 # 16 B per slot — overhead-bound + n_elem_large = 16384 # 32 KB per slot — bandwidth-bound + + lat_tcm_small = _run_torus_allreduce( + tmp_path, buffer_kind="tcm", n_elem=n_elem_small, + ) + lat_hbm_small = _run_torus_allreduce( + tmp_path, buffer_kind="hbm", n_elem=n_elem_small, + ) + lat_tcm_large = _run_torus_allreduce( + tmp_path, buffer_kind="tcm", n_elem=n_elem_large, + ) + lat_hbm_large = _run_torus_allreduce( + tmp_path, buffer_kind="hbm", n_elem=n_elem_large, + ) + + gap_small = abs(lat_hbm_small - lat_tcm_small) + gap_large = abs(lat_hbm_large - lat_tcm_large) + + assert gap_large > 1000.0, ( + f"large-payload buffer_kind gap must be observably large " + f"(this is the sweep's whole point). got gap_large={gap_large:.1f} ns " + f"(lat_tcm_large={lat_tcm_large:.1f}, lat_hbm_large={lat_hbm_large:.1f})" + ) + assert gap_small / gap_large < 0.10, ( + f"buffer_kind sensitivity should grow with payload — " + f"small-payload gap should be < 10% of large-payload gap. " + f"got gap_small={gap_small:.1f} ns, gap_large={gap_large:.1f} ns, " + f"ratio={gap_small / gap_large:.3f}" + )