Files
kernbench2/tests/test_pe_to_pe_latency.py
T
mukesh a563169e89 Add tl.recv_no_consume diagnostic API for apples-to-apples pe2pe plot
The pe2pe overview compared IPCQ (tl.send + tl.recv) against raw DMA
(tl.load + tl.store), but DMA is one-sided — DST never reads — while
tl.recv pays a slot-read on DST. The comparison was unfair: IPCQ
looked slower partly because it does more work.

Adds tl.recv_no_consume() — a separate, diagnostic-only entry point
that blocks for slot arrival but skips the slot-read (and bank-hop)
charge on DST. Production tl.recv is unchanged (no `consume` kwarg
on the public API), so the diagnostic flag can never accidentally
leak into real workloads.

Updates test_pe_to_pe_latency to call tl.recv_no_consume so the
overview.png shows IPCQ no-consume vs raw DMA on equal footing.
Also fixes PLOT_DIR back to docs/diagrams/pe2pe_latency_plots/
(was lost in a merge). Adds scripts/replot_pe2pe.py for label-only
re-renders without re-measuring.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 18:20:44 -07:00

362 lines
12 KiB
Python

"""PE-to-PE latency sweep across hop types and data sizes.
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for four
hop types. The IPCQ path uses ``tl.recv_no_consume(...)`` so that DST
does not pay the slot-read latency — apples-to-apples with the DMA
path, which is a one-sided write that has no read on DST.
``tl.recv_no_consume`` is a DIAGNOSTIC-only entry point that exists
solely to draw this graph; production kernels use ``tl.recv``.
H1 Intra-cube horizontal pe0 → pe1
H2 Intra-cube vertical pe0 → pe4
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pytest
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
PLOT_DIR = (
Path(__file__).parent.parent / "docs" / "diagrams" / "pe2pe_latency_plots"
)
SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240]
N_CUBES = 16
N_PES = 8
ELEM_BYTES = 2 # f16
@dataclass(frozen=True)
class Hop:
id: str
label: str
src: tuple[int, int, int]
dst: tuple[int, int, int]
send_dir: str
recv_dir: str
supports_raw: bool
HOPS = [
Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)",
(0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True),
Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)",
(0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True),
Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)",
(0, 0, 0), (0, 1, 0), "E", "W", True),
Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)",
(0, 0, 0), (0, 4, 0), "S", "N", True),
]
def _make_engine():
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
return engine, topo.topology_obj.spec
# ── IPCQ path ────────────────────────────────────────────────────────
def _measure_ipcq(hop: Hop, nbytes: int) -> float:
engine, spec = _make_engine()
cfg = load_ccl_config()
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes)
n_elem = nbytes // ELEM_BYTES
src_sip, src_cube, src_pe = hop.src
dst_sip, dst_cube, dst_pe = hop.dst
send_dir, recv_dir = hop.send_dir, hop.recv_dir
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"ipcq_{hop.id}_{nbytes}",
spec=spec,
) as ctx:
configure_sfr_intercube_multisip(engine, spec, merged)
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
tl.send(dir=send_dir, src=data)
elif cube_id == dst_cube and pe_id == dst_pe:
# tl.recv_no_consume: DST blocks until bytes land in
# slot but skips slot-read latency. Apples-to-apples
# with the raw-DMA path below, which has no DST read.
# Diagnostic-only — production kernels use tl.recv.
tl.recv_no_consume(dir=recv_dir,
shape=(n_elem,), dtype="f16")
tensors = []
for s in sorted({src_sip, dst_sip}):
ctx.ahbm.set_device(s)
t = ctx.zeros(
(N_CUBES, N_PES * n_elem), dtype="f16",
dp=dp, name=f"sip{s}",
)
t.copy_(ctx.from_numpy(
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
))
tensors.append(t)
all_pending = []
for t in tensors:
pending = ctx.launch(
f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True,
)
all_pending.extend(pending)
for h, sip_id, meta in all_pending:
ctx.wait(h, _meta=meta)
# Per-PE kernel execution time (excludes launch dispatch and
# response aggregation). IPCQ: DST blocks on tl.recv until the
# send arrives, so max across SIPs = DST's transfer time.
pe_exec_vals = []
for h, _sip, _meta in all_pending:
_, trace = engine.get_completion(h)
if trace and trace.get("pe_exec_ns") is not None:
pe_exec_vals.append(float(trace["pe_exec_ns"]))
return max(pe_exec_vals) if pe_exec_vals else 0.0
# ── Raw DMA path (intra-SIP only) ────────────────────────────────────
def _measure_raw(hop: Hop, nbytes: int) -> float:
"""tl.load from source slice + tl.store to destination slice. The VA
mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all
cubes of the SIP), so the store goes through the fabric to the
destination PE's HBM. No IPCQ protocol involved.
"""
if not hop.supports_raw:
raise RuntimeError(f"hop {hop.id} does not support raw path")
engine, spec = _make_engine()
n_elem = nbytes // ELEM_BYTES
src_sip, src_cube, src_pe = hop.src
dst_sip, dst_cube, dst_pe = hop.dst
assert src_sip == dst_sip
# Slice offsets in the (N_CUBES, N_PES * n_elem) tensor:
# row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem
# Byte offsets from va_base:
src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES
dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"raw_{hop.id}_{nbytes}",
spec=spec,
) as ctx:
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
ctx.ahbm.set_device(src_sip)
t = ctx.zeros(
(N_CUBES, N_PES * n_elem), dtype="f16",
dp=dp, name="raw_tensor",
)
t.copy_(ctx.from_numpy(
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
))
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(
t_ptr + src_off, shape=(n_elem,), dtype="f16",
)
tl.store(t_ptr + dst_off, data)
pending = ctx.launch(
f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True,
)
for h, sip_id, meta in pending:
ctx.wait(h, _meta=meta)
# Per-PE kernel execution time. Raw: only SRC does real work
# (tl.load + tl.store, store is blocking), so max across all PEs
# = SRC's transfer time. Idle PEs contribute only overhead_ns.
pe_exec_vals = []
for h, _sip, _meta in pending:
_, trace = engine.get_completion(h)
if trace and trace.get("pe_exec_ns") is not None:
pe_exec_vals.append(float(trace["pe_exec_ns"]))
return max(pe_exec_vals) if pe_exec_vals else 0.0
# ── CSV + plotting ───────────────────────────────────────────────────
def _write_csv(records, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(
f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"],
)
w.writeheader()
for r in records:
w.writerow(r)
def _plot_per_hop(records, hop: Hop, path: Path) -> None:
import matplotlib.pyplot as plt
ipcq = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
raw = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
key=lambda r: r["size_bytes"],
)
fig, ax = plt.subplots(figsize=(8, 5))
if ipcq:
ax.plot(
[r["size_bytes"] for r in ipcq],
[r["total_ns"] for r in ipcq],
marker="o", label="IPCQ no-consume (send/recv, no slot read)",
color="tab:blue",
)
if raw:
ax.plot(
[r["size_bytes"] for r in raw],
[r["total_ns"] for r in raw],
marker="s", label="Raw DMA (load+store)", color="tab:orange",
)
ax.set_xlabel("Data size (bytes)")
ax.set_ylabel("Latency (ns)")
ax.set_title(hop.label)
ax.grid(True, alpha=0.3)
ax.legend()
fig.tight_layout()
fig.savefig(path, dpi=120)
plt.close(fig)
def _plot_overview(records, path: Path) -> None:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(13, 9))
axes = axes.flatten()
for i, hop in enumerate(HOPS):
ax = axes[i]
ipcq = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
raw = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
key=lambda r: r["size_bytes"],
)
if ipcq:
ax.plot(
[r["size_bytes"] for r in ipcq],
[r["total_ns"] for r in ipcq],
marker="o", label="IPCQ no-consume", color="tab:blue",
)
if raw:
ax.plot(
[r["size_bytes"] for r in raw],
[r["total_ns"] for r in raw],
marker="s", label="Raw DMA", color="tab:orange",
)
ax.set_title(hop.label, fontsize=10)
ax.set_xlabel("bytes")
ax.set_ylabel("ns")
ax.grid(True, alpha=0.3)
ax.legend(fontsize=8)
for j in range(len(HOPS), len(axes)):
axes[j].axis("off")
fig.suptitle(
"PE-to-PE latency: IPCQ no-consume vs raw DMA",
fontsize=14,
)
fig.tight_layout()
fig.savefig(path, dpi=120)
plt.close(fig)
# ── Test entry ───────────────────────────────────────────────────────
def test_pe_to_pe_latency_sweep():
records: list[dict] = []
for hop in HOPS:
for size in SIZES:
# IPCQ path uses tl.recv(consume=False) — apples-to-apples
# with the raw-DMA path, which has no DST read either.
ipcq_ns = _measure_ipcq(hop, size)
records.append({
"hop": hop.id, "label": hop.label,
"size_bytes": size, "path": "ipcq",
"total_ns": ipcq_ns,
})
raw_s = "n/a"
if hop.supports_raw:
raw_ns = _measure_raw(hop, size)
records.append({
"hop": hop.id, "label": hop.label,
"size_bytes": size, "path": "raw",
"total_ns": raw_ns,
})
raw_s = f"{raw_ns:7.1f}ns"
print(
f"[{hop.id}] size={size:5d} "
f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}"
)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
_write_csv(records, PLOT_DIR / "summary.csv")
for hop in HOPS:
_plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png")
_plot_overview(records, PLOT_DIR / "overview.png")
for hop in HOPS:
rs = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
for r in rs:
assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0"
print(f"\n Plots + CSV written to {PLOT_DIR}")