Files
ywkang 81cc32c46b ADR-0001 Rev 2: 51-bit PhysAddr layout with concrete sub-unit tables
Remove rack_id (4 bits), rename sip_seg→die_id, shift fields to enable
42-bit local_offset (4 TB per die). Define PE_LOCAL/MCPU_LOCAL/CUBE_SRAM
sub-unit tables for AHBM dies and IOCPU sub-unit table for IOCHIPLET
dies (1 TB window). Supersedes ADR-0031.

Also fixes latent VA/PA confusion in pe_dma pipeline DMA path where
virtual addresses were decoded as physical addresses without MMU
translation — previously masked by coincidental bit-position alignment.

529 passed (+6 recovered), 10 pre-existing failures unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 15:52:29 -07:00

409 lines
13 KiB
Python

import pytest
from pathlib import Path
from kernbench.common.types import Completion, RequestHandle
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.runtime_api.kernel import (
KernelLaunchMsg,
KernelRef,
MemoryReadMsg,
MemoryWriteMsg,
ScalarArg,
TensorArg,
TensorArgShard,
)
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _engine():
graph = load_topology(TOPOLOGY_PATH)
return GraphEngine(graph)
def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int:
"""Create an HBM physical address targeting a specific PE's HBM slice."""
# 48 GB / 8 slices = 6 GB per slice
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
sip_id=sip, die_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
def _sram_pa(sip: int = 0, cube: int = 0) -> int:
"""Create an SRAM physical address."""
pa = PhysAddr.cube_sram_addr(sip_id=sip, die_id=cube, sram_offset=0x800)
return pa.encode()
# ── 1. submit returns handle ────────────────────────────────────────
def test_engine_submit_returns_handle():
"""submit() must return a RequestHandle (non-empty string)."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r0",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
handle = engine.submit(msg)
assert isinstance(handle, str)
assert len(handle) > 0
# ── 2. memory write completion ──────────────────────────────────────
def test_engine_memory_write_completion():
"""MemoryWrite must complete with ok=True."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r1",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
# ── 3. memory read completion ───────────────────────────────────────
def test_engine_memory_read_completion():
"""MemoryRead must complete with ok=True."""
engine = _engine()
msg = MemoryReadMsg(
correlation_id="c0", request_id="r2",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(), nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
# ── 4. latency positive ────────────────────────────────────────────
def test_engine_latency_positive():
"""Trace total_ns must be > 0 (ADR-0002 D4)."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r3",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
assert trace["total_ns"] > 0
# ── 5. trace has total_ns and nbytes ───────────────────────────────
def test_engine_trace_has_total_ns_and_nbytes():
"""Trace must contain 'total_ns' and 'nbytes'."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r4",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
assert "total_ns" in trace
assert "nbytes" in trace
assert trace["nbytes"] == 4096
# ── 6. latency includes node overhead_ns ────────────────────────────
def test_engine_latency_includes_node_overhead_ns():
"""Path traverses components with overhead_ns > 0, so total >= some minimum."""
engine = _engine()
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r7",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
# pcie_ep (5.0) + io_cpu (10.0) + m_cpu (5.0) = at least 20 ns
assert trace["total_ns"] >= 20.0
# ── 7. concurrent requests ─────────────────────────────────────────
def test_engine_concurrent_requests():
"""Two requests submitted before wait must both complete with traces."""
engine = _engine()
msg1 = MemoryWriteMsg(
correlation_id="c0", request_id="r9a",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
msg2 = MemoryWriteMsg(
correlation_id="c0", request_id="r9b",
dst_sip=0, dst_cube=0, dst_pe=1,
dst_pa=_hbm_pa(pe_id=1), nbytes=4096, pattern="zero",
)
h1 = engine.submit(msg1)
h2 = engine.submit(msg2)
engine.wait(h1)
engine.wait(h2)
comp1, trace1 = engine.get_completion(h1)
comp2, trace2 = engine.get_completion(h2)
assert comp1.ok is True
assert comp2.ok is True
assert trace1["total_ns"] > 0
assert trace2["total_ns"] > 0
# ── 8. kernel launch ───────────────────────────────────────────────
def test_engine_kernel_launch_simplified():
"""KernelLaunch returns latency > 0."""
from kernbench.triton_emu.registry import clear_registry, register_kernel
clear_registry()
hbm_pa = _hbm_pa(pe_id=0)
def gemm_kernel(a_ptr, tl):
a = tl.load(a_ptr, shape=(4, 4), dtype="f16")
tl.store(a_ptr, a)
register_kernel("gemm", gemm_kernel)
engine = _engine()
shard0 = TensorArgShard(
sip=0, cube=0, pe=0,
pa=_hbm_pa(pe_id=0), nbytes=4096, offset_bytes=0,
)
shard1 = TensorArgShard(
sip=0, cube=0, pe=1,
pa=_hbm_pa(pe_id=1), nbytes=4096, offset_bytes=4096,
)
msg = KernelLaunchMsg(
correlation_id="c0", request_id="r10",
kernel_ref=KernelRef(name="gemm", kind="builtin"),
args=(TensorArg(shards=(shard0, shard1)),),
)
h = engine.submit(msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
clear_registry()
# ── 9. deterministic ───────────────────────────────────────────────
def test_engine_deterministic():
"""Same request on two engines must produce identical latency."""
msg = MemoryWriteMsg(
correlation_id="c0", request_id="r11",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
_, t1 = e1.get_completion(h1)
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, t2 = e2.get_completion(h2)
assert t1["total_ns"] == t2["total_ns"]
# ── 10. remote cube access succeeds with higher latency ────────────
def test_dma_capacity_serializes_concurrent():
"""Two concurrent DMA writes to the same cube must contend at DMA capacity=1.
When two MemoryWrite requests target the same cube's M_CPU simultaneously,
the DMA engine (capacity=1) serializes them. The slower request must take
longer than a single isolated request (ADR-0014 D4, ADR-0015 D5).
"""
# Single isolated write baseline
engine_single = _engine()
msg_single = MemoryWriteMsg(
correlation_id="c0", request_id="single",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
h1 = engine_single.submit(msg_single)
engine_single.wait(h1)
_, t1 = engine_single.get_completion(h1)
single_ns = t1["total_ns"]
# Two concurrent writes to same cube (different PEs) → DMA contention
engine_conc = _engine()
msg_a = MemoryWriteMsg(
correlation_id="c0", request_id="conc-a",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
pattern="zero", target_pe=0,
)
msg_b = MemoryWriteMsg(
correlation_id="c0", request_id="conc-b",
dst_sip=0, dst_cube=0, dst_pe=1,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=1), nbytes=4096,
pattern="zero", target_pe=1,
)
ha = engine_conc.submit(msg_a)
hb = engine_conc.submit(msg_b)
engine_conc.wait(ha)
engine_conc.wait(hb)
_, ta = engine_conc.get_completion(ha)
_, tb = engine_conc.get_completion(hb)
# At least one must be delayed by DMA contention
max_ns = max(ta["total_ns"], tb["total_ns"])
assert max_ns > single_ns, (
f"concurrent max ({max_ns:.2f}ns) must > single ({single_ns:.2f}ns) "
f"due to DMA capacity=1 contention"
)
# ── 11. formula latency lower bound ──────────────────────────────
def test_formula_latency_lower_bound():
"""_formula_latency must be <= actual latency (ADR-0015 D7).
Uses PE DMA path which is fully known at engine level.
"""
from kernbench.policy.address.phyaddr import PhysAddr as PA
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.topology.builder import load_topology as lt
graph = lt(TOPOLOGY_PATH)
engine = GraphEngine(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=1)
pa_obj = PA.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
formula = engine._formula_latency(path, 4096)
# Run actual simulation
msg = MemoryReadMsg(
correlation_id="c0", request_id="formula-lb",
src_sip=0, src_cube=0, src_pe=0,
src_pa=pa, nbytes=4096, target_pe=1,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
actual = trace["total_ns"]
assert formula <= actual, (
f"formula ({formula:.2f}) must <= actual ({actual:.2f})"
)
assert formula > 0, "formula must be > 0"
def test_formula_latency_lower_bound_no_contention():
"""With no contention, formula is a lower bound for PE DMA.
PE DMA routes through NOC, which applies internal mesh traversal
latency (XY routing based on physical positions) not captured by the
formula (NOC edges have distance_mm=0 since NOC is distributed).
Formula <= actual is the invariant.
"""
from kernbench.runtime_api.kernel import PeDmaMsg
from kernbench.policy.address.phyaddr import PhysAddr as PA
from kernbench.policy.routing.router import AddressResolver, PathRouter
from kernbench.topology.builder import load_topology as lt
graph = lt(TOPOLOGY_PATH)
engine = GraphEngine(graph)
resolver = AddressResolver(graph)
router = PathRouter(graph)
pa = _hbm_pa(sip=0, cube=0, pe_id=0)
pa_obj = PA.decode(pa)
dst_node = resolver.resolve(pa_obj)
pe_ref = "sip0.cube0.pe0"
path = router.find_path(pe_ref, dst_node)
formula = engine._formula_latency(path, 4096)
msg = PeDmaMsg(
correlation_id="c0", request_id="formula-exact",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=pa, nbytes=4096,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
actual = trace["total_ns"]
# Formula is a lower bound; NOC internal traversal adds latency
assert formula <= actual + 0.01, (
f"formula ({formula:.4f}) must be <= actual ({actual:.4f})"
)
assert actual > 0
# ── 10. remote cube access succeeds with higher latency ────────────
def test_engine_remote_cube_latency_higher():
"""Accessing a distant cube's HBM must have strictly higher latency than local.
Uses separate engines to avoid contention effects.
cube15 (far corner of 4x4 mesh) requires multiple UCIe + NOC hops
from IO chiplet compared to cube0 (directly connected).
"""
engine_local = _engine()
engine_remote = _engine()
msg_local = MemoryReadMsg(
correlation_id="c0", request_id="r14a",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
msg_remote = MemoryReadMsg(
correlation_id="c0", request_id="r14b",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(sip=0, cube=15, pe_id=0), nbytes=4096,
)
h_local = engine_local.submit(msg_local)
engine_local.wait(h_local)
_, t_local = engine_local.get_completion(h_local)
h_remote = engine_remote.submit(msg_remote)
engine_remote.wait(h_remote)
comp_remote, t_remote = engine_remote.get_completion(h_remote)
assert comp_remote.ok is True
assert t_remote is not None and t_local is not None
assert t_remote["total_ns"] > t_local["total_ns"], (
f"remote cube {t_remote['total_ns']:.2f} must > local {t_local['total_ns']:.2f}"
)