Files
kernbench2/tests/test_probe.py
T
ywkang d75da439c6 Add probe CLI improvements, D2H read, UCIe/HBM tuning, BW sweep
- Probe CLI: restructured output (tables first, routes below), per-hop
  timestamps, split cross-cube into best/worst cases, D2H read section
- UCIe overhead: 1ns -> 8ns per port (16ns per crossing) to fix
  cross-cube-best < cross-half latency inversion
- HBM efficiency: added efficiency=0.8 factor to hbm_ctrl, reducing
  effective BW from 256 to 204.8 GB/s
- Multi-size BW sweep: saturation tables (4KB-1MB) for all probe cases
- Probe default data size: 4KB -> 32KB for more realistic measurements
- IOChiplet NOC + D2H topology and tests
- NOC mesh, xbar, BW occupancy components and tests
- Cube mesh visualization diagram

278 tests pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 01:16:18 -07:00

334 lines
12 KiB
Python

"""Tests for H2D writes and PE DMA probe latency invariants.
H2D tests use MemoryWriteMsg (pcie_ep → io_cpu → m_cpu → hbm_ctrl → response).
PE DMA tests use PeDmaMsg (direct pe_dma → xbar → hbm_ctrl injection).
"""
from pathlib import Path
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.runtime_api.kernel import MemoryReadMsg, MemoryWriteMsg, PeDmaMsg
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _engine():
return GraphEngine(load_topology(TOPOLOGY_PATH))
def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int:
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=sip, cube_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
def _h2d_latency(dst_cube: int, dst_pe: int = 0) -> float:
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="probe", request_id=f"h2d-c{dst_cube}-p{dst_pe}",
dst_sip=0, dst_cube=dst_cube, dst_pe=dst_pe,
dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=dst_pe), nbytes=4096,
pattern="zero", target_pe=dst_pe,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
# ── 1. Single-PE write completes ──────────────────────────────────
def test_single_pe_write_completes():
"""MemoryWriteMsg(target_pe=0) must complete with ok=True, latency > 0."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="probe", request_id="pe-local",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
# ── 2. Cross-cube write positive latency ─────────────────────────
def test_cross_cube_write_positive():
"""Cross-cube MemoryWriteMsg(target_pe=0) must complete with latency > 0."""
lat = _h2d_latency(dst_cube=1, dst_pe=0)
assert lat > 0
# ── 3. H2D latency monotonicity ──────────────────────────────────
def test_h2d_latency_monotonic():
"""1hop < 2hop < 3hop < 4hop."""
cubes = [0, 4, 8, 12]
latencies: list[tuple[int, float]] = []
for cube in cubes:
lat = _h2d_latency(dst_cube=cube, dst_pe=0)
latencies.append((cube, lat))
for i in range(len(latencies) - 1):
assert latencies[i][1] < latencies[i + 1][1], (
f"cube{latencies[i][0]}({latencies[i][1]:.2f}) "
f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})"
)
# ── 4. Single-PE write deterministic ─────────────────────────────
def test_single_pe_write_deterministic():
"""Same MemoryWriteMsg on two engines must produce identical latency."""
msg = MemoryWriteMsg(
correlation_id="probe", request_id="det",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 5. Cut-through (wormhole) wire model invariants ──────────────
def test_h2d_local_cube_cut_through():
"""H2D to local cube with cut-through should be < 50ns for 4096B.
Full command path: pcie_ep → io_cpu → ucie → noc → m_cpu
DMA: m_cpu → noc → xbar → hbm_ctrl (drain once at terminal)
Plus response path back.
With store-and-forward each hop would serialize; cut-through keeps it low.
"""
lat = _h2d_latency(dst_cube=0, dst_pe=0)
assert lat < 65.0, f"Local H2D {lat:.2f}ns; cut-through expects < 65ns"
def test_h2d_remote_cube_cut_through():
"""H2D to 1-hop remote cube: cut-through drain dominates, not per-hop serialization.
With store-and-forward, each hop would serialize 4096B, total >> 100ns.
With cut-through, drain happens once at bottleneck.
"""
lat = _h2d_latency(dst_cube=4, dst_pe=0)
assert lat < 80.0, f"Remote H2D {lat:.2f}ns; cut-through expects < 80ns"
# ── 6. PE DMA: direct injection tests ─────────────────────────
def _graph():
return load_topology(TOPOLOGY_PATH)
def _pe_dma_latency(src_cube: int, src_pe: int, dst_pe: int) -> float:
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id=f"dma-c{src_cube}-p{src_pe}-s{dst_pe}",
src_sip=0, src_cube=src_cube, src_pe=src_pe,
dst_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def _pe_dma_bottleneck(src_cube: int, src_pe: int, dst_pe: int) -> float | None:
graph = _graph()
edge_map = {(e.src, e.dst): e for e in graph.edges}
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe)
pa_obj = PhysAddr.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = f"sip0.cube{src_cube}.pe{src_pe}"
path = router.find_path(pe_ref, dst_node)
bws: list[float] = []
for i in range(len(path) - 1):
e = edge_map.get((path[i], path[i + 1]))
if e and e.bw_gbs:
bws.append(e.bw_gbs)
return min(bws) if bws else None
def test_pe_dma_local_completes():
"""PeDmaMsg to local slice0 must complete with ok=True, latency > 0."""
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id="dma-local",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
def test_pe_dma_local_bottleneck_hbm():
"""PE DMA pe0→slice0 (local): bottleneck = HBM effective BW (256 * 0.8 = 204.8)."""
bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=0)
assert bn == 204.8, f"Local PE DMA bottleneck {bn}, expected 204.8"
def test_pe_dma_same_half_bottleneck_hbm():
"""PE DMA pe0→slice1 (same half via xbar_top): bottleneck = HBM effective BW."""
bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=1)
assert bn == 204.8, f"Same-half PE DMA bottleneck {bn}, expected 204.8"
def test_pe_dma_deterministic():
"""Same PeDmaMsg on two engines must produce identical latency."""
msg = PeDmaMsg(
correlation_id="probe", request_id="det",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 7. PE DMA cross-cube best vs worst ──────────────────────────
def _pe_dma_cross_cube_latency(dst_cube: int) -> float:
engine = _engine()
msg = PeDmaMsg(
correlation_id="probe", request_id=f"dma-cross-c{dst_cube}",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def test_pe_cross_cube_best_worst():
"""Cross-cube best (adjacent cube1) must have lower latency than worst (far cube15)."""
best = _pe_dma_cross_cube_latency(dst_cube=1)
worst = _pe_dma_cross_cube_latency(dst_cube=15)
assert best < worst, (
f"Best (cube1) {best:.2f}ns must < worst (cube15) {worst:.2f}ns"
)
# ── 8. Probe timestamp trace ──────────────────────────────────
def test_probe_timestamp_trace():
"""_hop_timestamps must return monotonically increasing cumulative timestamps."""
from kernbench.cli.probe import _hop_timestamps, _build_edge_map
graph = _graph()
edge_map = _build_edge_map(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=0)
pa_obj = PhysAddr.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
timestamps = _hop_timestamps(path, 4096, edge_map, graph)
assert len(timestamps) == len(path)
for i in range(len(timestamps) - 1):
assert timestamps[i][1] <= timestamps[i + 1][1], (
f"Timestamps not monotonic at hop {i}: "
f"{timestamps[i][1]:.4f} > {timestamps[i + 1][1]:.4f}"
)
# ── 9. D2H Read latency monotonicity ────────────────────────────
def _d2h_latency(src_cube: int) -> float:
engine = _engine()
msg = MemoryReadMsg(
correlation_id="probe", request_id=f"d2h-c{src_cube}",
src_sip=0, src_cube=src_cube, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=0), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
return trace["total_ns"]
def test_d2h_latency_monotonic():
"""D2H read: 1hop < 2hop < 3hop < 4hop."""
cubes = [0, 4, 8, 12]
latencies = [(c, _d2h_latency(c)) for c in cubes]
for i in range(len(latencies) - 1):
assert latencies[i][1] < latencies[i + 1][1], (
f"cube{latencies[i][0]}({latencies[i][1]:.2f}) "
f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})"
)
def test_d2h_latency_gte_h2d():
"""D2H read latency >= H2D write latency for same cube (reverse data path)."""
for cube in [0, 4, 8]:
h2d = _h2d_latency(dst_cube=cube, dst_pe=0)
d2h = _d2h_latency(src_cube=cube)
assert d2h >= h2d * 0.8, (
f"cube{cube}: D2H ({d2h:.2f}ns) should be >= 80% of H2D ({h2d:.2f}ns)"
)
# ── 10. HBM efficiency applied ──────────────────────────────────
def test_hbm_efficiency_applied():
"""HBM edge BW should reflect efficiency factor (256 * 0.8 = 204.8)."""
graph = _graph()
edge_map = {(e.src, e.dst): e for e in graph.edges}
e = edge_map.get(("sip0.cube0.xbar_top", "sip0.cube0.hbm_ctrl.slice0"))
assert e is not None, "xbar_top -> hbm_ctrl.slice0 edge missing"
assert e.bw_gbs == 204.8, f"HBM edge BW {e.bw_gbs}, expected 204.8 (256*0.8)"
# ── 11. Sweep saturation ──────────────────────────────────────
def test_probe_sweep_saturation():
"""Utilization at 1MB must exceed utilization at 4KB for pe-local-hbm."""
from kernbench.cli.probe import _sweep_util
# pe-local-hbm: ovhd=2ns (xbar), wire~0.03ns, bn=204.8 GB/s
u = _sweep_util(2.0, 0.03, 204.8)
assert u[-1] > u[0], (
f"1MB util ({u[-1]:.1f}%) must exceed 4KB util ({u[0]:.1f}%)"
)
assert u[-1] > 99.0, f"1MB util ({u[-1]:.1f}%) should be >99%"