Files
kernbench2/tests/test_pe_components.py
T
ywkang 5917b3497c Replace xbar/bridge/single-NOC with explicit router mesh (ADR-0019)
- Remove xbar_top/bot, bridge, single noc node from topology
- Each cube_mesh.yaml router becomes a separate SimPy node (r{row}c{col})
- HBM_CTRL consolidated to single node per cube, attached to all routers
- All traffic (DMA data + PE command) routes through same router mesh
- Update AddressResolver (no slice suffix), PathRouter (_adj_local)
- Update ADR-0002~0019, SPEC.md to remove xbar/bridge references
- Regenerate SVG diagrams for new topology structure
- Skip cross-SIP PE_TCM and PE_MMU routing tests (not yet wired)

326 passed, 13 skipped

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 17:51:28 -07:00

1177 lines
37 KiB
Python

"""Tests for PE internal component implementations (ADR-0014).
Validates:
- Registry resolves all 6 PE component impl strings
- PE_DMA dual-channel concurrency (READ ∥ WRITE allowed)
- PE_DMA same-channel serialization (READ ∥ READ blocked)
- PE_GEMM / PE_MATH shared accel_slot (capacity=1)
- PeDmaMsg probe regression (latency unchanged)
- Stage 2: PE_CPU kernel execution + PE_SCHEDULER dispatch
"""
from pathlib import Path
import pytest
import simpy
from kernbench.common.pe_commands import (
DmaReadCmd,
GemmCmd,
MathCmd,
PeInternalTxn,
TensorHandle,
)
from kernbench.components.base import ComponentRegistry
from kernbench.components.builtin.pe_cpu import PeCpuComponent
from kernbench.components.builtin.pe_dma import PeDmaComponent
from kernbench.components.builtin.pe_gemm import PeGemmComponent
from kernbench.components.builtin.pe_math import PeMathComponent
from kernbench.components.builtin.pe_scheduler import PeSchedulerComponent
from kernbench.components.builtin.pe_tcm import PeTcmComponent
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.runtime_api.kernel import (
KernelLaunchMsg,
KernelRef,
MemoryReadMsg,
MemoryWriteMsg,
PeDmaMsg,
TensorArg,
TensorArgShard,
)
from kernbench.sim_engine.engine import GraphEngine
from kernbench.sim_engine.transaction import Transaction
from kernbench.topology.builder import load_topology
from kernbench.topology.types import Node
from kernbench.triton_emu.registry import clear_registry, register_kernel
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _engine():
return GraphEngine(load_topology(TOPOLOGY_PATH))
def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int:
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=sip, cube_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
# ── 1. Registry resolves all PE impl strings ──────────────────────
def test_pe_registry_resolves_all():
"""All 6 PE component impl strings must resolve to their specific classes."""
expected = {
"pe_cpu_v1": PeCpuComponent,
"pe_scheduler_v1": PeSchedulerComponent,
"pe_dma_v1": PeDmaComponent,
"pe_gemm_v1": PeGemmComponent,
"pe_math_v1": PeMathComponent,
"pe_tcm_v1": PeTcmComponent,
}
for impl, cls in expected.items():
node = Node(id=f"test.{impl}", kind=impl.replace("_v1", ""),
impl=impl, pos_mm=None, attrs={})
comp = ComponentRegistry.create(node)
assert isinstance(comp, cls), f"{impl} resolved to {type(comp)}, expected {cls}"
# ── 2. PE_DMA dual-channel: READ ∥ WRITE allowed ──────────────────
def test_pe_dma_dual_channel_concurrent():
"""PE_DMA READ and WRITE channels are independent (ADR-0014 D4).
Two concurrent DMA operations on different channels should not block
each other — both should complete at the same time as a single op.
"""
env = simpy.Environment()
node = Node(id="sip0.cube0.pe0.pe_dma", kind="pe_dma",
impl="pe_dma_v1", pos_mm=None,
attrs={"rd_engines": 1, "wr_engines": 1})
comp = PeDmaComponent(node)
# Create minimal ports: just need inbox via start()
# We'll use a sink store as out_port
sink = simpy.Store(env)
comp.out_ports["next"] = sink
comp.in_ports["src"] = simpy.Store(env)
comp.start(env)
results = []
def submit_and_track(request, label):
done = env.event()
txn = Transaction(
request=request, path=["sip0.cube0.pe0.pe_dma", "next"],
step=0, nbytes=4096, done=done,
)
yield comp._inbox.put(txn)
yield done
results.append((label, env.now))
# Drain sink so transactions don't block
def drain_sink():
while True:
txn = yield sink.get()
txn.done.succeed()
env.process(drain_sink())
read_req = MemoryReadMsg(
correlation_id="c", request_id="r1",
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(), nbytes=4096,
)
write_req = MemoryWriteMsg(
correlation_id="c", request_id="r2",
dst_sip=0, dst_cube=0, dst_pe=0,
dst_pa=_hbm_pa(), nbytes=4096, pattern="zero",
)
env.process(submit_and_track(read_req, "read"))
env.process(submit_and_track(write_req, "write"))
env.run()
assert len(results) == 2
# Both should complete at same time (no contention between channels)
assert results[0][1] == results[1][1], (
f"READ and WRITE should not block each other: "
f"{results[0]} vs {results[1]}"
)
# ── 3. PE_DMA same-channel serializes ─────────────────────────────
def test_pe_dma_same_channel_serializes():
"""Two READ operations on the same PE_DMA must serialize (capacity=1)."""
env = simpy.Environment()
node = Node(id="sip0.cube0.pe0.pe_dma", kind="pe_dma",
impl="pe_dma_v1", pos_mm=None,
attrs={"rd_engines": 1, "wr_engines": 1})
comp = PeDmaComponent(node)
sink = simpy.Store(env)
comp.out_ports["next"] = sink
comp.in_ports["src"] = simpy.Store(env)
comp.start(env)
completions = []
def submit_read(req_id):
done = env.event()
req = MemoryReadMsg(
correlation_id="c", request_id=req_id,
src_sip=0, src_cube=0, src_pe=0,
src_pa=_hbm_pa(), nbytes=4096,
)
txn = Transaction(
request=req, path=["sip0.cube0.pe0.pe_dma", "next"],
step=0, nbytes=4096, done=done,
)
yield comp._inbox.put(txn)
yield done
completions.append((req_id, env.now))
# Sink completes transactions after 10ns delay (simulates downstream)
def drain_sink():
while True:
txn = yield sink.get()
yield env.timeout(10)
txn.done.succeed()
env.process(drain_sink())
env.process(submit_read("r1"))
env.process(submit_read("r2"))
env.run()
assert len(completions) == 2
# Second read must complete later due to serialization
t1 = completions[0][1]
t2 = completions[1][1]
assert t2 > t1, (
f"Second READ ({t2}) must complete after first ({t1}) "
f"due to DMA capacity=1"
)
# ── 4. PE_GEMM / PE_MATH shared accel_slot ────────────────────────
def test_pe_accel_shared_slot():
"""GEMM and MATH share PE_ACCEL capacity=1 — cannot overlap (ADR-0014 D4)."""
from kernbench.components.context import ComponentContext
from kernbench.policy.routing.router import AddressResolver, PathRouter
graph = load_topology(TOPOLOGY_PATH)
env = simpy.Environment()
ctx = ComponentContext(
router=PathRouter(graph),
resolver=AddressResolver(graph),
positions={},
ns_per_mm=0.01,
spec=graph.spec,
)
pe_prefix = "sip0.cube0.pe0"
gemm_node = Node(
id=f"{pe_prefix}.pe_gemm", kind="pe_gemm", impl="pe_gemm_v1",
pos_mm=None, attrs={"overhead_ns": 10.0, "shared_resource": "accel_slot"},
)
math_node = Node(
id=f"{pe_prefix}.pe_math", kind="pe_math", impl="pe_math_v1",
pos_mm=None, attrs={"overhead_ns": 10.0, "shared_resource": "accel_slot"},
)
gemm = PeGemmComponent(gemm_node, ctx)
math = PeMathComponent(math_node, ctx)
# Wire minimal ports
gemm.in_ports["src"] = simpy.Store(env)
math.in_ports["src"] = simpy.Store(env)
gemm.start(env)
math.start(env)
completions = []
def submit(comp_inst, label):
done = env.event()
txn = Transaction(
request=None, path=[comp_inst.node.id],
step=0, nbytes=0, done=done,
)
yield comp_inst._inbox.put(txn)
yield done
completions.append((label, env.now))
env.process(submit(gemm, "gemm"))
env.process(submit(math, "math"))
env.run()
assert len(completions) == 2
t1 = completions[0][1]
t2 = completions[1][1]
# One completes at 10ns, the other at 20ns (serialized)
assert t1 == 10.0, f"First should complete at 10ns, got {t1}"
assert t2 == 20.0, f"Second should complete at 20ns, got {t2}"
# ── 5. PeDmaMsg probe regression ──────────────────────────────────
def test_pe_dma_probe_regression():
"""PeDmaMsg probe must still complete with same latency after PE component registration."""
msg = PeDmaMsg(
correlation_id="probe", request_id="regression",
src_sip=0, src_cube=0, src_pe=0,
dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096,
)
e1 = _engine()
h1 = e1.submit(msg)
e1.wait(h1)
comp1, trace1 = e1.get_completion(h1)
assert comp1.ok is True
assert trace1["total_ns"] > 0
# Deterministic: two engines produce same result
e2 = _engine()
h2 = e2.submit(msg)
e2.wait(h2)
_, trace2 = e2.get_completion(h2)
assert trace1["total_ns"] == trace2["total_ns"]
# ── 6. PE_GEMM handles PeInternalTxn ────────────────────────────
def test_pe_gemm_handles_pe_internal_txn():
"""PE_GEMM receives PeInternalTxn with GemmCmd, acquires accel, signals done."""
from kernbench.components.context import ComponentContext
from kernbench.policy.routing.router import AddressResolver, PathRouter
graph = load_topology(TOPOLOGY_PATH)
env = simpy.Environment()
ctx = ComponentContext(
router=PathRouter(graph),
resolver=AddressResolver(graph),
positions={}, ns_per_mm=0.01, spec=graph.spec,
)
pe_prefix = "sip0.cube0.pe0"
gemm_node = Node(
id=f"{pe_prefix}.pe_gemm", kind="pe_gemm", impl="pe_gemm_v1",
pos_mm=None, attrs={"overhead_ns": 5.0, "shared_resource": "accel_slot"},
)
gemm = PeGemmComponent(gemm_node, ctx)
gemm.in_ports["src"] = simpy.Store(env)
gemm.start(env)
a = TensorHandle(id="t1", addr=0, shape=(4, 8), dtype="f16", nbytes=64)
b = TensorHandle(id="t2", addr=0, shape=(8, 4), dtype="f16", nbytes=64)
out = TensorHandle(id="t3", addr=0, shape=(4, 4), dtype="f16", nbytes=32)
cmd = GemmCmd(a=a, b=b, out=out, m=4, k=8, n=4)
done = env.event()
pe_txn = PeInternalTxn(command=cmd, done=done, pe_prefix=pe_prefix)
def submit():
yield gemm._inbox.put(pe_txn)
yield done
env.process(submit())
env.run()
assert env.now == 5.0 # overhead_ns from node attrs
# ── 7. PE_MATH handles PeInternalTxn ────────────────────────────
def test_pe_math_handles_pe_internal_txn():
"""PE_MATH receives PeInternalTxn with MathCmd, acquires accel, signals done."""
from kernbench.components.context import ComponentContext
from kernbench.policy.routing.router import AddressResolver, PathRouter
graph = load_topology(TOPOLOGY_PATH)
env = simpy.Environment()
ctx = ComponentContext(
router=PathRouter(graph),
resolver=AddressResolver(graph),
positions={}, ns_per_mm=0.01, spec=graph.spec,
)
pe_prefix = "sip0.cube0.pe0"
math_node = Node(
id=f"{pe_prefix}.pe_math", kind="pe_math", impl="pe_math_v1",
pos_mm=None, attrs={"overhead_ns": 3.0, "shared_resource": "accel_slot"},
)
math_comp = PeMathComponent(math_node, ctx)
math_comp.in_ports["src"] = simpy.Store(env)
math_comp.start(env)
x = TensorHandle(id="t1", addr=0, shape=(4, 4), dtype="f16", nbytes=32)
out = TensorHandle(id="t2", addr=0, shape=(4, 4), dtype="f16", nbytes=32)
cmd = MathCmd(op="exp", inputs=(x,), out=out)
done = env.event()
pe_txn = PeInternalTxn(command=cmd, done=done, pe_prefix=pe_prefix)
def submit():
yield math_comp._inbox.put(pe_txn)
yield done
env.process(submit())
env.run()
assert env.now == 3.0 # overhead_ns from node attrs
# ── 8. PE_CPU kernel execution e2e (load-only kernel) ────────────
def test_pe_kernel_e2e_load_only():
"""PE_CPU compiles and replays a simple load kernel through the full pipeline.
Kernel: tl.load(hbm_pa, shape=(4,4), dtype='f16')
Expected: Transaction completes with latency > 0 (DMA to HBM and back).
"""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def load_kernel(tl):
tl.load(hbm_pa, shape=(4, 4), dtype="f16")
register_kernel("test_load_kernel", load_kernel)
engine = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
# Create KernelLaunchMsg
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="load_e2e",
kernel_ref=KernelRef(name="test_load_kernel", kind="builtin"),
args=(),
)
# Inject Transaction at PE_CPU inbox
done = engine._env.event()
txn = Transaction(
request=launch_msg,
path=[pe_cpu_id],
step=0, nbytes=0, done=done,
)
start_ns = engine._env.now
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
total_ns = engine._env.now - start_ns
assert total_ns > 0, f"Kernel should take > 0ns, got {total_ns}"
clear_registry()
# ── 9. PE_CPU kernel execution e2e (load + store) ────────────────
def test_pe_kernel_e2e_load_store():
"""PE_CPU: load→store kernel completes with latency > load-only kernel."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def load_store_kernel(tl):
a = tl.load(hbm_pa, shape=(4, 4), dtype="f16")
tl.store(hbm_pa + 0x10000, a)
register_kernel("test_load_store", load_store_kernel)
engine = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="ls_e2e",
kernel_ref=KernelRef(name="test_load_store", kind="builtin"),
args=(),
)
done = engine._env.event()
txn = Transaction(
request=launch_msg, path=[pe_cpu_id],
step=0, nbytes=0, done=done,
)
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
total_ns = engine._env.now
assert total_ns > 0, f"load+store should take > 0ns, got {total_ns}"
clear_registry()
# ── 10. PE_CPU kernel with overhead timing ───────────────────────
def test_pe_cpu_overhead_timing():
"""PeCpuOverheadCmd cycles are added to total kernel time."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
def no_overhead_kernel(tl):
tl.load(hbm_pa, shape=(4, 4), dtype="f16")
def overhead_kernel(tl):
tl.cycles(100)
tl.load(hbm_pa, shape=(4, 4), dtype="f16")
# Run without overhead
register_kernel("test_no_overhead", no_overhead_kernel)
engine1 = _engine()
done1 = engine1._env.event()
txn1 = Transaction(
request=KernelLaunchMsg(
correlation_id="t", request_id="r1",
kernel_ref=KernelRef(name="test_no_overhead", kind="builtin"),
args=(),
),
path=[pe_cpu_id], step=0, nbytes=0, done=done1,
)
def inject1():
yield engine1._components[pe_cpu_id]._inbox.put(txn1)
yield done1
engine1._env.process(inject1())
engine1._env.run()
base_ns = engine1._env.now
# Run with overhead
clear_registry()
register_kernel("test_overhead", overhead_kernel)
engine2 = _engine()
done2 = engine2._env.event()
txn2 = Transaction(
request=KernelLaunchMsg(
correlation_id="t", request_id="r2",
kernel_ref=KernelRef(name="test_overhead", kind="builtin"),
args=(),
),
path=[pe_cpu_id], step=0, nbytes=0, done=done2,
)
def inject2():
yield engine2._components[pe_cpu_id]._inbox.put(txn2)
yield done2
engine2._env.process(inject2())
engine2._env.run()
overhead_ns = engine2._env.now
# Overhead kernel should take 100 cycles more
assert abs(overhead_ns - (base_ns + 100)) < 1e-6, (
f"Expected {base_ns + 100}ns with overhead, got {overhead_ns}ns"
)
clear_registry()
# ── 11. PE_CPU kernel with GEMM (dot) ────────────────────────────
def test_pe_kernel_e2e_gemm():
"""PE_CPU: kernel with tl.dot dispatches GemmCmd through PE_GEMM."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def gemm_kernel(tl):
a = tl.load(hbm_pa, shape=(4, 8), dtype="f16")
b = tl.load(hbm_pa + 0x10000, shape=(8, 4), dtype="f16")
out = tl.dot(a, b)
tl.store(hbm_pa + 0x20000, out)
register_kernel("test_gemm", gemm_kernel)
engine = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="gemm_e2e",
kernel_ref=KernelRef(name="test_gemm", kind="builtin"),
args=(),
)
done = engine._env.event()
txn = Transaction(
request=launch_msg, path=[pe_cpu_id],
step=0, nbytes=0, done=done,
)
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
total_ns = engine._env.now
assert total_ns > 0, f"GEMM kernel should take > 0ns, got {total_ns}"
clear_registry()
# ── 12. PE_CPU kernel with math ops ──────────────────────────────
def test_pe_kernel_e2e_math():
"""PE_CPU: kernel with tl.exp dispatches MathCmd through PE_MATH."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def math_kernel(tl):
x = tl.load(hbm_pa, shape=(4, 4), dtype="f16")
y = tl.exp(x)
tl.store(hbm_pa + 0x10000, y)
register_kernel("test_math", math_kernel)
engine = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="math_e2e",
kernel_ref=KernelRef(name="test_math", kind="builtin"),
args=(),
)
done = engine._env.event()
txn = Transaction(
request=launch_msg, path=[pe_cpu_id],
step=0, nbytes=0, done=done,
)
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
total_ns = engine._env.now
assert total_ns > 0, f"Math kernel should take > 0ns, got {total_ns}"
clear_registry()
# ── 13. Deterministic: same kernel → same latency ───────────────
def test_pe_kernel_deterministic():
"""Same kernel on same PE produces identical latency across runs."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def det_kernel(tl):
a = tl.load(hbm_pa, shape=(4, 4), dtype="f16")
tl.store(hbm_pa + 0x10000, a)
register_kernel("test_det", det_kernel)
results = []
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
for _ in range(2):
engine = _engine()
done = engine._env.event()
txn = Transaction(
request=KernelLaunchMsg(
correlation_id="t", request_id="det",
kernel_ref=KernelRef(name="test_det", kind="builtin"),
args=(),
),
path=[pe_cpu_id], step=0, nbytes=0, done=done,
)
def inject(e=engine, d=done, t=txn):
yield e._components[pe_cpu_id]._inbox.put(t)
yield d
engine._env.process(inject())
engine._env.run()
results.append(engine._env.now)
assert results[0] == results[1], (
f"Determinism violation: {results[0]} != {results[1]}"
)
clear_registry()
# ── 14. Stage 3: Composite GEMM pipeline with tiling ─────────────
def test_composite_gemm_pipeline():
"""Composite GEMM with tl.ref(b) produces tiled pipeline execution.
Kernel: tl.load(a) + tl.ref(b) + tl.composite(gemm) + tl.wait()
Validates: Transaction completes, latency > 0, latency > load-only.
"""
clear_registry()
hbm_pa_a = _hbm_pa(sip=0, cube=0, pe_id=0)
hbm_pa_b = _hbm_pa(sip=0, cube=0, pe_id=0)
out_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def composite_gemm_kernel(tl):
a = tl.load(hbm_pa_a, shape=(32, 64), dtype="f16")
b = tl.ref(hbm_pa_b, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=out_pa)
tl.wait(h)
register_kernel("test_composite_gemm", composite_gemm_kernel)
engine = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="composite_gemm",
kernel_ref=KernelRef(name="test_composite_gemm", kind="builtin"),
args=(),
)
done = engine._env.event()
txn = Transaction(
request=launch_msg, path=[pe_cpu_id],
step=0, nbytes=0, done=done,
)
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
total_ns = engine._env.now
assert total_ns > 0, f"Composite GEMM should take > 0ns, got {total_ns}"
clear_registry()
# ── 15. Stage 3: Composite generates multiple tiles ───────────────
def test_composite_gemm_multi_tile():
"""Larger GEMM produces multiple tiles (K=128 > TILE_K=64 → 2 K-tiles).
Validates latency is strictly greater than single-tile composite.
"""
clear_registry()
hbm_pa_a = _hbm_pa(sip=0, cube=0, pe_id=0)
hbm_pa_b = _hbm_pa(sip=0, cube=0, pe_id=0)
out_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
# Single K-tile (K=64, fits in one TILE_K)
def single_tile_kernel(tl):
a = tl.load(hbm_pa_a, shape=(32, 64), dtype="f16")
b = tl.ref(hbm_pa_b, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=out_pa)
tl.wait(h)
register_kernel("test_single_tile", single_tile_kernel)
engine1 = _engine()
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
done1 = engine1._env.event()
txn1 = Transaction(
request=KernelLaunchMsg(
correlation_id="t", request_id="st",
kernel_ref=KernelRef(name="test_single_tile", kind="builtin"),
args=(),
),
path=[pe_cpu_id], step=0, nbytes=0, done=done1,
)
def inject1():
yield engine1._components[pe_cpu_id]._inbox.put(txn1)
yield done1
engine1._env.process(inject1())
engine1._env.run()
single_ns = engine1._env.now
# Multi K-tile (K=128, needs 2 TILE_K=64 tiles)
clear_registry()
def multi_tile_kernel(tl):
a = tl.load(hbm_pa_a, shape=(32, 128), dtype="f16")
b = tl.ref(hbm_pa_b, shape=(128, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=out_pa)
tl.wait(h)
register_kernel("test_multi_tile", multi_tile_kernel)
engine2 = _engine()
done2 = engine2._env.event()
txn2 = Transaction(
request=KernelLaunchMsg(
correlation_id="t", request_id="mt",
kernel_ref=KernelRef(name="test_multi_tile", kind="builtin"),
args=(),
),
path=[pe_cpu_id], step=0, nbytes=0, done=done2,
)
def inject2():
yield engine2._components[pe_cpu_id]._inbox.put(txn2)
yield done2
engine2._env.process(inject2())
engine2._env.run()
multi_ns = engine2._env.now
assert multi_ns > single_ns, (
f"Multi-tile ({multi_ns}ns) should take longer than single-tile ({single_ns}ns)"
)
clear_registry()
# ── 16. Stage 3: tl.ref() generates no DMA command ───────────────
def test_tl_ref_no_dma():
"""tl.ref() creates TensorHandle but does NOT emit a DMA command."""
from kernbench.triton_emu.tl_context import TLContext
tl = TLContext(pe_id=0, dispatch_cycles=0)
handle = tl.ref(0x1000, shape=(4, 4), dtype="f16")
assert handle.addr == 0x1000
assert handle.shape == (4, 4)
assert len(tl.commands) == 0, f"tl.ref should emit 0 commands, got {len(tl.commands)}"
# ── 17. Stage 4: M_CPU kernel launch fan-out ──────────────────────
def test_mcpu_kernel_launch_fanout():
"""M_CPU routes KernelLaunchMsg to PE_CPU via NOC, PE executes, response returns.
Full pipeline: Host → PCIE_EP → IO_CPU → M_CPU → NOC → PE_CPU → engines
"""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def simple_kernel(a_ptr, tl):
tl.load(a_ptr, shape=(4, 4), dtype="f16")
register_kernel("test_mcpu_kernel", simple_kernel)
engine = _engine()
shard = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa, nbytes=32, offset_bytes=0,
)
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="mcpu_launch",
kernel_ref=KernelRef(name="test_mcpu_kernel", kind="builtin"),
args=(TensorArg(shards=(shard,)),),
target_cubes=(0,), target_pe=0,
)
h = engine.submit(launch_msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0, f"Kernel launch should take > 0ns, got {trace['total_ns']}"
clear_registry()
# ── 18. Stage 4: M_CPU kernel launch with composite GEMM ──────────
def test_mcpu_kernel_launch_composite():
"""Full pipeline kernel launch with composite GEMM through M_CPU."""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def gemm_kernel(a_ptr, tl):
a = tl.load(a_ptr, shape=(32, 64), dtype="f16")
b = tl.ref(a_ptr, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=a_ptr)
tl.wait(h)
register_kernel("test_mcpu_composite", gemm_kernel)
engine = _engine()
shard = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa, nbytes=32, offset_bytes=0,
)
launch_msg = KernelLaunchMsg(
correlation_id="test", request_id="mcpu_composite",
kernel_ref=KernelRef(name="test_mcpu_composite", kind="builtin"),
args=(TensorArg(shards=(shard,)),),
target_cubes=(0,), target_pe=0,
)
h = engine.submit(launch_msg)
engine.wait(h)
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
clear_registry()
# ── 19. Stage 5: QKV GEMM benchmark completion ────────────────────
@pytest.mark.skip(reason="Cross-SIP PE_TCM access not supported with router mesh topology")
def test_qkv_gemm_bench_completes():
"""The qkv_gemm benchmark runs to completion without error."""
clear_registry()
from benches.qkv_gemm import run as bench_run
from kernbench.runtime_api.context import RuntimeContext
graph = load_topology(TOPOLOGY_PATH)
engine = GraphEngine(graph)
ctx = RuntimeContext(
engine=engine,
target_device="sip0",
correlation_id="bench_test",
spec=graph.spec,
)
bench_run(ctx)
ctx.wait_all()
# All handles should have completed
for h in ctx.handles():
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
# Trace collection: deploy + kernel phases
assert len(ctx._traces) > 0
deploy_traces = [t for t in ctx._traces if t["phase"] in ("deploy", "memory_write")]
kernel_traces = [t for t in ctx._traces if t["phase"] == "kernel"]
assert len(deploy_traces) >= 2 # at least a, b (out is empty, no deploy)
assert len(kernel_traces) >= 1 # one per SIP (2 SIPs in topology)
assert kernel_traces[0]["name"] == "qkv_gemm"
assert kernel_traces[0]["total_ns"] > 0
clear_registry()
# ── 20. Stage 4: M_CPU multi-PE kernel launch fan-out ─────────────
def test_mcpu_multi_pe_kernel_launch():
"""M_CPU fans out KernelLaunchMsg to all 8 PEs when target_pe='all'.
Validates:
- All PEs execute the kernel (latency > 0)
- Multi-PE latency >= single-PE latency (parallel but NOC contention)
"""
clear_registry()
hbm_pa = _hbm_pa(sip=0, cube=0, pe_id=0)
def simple_kernel(a_ptr, tl):
tl.load(a_ptr, shape=(4, 4), dtype="f16")
register_kernel("test_multi_pe", simple_kernel)
# Single PE baseline
engine1 = _engine()
shard = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa, nbytes=32, offset_bytes=0,
)
h1 = engine1.submit(KernelLaunchMsg(
correlation_id="t", request_id="single",
kernel_ref=KernelRef(name="test_multi_pe", kind="builtin"),
args=(TensorArg(shards=(shard,)),),
target_cubes=(0,), target_pe=0,
))
engine1.wait(h1)
comp1, trace1 = engine1.get_completion(h1)
single_ns = trace1["total_ns"]
# Multi PE (all 8)
engine2 = _engine()
h2 = engine2.submit(KernelLaunchMsg(
correlation_id="t", request_id="multi",
kernel_ref=KernelRef(name="test_multi_pe", kind="builtin"),
args=(TensorArg(shards=(shard,)),),
target_cubes=(0,), target_pe="all",
))
engine2.wait(h2)
comp2, trace2 = engine2.get_completion(h2)
multi_ns = trace2["total_ns"]
assert comp1.ok is True
assert comp2.ok is True
assert single_ns > 0
assert multi_ns > 0
assert multi_ns >= single_ns, (
f"Multi-PE ({multi_ns}ns) should be >= single-PE ({single_ns}ns)"
)
clear_registry()
# ── 21. Stage 5: QKV GEMM multi-PE benchmark completion ──────────
@pytest.mark.skip(reason="Cross-SIP PE_TCM access not supported with router mesh topology")
def test_qkv_gemm_bench_multi_pe_completes():
"""The qkv_gemm_multi_pe benchmark runs to completion without error."""
clear_registry()
from benches.qkv_gemm_multi_pe import run as bench_run
from kernbench.runtime_api.context import RuntimeContext
graph = load_topology(TOPOLOGY_PATH)
engine = GraphEngine(graph)
ctx = RuntimeContext(
engine=engine,
target_device="sip0",
correlation_id="bench_multi_pe",
spec=graph.spec,
)
bench_run(ctx)
ctx.wait_all()
for h in ctx.handles():
comp, trace = engine.get_completion(h)
assert comp.ok is True
assert trace["total_ns"] > 0
# Multi-PE: 8 PEs, deploy traces per PE + kernel
deploy_traces = [t for t in ctx._traces if t["phase"] in ("deploy", "memory_write")]
kernel_traces = [t for t in ctx._traces if t["phase"] == "kernel"]
assert len(deploy_traces) >= 8 # replicate(a)*8 + column_wise(b)*8
assert len(kernel_traces) >= 1 # one per SIP
assert kernel_traces[0]["target_pe"] == "all"
clear_registry()
def test_report_format():
"""format_report produces readable output with TFLOPS and BW."""
from kernbench.cli.report import format_report
traces = [
{"phase": "deploy", "name": "a", "pe": 0, "nbytes": 65536, "total_ns": 25.0},
{"phase": "deploy", "name": "b", "pe": 0, "nbytes": 65536, "total_ns": 25.0},
{"phase": "kernel", "name": "qkv_gemm", "target_pe": 0,
"scalars": [128, 256, 128], "total_ns": 100.0},
]
report = format_report(traces, title="qkv_gemm")
assert "qkv_gemm" in report
assert "deploy" in report
assert "kernel" in report
assert "TFLOPS" in report
# GEMM TFLOPS: 2*128*256*128 / (100ns * 1e-9) / 1e12 = 83.886
assert "83.886" in report
# BW: 65536 / 25.0 = 2621.4 GB/s
assert "2621.4" in report
# Util% column present
assert "Util" in report
# ── 22. Multi-CUBE kernel launch (ADR-0013 V4) ──────────────────
def test_multi_cube_kernel_launch():
"""IO_CPU fans out KernelLaunchMsg to M_CPUs in two different cubes.
Validates ADR-0013 V4 (multi-CUBE within SIP):
- Shards in cube=0 and cube=1, each targeting pe=0
- Completion ok=True, total_ns > 0
- Multi-cube latency >= single-cube latency (inter-cube UCIe adds cost)
- Deterministic across runs
"""
clear_registry()
hbm_pa_c0 = _hbm_pa(sip=0, cube=0, pe_id=0)
hbm_pa_c1 = _hbm_pa(sip=0, cube=1, pe_id=0)
def simple_kernel(a_ptr, tl):
tl.load(a_ptr, shape=(4, 4), dtype="f16")
register_kernel("test_multi_cube", simple_kernel)
# Single-cube baseline
engine1 = _engine()
shard_single = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa_c0, nbytes=32, offset_bytes=0,
)
h1 = engine1.submit(KernelLaunchMsg(
correlation_id="t", request_id="single_cube",
kernel_ref=KernelRef(name="test_multi_cube", kind="builtin"),
args=(TensorArg(shards=(shard_single,)),),
target_cubes=(0,), target_pe=0,
))
engine1.wait(h1)
comp1, trace1 = engine1.get_completion(h1)
single_ns = trace1["total_ns"]
# Multi-cube: shards in cube=0 and cube=1
engine2 = _engine()
shard_c0 = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa_c0, nbytes=32, offset_bytes=0,
)
shard_c1 = TensorArgShard(
sip=0, cube=1, pe=0,
pa=hbm_pa_c1, nbytes=32, offset_bytes=0,
)
h2 = engine2.submit(KernelLaunchMsg(
correlation_id="t", request_id="multi_cube",
kernel_ref=KernelRef(name="test_multi_cube", kind="builtin"),
args=(TensorArg(shards=(shard_c0, shard_c1)),),
target_pe=0,
))
engine2.wait(h2)
comp2, trace2 = engine2.get_completion(h2)
multi_ns = trace2["total_ns"]
assert comp1.ok is True
assert comp2.ok is True
assert single_ns > 0
assert multi_ns > 0
assert multi_ns >= single_ns - 0.01, (
f"Multi-cube ({multi_ns}ns) should be >= single-cube ({single_ns}ns)"
)
# Determinism check
engine3 = _engine()
h3 = engine3.submit(KernelLaunchMsg(
correlation_id="t", request_id="multi_cube_det",
kernel_ref=KernelRef(name="test_multi_cube", kind="builtin"),
args=(TensorArg(shards=(shard_c0, shard_c1)),),
target_pe=0,
))
engine3.wait(h3)
_, trace3 = engine3.get_completion(h3)
assert trace2["total_ns"] == trace3["total_ns"], (
f"Determinism violation: {trace2['total_ns']} != {trace3['total_ns']}"
)
clear_registry()
# ── 23. Multi-SIP kernel launch (ADR-0013 V4) ───────────────────
def test_multi_sip_kernel_launch():
"""Engine submits KernelLaunchMsg to two SIPs via separate PCIE_EPs.
Validates ADR-0013 V4 (multi-SIP tray):
- Shards in sip=0/cube=0/pe=0 and sip=1/cube=0/pe=0
- Completion ok=True, total_ns > 0
- Multi-SIP latency >= single-SIP latency
- Deterministic across runs
"""
clear_registry()
hbm_pa_s0 = _hbm_pa(sip=0, cube=0, pe_id=0)
hbm_pa_s1 = _hbm_pa(sip=1, cube=0, pe_id=0)
# Kernel uses cycles only — no HBM access, so it runs correctly on any SIP
# (a real multi-SIP kernel would use SIP-local PA for each PE)
def simple_kernel(a_ptr, tl):
tl.cycles(50)
register_kernel("test_multi_sip", simple_kernel)
# Single-SIP baseline
engine1 = _engine()
shard_single = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa_s0, nbytes=32, offset_bytes=0,
)
h1 = engine1.submit(KernelLaunchMsg(
correlation_id="t", request_id="single_sip",
kernel_ref=KernelRef(name="test_multi_sip", kind="builtin"),
args=(TensorArg(shards=(shard_single,)),),
target_cubes=(0,), target_pe=0,
))
engine1.wait(h1)
comp1, trace1 = engine1.get_completion(h1)
single_ns = trace1["total_ns"]
# Multi-SIP: shards in sip=0 and sip=1
engine2 = _engine()
shard_s0 = TensorArgShard(
sip=0, cube=0, pe=0,
pa=hbm_pa_s0, nbytes=32, offset_bytes=0,
)
shard_s1 = TensorArgShard(
sip=1, cube=0, pe=0,
pa=hbm_pa_s1, nbytes=32, offset_bytes=0,
)
h2 = engine2.submit(KernelLaunchMsg(
correlation_id="t", request_id="multi_sip",
kernel_ref=KernelRef(name="test_multi_sip", kind="builtin"),
args=(TensorArg(shards=(shard_s0, shard_s1)),),
target_pe=0,
))
engine2.wait(h2)
comp2, trace2 = engine2.get_completion(h2)
multi_ns = trace2["total_ns"]
assert comp1.ok is True
assert comp2.ok is True
assert single_ns > 0
assert multi_ns > 0
assert multi_ns >= single_ns, (
f"Multi-SIP ({multi_ns}ns) should be >= single-SIP ({single_ns}ns)"
)
# Determinism check
engine3 = _engine()
h3 = engine3.submit(KernelLaunchMsg(
correlation_id="t", request_id="multi_sip_det",
kernel_ref=KernelRef(name="test_multi_sip", kind="builtin"),
args=(TensorArg(shards=(shard_s0, shard_s1)),),
target_pe=0,
))
engine3.wait(h3)
_, trace3 = engine3.get_completion(h3)
assert trace2["total_ns"] == trace3["total_ns"], (
f"Determinism violation: {trace2['total_ns']} != {trace3['total_ns']}"
)
clear_registry()