Files
kernbench2/tests/test_ipcq_buffer_kind_latency.py
mukesh b610cb0d9a sccl: drive allreduce tests via torch.distributed; reorganize into tests/sccl/
Convert the multidevice allreduce correctness + latency/buffer-kind sweeps
to run through the real PyTorch-distributed path
(init_process_group(backend="ahbm") -> mp.spawn -> dist.all_reduce) instead
of direct ctx.launch, and reorganize the CCL/allreduce tests into a
tests/sccl/ package split one test per file.

Production change (required for the distributed path on non-square SIP grids):
- AhbmCCLBackend now reads explicit system.sips.w/h from the spec, with a
  square-only sqrt fallback that raises on ambiguity, instead of silently
  guessing round(sqrt(count)). This fixes the 2x3 / 3x2 torus + mesh cases,
  which previously resolved to a wrong 2x2 grid. Mirrors the test helper's
  _sip_topo_dims precedence (explicit w/h > square fallback > raise).

Test reorganization (tests/sccl/):
- _allreduce_helpers.py: shared plumbing (distributed driver, config writers,
  direct-launch run_allreduce parity reference, sweep/buffer-kind constants,
  plot aggregators, topology-diagram + FSIM-comparison emitters).
- test_allreduce_ring_torus_mesh.py: correctness across ring/torus/mesh.
- test_distributed_default_topology.py: full distributed path on topology.yaml.
- test_plot_latency_sweep.py / test_plot_buffer_kind_sweep.py: sweep rows.
- test_plot_topology_diagram.py / test_plot_comparison_fsim.py: plot emitters.
- test_intercube_root_center.py: moved in (ADR-0032 center-root latency guard).

Also:
- Move the FSIM comparison plot generator out of scripts/ into the sccl suite.
- Delete superseded test files (test_allreduce_multidevice,
  test_distributed_lrab_hierarchical_allreduce, test_allreduce_buffer_kind_sweep)
  and repoint conftest aggregators + the ipcq buffer-kind importers.
- Regenerate the allreduce_latency_plots derived artifacts from the full sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 22:24:43 -07:00

238 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 1 micro-tests for IPCQ slot-memory latency model.
These tests assert the TARGET behavior expected after Phase 2 wires
``buffer_kind`` (tcm/sram/hbm) into the IPCQ slot read/write latency
charges. They are written BEFORE the production change and are
EXPECTED TO FAIL today.
Failure semantics today:
- Slot access is latency-free, so the tcm/sram/hbm runs produce
identical pe_exec_ns. The ordering assertion therefore fails with
"tcm == sram == hbm" — proving the test harness is wired and that
Phase 2 production work is what makes them pass.
Reference (Phase 2 will edit these):
- src/kernbench/components/builtin/pe_dma.py — _handle_ipcq_inbound
- src/kernbench/components/builtin/pe_ipcq.py — _handle_recv,
_BUFFER_KIND_BW table
- src/kernbench/runtime_api/kernel.py — IpcqDmaToken adds
buffer_kind field
- ccl.yaml — algorithm.buffer_kind
The tests reuse the existing config-driven allreduce app
(``run_allreduce`` in tests/sccl/_allreduce_helpers.py) with a 2-SIP
ring topology and a SMALL n_elem so they finish fast (~3-5 s each).
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
# Reuse the test app's helpers so this micro-test file does not
# duplicate the run-allreduce + write-temp-configs plumbing.
from tests.sccl._allreduce_helpers import (
_write_temp_configs,
run_allreduce,
)
# Expected per-tier (slot intrinsic BW, fixed overhead, PE↔bank hop BW).
# Slot intrinsic mirrors _BUFFER_KIND_BW in src/kernbench/common/ipcq_types.py.
# PE↔bank hop reflects topology.yaml link BWs:
# - TCM is per-PE local → no hop, encoded as inf.
# - SRAM bank sits on cube NoC behind sram_to_router_bw_gbs = 128 GB/s.
# - HBM ctrl sits on cube NoC behind hbm_to_router_bw_gbs = 256 GB/s.
_EXPECTED_TIER = {
"tcm": {"slot_bw_gbs": 512.0, "overhead_ns": 0.0, "bank_hop_bw_gbs": float("inf")},
"sram": {"slot_bw_gbs": 512.0, "overhead_ns": 2.0, "bank_hop_bw_gbs": 128.0},
"hbm": {"slot_bw_gbs": 256.0, "overhead_ns": 6.0, "bank_hop_bw_gbs": 256.0},
}
def _expected_slot_io_ns(buffer_kind: str, nbytes: int) -> float:
"""Per-access latency the model is expected to add (write OR read).
Includes the PE↔bank fabric hop for non-TCM tiers — SRAM and HBM
live on the cube NoC behind a router-attached link, so each slot
access pays a fabric drain in addition to the intrinsic slot-IO.
"""
tier = _EXPECTED_TIER[buffer_kind]
bank_hop_ns = nbytes / tier["bank_hop_bw_gbs"]
slot_io_ns = nbytes / tier["slot_bw_gbs"] + tier["overhead_ns"]
return bank_hop_ns + slot_io_ns
def _run_torus_allreduce(
tmp_path: Path, *, buffer_kind: str, n_elem: int,
) -> float:
"""Run one torus_2d 6-SIP allreduce and return critical-path
pe_exec_ns. The buffer_kind override is wired into ccl.yaml.
"""
sub = tmp_path / f"{buffer_kind}_{n_elem}"
sub.mkdir()
topo_path, ccl_path = _write_temp_configs(
sub,
sip_topology="torus_2d",
n_sips=6,
algorithm="lrab_hierarchical_allreduce",
sip_w=3, sip_h=2,
n_elem_override=n_elem,
)
# Patch ccl.yaml in-place so the algorithm picks up buffer_kind.
import yaml
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind
ccl_cfg.setdefault("algorithms", {}).setdefault(
"lrab_hierarchical_allreduce", {},
)["buffer_kind"] = buffer_kind
with open(ccl_path, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"bk_{buffer_kind}_{n_elem}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm="lrab_hierarchical_allreduce", ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0, "allreduce did not validate"
pe_exec_vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
return max(pe_exec_vals) if pe_exec_vals else 0.0
# ── Phase 1 assertions ───────────────────────────────────────────────
def test_slot_write_latency_orders_tcm_hbm_sram(tmp_path):
"""tcm < hbm < sram at 8192 B per send.
The ordering is set by the topology link BWs, NOT the intrinsic slot
cell rates: SRAM and HBM both live on the cube NoC behind a router
link, and SRAM's link (128 GB/s) is the narrowest in the system —
narrower than HBM's (256 GB/s). So once the PE↔bank hop is charged,
SRAM ends up the slowest tier even though its slot cell array has
the same intrinsic BW as TCM.
Pre-fix model misses the PE↔bank hop entirely → assertion FAILS
(today's ordering is tcm < sram < hbm). Post-fix model includes the
hop → assertion PASSES.
"""
n_elem = 4096 # 8192 B per slot
lat_tcm = _run_torus_allreduce(tmp_path, buffer_kind="tcm", n_elem=n_elem)
lat_sram = _run_torus_allreduce(tmp_path, buffer_kind="sram", n_elem=n_elem)
lat_hbm = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=n_elem)
# Expected per-access deltas (write+read = 2× the per-access value).
exp_tcm = 2 * _expected_slot_io_ns("tcm", n_elem * 2)
exp_sram = 2 * _expected_slot_io_ns("sram", n_elem * 2)
exp_hbm = 2 * _expected_slot_io_ns("hbm", n_elem * 2)
# Floor margin: 50% of the raw expected per-access delta — lets the
# implementation choose to charge only one side without breaking the
# test, but still requires a clearly observable gap.
margin_hbm_tcm = 0.5 * (exp_hbm - exp_tcm)
margin_sram_hbm = 0.5 * (exp_sram - exp_hbm)
assert lat_hbm > lat_tcm + margin_hbm_tcm, (
f"hbm should be slower than tcm by ≥ {margin_hbm_tcm:.1f} ns "
f"per allreduce, got hbm={lat_hbm:.1f} tcm={lat_tcm:.1f} "
f"(delta={lat_hbm - lat_tcm:.1f})"
)
assert lat_sram > lat_hbm + margin_sram_hbm, (
f"sram should be slower than hbm by ≥ {margin_sram_hbm:.1f} ns "
f"per allreduce (sram bank link 128 GB/s is narrower than hbm "
f"link 256 GB/s), got sram={lat_sram:.1f} hbm={lat_hbm:.1f} "
f"(delta={lat_sram - lat_hbm:.1f})"
)
def test_slot_io_scales_linearly_with_nbytes(tmp_path):
"""For buffer_kind=hbm, doubling nbytes should add ~nbytes/32 ns
of latency to each slot access. Sanity-checks the slope.
Pre-Phase-2: latency does not respond to nbytes via memory BW
(only via fabric drain), so the observed slope is dominated by
fabric BW and does NOT match 1/32 ns/B.
"""
lat_4k = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=2048)
lat_8k = _run_torus_allreduce(tmp_path, buffer_kind="hbm", n_elem=4096)
# Expected delta from doubling: at least one slot-IO event per cube
# in the critical path (very conservative). Per-access add = 4096/256 = 16
# ns on HBM going from 4k → 8k. Multiple slot accesses on the critical
# path should make the observed delta meaningfully larger.
expected_min_delta = 0.5 * (4096 / 256.0) # ≈ 8 ns
assert lat_8k - lat_4k > expected_min_delta, (
f"doubling nbytes on hbm should add ≥ {expected_min_delta:.1f} ns "
f"of slot-IO latency, got delta={lat_8k - lat_4k:.1f} ns "
f"(lat_4k={lat_4k:.1f}, lat_8k={lat_8k:.1f})"
)
def test_buffer_kind_sensitivity_grows_with_payload(tmp_path):
"""Credit-return cost is fabric-only by design (16 B packet); only
the data slot-IO charge depends on ``buffer_kind``. Therefore the
tcm-vs-hbm gap must scale with payload size and be a small fraction
of the large-payload gap at small payloads.
Concrete invariant the model must satisfy:
gap_small / gap_large < 0.10
Pre-Phase-2: gap_small == gap_large == 0 (division undefined → test
fails because gap_large is required > 0). Post-Phase-2: at small
nbytes the slot-IO charge is dominated by the constant
``overhead_ns`` term, while at large nbytes it is dominated by the
``nbytes / bw_gbs`` term — so gap_large grows linearly while
gap_small stays small.
"""
n_elem_small = 8 # 16 B per slot — overhead-bound
n_elem_large = 16384 # 32 KB per slot — bandwidth-bound
lat_tcm_small = _run_torus_allreduce(
tmp_path, buffer_kind="tcm", n_elem=n_elem_small,
)
lat_hbm_small = _run_torus_allreduce(
tmp_path, buffer_kind="hbm", n_elem=n_elem_small,
)
lat_tcm_large = _run_torus_allreduce(
tmp_path, buffer_kind="tcm", n_elem=n_elem_large,
)
lat_hbm_large = _run_torus_allreduce(
tmp_path, buffer_kind="hbm", n_elem=n_elem_large,
)
gap_small = abs(lat_hbm_small - lat_tcm_small)
gap_large = abs(lat_hbm_large - lat_tcm_large)
assert gap_large > 1000.0, (
f"large-payload buffer_kind gap must be observably large "
f"(this is the sweep's whole point). got gap_large={gap_large:.1f} ns "
f"(lat_tcm_large={lat_tcm_large:.1f}, lat_hbm_large={lat_hbm_large:.1f})"
)
assert gap_small / gap_large < 0.10, (
f"buffer_kind sensitivity should grow with payload — "
f"small-payload gap should be < 10% of large-payload gap. "
f"got gap_small={gap_small:.1f} ns, gap_large={gap_large:.1f} ns, "
f"ratio={gap_small / gap_large:.3f}"
)