ff7d727ddd
Rename the intercube all-reduce identity to lrab_hierarchical_allreduce (module, config key, distributed test) so the name reflects both levels it implements: LRAB intra-SIP (local reduce to center root + broadcast) and the hierarchical inter-SIP topology exchange (ring/torus/mesh). ADR-0032 slug kept as the stable decision id; pure rename, no logic change. Also in this batch: - ADR-0032 (EN+KO): document the shipped center-root bidirectional reduce (doc was stale corner-root); annotate ccl.yaml root_cube as a placeholder. - Rename allreduce + pe2pe latency plots to descriptive, title-matching filenames and retitle the in-plot headings; drop overview/overview_log. - Point the PPTX image refs at the new plot names. Doc + derived-artifact + rename only; no simulation behavior changed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
366 lines
13 KiB
Python
366 lines
13 KiB
Python
"""PE-to-PE latency sweep across hop types and data sizes.
|
|
|
|
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for four
|
|
hop types. The IPCQ path uses ``tl.recv_no_consume(...)`` so that DST
|
|
does not pay the slot-read latency — apples-to-apples with the DMA
|
|
path, which is a one-sided write that has no read on DST.
|
|
|
|
``tl.recv_no_consume`` is a DIAGNOSTIC-only entry point that exists
|
|
solely to draw this graph; production kernels use ``tl.recv``.
|
|
|
|
H1 Intra-cube horizontal pe0 → pe1
|
|
H2 Intra-cube vertical pe0 → pe4
|
|
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
|
|
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
|
|
|
|
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
|
|
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
|
from kernbench.policy.placement.dp import DPPolicy
|
|
from kernbench.runtime_api.context import RuntimeContext
|
|
from kernbench.runtime_api.types import DeviceSelector
|
|
from kernbench.sim_engine.engine import GraphEngine
|
|
from kernbench.topology.builder import resolve_topology
|
|
|
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
|
PLOT_DIR = (
|
|
Path(__file__).parent.parent / "docs" / "diagrams" / "pe2pe_latency_plots"
|
|
)
|
|
|
|
SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240]
|
|
|
|
N_CUBES = 16
|
|
N_PES = 8
|
|
ELEM_BYTES = 2 # f16
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Hop:
|
|
id: str
|
|
label: str
|
|
src: tuple[int, int, int]
|
|
dst: tuple[int, int, int]
|
|
send_dir: str
|
|
recv_dir: str
|
|
supports_raw: bool
|
|
|
|
|
|
HOPS = [
|
|
Hop("latency_intracube_PE0_to_PE1_horizontal",
|
|
"Intra-cube PE-to-PE latency: PE0 → PE1 (horizontal)",
|
|
(0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True),
|
|
Hop("latency_intracube_PE0_to_PE4_vertical",
|
|
"Intra-cube PE-to-PE latency: PE0 → PE4 (vertical)",
|
|
(0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True),
|
|
Hop("latency_intercube_C0PE0_to_C1PE0_horizontal",
|
|
"Inter-cube PE-to-PE latency: Cube0.PE0 → Cube1.PE0 (horizontal)",
|
|
(0, 0, 0), (0, 1, 0), "E", "W", True),
|
|
Hop("latency_intercube_C0PE0_to_C4PE0_vertical",
|
|
"Inter-cube PE-to-PE latency: Cube0.PE0 → Cube4.PE0 (vertical)",
|
|
(0, 0, 0), (0, 4, 0), "S", "N", True),
|
|
]
|
|
|
|
|
|
def _make_engine():
|
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
|
return engine, topo.topology_obj.spec
|
|
|
|
|
|
# ── IPCQ path ────────────────────────────────────────────────────────
|
|
|
|
|
|
def _measure_ipcq(hop: Hop, nbytes: int) -> float:
|
|
engine, spec = _make_engine()
|
|
|
|
cfg = load_ccl_config()
|
|
merged = resolve_algorithm_config(cfg, name="lrab_hierarchical_allreduce")
|
|
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes)
|
|
|
|
n_elem = nbytes // ELEM_BYTES
|
|
src_sip, src_cube, src_pe = hop.src
|
|
dst_sip, dst_cube, dst_pe = hop.dst
|
|
send_dir, recv_dir = hop.send_dir, hop.recv_dir
|
|
|
|
with RuntimeContext(
|
|
engine=engine,
|
|
target_device=DeviceSelector("all"),
|
|
correlation_id=f"ipcq_{hop.id}_{nbytes}",
|
|
spec=spec,
|
|
) as ctx:
|
|
configure_sfr_intercube_multisip(engine, spec, merged)
|
|
|
|
dp = DPPolicy(
|
|
cube="row_wise", pe="column_wise",
|
|
num_cubes=N_CUBES, num_pes=N_PES,
|
|
)
|
|
|
|
def kernel(t_ptr, n_elem, tl):
|
|
pe_id = tl.program_id(axis=0)
|
|
cube_id = tl.program_id(axis=1)
|
|
if cube_id == src_cube and pe_id == src_pe:
|
|
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
|
|
tl.send(dir=send_dir, src=data)
|
|
elif cube_id == dst_cube and pe_id == dst_pe:
|
|
# tl.recv_no_consume: DST blocks until bytes land in
|
|
# slot but skips slot-read latency. Apples-to-apples
|
|
# with the raw-DMA path below, which has no DST read.
|
|
# Diagnostic-only — production kernels use tl.recv.
|
|
tl.recv_no_consume(dir=recv_dir,
|
|
shape=(n_elem,), dtype="f16")
|
|
|
|
tensors = []
|
|
for s in sorted({src_sip, dst_sip}):
|
|
ctx.ahbm.set_device(s)
|
|
t = ctx.zeros(
|
|
(N_CUBES, N_PES * n_elem), dtype="f16",
|
|
dp=dp, name=f"sip{s}",
|
|
)
|
|
t.copy_(ctx.from_numpy(
|
|
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
|
))
|
|
tensors.append(t)
|
|
|
|
all_pending = []
|
|
for t in tensors:
|
|
pending = ctx.launch(
|
|
f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True,
|
|
)
|
|
all_pending.extend(pending)
|
|
for h, sip_id, meta in all_pending:
|
|
ctx.wait(h, _meta=meta)
|
|
|
|
# Per-PE kernel execution time (excludes launch dispatch and
|
|
# response aggregation). IPCQ: DST blocks on tl.recv until the
|
|
# send arrives, so max across SIPs = DST's transfer time.
|
|
pe_exec_vals = []
|
|
for h, _sip, _meta in all_pending:
|
|
_, trace = engine.get_completion(h)
|
|
if trace and trace.get("pe_exec_ns") is not None:
|
|
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
|
|
|
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
|
|
|
|
|
# ── Raw DMA path (intra-SIP only) ────────────────────────────────────
|
|
|
|
|
|
def _measure_raw(hop: Hop, nbytes: int) -> float:
|
|
"""tl.load from source slice + tl.store to destination slice. The VA
|
|
mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all
|
|
cubes of the SIP), so the store goes through the fabric to the
|
|
destination PE's HBM. No IPCQ protocol involved.
|
|
"""
|
|
if not hop.supports_raw:
|
|
raise RuntimeError(f"hop {hop.id} does not support raw path")
|
|
|
|
engine, spec = _make_engine()
|
|
|
|
n_elem = nbytes // ELEM_BYTES
|
|
src_sip, src_cube, src_pe = hop.src
|
|
dst_sip, dst_cube, dst_pe = hop.dst
|
|
assert src_sip == dst_sip
|
|
|
|
# Slice offsets in the (N_CUBES, N_PES * n_elem) tensor:
|
|
# row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem
|
|
# Byte offsets from va_base:
|
|
src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES
|
|
dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES
|
|
|
|
with RuntimeContext(
|
|
engine=engine,
|
|
target_device=DeviceSelector("all"),
|
|
correlation_id=f"raw_{hop.id}_{nbytes}",
|
|
spec=spec,
|
|
) as ctx:
|
|
dp = DPPolicy(
|
|
cube="row_wise", pe="column_wise",
|
|
num_cubes=N_CUBES, num_pes=N_PES,
|
|
)
|
|
ctx.ahbm.set_device(src_sip)
|
|
t = ctx.zeros(
|
|
(N_CUBES, N_PES * n_elem), dtype="f16",
|
|
dp=dp, name="raw_tensor",
|
|
)
|
|
t.copy_(ctx.from_numpy(
|
|
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
|
))
|
|
|
|
def kernel(t_ptr, n_elem, tl):
|
|
pe_id = tl.program_id(axis=0)
|
|
cube_id = tl.program_id(axis=1)
|
|
if cube_id == src_cube and pe_id == src_pe:
|
|
data = tl.load(
|
|
t_ptr + src_off, shape=(n_elem,), dtype="f16",
|
|
)
|
|
tl.store(t_ptr + dst_off, data)
|
|
|
|
pending = ctx.launch(
|
|
f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True,
|
|
)
|
|
for h, sip_id, meta in pending:
|
|
ctx.wait(h, _meta=meta)
|
|
|
|
# Per-PE kernel execution time. Raw: only SRC does real work
|
|
# (tl.load + tl.store, store is blocking), so max across all PEs
|
|
# = SRC's transfer time. Idle PEs contribute only overhead_ns.
|
|
pe_exec_vals = []
|
|
for h, _sip, _meta in pending:
|
|
_, trace = engine.get_completion(h)
|
|
if trace and trace.get("pe_exec_ns") is not None:
|
|
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
|
|
|
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
|
|
|
|
|
# ── CSV + plotting ───────────────────────────────────────────────────
|
|
|
|
|
|
def _write_csv(records, path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(
|
|
f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"],
|
|
)
|
|
w.writeheader()
|
|
for r in records:
|
|
w.writerow(r)
|
|
|
|
|
|
def _plot_per_hop(records, hop: Hop, path: Path) -> None:
|
|
import matplotlib.pyplot as plt
|
|
|
|
ipcq = sorted(
|
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
|
key=lambda r: r["size_bytes"],
|
|
)
|
|
raw = sorted(
|
|
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
|
key=lambda r: r["size_bytes"],
|
|
)
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 5))
|
|
if ipcq:
|
|
ax.plot(
|
|
[r["size_bytes"] for r in ipcq],
|
|
[r["total_ns"] for r in ipcq],
|
|
marker="o", label="IPCQ no-consume (send/recv, no slot read)",
|
|
color="tab:blue",
|
|
)
|
|
if raw:
|
|
ax.plot(
|
|
[r["size_bytes"] for r in raw],
|
|
[r["total_ns"] for r in raw],
|
|
marker="s", label="Raw DMA (load+store)", color="tab:orange",
|
|
)
|
|
ax.set_xlabel("Data size (bytes)")
|
|
ax.set_ylabel("Latency (ns)")
|
|
ax.set_title(hop.label)
|
|
ax.grid(True, alpha=0.3)
|
|
ax.legend()
|
|
fig.tight_layout()
|
|
fig.savefig(path, dpi=120)
|
|
plt.close(fig)
|
|
|
|
|
|
def _plot_overview(records, path: Path) -> None:
|
|
import matplotlib.pyplot as plt
|
|
|
|
fig, axes = plt.subplots(2, 2, figsize=(13, 9))
|
|
axes = axes.flatten()
|
|
for i, hop in enumerate(HOPS):
|
|
ax = axes[i]
|
|
ipcq = sorted(
|
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
|
key=lambda r: r["size_bytes"],
|
|
)
|
|
raw = sorted(
|
|
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
|
key=lambda r: r["size_bytes"],
|
|
)
|
|
if ipcq:
|
|
ax.plot(
|
|
[r["size_bytes"] for r in ipcq],
|
|
[r["total_ns"] for r in ipcq],
|
|
marker="o", label="IPCQ no-consume", color="tab:blue",
|
|
)
|
|
if raw:
|
|
ax.plot(
|
|
[r["size_bytes"] for r in raw],
|
|
[r["total_ns"] for r in raw],
|
|
marker="s", label="Raw DMA", color="tab:orange",
|
|
)
|
|
ax.set_title(hop.label, fontsize=10)
|
|
ax.set_xlabel("bytes")
|
|
ax.set_ylabel("ns")
|
|
ax.grid(True, alpha=0.3)
|
|
ax.legend(fontsize=8)
|
|
for j in range(len(HOPS), len(axes)):
|
|
axes[j].axis("off")
|
|
fig.suptitle(
|
|
"PE-to-PE latency: IPCQ no-consume vs raw DMA",
|
|
fontsize=14,
|
|
)
|
|
fig.tight_layout()
|
|
fig.savefig(path, dpi=120)
|
|
plt.close(fig)
|
|
|
|
|
|
# ── Test entry ───────────────────────────────────────────────────────
|
|
|
|
|
|
def test_pe_to_pe_latency_sweep():
|
|
records: list[dict] = []
|
|
|
|
for hop in HOPS:
|
|
for size in SIZES:
|
|
# IPCQ path uses tl.recv(consume=False) — apples-to-apples
|
|
# with the raw-DMA path, which has no DST read either.
|
|
ipcq_ns = _measure_ipcq(hop, size)
|
|
records.append({
|
|
"hop": hop.id, "label": hop.label,
|
|
"size_bytes": size, "path": "ipcq",
|
|
"total_ns": ipcq_ns,
|
|
})
|
|
|
|
raw_s = "n/a"
|
|
if hop.supports_raw:
|
|
raw_ns = _measure_raw(hop, size)
|
|
records.append({
|
|
"hop": hop.id, "label": hop.label,
|
|
"size_bytes": size, "path": "raw",
|
|
"total_ns": raw_ns,
|
|
})
|
|
raw_s = f"{raw_ns:7.1f}ns"
|
|
|
|
print(
|
|
f"[{hop.id}] size={size:5d} "
|
|
f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}"
|
|
)
|
|
|
|
PLOT_DIR.mkdir(parents=True, exist_ok=True)
|
|
_write_csv(records, PLOT_DIR / "summary.csv")
|
|
for hop in HOPS:
|
|
_plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png")
|
|
_plot_overview(records, PLOT_DIR / "overview.png")
|
|
|
|
for hop in HOPS:
|
|
rs = sorted(
|
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
|
key=lambda r: r["size_bytes"],
|
|
)
|
|
for r in rs:
|
|
assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0"
|
|
|
|
print(f"\n Plots + CSV written to {PLOT_DIR}")
|