Files
kernbench2/tests/test_e2e_pipeline.py
ywkang 687c98086d ADR housekeeping: category prefixes, lifecycle folders, retroactive 0034-0037
Filename + lifecycle:
- ADR rename to ADR-NNNN-<cat>-title.md with 8 3-letter category prefixes
  (dev / mem / lat / prog / algo / par / api / ver). Numbers stay immutable.
- ADR Lifecycle split into 3 folders, documented in CLAUDE.md Part 2:
  docs/adr/ (Accepted), docs/adr-proposed/ (Proposed/Stub/Draft),
  docs/adr-history/ (Superseded/Merged). Status field gains "Draft" for
  retroactive docs pending verification.

Merges (one ADR per topic, no change-history annotations):
- ADR-0017 absorbs ADR-0019 (Cube NOC + per-PE HBM connectivity, 10 D-items)
- ADR-0014 absorbs ADR-0021 (PE pipeline execution model, 8 D-items incl.
  TileToken self-routing and multi-op composite epilogue scope)
- ADR-0023 absorbs docs/ipcq-dma-codesign-hw.md as new "HW Realization
  Notes (Informative)" section (D16-D23 + Open HW Questions). codesign-hw.md
  deleted; ADR-0019/0021 moved to adr-history with one-line stub status

Retroactive documentation (G4 closures, code-verified):
- ADR-0037 forwarding component (TransitComponent: first-flit overhead,
  serial worker, path-based routing, single impl/multiple names)
- ADR-0036 IO_CPU component (target_start_ns global barrier stamping,
  per-cube fan-out, response aggregation)
- ADR-0035 M_CPU & M_CPU.DMA component (3 fan-out paths, DMA Resources,
  target_start_ns passthrough)
- ADR-0034 HBM controller internal design (per-PC state, address-based
  selection, flit-aware per-flit commit, async finalize, command-only
  fallback path)

Content updates:
- ADR-0010 expanded to full CLI surface (run/probe/web), retitled
  "Command Line Interface and Execution Semantics"
- ADR-0007 D2 rewritten to current state; ADR-0015 supersession notes pruned
- ADR-0005 wrapped in Decision header with D1-D5; ADR-0022 metadata
  block replaced with standard Status header
- ADR-0024 trimmed to rank=SIP launcher essentials (D1-D4);
  ADR-0027 cleaned of supersession history
- ADR-0033 D6 cleanup: address-based PC selection moved out of future-work
  (now documented in ADR-0034 D3); related D1/D3 wording realigned
- Cross-references back-filled in 5 ADRs (G3 gaps closed)

Onboarding docs split:
- docs/onboarding/ created
- moved: hw-architecture-overview.md, latency-model.md, di-presentation.md,
  ccl-author-guide{,.en}.md
- references updated in README, ADR-0023{,.en}, src/kernbench/ccl/__init__.py

Source / test / yaml: ADR-NNNN cross-references in docstrings and YAML
comments updated after the merges (ADR-0021->0014 D6, ADR-0019->0017 D8).
No behavior change.

Tooling:
- tools/verify_adr_lang_pairs.py + tests/test_verify_adr_lang_pairs.py
  (ADR EN/KO pair invariant checker)
- .claude/commands/report.md tracked (/report slash command)
- .gitignore: allow .claude/commands/*.md while keeping settings files ignored

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 01:15:55 -07:00

236 lines
7.9 KiB
Python

"""End-to-end pipeline tests (ADR-0020 + ADR-0014).
Verifies:
1. Actual benchmark kernel → greenlet mode → op_log → DataExecutor → accuracy
2. CompositeCmd → _feed_loop → DMA → FETCH_STORE → GEMM → STORE → DMA_WB chain
3. Latency regression: new builtin produces reasonable latency
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
from kernbench.sim_engine.engine import GraphEngine
from kernbench.sim_engine.transaction import Transaction
from kernbench.topology.builder import load_topology
from kernbench.triton_emu.registry import clear_registry, register_kernel
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _graph():
return load_topology(TOPOLOGY_PATH)
def _engine(enable_data=False):
return GraphEngine(_graph(), enable_data=enable_data)
def _hbm_pa(sip=0, cube=0, pe_id=0):
from kernbench.policy.address.phyaddr import PhysAddr
slice_bytes = 48 * (1 << 30) // 8
pa = PhysAddr.pe_hbm_addr(
sip_id=sip, die_id=cube, pe_id=pe_id,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
return pa.encode()
def _inject_kernel(engine, kernel_name, kernel_fn, *, args=(), enable_data=False):
"""Inject a kernel directly into PE_CPU inbox and run."""
from kernbench.runtime_api.kernel import KernelLaunchMsg, KernelRef
pe_cpu_id = "sip0.cube0.pe0.pe_cpu"
done = engine._env.event()
txn = Transaction(
request=KernelLaunchMsg(
correlation_id="test", request_id=kernel_name,
kernel_ref=KernelRef(name=kernel_name, kind="builtin"),
args=args,
),
path=[pe_cpu_id], step=0, nbytes=0, done=done,
)
def inject():
yield engine._components[pe_cpu_id]._inbox.put(txn)
yield done
engine._env.process(inject())
engine._env.run()
return engine._env.now, txn
# ── 1. CompositeCmd GEMM pipeline E2E ────────────────────────────────
def test_composite_gemm_pipeline_completes():
"""CompositeCmd(gemm) runs through full pipeline: scheduler → DMA → fetch → GEMM → store → DMA."""
clear_registry()
pa = _hbm_pa()
def kernel(tl):
a = tl.load(pa, shape=(32, 64), dtype="f16")
b = tl.ref(pa, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa)
tl.wait(h)
register_kernel("e2e_gemm", kernel)
engine = _engine()
latency_ns, _ = _inject_kernel(engine, "e2e_gemm", kernel)
assert latency_ns > 0, "Pipeline should take non-zero time"
def test_composite_math_pipeline_completes():
"""CompositeCmd(math) runs through full pipeline."""
clear_registry()
pa = _hbm_pa()
def kernel(tl):
a = tl.load(pa, shape=(32, 32), dtype="f16")
h = tl.composite(op="math", a=a, out_ptr=pa, math_op="exp")
tl.wait(h)
register_kernel("e2e_math", kernel)
engine = _engine()
latency_ns, _ = _inject_kernel(engine, "e2e_math", kernel)
assert latency_ns > 0
# ── 2. Greenlet mode + op_log generation ─────────────────────────────
def test_greenlet_mode_generates_op_log():
"""enable_data=True → greenlet mode → op_log records generated."""
clear_registry()
pa = _hbm_pa()
def kernel(tl):
a = tl.load(pa, shape=(32, 64), dtype="f16")
b = tl.ref(pa, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa)
tl.wait(h)
register_kernel("e2e_oplog", kernel)
engine = _engine(enable_data=True)
latency_ns, _ = _inject_kernel(engine, "e2e_oplog", kernel)
assert latency_ns > 0
records = engine.op_log
# Should have DMA read + composite/GEMM records
assert len(records) > 0
op_kinds = {r.op_kind for r in records}
assert "memory" in op_kinds, f"Expected memory ops, got {op_kinds}"
# ── 3. Phase 1 → Phase 2 accuracy (GEMM) ────────────────────────────
def test_phase1_phase2_gemm_accuracy():
"""Full pipeline: greenlet + MemoryStore → op_log → DataExecutor → verify."""
from kernbench.sim_engine.data_executor import DataExecutor
clear_registry()
pa_a = _hbm_pa(pe_id=0)
pa_b = _hbm_pa(pe_id=1) # different PE offset for distinct address
pa_out = _hbm_pa(pe_id=2)
# Prepare input data in MemoryStore
a_data = np.random.randn(32, 64).astype(np.float16)
b_data = np.random.randn(64, 32).astype(np.float16)
def kernel(tl):
a = tl.load(pa_a, shape=(32, 64), dtype="f16")
b = tl.ref(pa_b, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa_out)
tl.wait(h)
register_kernel("e2e_accuracy", kernel)
engine = _engine(enable_data=True)
# Seed MemoryStore with actual data
engine.memory_store.write("hbm", pa_a, a_data)
engine.memory_store.write("hbm", pa_b, b_data)
latency_ns, _ = _inject_kernel(engine, "e2e_accuracy", kernel)
assert latency_ns > 0
# Phase 2: execute GEMM ops from op_log
gemm_records = [r for r in engine.op_log if r.op_kind == "gemm"]
if len(gemm_records) > 0:
snap = engine.memory_store.snapshot()
executor = DataExecutor(gemm_records, snap)
executor.run()
# Verify: check that GEMM produced correct result
expected = (a_data.astype(np.float32) @ b_data.astype(np.float32)).astype(np.float16)
# The result address depends on tiling, check any gemm output
for r in gemm_records:
if "dst_addr" in r.params:
try:
result = snap.read("tcm", r.params["dst_addr"])
assert np.allclose(result, expected[:result.shape[0], :result.shape[1]],
rtol=1e-2, atol=1e-2), \
f"GEMM result mismatch at addr {r.params['dst_addr']}"
except KeyError:
pass # Address not in store (intermediate tile)
# ── 4. Latency regression ────────────────────────────────────────────
def test_latency_positive_and_consistent():
"""Multiple runs of same kernel produce same latency (deterministic)."""
clear_registry()
pa = _hbm_pa()
def kernel(tl):
a = tl.load(pa, shape=(32, 64), dtype="f16")
b = tl.ref(pa, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa)
tl.wait(h)
register_kernel("e2e_latency", kernel)
engine1 = _engine()
ns1, _ = _inject_kernel(engine1, "e2e_latency", kernel)
clear_registry()
register_kernel("e2e_latency", kernel)
engine2 = _engine()
ns2, _ = _inject_kernel(engine2, "e2e_latency", kernel)
assert ns1 > 0
assert ns1 == ns2, f"Non-deterministic latency: {ns1} vs {ns2}"
def test_multi_tile_latency_greater_than_single():
"""Multi-tile GEMM (K=128) takes longer than single-tile (K=64)."""
clear_registry()
pa = _hbm_pa()
def single_kernel(tl):
a = tl.load(pa, shape=(32, 64), dtype="f16")
b = tl.ref(pa, shape=(64, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa)
tl.wait(h)
register_kernel("e2e_single", single_kernel)
engine1 = _engine()
ns_single, _ = _inject_kernel(engine1, "e2e_single", single_kernel)
clear_registry()
def multi_kernel(tl):
a = tl.load(pa, shape=(32, 128), dtype="f16")
b = tl.ref(pa, shape=(128, 32), dtype="f16")
h = tl.composite(op="gemm", a=a, b=b, out_ptr=pa)
tl.wait(h)
register_kernel("e2e_multi", multi_kernel)
engine2 = _engine()
ns_multi, _ = _inject_kernel(engine2, "e2e_multi", multi_kernel)
assert ns_multi > ns_single, \
f"Multi-tile ({ns_multi:.1f}ns) should be > single-tile ({ns_single:.1f}ns)"