"""PE-to-PE latency sweep across hop types and data sizes. Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for four hop types: H1 Intra-cube horizontal pe0 → pe1 H2 Intra-cube vertical pe0 → pe4 H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0 H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0 Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV. """ from __future__ import annotations import csv from dataclasses import dataclass from pathlib import Path import numpy as np import pytest from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip from kernbench.policy.placement.dp import DPPolicy from kernbench.runtime_api.context import RuntimeContext from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" PLOT_DIR = Path(__file__).parent / "pe2pe_latency_plots" SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240] N_CUBES = 16 N_PES = 8 ELEM_BYTES = 2 # f16 @dataclass(frozen=True) class Hop: id: str label: str src: tuple[int, int, int] dst: tuple[int, int, int] send_dir: str recv_dir: str supports_raw: bool HOPS = [ Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)", (0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True), Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)", (0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True), Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)", (0, 0, 0), (0, 1, 0), "E", "W", True), Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)", (0, 0, 0), (0, 4, 0), "S", "N", True), ] def _make_engine(): topo = resolve_topology(str(TOPOLOGY_PATH)) engine = GraphEngine(topo.topology_obj, enable_data=True) return engine, topo.topology_obj.spec # ── IPCQ path ──────────────────────────────────────────────────────── def _measure_ipcq(hop: Hop, nbytes: int) -> float: engine, spec = _make_engine() cfg = load_ccl_config() merged = resolve_algorithm_config(cfg, name="intercube_allreduce") merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes) n_elem = nbytes // ELEM_BYTES src_sip, src_cube, src_pe = hop.src dst_sip, dst_cube, dst_pe = hop.dst send_dir, recv_dir = hop.send_dir, hop.recv_dir with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id=f"ipcq_{hop.id}_{nbytes}", spec=spec, ) as ctx: configure_sfr_intercube_multisip(engine, spec, merged) dp = DPPolicy( cube="row_wise", pe="column_wise", num_cubes=N_CUBES, num_pes=N_PES, ) def kernel(t_ptr, n_elem, tl): pe_id = tl.program_id(axis=0) cube_id = tl.program_id(axis=1) if cube_id == src_cube and pe_id == src_pe: data = tl.load(t_ptr, shape=(n_elem,), dtype="f16") tl.send(dir=send_dir, src=data) elif cube_id == dst_cube and pe_id == dst_pe: tl.recv(dir=recv_dir, shape=(n_elem,), dtype="f16") tensors = [] for s in sorted({src_sip, dst_sip}): ctx.ahbm.set_device(s) t = ctx.zeros( (N_CUBES, N_PES * n_elem), dtype="f16", dp=dp, name=f"sip{s}", ) t.copy_(ctx.from_numpy( np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16), )) tensors.append(t) all_pending = [] for t in tensors: pending = ctx.launch( f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True, ) all_pending.extend(pending) for h, sip_id, meta in all_pending: ctx.wait(h, _meta=meta) # Per-PE kernel execution time (excludes launch dispatch and # response aggregation). IPCQ: DST blocks on tl.recv until the # send arrives, so max across SIPs = DST's transfer time. pe_exec_vals = [] for h, _sip, _meta in all_pending: _, trace = engine.get_completion(h) if trace and trace.get("pe_exec_ns") is not None: pe_exec_vals.append(float(trace["pe_exec_ns"])) return max(pe_exec_vals) if pe_exec_vals else 0.0 # ── Raw DMA path (intra-SIP only) ──────────────────────────────────── def _measure_raw(hop: Hop, nbytes: int) -> float: """tl.load from source slice + tl.store to destination slice. The VA mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all cubes of the SIP), so the store goes through the fabric to the destination PE's HBM. No IPCQ protocol involved. """ if not hop.supports_raw: raise RuntimeError(f"hop {hop.id} does not support raw path") engine, spec = _make_engine() n_elem = nbytes // ELEM_BYTES src_sip, src_cube, src_pe = hop.src dst_sip, dst_cube, dst_pe = hop.dst assert src_sip == dst_sip # Slice offsets in the (N_CUBES, N_PES * n_elem) tensor: # row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem # Byte offsets from va_base: src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id=f"raw_{hop.id}_{nbytes}", spec=spec, ) as ctx: dp = DPPolicy( cube="row_wise", pe="column_wise", num_cubes=N_CUBES, num_pes=N_PES, ) ctx.ahbm.set_device(src_sip) t = ctx.zeros( (N_CUBES, N_PES * n_elem), dtype="f16", dp=dp, name="raw_tensor", ) t.copy_(ctx.from_numpy( np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16), )) def kernel(t_ptr, n_elem, tl): pe_id = tl.program_id(axis=0) cube_id = tl.program_id(axis=1) if cube_id == src_cube and pe_id == src_pe: data = tl.load( t_ptr + src_off, shape=(n_elem,), dtype="f16", ) tl.store(t_ptr + dst_off, data) pending = ctx.launch( f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True, ) for h, sip_id, meta in pending: ctx.wait(h, _meta=meta) # Per-PE kernel execution time. Raw: only SRC does real work # (tl.load + tl.store, store is blocking), so max across all PEs # = SRC's transfer time. Idle PEs contribute only overhead_ns. pe_exec_vals = [] for h, _sip, _meta in pending: _, trace = engine.get_completion(h) if trace and trace.get("pe_exec_ns") is not None: pe_exec_vals.append(float(trace["pe_exec_ns"])) return max(pe_exec_vals) if pe_exec_vals else 0.0 # ── CSV + plotting ─────────────────────────────────────────────────── def _write_csv(records, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter( f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"], ) w.writeheader() for r in records: w.writerow(r) def _plot_per_hop(records, hop: Hop, path: Path) -> None: import matplotlib.pyplot as plt ipcq = sorted( [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], key=lambda r: r["size_bytes"], ) raw = sorted( [r for r in records if r["hop"] == hop.id and r["path"] == "raw"], key=lambda r: r["size_bytes"], ) fig, ax = plt.subplots(figsize=(8, 5)) if ipcq: ax.plot( [r["size_bytes"] for r in ipcq], [r["total_ns"] for r in ipcq], marker="o", label="IPCQ (send/recv)", color="tab:blue", ) if raw: ax.plot( [r["size_bytes"] for r in raw], [r["total_ns"] for r in raw], marker="s", label="Raw DMA (load+store)", color="tab:orange", ) ax.set_xlabel("Data size (bytes)") ax.set_ylabel("Latency (ns)") ax.set_title(hop.label) ax.grid(True, alpha=0.3) ax.legend() fig.tight_layout() fig.savefig(path, dpi=120) plt.close(fig) def _plot_overview(records, path: Path) -> None: import matplotlib.pyplot as plt fig, axes = plt.subplots(2, 2, figsize=(13, 9)) axes = axes.flatten() for i, hop in enumerate(HOPS): ax = axes[i] ipcq = sorted( [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], key=lambda r: r["size_bytes"], ) raw = sorted( [r for r in records if r["hop"] == hop.id and r["path"] == "raw"], key=lambda r: r["size_bytes"], ) if ipcq: ax.plot( [r["size_bytes"] for r in ipcq], [r["total_ns"] for r in ipcq], marker="o", label="IPCQ", color="tab:blue", ) if raw: ax.plot( [r["size_bytes"] for r in raw], [r["total_ns"] for r in raw], marker="s", label="Raw", color="tab:orange", ) ax.set_title(hop.label, fontsize=10) ax.set_xlabel("bytes") ax.set_ylabel("ns") ax.grid(True, alpha=0.3) ax.legend(fontsize=8) for j in range(len(HOPS), len(axes)): axes[j].axis("off") fig.suptitle( "PE-to-PE latency: IPCQ vs raw DMA", fontsize=14, ) fig.tight_layout() fig.savefig(path, dpi=120) plt.close(fig) # ── Test entry ─────────────────────────────────────────────────────── def test_pe_to_pe_latency_sweep(): records: list[dict] = [] for hop in HOPS: for size in SIZES: # IPCQ path ipcq_ns = _measure_ipcq(hop, size) records.append({ "hop": hop.id, "label": hop.label, "size_bytes": size, "path": "ipcq", "total_ns": ipcq_ns, }) raw_s = "n/a" if hop.supports_raw: raw_ns = _measure_raw(hop, size) records.append({ "hop": hop.id, "label": hop.label, "size_bytes": size, "path": "raw", "total_ns": raw_ns, }) raw_s = f"{raw_ns:7.1f}ns" print( f"[{hop.id}] size={size:5d} " f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}" ) PLOT_DIR.mkdir(parents=True, exist_ok=True) _write_csv(records, PLOT_DIR / "summary.csv") for hop in HOPS: _plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png") _plot_overview(records, PLOT_DIR / "overview.png") for hop in HOPS: rs = sorted( [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], key=lambda r: r["size_bytes"], ) for r in rs: assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0" print(f"\n Plots + CSV written to {PLOT_DIR}")