Files
kernbench2/tests/test_kernel_runner.py
ywkang 51004c311c Implement ADR-0020: 2-pass data execution with greenlet kernel runner
Step 1 — Foundation:
- OpRecord/OpLogger: op log infrastructure with t_start stable ordering
- MemoryStore: numpy ndarray tensor-granular storage (reference semantics)
- data_op=True flag on DmaReadCmd, DmaWriteCmd, GemmCmd, MathCmd, CompositeCmd
- numpy/greenlet dependencies added to pyproject.toml

Step 2 — ComponentBase hooks:
- _on_process_start/end hooks in _forward_txn (fabric messages)
- _handle_with_hooks in PeEngineBase (PE-internal commands)
- op_logger optional — zero overhead when disabled

Step 3 — KernelRunner + greenlet:
- KernelRunner: greenlet ↔ SimPy bridge in triton_emu/kernel_runner.py
- TLContext: _emit() method routes to greenlet switch or command list
- tl.load() returns real numpy data in greenlet mode
- Dynamic control flow supported (memory-read based branching)

Step 4 — PE_CPU integration:
- Greenlet mode when ctx.memory_store is set, legacy fallback otherwise
- Refactored into _execute_greenlet/_execute_legacy/_send_response
- ComponentContext gains memory_store and op_logger fields

Step 5 — DataExecutor:
- Phase 2 numpy execution for GEMM/Math ops from op_log
- _compute_math: all unary/binary/reduction ops
- verify(): compare MemoryStore against expected with dtype tolerance

28 new tests, 366 total passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 00:22:44 -07:00

141 lines
4.0 KiB
Python

"""Tests for KernelRunner greenlet-based execution (ADR-0020 D3)."""
import numpy as np
import simpy
from kernbench.sim_engine.memory_store import MemoryStore
from kernbench.triton_emu.kernel_runner import KernelRunner
def _make_runner(env, store=None):
"""Create a minimal KernelRunner with mock scheduler port."""
scheduler_id = "sip0.cube0.pe0.pe_scheduler"
out_ports = {scheduler_id: simpy.Store(env)}
runner = KernelRunner(
pe_prefix="sip0.cube0.pe0",
pe_idx=0, sip_idx=0, cube_idx=0,
scheduler_id=scheduler_id,
out_ports=out_ports,
store=store,
)
return runner, out_ports[scheduler_id]
def _mock_scheduler(env, inbox):
"""Consume PeInternalTxn from inbox and immediately succeed."""
while True:
pe_txn = yield inbox.get()
pe_txn.done.succeed()
def test_kernel_runner_basic_load():
"""Kernel with tl.load runs through greenlet without hanging."""
env = simpy.Environment()
store = MemoryStore()
data = np.ones((4, 4), dtype=np.float16)
store.write("hbm", 0x1000, data)
runner, sched_port = _make_runner(env, store)
env.process(_mock_scheduler(env, sched_port))
def kernel(a_ptr, tl):
a = tl.load(a_ptr, (4, 4), "f16")
assert a.data is not None
assert a.data.shape == (4, 4)
def run():
yield from runner.run(env, kernel, [0x1000], num_programs=1)
env.process(run())
env.run()
def test_kernel_runner_load_returns_data():
"""tl.load returns actual numpy data from MemoryStore."""
env = simpy.Environment()
store = MemoryStore()
data = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float16)
store.write("hbm", 0x2000, data)
runner, sched_port = _make_runner(env, store)
env.process(_mock_scheduler(env, sched_port))
results = {}
def kernel(ptr, tl):
a = tl.load(ptr, (2, 2), "f16")
results["data"] = a.data
def run():
yield from runner.run(env, kernel, [0x2000], num_programs=1)
env.process(run())
env.run()
assert results["data"] is data # reference equality
def test_kernel_runner_composite():
"""Composite commands pass through without blocking kernel."""
env = simpy.Environment()
runner, sched_port = _make_runner(env)
env.process(_mock_scheduler(env, sched_port))
def kernel(a_ptr, b_ptr, out_ptr, tl):
a = tl.ref(a_ptr, (4, 8), "f16")
b = tl.ref(b_ptr, (8, 4), "f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=out_ptr)
tl.wait(h)
def run():
yield from runner.run(env, kernel, [0, 64, 128], num_programs=1)
env.process(run())
env.run()
def test_kernel_runner_dynamic_branch():
"""Kernel can branch based on loaded data (ADR-0020 D3)."""
env = simpy.Environment()
store = MemoryStore()
store.write("hbm", 0x100, np.array([1.0], dtype=np.float32))
store.write("hbm", 0x200, np.array([0.0], dtype=np.float32))
runner, sched_port = _make_runner(env, store)
env.process(_mock_scheduler(env, sched_port))
results = {"branch": None}
def kernel(flag_ptr, tl):
flag = tl.load(flag_ptr, (1,), "f32")
if flag.data is not None and flag.data[0] > 0.5:
results["branch"] = "taken"
else:
results["branch"] = "not_taken"
# Test with flag=1.0 → branch taken
def run():
yield from runner.run(env, kernel, [0x100], num_programs=1)
env.process(run())
env.run()
assert results["branch"] == "taken"
def test_kernel_runner_no_store():
"""Without MemoryStore, tl.load returns handle with data=None."""
env = simpy.Environment()
runner, sched_port = _make_runner(env, store=None)
env.process(_mock_scheduler(env, sched_port))
results = {}
def kernel(ptr, tl):
a = tl.load(ptr, (4,), "f16")
results["data"] = a.data
def run():
yield from runner.run(env, kernel, [0], num_programs=1)
env.process(run())
env.run()
assert results["data"] is None