Files
kernbench2/tests/test_probe.py
T
ywkang 049e3d8bb3 benches: package as kernbench.benches, add @bench registry + list subcommand
Move benches/ -> src/kernbench/benches/ and src/kernbench/cli/probe.py ->
src/kernbench/probes/probe.py. Each bench self-registers via
@bench(name=..., description=...); kernbench list enumerates benches
with auto-assigned indices, --bench accepts kebab-case name or numeric
index. Audit at package-import time fails if any non-underscore module
forgets the decorator. ADR-0010 (EN + KO) updated to reflect the new
resolver path, list subcommand, and probes package separation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 14:42:10 -07:00

352 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for H2D writes and PE DMA probe latency invariants.
H2D tests use MemoryWriteMsg (pcie_ep → io_cpu → m_cpu → hbm_ctrl → response).
PE DMA tests use PeDmaMsg (direct pe_dma → router mesh → hbm_ctrl injection).
"""
from pathlib import Path
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.runtime_api.kernel import MemoryReadMsg, MemoryWriteMsg, PeDmaMsg
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _engine():
return GraphEngine(load_topology(TOPOLOGY_PATH))
def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int:
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
sip_id=sip, die_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
def _h2d_latency(dst_cube: int, dst_pe: int = 0) -> float:
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="probe", request_id=f"h2d-c{dst_cube}-p{dst_pe}",
dst_sip=0, dst_cube=dst_cube, dst_pe=dst_pe,
dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=dst_pe), nbytes=4096,
pattern="zero", target_pe=dst_pe,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
# ── 1. Single-PE write completes ──────────────────────────────────
def test_single_pe_write_completes():
"""MemoryWriteMsg(target_pe=0) must complete with ok=True, latency > 0."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="probe", request_id="pe-local",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
# ── 2. Cross-cube write positive latency ─────────────────────────
def test_cross_cube_write_positive():
"""Cross-cube MemoryWriteMsg(target_pe=0) must complete with latency > 0."""
lat = _h2d_latency(dst_cube=1, dst_pe=0)
assert lat > 0
# ── 3. H2D latency monotonicity ──────────────────────────────────
def test_h2d_latency_monotonic():
"""1hop < 2hop < 3hop < 4hop."""
cubes = [0, 4, 8, 12]
latencies: list[tuple[int, float]] = []
for cube in cubes:
lat = _h2d_latency(dst_cube=cube, dst_pe=0)
latencies.append((cube, lat))
for i in range(len(latencies) - 1):
assert latencies[i][1] < latencies[i + 1][1], (
f"cube{latencies[i][0]}({latencies[i][1]:.2f}) "
f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})"
)
# ── 4. Single-PE write deterministic ─────────────────────────────
def test_single_pe_write_deterministic():
"""Same MemoryWriteMsg on two engines must produce identical latency."""
msg = MemoryWriteMsg(
correlation_id="probe", request_id="det",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 5. Cut-through (wormhole) wire model invariants ──────────────
def test_h2d_local_cube_cut_through():
"""H2D to local cube with cut-through should be well below store-and-forward.
Full command path: pcie_ep → io_cpu → ucie → noc → m_cpu
DMA: m_cpu → router mesh → hbm_ctrl (drain once at bottleneck link)
Plus response path back. With store-and-forward each hop would serialize
nbytes through it (~5 × drain = 160ns for 4KB through UCIe 128 GB/s);
cut-through (ADR-0033 Phase 2c wormhole) keeps total dominated by the
single bottleneck transit.
"""
lat = _h2d_latency(dst_cube=0, dst_pe=0)
assert lat < 80.0, f"Local H2D {lat:.2f}ns; cut-through expects < 80ns (SAW would be > 160ns)"
def test_h2d_remote_cube_cut_through():
"""H2D to 1-hop remote cube: cut-through drain dominates, not per-hop serialization.
With store-and-forward, each hop would serialize 4096B, total >> 100ns.
With cut-through, drain happens once at bottleneck.
"""
lat = _h2d_latency(dst_cube=4, dst_pe=0)
assert lat < 120.0, f"Remote H2D {lat:.2f}ns; cut-through expects < 120ns"
# ── 6. PE DMA: direct injection tests ─────────────────────────
def _graph():
return load_topology(TOPOLOGY_PATH)
def _hbm_effective_bw() -> float:
"""Compute HBM effective BW from topology spec: hbm_to_router_bw_gbs * efficiency."""
g = _graph()
raw_bw = g.spec["cube"]["links"]["hbm_to_router_bw_gbs"]
eff = g.spec["cube"]["components"]["hbm_ctrl"].get("attrs", {}).get("efficiency", 1.0)
return raw_bw * eff
def _pe_dma_latency(src_cube: int, src_pe: int, dst_pe: int) -> float:
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id=f"dma-c{src_cube}-p{src_pe}-s{dst_pe}",
src_sip=0, src_cube=src_cube, src_pe=src_pe,
dst_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def _pe_dma_bottleneck(src_cube: int, src_pe: int, dst_pe: int) -> float | None:
graph = _graph()
edge_map = {(e.src, e.dst): e for e in graph.edges}
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe)
pa_obj = PhysAddr.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = f"sip0.cube{src_cube}.pe{src_pe}"
path = router.find_path(pe_ref, dst_node)
bws: list[float] = []
for i in range(len(path) - 1):
e = edge_map.get((path[i], path[i + 1]))
if e and e.bw_gbs:
bws.append(e.bw_gbs)
return min(bws) if bws else None
def test_pe_dma_local_completes():
"""PeDmaMsg to local slice0 must complete with ok=True, latency > 0."""
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id="dma-local",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
def test_pe_dma_local_bottleneck_hbm():
"""PE DMA pe0→slice0 (local): bottleneck = HBM effective BW."""
bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=0)
expected = _hbm_effective_bw()
assert bn == expected, f"Local PE DMA bottleneck {bn}, expected {expected}"
def test_pe_dma_same_half_bottleneck_hbm():
"""PE DMA pe0→pe1 HBM (same row via router mesh): bottleneck = HBM effective BW."""
bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=1)
expected = _hbm_effective_bw()
assert bn == expected, f"Same-half PE DMA bottleneck {bn}, expected {expected}"
def test_pe_dma_deterministic():
"""Same PeDmaMsg on two engines must produce identical latency."""
msg = PeDmaMsg(
correlation_id="probe", request_id="det",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 7. PE DMA cross-cube best vs worst ──────────────────────────
def _pe_dma_cross_cube_latency(dst_cube: int) -> float:
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id=f"dma-cross-c{dst_cube}",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def test_pe_cross_cube_best_worst():
"""Cross-cube best (adjacent cube1) must have lower latency than worst (far cube15)."""
best = _pe_dma_cross_cube_latency(dst_cube=1)
worst = _pe_dma_cross_cube_latency(dst_cube=15)
assert best < worst, (
f"Best (cube1) {best:.2f}ns must < worst (cube15) {worst:.2f}ns"
)
# ── 8. Probe timestamp trace ──────────────────────────────────
def test_probe_timestamp_trace():
"""_hop_timestamps must return monotonically increasing cumulative timestamps."""
from kernbench.probes.probe import _hop_timestamps, _build_edge_map
graph = _graph()
edge_map = _build_edge_map(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=0)
pa_obj = PhysAddr.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
timestamps = _hop_timestamps(path, 4096, edge_map, graph)
assert len(timestamps) == len(path)
for i in range(len(timestamps) - 1):
assert timestamps[i][1] <= timestamps[i + 1][1], (
f"Timestamps not monotonic at hop {i}: "
f"{timestamps[i][1]:.4f} > {timestamps[i + 1][1]:.4f}"
)
# ── 9. D2H Read latency monotonicity ────────────────────────────
def _d2h_latency(src_cube: int) -> float:
engine = _engine()
msg = MemoryReadMsg(
correlation_id="probe", request_id=f"d2h-c{src_cube}",
src_sip=0, src_cube=src_cube, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def test_d2h_latency_monotonic():
"""D2H read: 1hop < 2hop < 3hop < 4hop."""
cubes = [0, 4, 8, 12]
latencies = [(c, _d2h_latency(c)) for c in cubes]
for i in range(len(latencies) - 1):
assert latencies[i][1] < latencies[i + 1][1], (
f"cube{latencies[i][0]}({latencies[i][1]:.2f}) "
f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})"
)
def test_d2h_latency_gte_h2d():
"""D2H read latency >= H2D write latency for same cube (reverse data path)."""
for cube in [0, 4, 8]:
h2d = _h2d_latency(dst_cube=cube, dst_pe=0)
d2h = _d2h_latency(src_cube=cube)
assert d2h >= h2d * 0.8, (
f"cube{cube}: D2H ({d2h:.2f}ns) should be >= 80% of H2D ({h2d:.2f}ns)"
)
# ── 10. HBM efficiency applied ──────────────────────────────────
def test_hbm_efficiency_applied():
"""HBM edge BW should reflect efficiency factor from topology spec."""
graph = _graph()
# Find any router_to_hbm edge for cube0
hbm_edge = None
for e in graph.edges:
if e.kind == "router_to_hbm" and "cube0" in e.src:
hbm_edge = e
break
assert hbm_edge is not None, "router → hbm_ctrl edge missing"
expected = _hbm_effective_bw()
assert hbm_edge.bw_gbs == expected, f"HBM edge BW {hbm_edge.bw_gbs}, expected {expected}"
# ── 11. Sweep saturation ──────────────────────────────────────
def test_probe_sweep_saturation():
"""Utilization at 1MB must exceed utilization at 4KB for pe-local-hbm."""
from kernbench.probes.probe import _sweep_util
# pe-local-hbm: ovhd=2ns (router), wire~0.03ns, bn from topology
bn = _hbm_effective_bw()
u = _sweep_util(2.0, 0.03, bn)
assert u[-1] > u[0], (
f"1MB util ({u[-1]:.1f}%) must exceed 4KB util ({u[0]:.1f}%)"
)
assert u[-1] > 99.0, f"1MB util ({u[-1]:.1f}%) should be >99%"