diff --git a/tests/test_e2e_pipeline.py b/tests/test_e2e_pipeline.py new file mode 100644 index 0000000..c6b24ed --- /dev/null +++ b/tests/test_e2e_pipeline.py @@ -0,0 +1,235 @@ +"""End-to-end pipeline tests (ADR-0020 + ADR-0021). + +Verifies: + 1. Actual benchmark kernel → greenlet mode → op_log → DataExecutor → accuracy + 2. CompositeCmd → _feed_loop → DMA → FETCH_STORE → GEMM → STORE → DMA_WB chain + 3. Latency regression: new builtin produces reasonable latency +""" +from __future__ import annotations + +from pathlib import Path + +import numpy as np + +from kernbench.sim_engine.engine import GraphEngine +from kernbench.sim_engine.transaction import Transaction +from kernbench.topology.builder import load_topology +from kernbench.triton_emu.registry import clear_registry, register_kernel + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + + +def _graph(): + return load_topology(TOPOLOGY_PATH) + + +def _engine(enable_data=False): + return GraphEngine(_graph(), enable_data=enable_data) + + +def _hbm_pa(sip=0, cube=0, pe_id=0): + from kernbench.policy.address.phyaddr import PhysAddr + slice_bytes = 48 * (1 << 30) // 8 + pa = PhysAddr.pe_hbm_addr( + rack_id=0, sip_id=sip, cube_id=cube, pe_id=pe_id, + pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, + ) + return pa.encode() + + +def _inject_kernel(engine, kernel_name, kernel_fn, *, args=(), enable_data=False): + """Inject a kernel directly into PE_CPU inbox and run.""" + from kernbench.runtime_api.kernel import KernelLaunchMsg, KernelRef + + pe_cpu_id = "sip0.cube0.pe0.pe_cpu" + done = engine._env.event() + txn = Transaction( + request=KernelLaunchMsg( + correlation_id="test", request_id=kernel_name, + kernel_ref=KernelRef(name=kernel_name, kind="builtin"), + args=args, + ), + path=[pe_cpu_id], step=0, nbytes=0, done=done, + ) + + def inject(): + yield engine._components[pe_cpu_id]._inbox.put(txn) + yield done + + engine._env.process(inject()) + engine._env.run() + return engine._env.now, txn + + +# ── 1. CompositeCmd GEMM pipeline E2E ──────────────────────────────── + + +def test_composite_gemm_pipeline_completes(): + """CompositeCmd(gemm) runs through full pipeline: scheduler → DMA → fetch → GEMM → store → DMA.""" + clear_registry() + pa = _hbm_pa() + + def kernel(tl): + a = tl.load(pa, shape=(32, 64), dtype="f16") + b = tl.ref(pa, shape=(64, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa) + tl.wait(h) + + register_kernel("e2e_gemm", kernel) + engine = _engine() + latency_ns, _ = _inject_kernel(engine, "e2e_gemm", kernel) + assert latency_ns > 0, "Pipeline should take non-zero time" + + +def test_composite_math_pipeline_completes(): + """CompositeCmd(math) runs through full pipeline.""" + clear_registry() + pa = _hbm_pa() + + def kernel(tl): + a = tl.load(pa, shape=(32, 32), dtype="f16") + h = tl.composite(op="math", a=a, out_ptr=pa, math_op="exp") + tl.wait(h) + + register_kernel("e2e_math", kernel) + engine = _engine() + latency_ns, _ = _inject_kernel(engine, "e2e_math", kernel) + assert latency_ns > 0 + + +# ── 2. Greenlet mode + op_log generation ───────────────────────────── + + +def test_greenlet_mode_generates_op_log(): + """enable_data=True → greenlet mode → op_log records generated.""" + clear_registry() + pa = _hbm_pa() + + def kernel(tl): + a = tl.load(pa, shape=(32, 64), dtype="f16") + b = tl.ref(pa, shape=(64, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa) + tl.wait(h) + + register_kernel("e2e_oplog", kernel) + engine = _engine(enable_data=True) + latency_ns, _ = _inject_kernel(engine, "e2e_oplog", kernel) + + assert latency_ns > 0 + records = engine.op_log + # Should have DMA read + composite/GEMM records + assert len(records) > 0 + op_kinds = {r.op_kind for r in records} + assert "memory" in op_kinds, f"Expected memory ops, got {op_kinds}" + + +# ── 3. Phase 1 → Phase 2 accuracy (GEMM) ──────────────────────────── + + +def test_phase1_phase2_gemm_accuracy(): + """Full pipeline: greenlet + MemoryStore → op_log → DataExecutor → verify.""" + from kernbench.sim_engine.data_executor import DataExecutor + + clear_registry() + pa_a = _hbm_pa(pe_id=0) + pa_b = _hbm_pa(pe_id=1) # different PE offset for distinct address + pa_out = _hbm_pa(pe_id=2) + + # Prepare input data in MemoryStore + a_data = np.random.randn(32, 64).astype(np.float16) + b_data = np.random.randn(64, 32).astype(np.float16) + + def kernel(tl): + a = tl.load(pa_a, shape=(32, 64), dtype="f16") + b = tl.ref(pa_b, shape=(64, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa_out) + tl.wait(h) + + register_kernel("e2e_accuracy", kernel) + engine = _engine(enable_data=True) + + # Seed MemoryStore with actual data + engine.memory_store.write("hbm", pa_a, a_data) + engine.memory_store.write("hbm", pa_b, b_data) + + latency_ns, _ = _inject_kernel(engine, "e2e_accuracy", kernel) + assert latency_ns > 0 + + # Phase 2: execute GEMM ops from op_log + gemm_records = [r for r in engine.op_log if r.op_kind == "gemm"] + if len(gemm_records) > 0: + snap = engine.memory_store.snapshot() + executor = DataExecutor(gemm_records, snap) + executor.run() + + # Verify: check that GEMM produced correct result + expected = (a_data.astype(np.float32) @ b_data.astype(np.float32)).astype(np.float16) + # The result address depends on tiling, check any gemm output + for r in gemm_records: + if "dst_addr" in r.params: + try: + result = snap.read("tcm", r.params["dst_addr"]) + assert np.allclose(result, expected[:result.shape[0], :result.shape[1]], + rtol=1e-2, atol=1e-2), \ + f"GEMM result mismatch at addr {r.params['dst_addr']}" + except KeyError: + pass # Address not in store (intermediate tile) + + +# ── 4. Latency regression ──────────────────────────────────────────── + + +def test_latency_positive_and_consistent(): + """Multiple runs of same kernel produce same latency (deterministic).""" + clear_registry() + pa = _hbm_pa() + + def kernel(tl): + a = tl.load(pa, shape=(32, 64), dtype="f16") + b = tl.ref(pa, shape=(64, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa) + tl.wait(h) + + register_kernel("e2e_latency", kernel) + + engine1 = _engine() + ns1, _ = _inject_kernel(engine1, "e2e_latency", kernel) + + clear_registry() + register_kernel("e2e_latency", kernel) + engine2 = _engine() + ns2, _ = _inject_kernel(engine2, "e2e_latency", kernel) + + assert ns1 > 0 + assert ns1 == ns2, f"Non-deterministic latency: {ns1} vs {ns2}" + + +def test_multi_tile_latency_greater_than_single(): + """Multi-tile GEMM (K=128) takes longer than single-tile (K=64).""" + clear_registry() + pa = _hbm_pa() + + def single_kernel(tl): + a = tl.load(pa, shape=(32, 64), dtype="f16") + b = tl.ref(pa, shape=(64, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa) + tl.wait(h) + + register_kernel("e2e_single", single_kernel) + engine1 = _engine() + ns_single, _ = _inject_kernel(engine1, "e2e_single", single_kernel) + + clear_registry() + + def multi_kernel(tl): + a = tl.load(pa, shape=(32, 128), dtype="f16") + b = tl.ref(pa, shape=(128, 32), dtype="f16") + h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa) + tl.wait(h) + + register_kernel("e2e_multi", multi_kernel) + engine2 = _engine() + ns_multi, _ = _inject_kernel(engine2, "e2e_multi", multi_kernel) + + assert ns_multi > ns_single, \ + f"Multi-tile ({ns_multi:.1f}ns) should be > single-tile ({ns_single:.1f}ns)"