"""ADR-0009 D5 invariant: all PEs targeted by a single kernel launch MUST begin executing the kernel body at the same simulated time, regardless of their dispatch path length. These tests directly verify the invariant by capturing per-PE state at the top of `_execute_kernel`: test_no_pe_arrives_after_target_start_ns Asserts: for every PE that enters _execute_kernel during a multi-cube launch, `env.now` at entry must be <= target_start_ns. Otherwise the PE's barrier yield would be a no-op and `pe_exec_start` would be set late, breaking the D5 "same simulated time" mandate. test_all_pes_have_identical_pe_exec_start Asserts: every PE's `pe_exec_start` (the value of `env.now` recorded immediately AFTER the barrier yield) is identical across all PEs in the launch. Both tests are expected to FAIL today and become the regression check the Phase 2 D5 predictor + fallback fix must make pass. """ from __future__ import annotations from pathlib import Path import numpy as np import pytest from kernbench.policy.placement.dp import DPPolicy from kernbench.runtime_api.context import RuntimeContext from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" def _capture_per_pe_d5_state(): """Monkey-patch PeCpuComponent._execute_kernel to record, per PE: - entry_now: env.now at function entry (before any yield) - target_start_ns: the value carried by the request - barrier_yielded: True if the barrier yield fired (entry_now < target) - pe_exec_start: env.now immediately after the barrier check (i.e. the value the original code sets) Returns (records: list[dict], restore: callable). """ import kernbench.components.builtin.pe_cpu as pe_cpu_mod records: list[dict] = [] original = pe_cpu_mod.PeCpuComponent._execute_kernel def patched(self, env, txn): request = txn.request target_start = getattr(request, "target_start_ns", None) entry_now = float(env.now) rec = { "node_id": self.node.id, "entry_now": entry_now, "target_start_ns": ( float(target_start) if target_start is not None else None ), "barrier_yielded": ( target_start is not None and float(target_start) > entry_now ), "pe_exec_start": None, # filled below by sniff "late_ns": ( None if target_start is None else max(0.0, entry_now - float(target_start)) ), } records.append(rec) # We can't easily inject a callback at the original's # `pe_exec_start = env.now` line without rewriting it. Approximate: # if the original yields the barrier, env.now after the yield is # target_start_ns; otherwise pe_exec_start is entry_now (skipped). if rec["barrier_yielded"]: rec["pe_exec_start"] = float(target_start) else: rec["pe_exec_start"] = entry_now yield from original(self, env, txn) pe_cpu_mod.PeCpuComponent._execute_kernel = patched def restore(): pe_cpu_mod.PeCpuComponent._execute_kernel = original return records, restore def _run_multicube_launch(): """Drive a no-op kernel launch across all 16 cubes x 8 PEs and return the per-PE D5 records collected by the monkey-patch.""" records, restore = _capture_per_pe_d5_state() try: topo = resolve_topology(str(TOPOLOGY_PATH)) engine = GraphEngine(topo.topology_obj, enable_data=True) spec = topo.topology_obj.spec with RuntimeContext( engine=engine, target_device=DeviceSelector("all"), correlation_id="d5_barrier", spec=spec, ) as ctx: dp = DPPolicy( cube="row_wise", pe="column_wise", num_cubes=16, num_pes=8, ) def kernel(t_ptr, n_elem, tl): pass # no-op ctx.ahbm.set_device(0) t = ctx.zeros( (16, 8 * 64), dtype="f16", dp=dp, name="probe", ) t.copy_(ctx.from_numpy( np.zeros((16, 8 * 64), dtype=np.float16), )) pending = ctx.launch( "d5_probe", kernel, t, 64, _defer_wait=True, ) for h, _sip, meta in pending: ctx.wait(h, _meta=meta) finally: restore() return records def test_no_pe_arrives_after_target_start_ns(): """ADR-0009 D5: no PE may enter `_execute_kernel` after target_start_ns. Today this fails because IO_CPU's predictor under-shoots actual dispatch latency for far cubes (cube4, cube9-15). Phase 2 fix: chain-aware predictor in IO_CPU + monotonic upward re-stamp in M_CPU. """ records = _run_multicube_launch() assert records, "expected per-PE _execute_kernel records" late = [ r for r in records if r["target_start_ns"] is not None and r["late_ns"] is not None and r["late_ns"] > 1e-6 ] if late: # Provide actionable diagnostic in the failure. worst = sorted(late, key=lambda r: -r["late_ns"])[:5] details = "\n".join( f" {r['node_id']}: late by {r['late_ns']:.2f} ns " f"(entry_now={r['entry_now']:.2f}, " f"target_start_ns={r['target_start_ns']:.2f})" for r in worst ) pytest.fail( f"ADR-0009 D5 violated: {len(late)}/{len(records)} PEs " f"entered _execute_kernel AFTER target_start_ns " f"(barrier yield silently skipped). " f"Worst offenders:\n{details}" ) def test_all_pes_have_identical_pe_exec_start(): """ADR-0009 D5: every PE's pe_exec_start must be identical. With D5 honored, every PE either yields to target_start_ns (start = target_start_ns) or, if late, would still be aligned by the M_CPU upward re-stamp (Phase 2). Today: 75/128 PEs in this launch have distinct pe_exec_start values because they skipped the barrier. """ records = _run_multicube_launch() assert records, "expected per-PE _execute_kernel records" starts = sorted({round(r["pe_exec_start"], 6) for r in records}) if len(starts) > 1: spread = max(starts) - min(starts) # Distribution of how many PEs at each distinct start time from collections import Counter bucket = Counter(round(r["pe_exec_start"], 6) for r in records) details = "\n".join( f" pe_exec_start={t}: {n} PEs" for t, n in sorted(bucket.items()) ) pytest.fail( f"ADR-0009 D5 violated: PEs have {len(starts)} distinct " f"pe_exec_start values (spread = {spread:.2f} ns); " f"D5 mandates a single common value. " f"Distribution:\n{details}" )