Files
kernbench2/tests/test_engine.py
T
ywkang d75da439c6 Add probe CLI improvements, D2H read, UCIe/HBM tuning, BW sweep
- Probe CLI: restructured output (tables first, routes below), per-hop
  timestamps, split cross-cube into best/worst cases, D2H read section
- UCIe overhead: 1ns -> 8ns per port (16ns per crossing) to fix
  cross-cube-best < cross-half latency inversion
- HBM efficiency: added efficiency=0.8 factor to hbm_ctrl, reducing
  effective BW from 256 to 204.8 GB/s
- Multi-size BW sweep: saturation tables (4KB-1MB) for all probe cases
- Probe default data size: 4KB -> 32KB for more realistic measurements
- IOChiplet NOC + D2H topology and tests
- NOC mesh, xbar, BW occupancy components and tests
- Cube mesh visualization diagram

278 tests pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 01:16:18 -07:00

409 lines
13 KiB
Python

import pytest
from pathlib import Path
from kernbench.common.types import Completion, RequestHandle
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.runtime_api.kernel import (
KernelLaunchMsg,
KernelRef,
MemoryReadMsg,
MemoryWriteMsg,
ScalarArg,
TensorArg,
TensorArgShard,
)
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _engine():
graph = load_topology(TOPOLOGY_PATH)
return GraphEngine(graph)
def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int:
"""Create an HBM physical address targeting a specific PE's HBM slice."""
# 48 GB / 8 slices = 6 GB per slice
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=sip, cube_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
def _sram_pa(sip: int = 0, cube: int = 0) -> int:
"""Create an SRAM physical address."""
pa = PhysAddr.cube_sram_addr(rack_id=0, sip_id=sip, cube_id=cube, sram_offset=0x800)
return pa.encode()
# ── 1. submit returns handle ────────────────────────────────────────
def test_engine_submit_returns_handle():
"""submit() must return a RequestHandle (non-empty string)."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r0",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
handle = engine.submit(msg)
assert isinstance(handle, str)
assert len(handle) > 0
# ── 2. memory write completion ──────────────────────────────────────
def test_engine_memory_write_completion():
"""MemoryWrite must complete with ok=True."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r1",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
# ── 3. memory read completion ───────────────────────────────────────
def test_engine_memory_read_completion():
"""MemoryRead must complete with ok=True."""
engine = _engine()
msg = MemoryReadMsg(
correlation_id="c0", request_id="r2",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
# ── 4. latency positive ────────────────────────────────────────────
def test_engine_latency_positive():
"""Trace total_ns must be > 0 (ADR-0002 D4)."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r3",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
assert trace["total_ns"] > 0
# ── 5. trace has total_ns and nbytes ───────────────────────────────
def test_engine_trace_has_total_ns_and_nbytes():
"""Trace must contain 'total_ns' and 'nbytes'."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r4",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
assert "total_ns" in trace
assert "nbytes" in trace
assert trace["nbytes"] == 4096
# ── 6. latency includes node overhead_ns ────────────────────────────
def test_engine_latency_includes_node_overhead_ns():
"""Path traverses components with overhead_ns > 0, so total >= some minimum."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r7",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
# pcie_ep (5.0) + io_cpu (10.0) + m_cpu (5.0) = at least 20 ns
assert trace["total_ns"] >= 20.0
# ── 7. concurrent requests ─────────────────────────────────────────
def test_engine_concurrent_requests():
"""Two requests submitted before wait must both complete with traces."""
engine = _engine()
msg1 = MemoryWriteMsg(
correlation_id="c0", request_id="r9a",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
msg2 = MemoryWriteMsg(
correlation_id="c0", request_id="r9b",
dst_sip=0, dst_cube=0, dst_pe=1,
dst_pa=_hbm_pa(pe_id=1), nbytes=4096, pattern="zero",
)
h1 = engine.submit(msg1)
h2 = engine.submit(msg2)
engine.wait(h1)
engine.wait(h2)
comp1, trace1 = engine.get_completion(h1)
comp2, trace2 = engine.get_completion(h2)
assert comp1.ok is True
assert comp2.ok is True
assert trace1["total_ns"] > 0
assert trace2["total_ns"] > 0
# ── 8. kernel launch ───────────────────────────────────────────────
def test_engine_kernel_launch_simplified():
"""KernelLaunch returns latency > 0."""
from kernbench.triton_emu.registry import clear_registry, register_kernel
clear_registry()
hbm_pa = _hbm_pa(pe_id=0)
def gemm_kernel(a_ptr, tl):
a = tl.load(a_ptr, shape=(4, 4), dtype="f16")
tl.store(a_ptr, a)
register_kernel("gemm", gemm_kernel)
engine = _engine()
shard0 = TensorArgShard(
sip=0, cube=0, pe=0,
pa=_hbm_pa(pe_id=0), nbytes=4096, offset_bytes=0,
)
shard1 = TensorArgShard(
sip=0, cube=0, pe=1,
pa=_hbm_pa(pe_id=1), nbytes=4096, offset_bytes=4096,
)
msg = KernelLaunchMsg(
correlation_id="c0", request_id="r10",
kernel_ref=KernelRef(name="gemm", kind="builtin"),
args=(TensorArg(shards=(shard0, shard1)),),
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
clear_registry()
# ── 9. deterministic ───────────────────────────────────────────────
def test_engine_deterministic():
"""Same request on two engines must produce identical latency."""
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r11",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 10. remote cube access succeeds with higher latency ────────────
def test_dma_capacity_serializes_concurrent():
"""Two concurrent DMA writes to the same cube must contend at DMA capacity=1.
When two MemoryWrite requests target the same cube's M_CPU simultaneously,
the DMA engine (capacity=1) serializes them. The slower request must take
longer than a single isolated request (ADR-0014 D4, ADR-0015 D5).
"""
# Single isolated write baseline
engine_single = _engine()
msg_single = MemoryWriteMsg(
correlation_id="c0", request_id="single",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
h1 = engine_single.submit(msg_single)
engine_single.wait(h1)
_, t1 = engine_single.get_completion(h1)
single_ns = t1["total_ns"]
# Two concurrent writes to same cube (different PEs) → DMA contention
engine_conc = _engine()
msg_a = MemoryWriteMsg(
correlation_id="c0", request_id="conc-a",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
msg_b = MemoryWriteMsg(
correlation_id="c0", request_id="conc-b",
dst_sip=0, dst_cube=0, dst_pe=1,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=1), nbytes=4096,
pattern="zero", target_pe=1,
)
ha = engine_conc.submit(msg_a)
hb = engine_conc.submit(msg_b)
engine_conc.wait(ha)
engine_conc.wait(hb)
_, ta = engine_conc.get_completion(ha)
_, tb = engine_conc.get_completion(hb)
# At least one must be delayed by DMA contention
max_ns = max(ta["total_ns"], tb["total_ns"])
assert max_ns > single_ns, (
f"concurrent max ({max_ns:.2f}ns) must > single ({single_ns:.2f}ns) "
f"due to DMA capacity=1 contention"
)
# ── 11. formula latency lower bound ──────────────────────────────
def test_formula_latency_lower_bound():
"""_formula_latency must be <= actual latency (ADR-0015 D7).
Uses PE DMA path which is fully known at engine level.
"""
from kernbench.policy.address.phyaddr import PhysAddr as PA
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.topology.builder import load_topology as lt
graph = lt(TOPOLOGY_PATH)
engine = GraphEngine(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=1)
pa_obj = PA.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
formula = engine._formula_latency(path, 4096)
# Run actual simulation
msg = MemoryReadMsg(
correlation_id="c0", request_id="formula-lb",
src_sip=0, src_cube=0, src_pe=0,
src_pa=pa, nbytes=4096, target_pe=1,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
actual = trace["total_ns"]
assert formula <= actual, (
f"formula ({formula:.2f}) must <= actual ({actual:.2f})"
)
assert formula > 0, "formula must be > 0"
def test_formula_latency_lower_bound_no_contention():
"""With no contention, formula is a lower bound for PE DMA.
PE DMA routes through NOC, which applies internal mesh traversal
latency (XY routing based on physical positions) not captured by the
formula (NOC edges have distance_mm=0 since NOC is distributed).
Formula <= actual is the invariant.
"""
from kernbench.runtime_api.kernel import PeDmaMsg
from kernbench.policy.address.phyaddr import PhysAddr as PA
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.topology.builder import load_topology as lt
graph = lt(TOPOLOGY_PATH)
engine = GraphEngine(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=0)
pa_obj = PA.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
formula = engine._formula_latency(path, 4096)
msg = PeDmaMsg(
correlation_id="c0", request_id="formula-exact",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=pa, nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
actual = trace["total_ns"]
# Formula is a lower bound; NOC internal traversal adds latency
assert formula <= actual + 0.01, (
f"formula ({formula:.4f}) must be <= actual ({actual:.4f})"
)
assert actual > 0
# ── 10. remote cube access succeeds with higher latency ────────────
def test_engine_remote_cube_latency_higher():
"""Accessing a distant cube's HBM must have strictly higher latency than local.
Uses separate engines to avoid contention effects.
cube15 (far corner of 4x4 mesh) requires multiple UCIe + NOC hops
from IO chiplet compared to cube0 (directly connected).
"""
engine_local = _engine()
engine_remote = _engine()
msg_local = MemoryReadMsg(
correlation_id="c0", request_id="r14a",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
msg_remote = MemoryReadMsg(
correlation_id="c0", request_id="r14b",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=15, pe_id=0), nbytes=4096,
)
h_local = engine_local.submit(msg_local)
engine_local.wait(h_local)
_, t_local = engine_local.get_completion(h_local)
h_remote = engine_remote.submit(msg_remote)
engine_remote.wait(h_remote)
comp_remote, t_remote = engine_remote.get_completion(h_remote)
assert comp_remote.ok is True
assert t_remote is not None and t_local is not None
assert t_remote["total_ns"] > t_local["total_ns"], (
f"remote cube {t_remote['total_ns']:.2f} must > local {t_local['total_ns']:.2f}"
)