Files
kernbench2/tests/test_allreduce_buffer_kind_sweep.py
T
mukesh 84a1325e5c ADR-0023 D9.7: IPCQ slot-memory latency model (TCM/SRAM/HBM)
Charge per-tier bandwidth + setup overhead at IPCQ slot WRITE
(receiver inbound DMA, in pe_dma._handle_ipcq_inbound) and slot
READ (recv consume, in pe_ipcq._handle_recv). Tier table
(common/ipcq_types.py):
  tcm  : 512 GB/s, 0 ns
  sram : 128 GB/s, 2 ns
  hbm  :  32 GB/s, 6 ns

Before this change, slot read/write was free regardless of
buffer_kind, making memory-tier choice invisible in simulated
latency. After the change, swapping buffer_kind in ccl.yaml
produces measurable per-tier separation in allreduce latency.

Tests:
  test_ipcq_buffer_kind_latency.py — three micro-tests asserting
    tcm < sram < hbm ordering, payload-scaling, and that
    buffer_kind sensitivity grows with payload (credit-only path
    stays fabric-bound).
  test_allreduce_buffer_kind_sweep.py — 12-config parametrized
    sweep emitting buffer_kind_sweep.png (3 lines, torus_2d).

conftest sessionfinish hook generalised to dispatch multiple
sweep aggregators (allreduce + buffer-kind).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 21:28:34 -07:00

197 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 1 buffer-kind allreduce sweep — torus_2d 6 SIPs.
Parametrized over (buffer_kind, n_elem). Each case runs the standard
config-driven allreduce app and writes a JSON row to a shared staging
dir; the conftest sessionfinish hook (added in Phase 1) aggregates
rows into ``docs/diagrams/allreduce_latency_plots/buffer_kind_sweep.png``.
Pre-Phase-2: the three buffer-kind lines overlap exactly because slot
access is latency-free today. Post-Phase-2 they spread out (tcm
fastest, hbm slowest).
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import yaml
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
# Reuse the allreduce app helpers.
from tests.test_allreduce_multidevice import (
_write_temp_configs,
run_allreduce,
)
_BUFFER_KINDS = ["tcm", "sram", "hbm"]
_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot
_ELEM_BYTES_F16 = 2
_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams"
/ "allreduce_latency_plots")
_ROWS_DIR = _OUT_DIR / "_buffer_kind_rows"
def _bk_params():
out = []
for bk in _BUFFER_KINDS:
for n_elem in _N_ELEM_GRID:
out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}"))
return out
@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params())
def test_buffer_kind_allreduce_one(tmp_path, buffer_kind, n_elem):
"""One config of the buffer-kind sweep. xdist parallelizes."""
sub = tmp_path / f"{buffer_kind}_{n_elem}"
sub.mkdir()
topo_path, ccl_path = _write_temp_configs(
sub,
sip_topology="torus_2d",
n_sips=6,
algorithm="intercube_allreduce",
sip_w=3, sip_h=2,
n_elem_override=n_elem,
)
# Override buffer_kind in the temp ccl.yaml.
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind
ccl_cfg.setdefault("algorithms", {}).setdefault(
"intercube_allreduce", {},
)["buffer_kind"] = buffer_kind
with open(ccl_path, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"bk_sweep_{buffer_kind}_{n_elem}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm="intercube_allreduce", ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0
pe_exec_vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
bytes_per_pe = n_elem * _ELEM_BYTES_F16
record = {
"buffer_kind": buffer_kind,
"sip_topology": "torus_2d",
"n_sips": 6,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"latency_ns": crit_ns,
}
_ROWS_DIR.mkdir(parents=True, exist_ok=True)
row_path = _ROWS_DIR / f"{buffer_kind}_{n_elem}.json"
with open(row_path, "w", encoding="utf-8") as f:
json.dump(record, f)
def aggregate_buffer_kind_plot() -> bool:
"""Read per-config rows and emit buffer_kind_sweep.png + CSV.
Called from conftest.pytest_sessionfinish (controller-only).
Returns True if rows were aggregated.
"""
import csv
if not _ROWS_DIR.exists():
return False
row_files = sorted(_ROWS_DIR.glob("*.json"))
if not row_files:
return False
records = []
for p in row_files:
with open(p, encoding="utf-8") as f:
records.append(json.load(f))
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
def _fmt_bytes(x, _pos):
if x <= 0:
return "0"
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f} MB"
if x >= 1024:
return f"{x / 1024:.0f} KB"
return f"{x:.0f} B"
_bytes_fmt = FuncFormatter(_fmt_bytes)
_OUT_DIR.mkdir(parents=True, exist_ok=True)
with open(_OUT_DIR / "buffer_kind_sweep.csv", "w",
newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=[
"buffer_kind", "sip_topology", "n_sips", "n_elem",
"bytes_per_pe", "latency_ns",
])
w.writeheader()
for r in sorted(records, key=lambda r: (
r["buffer_kind"], r["bytes_per_pe"],
)):
w.writerow(r)
colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"}
fig, ax = plt.subplots(figsize=(10, 6))
for bk in ["tcm", "sram", "hbm"]:
rs = sorted(
[r for r in records if r["buffer_kind"] == bk],
key=lambda r: r["bytes_per_pe"],
)
if not rs:
continue
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o", lw=2.0,
color=colors[bk], label=f"buffer_kind = {bk}",
)
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
ax.set_ylabel("Time (ns)")
ax.set_title(
"Allreduce torus_2d (6 SIPs, 3×2) — IPCQ slot memory tier"
)
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(_OUT_DIR / "buffer_kind_sweep.png", dpi=130)
plt.close(fig)
for p in row_files:
try:
p.unlink()
except OSError:
pass
try:
_ROWS_DIR.rmdir()
except OSError:
pass
print(f"\nWrote {_OUT_DIR / 'buffer_kind_sweep.png'} "
f"from {len(records)} rows")
return True