PE-to-PE latency test + supporting fixes
Adds tests/test_pe_to_pe_latency.py: a sweep that measures PE-to-PE transfer latency for five hop types (intra-cube horizontal/vertical, inter-cube horizontal/vertical, inter-SIP) across data sizes 128 B to 10 KB, on both the IPCQ (tl.send/tl.recv) and raw-DMA (tl.load+tl.store) paths. Emits per-hop PNG plots, an overview PNG, and a CSV summary into tests/pe2pe_latency_plots/. Latency is reported as max(pe_exec_ns) across participating PEs, read from engine.get_completion(), so the measurement captures the SRC/DST PE's kernel body time rather than the full launch+ response-aggregation envelope. Two simulator fixes were needed to make this measurement meaningful: - PeMMU now stores a list of (start, end, pa) sub-regions per page rather than a single PA. DPPolicy layouts with shards smaller than page_size (e.g. 128 B payloads with 4 KB pages) used to silently overwrite each other through last-write-wins, causing DMAs intended for cube0 to physically route to cube3 - inflating latency by ~170 ns per DMA at small sizes. STOPGAP: real MMUs don't support sub-page regions; long-term fix is either smaller MMU page size or DPPolicy validation that refuses sub-page shards. - M_CPU's per-PE metrics aggregation (pe_exec_ns, dma_ns, compute_ns) now max-merges against the existing value in result_data rather than overwriting. Multi-cube workloads share one result_data dict via IO_CPU fanout; the previous overwrite caused whichever cube's M_CPU finished last to clobber others' values, so multi-cube pe_exec_ns was racy and frequently 0. Same fix applied in legacy/builtin/m_cpu.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,358 @@
|
||||
"""PE-to-PE latency sweep across hop types and data sizes.
|
||||
|
||||
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for five
|
||||
hop types:
|
||||
|
||||
H1 Intra-cube horizontal pe0 → pe1
|
||||
H2 Intra-cube vertical pe0 → pe4
|
||||
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
|
||||
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
|
||||
H5 Inter-SIP sip0.cube0.pe0 → sip1.cube0.pe0 (IPCQ only —
|
||||
raw needs
|
||||
cross-SIP MMU)
|
||||
|
||||
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
|
||||
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
||||
from kernbench.policy.placement.dp import DPPolicy
|
||||
from kernbench.runtime_api.context import RuntimeContext
|
||||
from kernbench.runtime_api.types import DeviceSelector
|
||||
from kernbench.sim_engine.engine import GraphEngine
|
||||
from kernbench.topology.builder import resolve_topology
|
||||
|
||||
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||
PLOT_DIR = Path(__file__).parent / "pe2pe_latency_plots"
|
||||
|
||||
SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240]
|
||||
|
||||
N_CUBES = 16
|
||||
N_PES = 8
|
||||
ELEM_BYTES = 2 # f16
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Hop:
|
||||
id: str
|
||||
label: str
|
||||
src: tuple[int, int, int]
|
||||
dst: tuple[int, int, int]
|
||||
send_dir: str
|
||||
recv_dir: str
|
||||
supports_raw: bool # False for cross-SIP (DPPolicy intra-device only)
|
||||
|
||||
|
||||
HOPS = [
|
||||
Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)",
|
||||
(0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True),
|
||||
Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)",
|
||||
(0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True),
|
||||
Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)",
|
||||
(0, 0, 0), (0, 1, 0), "E", "W", True),
|
||||
Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)",
|
||||
(0, 0, 0), (0, 4, 0), "S", "N", True),
|
||||
Hop("h5_inter_sip", "Inter-SIP (sip0 to sip1, same cube/pe)",
|
||||
(0, 0, 0), (1, 0, 0), "global_E", "global_W", False),
|
||||
]
|
||||
|
||||
|
||||
def _make_engine():
|
||||
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
return engine, topo.topology_obj.spec
|
||||
|
||||
|
||||
# ── IPCQ path ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _measure_ipcq(hop: Hop, nbytes: int) -> float:
|
||||
engine, spec = _make_engine()
|
||||
|
||||
cfg = load_ccl_config()
|
||||
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
|
||||
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes)
|
||||
|
||||
n_elem = nbytes // ELEM_BYTES
|
||||
src_sip, src_cube, src_pe = hop.src
|
||||
dst_sip, dst_cube, dst_pe = hop.dst
|
||||
send_dir, recv_dir = hop.send_dir, hop.recv_dir
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id=f"ipcq_{hop.id}_{nbytes}",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
configure_sfr_intercube_multisip(engine, spec, merged)
|
||||
|
||||
dp = DPPolicy(
|
||||
cube="row_wise", pe="column_wise",
|
||||
num_cubes=N_CUBES, num_pes=N_PES,
|
||||
)
|
||||
|
||||
def kernel(t_ptr, n_elem, tl):
|
||||
pe_id = tl.program_id(axis=0)
|
||||
cube_id = tl.program_id(axis=1)
|
||||
if cube_id == src_cube and pe_id == src_pe:
|
||||
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
|
||||
tl.send(dir=send_dir, src=data)
|
||||
elif cube_id == dst_cube and pe_id == dst_pe:
|
||||
tl.recv(dir=recv_dir, shape=(n_elem,), dtype="f16")
|
||||
|
||||
tensors = []
|
||||
for s in sorted({src_sip, dst_sip}):
|
||||
ctx.ahbm.set_device(s)
|
||||
t = ctx.zeros(
|
||||
(N_CUBES, N_PES * n_elem), dtype="f16",
|
||||
dp=dp, name=f"sip{s}",
|
||||
)
|
||||
t.copy_(ctx.from_numpy(
|
||||
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
||||
))
|
||||
tensors.append(t)
|
||||
|
||||
all_pending = []
|
||||
for t in tensors:
|
||||
pending = ctx.launch(
|
||||
f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True,
|
||||
)
|
||||
all_pending.extend(pending)
|
||||
for h, sip_id, meta in all_pending:
|
||||
ctx.wait(h, _meta=meta)
|
||||
|
||||
# Per-PE kernel execution time (excludes launch dispatch and
|
||||
# response aggregation). IPCQ: DST blocks on tl.recv until the
|
||||
# send arrives, so max across SIPs = DST's transfer time.
|
||||
pe_exec_vals = []
|
||||
for h, _sip, _meta in all_pending:
|
||||
_, trace = engine.get_completion(h)
|
||||
if trace and trace.get("pe_exec_ns") is not None:
|
||||
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
||||
|
||||
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||
|
||||
|
||||
# ── Raw DMA path (intra-SIP only) ────────────────────────────────────
|
||||
|
||||
|
||||
def _measure_raw(hop: Hop, nbytes: int) -> float:
|
||||
"""tl.load from source slice + tl.store to destination slice. The VA
|
||||
mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all
|
||||
cubes of the SIP), so the store goes through the fabric to the
|
||||
destination PE's HBM. No IPCQ protocol involved.
|
||||
"""
|
||||
if not hop.supports_raw:
|
||||
raise RuntimeError(f"hop {hop.id} does not support raw path")
|
||||
|
||||
engine, spec = _make_engine()
|
||||
|
||||
n_elem = nbytes // ELEM_BYTES
|
||||
src_sip, src_cube, src_pe = hop.src
|
||||
dst_sip, dst_cube, dst_pe = hop.dst
|
||||
assert src_sip == dst_sip
|
||||
|
||||
# Slice offsets in the (N_CUBES, N_PES * n_elem) tensor:
|
||||
# row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem
|
||||
# Byte offsets from va_base:
|
||||
src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES
|
||||
dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id=f"raw_{hop.id}_{nbytes}",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
dp = DPPolicy(
|
||||
cube="row_wise", pe="column_wise",
|
||||
num_cubes=N_CUBES, num_pes=N_PES,
|
||||
)
|
||||
ctx.ahbm.set_device(src_sip)
|
||||
t = ctx.zeros(
|
||||
(N_CUBES, N_PES * n_elem), dtype="f16",
|
||||
dp=dp, name="raw_tensor",
|
||||
)
|
||||
t.copy_(ctx.from_numpy(
|
||||
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
||||
))
|
||||
|
||||
def kernel(t_ptr, n_elem, tl):
|
||||
pe_id = tl.program_id(axis=0)
|
||||
cube_id = tl.program_id(axis=1)
|
||||
if cube_id == src_cube and pe_id == src_pe:
|
||||
data = tl.load(
|
||||
t_ptr + src_off, shape=(n_elem,), dtype="f16",
|
||||
)
|
||||
tl.store(t_ptr + dst_off, data)
|
||||
|
||||
pending = ctx.launch(
|
||||
f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True,
|
||||
)
|
||||
for h, sip_id, meta in pending:
|
||||
ctx.wait(h, _meta=meta)
|
||||
|
||||
# Per-PE kernel execution time. Raw: only SRC does real work
|
||||
# (tl.load + tl.store, store is blocking), so max across all PEs
|
||||
# = SRC's transfer time. Idle PEs contribute only overhead_ns.
|
||||
pe_exec_vals = []
|
||||
for h, _sip, _meta in pending:
|
||||
_, trace = engine.get_completion(h)
|
||||
if trace and trace.get("pe_exec_ns") is not None:
|
||||
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
||||
|
||||
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||
|
||||
|
||||
# ── CSV + plotting ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _write_csv(records, path: Path) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w", newline="", encoding="utf-8") as f:
|
||||
w = csv.DictWriter(
|
||||
f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"],
|
||||
)
|
||||
w.writeheader()
|
||||
for r in records:
|
||||
w.writerow(r)
|
||||
|
||||
|
||||
def _plot_per_hop(records, hop: Hop, path: Path) -> None:
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
ipcq = sorted(
|
||||
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||
key=lambda r: r["size_bytes"],
|
||||
)
|
||||
raw = sorted(
|
||||
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
||||
key=lambda r: r["size_bytes"],
|
||||
)
|
||||
|
||||
fig, ax = plt.subplots(figsize=(8, 5))
|
||||
if ipcq:
|
||||
ax.plot(
|
||||
[r["size_bytes"] for r in ipcq],
|
||||
[r["total_ns"] for r in ipcq],
|
||||
marker="o", label="IPCQ (send/recv)", color="tab:blue",
|
||||
)
|
||||
if raw:
|
||||
ax.plot(
|
||||
[r["size_bytes"] for r in raw],
|
||||
[r["total_ns"] for r in raw],
|
||||
marker="s", label="Raw DMA (load+store)", color="tab:orange",
|
||||
)
|
||||
else:
|
||||
ax.text(
|
||||
0.98, 0.02, "(Raw DMA unavailable for cross-SIP)",
|
||||
transform=ax.transAxes, ha="right", va="bottom",
|
||||
fontsize=9, color="gray",
|
||||
)
|
||||
ax.set_xlabel("Data size (bytes)")
|
||||
ax.set_ylabel("Latency (ns)")
|
||||
ax.set_title(hop.label)
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.legend()
|
||||
fig.tight_layout()
|
||||
fig.savefig(path, dpi=120)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def _plot_overview(records, path: Path) -> None:
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
fig, axes = plt.subplots(2, 3, figsize=(16, 9))
|
||||
axes = axes.flatten()
|
||||
for i, hop in enumerate(HOPS):
|
||||
ax = axes[i]
|
||||
ipcq = sorted(
|
||||
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||
key=lambda r: r["size_bytes"],
|
||||
)
|
||||
raw = sorted(
|
||||
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
||||
key=lambda r: r["size_bytes"],
|
||||
)
|
||||
if ipcq:
|
||||
ax.plot(
|
||||
[r["size_bytes"] for r in ipcq],
|
||||
[r["total_ns"] for r in ipcq],
|
||||
marker="o", label="IPCQ", color="tab:blue",
|
||||
)
|
||||
if raw:
|
||||
ax.plot(
|
||||
[r["size_bytes"] for r in raw],
|
||||
[r["total_ns"] for r in raw],
|
||||
marker="s", label="Raw", color="tab:orange",
|
||||
)
|
||||
ax.set_title(hop.label, fontsize=10)
|
||||
ax.set_xlabel("bytes")
|
||||
ax.set_ylabel("ns")
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.legend(fontsize=8)
|
||||
for j in range(len(HOPS), len(axes)):
|
||||
axes[j].axis("off")
|
||||
fig.suptitle(
|
||||
"PE-to-PE latency: IPCQ vs raw DMA",
|
||||
fontsize=14,
|
||||
)
|
||||
fig.tight_layout()
|
||||
fig.savefig(path, dpi=120)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
# ── Test entry ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_pe_to_pe_latency_sweep():
|
||||
records: list[dict] = []
|
||||
|
||||
for hop in HOPS:
|
||||
for size in SIZES:
|
||||
# IPCQ path
|
||||
ipcq_ns = _measure_ipcq(hop, size)
|
||||
records.append({
|
||||
"hop": hop.id, "label": hop.label,
|
||||
"size_bytes": size, "path": "ipcq",
|
||||
"total_ns": ipcq_ns,
|
||||
})
|
||||
|
||||
raw_s = "n/a"
|
||||
if hop.supports_raw:
|
||||
raw_ns = _measure_raw(hop, size)
|
||||
records.append({
|
||||
"hop": hop.id, "label": hop.label,
|
||||
"size_bytes": size, "path": "raw",
|
||||
"total_ns": raw_ns,
|
||||
})
|
||||
raw_s = f"{raw_ns:7.1f}ns"
|
||||
|
||||
print(
|
||||
f"[{hop.id}] size={size:5d} "
|
||||
f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}"
|
||||
)
|
||||
|
||||
PLOT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
_write_csv(records, PLOT_DIR / "summary.csv")
|
||||
for hop in HOPS:
|
||||
_plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png")
|
||||
_plot_overview(records, PLOT_DIR / "overview.png")
|
||||
|
||||
for hop in HOPS:
|
||||
rs = sorted(
|
||||
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||
key=lambda r: r["size_bytes"],
|
||||
)
|
||||
for r in rs:
|
||||
assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0"
|
||||
|
||||
print(f"\n Plots + CSV written to {PLOT_DIR}")
|
||||
Reference in New Issue
Block a user