From 6918e6e906814af1469151bf294b34d945a45d29 Mon Sep 17 00:00:00 2001 From: Mukesh Garg Date: Wed, 22 Apr 2026 21:04:31 -0700 Subject: [PATCH] PE-to-PE latency test + supporting fixes Adds tests/test_pe_to_pe_latency.py: a sweep that measures PE-to-PE transfer latency for five hop types (intra-cube horizontal/vertical, inter-cube horizontal/vertical, inter-SIP) across data sizes 128 B to 10 KB, on both the IPCQ (tl.send/tl.recv) and raw-DMA (tl.load+tl.store) paths. Emits per-hop PNG plots, an overview PNG, and a CSV summary into tests/pe2pe_latency_plots/. Latency is reported as max(pe_exec_ns) across participating PEs, read from engine.get_completion(), so the measurement captures the SRC/DST PE's kernel body time rather than the full launch+ response-aggregation envelope. Two simulator fixes were needed to make this measurement meaningful: - PeMMU now stores a list of (start, end, pa) sub-regions per page rather than a single PA. DPPolicy layouts with shards smaller than page_size (e.g. 128 B payloads with 4 KB pages) used to silently overwrite each other through last-write-wins, causing DMAs intended for cube0 to physically route to cube3 - inflating latency by ~170 ns per DMA at small sizes. STOPGAP: real MMUs don't support sub-page regions; long-term fix is either smaller MMU page size or DPPolicy validation that refuses sub-page shards. - M_CPU's per-PE metrics aggregation (pe_exec_ns, dma_ns, compute_ns) now max-merges against the existing value in result_data rather than overwriting. Multi-cube workloads share one result_data dict via IO_CPU fanout; the previous overwrite caused whichever cube's M_CPU finished last to clobber others' values, so multi-cube pe_exec_ns was racy and frequently 0. Same fix applied in legacy/builtin/m_cpu.py. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/kernbench/components/builtin/m_cpu.py | 13 +- .../components/legacy/builtin/m_cpu.py | 13 +- src/kernbench/policy/address/pe_mmu.py | 83 +++- tests/test_pe_to_pe_latency.py | 358 ++++++++++++++++++ 4 files changed, 446 insertions(+), 21 deletions(-) create mode 100644 tests/test_pe_to_pe_latency.py diff --git a/src/kernbench/components/builtin/m_cpu.py b/src/kernbench/components/builtin/m_cpu.py index 4fb9a12..496de9c 100644 --- a/src/kernbench/components/builtin/m_cpu.py +++ b/src/kernbench/components/builtin/m_cpu.py @@ -204,16 +204,21 @@ class MCpuComponent(ComponentBase): yield all_done del self._parent_txns[request.request_id] - # Aggregate PE-internal metrics (max across PEs) + # Aggregate PE-internal metrics (max across PEs and across cubes). + # Multiple M_CPUs share the same result_data dict via IO_CPU fanout; + # merge against the existing value so cubes don't clobber each other. pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns] if pe_exec_values: - txn.result_data["pe_exec_ns"] = max(pe_exec_values) + cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0 + txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values)) dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns] if dma_values: - txn.result_data["dma_ns"] = max(dma_values) + cur = txn.result_data.get("dma_ns", 0.0) or 0.0 + txn.result_data["dma_ns"] = max(cur, max(dma_values)) compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns] if compute_values: - txn.result_data["compute_ns"] = max(compute_values) + cur = txn.result_data.get("compute_ns", 0.0) or 0.0 + txn.result_data["compute_ns"] = max(cur, max(compute_values)) # Send aggregate response on reverse command path back to IO_CPU reverse_path = list(reversed(txn.path)) diff --git a/src/kernbench/components/legacy/builtin/m_cpu.py b/src/kernbench/components/legacy/builtin/m_cpu.py index 4fb9a12..496de9c 100644 --- a/src/kernbench/components/legacy/builtin/m_cpu.py +++ b/src/kernbench/components/legacy/builtin/m_cpu.py @@ -204,16 +204,21 @@ class MCpuComponent(ComponentBase): yield all_done del self._parent_txns[request.request_id] - # Aggregate PE-internal metrics (max across PEs) + # Aggregate PE-internal metrics (max across PEs and across cubes). + # Multiple M_CPUs share the same result_data dict via IO_CPU fanout; + # merge against the existing value so cubes don't clobber each other. pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns] if pe_exec_values: - txn.result_data["pe_exec_ns"] = max(pe_exec_values) + cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0 + txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values)) dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns] if dma_values: - txn.result_data["dma_ns"] = max(dma_values) + cur = txn.result_data.get("dma_ns", 0.0) or 0.0 + txn.result_data["dma_ns"] = max(cur, max(dma_values)) compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns] if compute_values: - txn.result_data["compute_ns"] = max(compute_values) + cur = txn.result_data.get("compute_ns", 0.0) or 0.0 + txn.result_data["compute_ns"] = max(cur, max(compute_values)) # Send aggregate response on reverse command path back to IO_CPU reverse_path = list(reversed(txn.path)) diff --git a/src/kernbench/policy/address/pe_mmu.py b/src/kernbench/policy/address/pe_mmu.py index 6080b24..125d009 100644 --- a/src/kernbench/policy/address/pe_mmu.py +++ b/src/kernbench/policy/address/pe_mmu.py @@ -19,7 +19,14 @@ class PageFault(Exception): class PeMMU: - """Per-PE MMU with page-aligned VA→PA translation table. + """Per-PE MMU with sub-page-capable VA→PA translation table. + + Each page-table entry is a list of (start_in_page, end_in_page, + pa_at_offset_zero) regions. This is a SIMULATOR STOPGAP — real MMUs + store one PA per page-table entry. Sub-page regions exist here so + DPPolicy layouts that shard below page granularity (e.g. 128 B + payloads with 4 KB pages) don't silently mis-route through last- + write-wins overwrites. Memory note: project_mmu_subpage_stopgap.md. Args: page_size: Page size in bytes (default 2 MB). @@ -34,7 +41,11 @@ class PeMMU: self._page_size = page_size self._page_shift = (page_size - 1).bit_length() self._page_mask = page_size - 1 - self._table: dict[int, int] = {} # va_page_number → pa_page_base + # vpn → list of (start_in_page, end_in_page, pa_at_offset_zero). + # pa_at_offset_zero is the PA that offset 0 of the page would map + # to under this region — i.e. translate(off) = pa_at_offset_zero + # + off when start <= off < end. + self._table: dict[int, list[tuple[int, int, int]]] = {} self._overhead_ns = overhead_ns @property @@ -46,21 +57,67 @@ class PeMMU: return len(self._table) def map(self, va: int, pa: int, size: int) -> None: - """Register VA→PA mapping for a contiguous range.""" - for off in range(0, size, self._page_size): - vpn = (va + off) >> self._page_shift - self._table[vpn] = pa + off + """Register VA→PA mapping for a contiguous range. + + Sub-page-aware: a single page can hold multiple disjoint regions, + each pointing to a different PA. Later map() calls APPEND a new + region; on overlap with an existing region, the new region wins + for the overlapping offsets (translate iterates in reverse so the + last write takes precedence — matches legacy single-PA behavior + when a full page is re-mapped). + """ + end_va = va + size + cur = va + while cur < end_va: + vpn = cur >> self._page_shift + page_base_va = vpn << self._page_shift + page_end_va = page_base_va + self._page_size + region_start = cur - page_base_va + region_end = min(end_va, page_end_va) - page_base_va + # PA seen at offset 0 of page if this region's mapping covered it + pa_at_offset_zero = pa + (cur - va) - region_start + self._table.setdefault(vpn, []).append( + (region_start, region_end, pa_at_offset_zero) + ) + cur = page_base_va + region_end def unmap(self, va: int, size: int) -> None: - """Remove VA mapping for a contiguous range.""" - for off in range(0, size, self._page_size): - vpn = (va + off) >> self._page_shift - self._table.pop(vpn, None) + """Remove VA mapping for a contiguous range. + + Drops any region whose extent is contained within the unmapped + range. Partial overlaps (region straddles the range boundary) + are left in place — caller is expected to unmap on the same + boundaries it mapped on. + """ + end_va = va + size + cur = va + while cur < end_va: + vpn = cur >> self._page_shift + page_base_va = vpn << self._page_shift + page_end_va = page_base_va + self._page_size + unmap_start = cur - page_base_va + unmap_end = min(end_va, page_end_va) - page_base_va + regions = self._table.get(vpn) + if regions is not None: + kept = [ + r for r in regions + if not (r[0] >= unmap_start and r[1] <= unmap_end) + ] + if kept: + self._table[vpn] = kept + else: + del self._table[vpn] + cur = page_base_va + unmap_end def translate(self, va: int) -> int: """Translate VA to PA. Raises PageFault if unmapped.""" vpn = va >> self._page_shift - pa_page_base = self._table.get(vpn) - if pa_page_base is None: + regions = self._table.get(vpn) + if regions is None: raise PageFault(va) - return pa_page_base + (va & self._page_mask) + offset = va & self._page_mask + # Iterate latest-first so newer map() calls win on overlap + for start, end, pa_at_offset_zero in reversed(regions): + if start <= offset < end: + return pa_at_offset_zero + offset + raise PageFault(va) diff --git a/tests/test_pe_to_pe_latency.py b/tests/test_pe_to_pe_latency.py new file mode 100644 index 0000000..b1d0afc --- /dev/null +++ b/tests/test_pe_to_pe_latency.py @@ -0,0 +1,358 @@ +"""PE-to-PE latency sweep across hop types and data sizes. + +Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for five +hop types: + + H1 Intra-cube horizontal pe0 → pe1 + H2 Intra-cube vertical pe0 → pe4 + H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0 + H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0 + H5 Inter-SIP sip0.cube0.pe0 → sip1.cube0.pe0 (IPCQ only — + raw needs + cross-SIP MMU) + +Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV. +""" +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path + +import numpy as np +import pytest + +from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config +from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip +from kernbench.policy.placement.dp import DPPolicy +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" +PLOT_DIR = Path(__file__).parent / "pe2pe_latency_plots" + +SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240] + +N_CUBES = 16 +N_PES = 8 +ELEM_BYTES = 2 # f16 + + +@dataclass(frozen=True) +class Hop: + id: str + label: str + src: tuple[int, int, int] + dst: tuple[int, int, int] + send_dir: str + recv_dir: str + supports_raw: bool # False for cross-SIP (DPPolicy intra-device only) + + +HOPS = [ + Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)", + (0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True), + Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)", + (0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True), + Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)", + (0, 0, 0), (0, 1, 0), "E", "W", True), + Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)", + (0, 0, 0), (0, 4, 0), "S", "N", True), + Hop("h5_inter_sip", "Inter-SIP (sip0 to sip1, same cube/pe)", + (0, 0, 0), (1, 0, 0), "global_E", "global_W", False), +] + + +def _make_engine(): + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + return engine, topo.topology_obj.spec + + +# ── IPCQ path ──────────────────────────────────────────────────────── + + +def _measure_ipcq(hop: Hop, nbytes: int) -> float: + engine, spec = _make_engine() + + cfg = load_ccl_config() + merged = resolve_algorithm_config(cfg, name="intercube_allreduce") + merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes) + + n_elem = nbytes // ELEM_BYTES + src_sip, src_cube, src_pe = hop.src + dst_sip, dst_cube, dst_pe = hop.dst + send_dir, recv_dir = hop.send_dir, hop.recv_dir + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id=f"ipcq_{hop.id}_{nbytes}", + spec=spec, + ) as ctx: + configure_sfr_intercube_multisip(engine, spec, merged) + + dp = DPPolicy( + cube="row_wise", pe="column_wise", + num_cubes=N_CUBES, num_pes=N_PES, + ) + + def kernel(t_ptr, n_elem, tl): + pe_id = tl.program_id(axis=0) + cube_id = tl.program_id(axis=1) + if cube_id == src_cube and pe_id == src_pe: + data = tl.load(t_ptr, shape=(n_elem,), dtype="f16") + tl.send(dir=send_dir, src=data) + elif cube_id == dst_cube and pe_id == dst_pe: + tl.recv(dir=recv_dir, shape=(n_elem,), dtype="f16") + + tensors = [] + for s in sorted({src_sip, dst_sip}): + ctx.ahbm.set_device(s) + t = ctx.zeros( + (N_CUBES, N_PES * n_elem), dtype="f16", + dp=dp, name=f"sip{s}", + ) + t.copy_(ctx.from_numpy( + np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16), + )) + tensors.append(t) + + all_pending = [] + for t in tensors: + pending = ctx.launch( + f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True, + ) + all_pending.extend(pending) + for h, sip_id, meta in all_pending: + ctx.wait(h, _meta=meta) + + # Per-PE kernel execution time (excludes launch dispatch and + # response aggregation). IPCQ: DST blocks on tl.recv until the + # send arrives, so max across SIPs = DST's transfer time. + pe_exec_vals = [] + for h, _sip, _meta in all_pending: + _, trace = engine.get_completion(h) + if trace and trace.get("pe_exec_ns") is not None: + pe_exec_vals.append(float(trace["pe_exec_ns"])) + + return max(pe_exec_vals) if pe_exec_vals else 0.0 + + +# ── Raw DMA path (intra-SIP only) ──────────────────────────────────── + + +def _measure_raw(hop: Hop, nbytes: int) -> float: + """tl.load from source slice + tl.store to destination slice. The VA + mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all + cubes of the SIP), so the store goes through the fabric to the + destination PE's HBM. No IPCQ protocol involved. + """ + if not hop.supports_raw: + raise RuntimeError(f"hop {hop.id} does not support raw path") + + engine, spec = _make_engine() + + n_elem = nbytes // ELEM_BYTES + src_sip, src_cube, src_pe = hop.src + dst_sip, dst_cube, dst_pe = hop.dst + assert src_sip == dst_sip + + # Slice offsets in the (N_CUBES, N_PES * n_elem) tensor: + # row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem + # Byte offsets from va_base: + src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES + dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id=f"raw_{hop.id}_{nbytes}", + spec=spec, + ) as ctx: + dp = DPPolicy( + cube="row_wise", pe="column_wise", + num_cubes=N_CUBES, num_pes=N_PES, + ) + ctx.ahbm.set_device(src_sip) + t = ctx.zeros( + (N_CUBES, N_PES * n_elem), dtype="f16", + dp=dp, name="raw_tensor", + ) + t.copy_(ctx.from_numpy( + np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16), + )) + + def kernel(t_ptr, n_elem, tl): + pe_id = tl.program_id(axis=0) + cube_id = tl.program_id(axis=1) + if cube_id == src_cube and pe_id == src_pe: + data = tl.load( + t_ptr + src_off, shape=(n_elem,), dtype="f16", + ) + tl.store(t_ptr + dst_off, data) + + pending = ctx.launch( + f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True, + ) + for h, sip_id, meta in pending: + ctx.wait(h, _meta=meta) + + # Per-PE kernel execution time. Raw: only SRC does real work + # (tl.load + tl.store, store is blocking), so max across all PEs + # = SRC's transfer time. Idle PEs contribute only overhead_ns. + pe_exec_vals = [] + for h, _sip, _meta in pending: + _, trace = engine.get_completion(h) + if trace and trace.get("pe_exec_ns") is not None: + pe_exec_vals.append(float(trace["pe_exec_ns"])) + + return max(pe_exec_vals) if pe_exec_vals else 0.0 + + +# ── CSV + plotting ─────────────────────────────────────────────────── + + +def _write_csv(records, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", newline="", encoding="utf-8") as f: + w = csv.DictWriter( + f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"], + ) + w.writeheader() + for r in records: + w.writerow(r) + + +def _plot_per_hop(records, hop: Hop, path: Path) -> None: + import matplotlib.pyplot as plt + + ipcq = sorted( + [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], + key=lambda r: r["size_bytes"], + ) + raw = sorted( + [r for r in records if r["hop"] == hop.id and r["path"] == "raw"], + key=lambda r: r["size_bytes"], + ) + + fig, ax = plt.subplots(figsize=(8, 5)) + if ipcq: + ax.plot( + [r["size_bytes"] for r in ipcq], + [r["total_ns"] for r in ipcq], + marker="o", label="IPCQ (send/recv)", color="tab:blue", + ) + if raw: + ax.plot( + [r["size_bytes"] for r in raw], + [r["total_ns"] for r in raw], + marker="s", label="Raw DMA (load+store)", color="tab:orange", + ) + else: + ax.text( + 0.98, 0.02, "(Raw DMA unavailable for cross-SIP)", + transform=ax.transAxes, ha="right", va="bottom", + fontsize=9, color="gray", + ) + ax.set_xlabel("Data size (bytes)") + ax.set_ylabel("Latency (ns)") + ax.set_title(hop.label) + ax.grid(True, alpha=0.3) + ax.legend() + fig.tight_layout() + fig.savefig(path, dpi=120) + plt.close(fig) + + +def _plot_overview(records, path: Path) -> None: + import matplotlib.pyplot as plt + + fig, axes = plt.subplots(2, 3, figsize=(16, 9)) + axes = axes.flatten() + for i, hop in enumerate(HOPS): + ax = axes[i] + ipcq = sorted( + [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], + key=lambda r: r["size_bytes"], + ) + raw = sorted( + [r for r in records if r["hop"] == hop.id and r["path"] == "raw"], + key=lambda r: r["size_bytes"], + ) + if ipcq: + ax.plot( + [r["size_bytes"] for r in ipcq], + [r["total_ns"] for r in ipcq], + marker="o", label="IPCQ", color="tab:blue", + ) + if raw: + ax.plot( + [r["size_bytes"] for r in raw], + [r["total_ns"] for r in raw], + marker="s", label="Raw", color="tab:orange", + ) + ax.set_title(hop.label, fontsize=10) + ax.set_xlabel("bytes") + ax.set_ylabel("ns") + ax.grid(True, alpha=0.3) + ax.legend(fontsize=8) + for j in range(len(HOPS), len(axes)): + axes[j].axis("off") + fig.suptitle( + "PE-to-PE latency: IPCQ vs raw DMA", + fontsize=14, + ) + fig.tight_layout() + fig.savefig(path, dpi=120) + plt.close(fig) + + +# ── Test entry ─────────────────────────────────────────────────────── + + +def test_pe_to_pe_latency_sweep(): + records: list[dict] = [] + + for hop in HOPS: + for size in SIZES: + # IPCQ path + ipcq_ns = _measure_ipcq(hop, size) + records.append({ + "hop": hop.id, "label": hop.label, + "size_bytes": size, "path": "ipcq", + "total_ns": ipcq_ns, + }) + + raw_s = "n/a" + if hop.supports_raw: + raw_ns = _measure_raw(hop, size) + records.append({ + "hop": hop.id, "label": hop.label, + "size_bytes": size, "path": "raw", + "total_ns": raw_ns, + }) + raw_s = f"{raw_ns:7.1f}ns" + + print( + f"[{hop.id}] size={size:5d} " + f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}" + ) + + PLOT_DIR.mkdir(parents=True, exist_ok=True) + _write_csv(records, PLOT_DIR / "summary.csv") + for hop in HOPS: + _plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png") + _plot_overview(records, PLOT_DIR / "overview.png") + + for hop in HOPS: + rs = sorted( + [r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"], + key=lambda r: r["size_bytes"], + ) + for r in rs: + assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0" + + print(f"\n Plots + CSV written to {PLOT_DIR}")