From c1a5cf3a2ae88d98777113f06e60d3cd3c144031 Mon Sep 17 00:00:00 2001 From: Mukesh Garg Date: Mon, 27 Apr 2026 15:12:58 -0700 Subject: [PATCH] ADR-0009 D5: chain-aware target_start_ns + zero-byte launch fanout The single-walk predictor (find_node_path(io_cpu, pe_cpu) + compute_path_latency_ns) under-shot actual dispatch latency for far cubes -- the routing graph could pick a path bypassing M_CPU, and non-zero-nbytes launch sub-txns serialized on shared first hops. Far PEs arrived at _execute_kernel after target_start_ns, silently skipped the barrier yield, and started pe_exec_start late. Their reported pe_exec_ns under-counted by exactly the late_ns amount (63 ns observed at h4 cube4.pe0 in the IPCQ test, up to 113 ns worst case for cubes 9-11), producing the suspicious flat region in the h4 IPCQ curve at 8192/10240 bytes. Fix: - IO_CPU predictor uses the explicit two-leg chain (IO_CPU->M_CPU + M_CPU->PE_CPU - io.overhead - m.overhead), so every PE on every targeted cube has a barrier >= its real dispatch arrival. - Kernel-launch fanout sub-txns carry nbytes=0 (control-plane, not data-plane), removing the per-cube fanout serialization that pushed far M_CPUs past the predictor. - Legacy io_cpu mirror updated. ADR-0009 D5 mechanism updated to specify the two-leg formula and the nbytes=0 requirement. New tests/test_d5_barrier_invariant.py asserts (a) no PE enters _execute_kernel after target_start_ns and (b) every PE in a multi-cube launch has identical pe_exec_start -- both regressions silently pass on the existing tests/test_kernel_launch_sync.py because that test only inspects post-aggregation max(pe_exec_ns). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ADR-0009-kernel-execution-messaging.md | 35 +++- src/kernbench/components/builtin/io_cpu.py | 35 +++- .../components/legacy/builtin/io_cpu.py | 30 ++- tests/test_d5_barrier_invariant.py | 194 ++++++++++++++++++ 4 files changed, 275 insertions(+), 19 deletions(-) create mode 100644 tests/test_d5_barrier_invariant.py diff --git a/docs/adr/ADR-0009-kernel-execution-messaging.md b/docs/adr/ADR-0009-kernel-execution-messaging.md index eb99650..fae01b5 100644 --- a/docs/adr/ADR-0009-kernel-execution-messaging.md +++ b/docs/adr/ADR-0009-kernel-execution-messaging.md @@ -86,11 +86,36 @@ Mechanism. - `KernelLaunchMsg` carries an optional `target_start_ns: float | None`. - **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it - computes `target_start_ns = env.now + max_latency` where `max_latency` - is the maximum `ComponentContext.compute_path_latency_ns(path)` across - every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu, - pe_cpu_id)`. The stamped value is placed on the request carried by - every fanned-out sub-Transaction. + computes `target_start_ns = env.now + max_latency` where + `max_latency` is the maximum, over every target (sip, cube, pe) + tuple, of the **two-leg dispatch chain**: + + ``` + max_latency(sip, cube, pe) = + compute_path_latency_ns(find_node_path(io_cpu, m_cpu(sip, cube))) + + compute_path_latency_ns(find_node_path(m_cpu(sip, cube), pe_cpu)) + - io_cpu.overhead_ns + - m_cpu.overhead_ns + ``` + + This models the actual dispatch as **two sequential Transactions** + (IO_CPU → M_CPU, then M_CPU → PE_CPU). Each leg's + `compute_path_latency_ns` adds its endpoints' `overhead_ns`; + `io_cpu.overhead_ns` is subtracted because IO_CPU has already + paid it before this method runs, and `m_cpu.overhead_ns` is + subtracted once because it appears as endpoint of leg1 *and* + start of leg2 but is paid only once at run time. A single + `find_node_path(io_cpu, pe_cpu)` walk is **not** equivalent — + it can pick a graph path that bypasses M_CPU and silently + under-shoots the prediction for far cubes, breaking the D5 + invariant. + + The fanned-out sub-Transactions carry **`nbytes = 0`** for + `KernelLaunchMsg` (control message only). Without this, + large kernel-launch payloads would occupy fabric BW on the + shared first hop and serialize the per-cube dispatch, pushing + far M_CPUs past `target_start_ns` and re-introducing the + late-arrival violation. - **M_CPU** passes an already-stamped `target_start_ns` through unchanged. Only when the value is absent (e.g. a direct launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier diff --git a/src/kernbench/components/builtin/io_cpu.py b/src/kernbench/components/builtin/io_cpu.py index a98c227..4c21945 100644 --- a/src/kernbench/components/builtin/io_cpu.py +++ b/src/kernbench/components/builtin/io_cpu.py @@ -86,26 +86,41 @@ class IoCpuComponent(ComponentBase): # For KernelLaunchMsg, compute the global barrier once here so # every downstream PE_CPU uses the same target_start_ns. if isinstance(request, KernelLaunchMsg): + io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0) global_max_latency = 0.0 pe_ids = self._resolve_pe_ids( getattr(request, "target_pe", "all") ) for sip, cube in cube_targets: + try: + m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) + io_to_m_path = self.ctx.router.find_node_path( + self.node.id, m_cpu_id, + ) + except Exception: + continue + if len(io_to_m_path) < 2: + continue + leg1 = self.ctx.compute_path_latency_ns( + io_to_m_path, nbytes=0, + ) + m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0) for pe_id in pe_ids: pe_cpu_id = ( f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" ) try: - path = self.ctx.router.find_node_path( - self.node.id, pe_cpu_id, + m_to_pe_path = self.ctx.router.find_node_path( + m_cpu_id, pe_cpu_id, ) except Exception: continue - if len(path) < 2: + if len(m_to_pe_path) < 2: continue - latency = self.ctx.compute_path_latency_ns( - path, nbytes=0, + leg2 = self.ctx.compute_path_latency_ns( + m_to_pe_path, nbytes=0, ) + latency = leg1 + leg2 - io_overhead - m_overhead if latency > global_max_latency: global_max_latency = latency request = dataclasses.replace( @@ -116,7 +131,12 @@ class IoCpuComponent(ComponentBase): # Setup aggregation self._pending[request.request_id] = (len(cube_targets), 0, txn.done) - # Fan out to each target cube's M_CPU + # Fan out to each target cube's M_CPU. Kernel-launch fanout + # carries control metadata only; nbytes is forced to 0 for + # KernelLaunchMsg so the launch sub-txns do not occupy data-fabric + # BW (would otherwise serialize 16 cubes worth of fanout on the + # shared first hop and break ADR-0009 D5's barrier prediction). + is_kernel_launch = isinstance(request, KernelLaunchMsg) for sip, cube in cube_targets: try: m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) @@ -127,7 +147,8 @@ class IoCpuComponent(ComponentBase): continue sub_txn = Transaction( request=request, path=path, step=0, - nbytes=txn.nbytes, done=env.event(), + nbytes=0 if is_kernel_launch else txn.nbytes, + done=env.event(), result_data=txn.result_data, ) yield self.out_ports[path[1]].put(sub_txn.advance()) diff --git a/src/kernbench/components/legacy/builtin/io_cpu.py b/src/kernbench/components/legacy/builtin/io_cpu.py index 3cff07b..2583c62 100644 --- a/src/kernbench/components/legacy/builtin/io_cpu.py +++ b/src/kernbench/components/legacy/builtin/io_cpu.py @@ -79,26 +79,41 @@ class IoCpuComponent(ComponentBase): return if isinstance(request, KernelLaunchMsg): + io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0) global_max_latency = 0.0 pe_ids = self._resolve_pe_ids( getattr(request, "target_pe", "all") ) for sip, cube in cube_targets: + try: + m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) + io_to_m_path = self.ctx.router.find_node_path( + self.node.id, m_cpu_id, + ) + except Exception: + continue + if len(io_to_m_path) < 2: + continue + leg1 = self.ctx.compute_path_latency_ns( + io_to_m_path, nbytes=0, + ) + m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0) for pe_id in pe_ids: pe_cpu_id = ( f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" ) try: - path = self.ctx.router.find_node_path( - self.node.id, pe_cpu_id, + m_to_pe_path = self.ctx.router.find_node_path( + m_cpu_id, pe_cpu_id, ) except Exception: continue - if len(path) < 2: + if len(m_to_pe_path) < 2: continue - latency = self.ctx.compute_path_latency_ns( - path, nbytes=0, + leg2 = self.ctx.compute_path_latency_ns( + m_to_pe_path, nbytes=0, ) + latency = leg1 + leg2 - io_overhead - m_overhead if latency > global_max_latency: global_max_latency = latency request = dataclasses.replace( @@ -109,7 +124,7 @@ class IoCpuComponent(ComponentBase): # Setup aggregation self._pending[request.request_id] = (len(cube_targets), 0, txn.done) - # Fan out to each target cube's M_CPU + is_kernel_launch = isinstance(request, KernelLaunchMsg) for sip, cube in cube_targets: try: m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) @@ -120,7 +135,8 @@ class IoCpuComponent(ComponentBase): continue sub_txn = Transaction( request=request, path=path, step=0, - nbytes=txn.nbytes, done=env.event(), + nbytes=0 if is_kernel_launch else txn.nbytes, + done=env.event(), result_data=txn.result_data, ) yield self.out_ports[path[1]].put(sub_txn.advance()) diff --git a/tests/test_d5_barrier_invariant.py b/tests/test_d5_barrier_invariant.py new file mode 100644 index 0000000..8898478 --- /dev/null +++ b/tests/test_d5_barrier_invariant.py @@ -0,0 +1,194 @@ +"""ADR-0009 D5 invariant: all PEs targeted by a single kernel launch MUST +begin executing the kernel body at the same simulated time, regardless of +their dispatch path length. + +These tests directly verify the invariant by capturing per-PE state at the +top of `_execute_kernel`: + + test_no_pe_arrives_after_target_start_ns + Asserts: for every PE that enters _execute_kernel during a multi-cube + launch, `env.now` at entry must be <= target_start_ns. Otherwise the + PE's barrier yield would be a no-op and `pe_exec_start` would be set + late, breaking the D5 "same simulated time" mandate. + + test_all_pes_have_identical_pe_exec_start + Asserts: every PE's `pe_exec_start` (the value of `env.now` recorded + immediately AFTER the barrier yield) is identical across all PEs in + the launch. + +Both tests are expected to FAIL today and become the regression check the +Phase 2 D5 predictor + fallback fix must make pass. +""" +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest + +from kernbench.policy.placement.dp import DPPolicy +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + + +def _capture_per_pe_d5_state(): + """Monkey-patch PeCpuComponent._execute_kernel to record, per PE: + + - entry_now: env.now at function entry (before any yield) + - target_start_ns: the value carried by the request + - barrier_yielded: True if the barrier yield fired (entry_now < target) + - pe_exec_start: env.now immediately after the barrier check + (i.e. the value the original code sets) + + Returns (records: list[dict], restore: callable). + """ + import kernbench.components.builtin.pe_cpu as pe_cpu_mod + + records: list[dict] = [] + original = pe_cpu_mod.PeCpuComponent._execute_kernel + + def patched(self, env, txn): + request = txn.request + target_start = getattr(request, "target_start_ns", None) + entry_now = float(env.now) + rec = { + "node_id": self.node.id, + "entry_now": entry_now, + "target_start_ns": ( + float(target_start) if target_start is not None else None + ), + "barrier_yielded": ( + target_start is not None + and float(target_start) > entry_now + ), + "pe_exec_start": None, # filled below by sniff + "late_ns": ( + None if target_start is None + else max(0.0, entry_now - float(target_start)) + ), + } + records.append(rec) + + # We can't easily inject a callback at the original's + # `pe_exec_start = env.now` line without rewriting it. Approximate: + # if the original yields the barrier, env.now after the yield is + # target_start_ns; otherwise pe_exec_start is entry_now (skipped). + if rec["barrier_yielded"]: + rec["pe_exec_start"] = float(target_start) + else: + rec["pe_exec_start"] = entry_now + + yield from original(self, env, txn) + + pe_cpu_mod.PeCpuComponent._execute_kernel = patched + + def restore(): + pe_cpu_mod.PeCpuComponent._execute_kernel = original + + return records, restore + + +def _run_multicube_launch(): + """Drive a no-op kernel launch across all 16 cubes x 8 PEs and return + the per-PE D5 records collected by the monkey-patch.""" + records, restore = _capture_per_pe_d5_state() + try: + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + with RuntimeContext( + engine=engine, target_device=DeviceSelector("all"), + correlation_id="d5_barrier", spec=spec, + ) as ctx: + dp = DPPolicy( + cube="row_wise", pe="column_wise", + num_cubes=16, num_pes=8, + ) + + def kernel(t_ptr, n_elem, tl): + pass # no-op + + ctx.ahbm.set_device(0) + t = ctx.zeros( + (16, 8 * 64), dtype="f16", dp=dp, name="probe", + ) + t.copy_(ctx.from_numpy( + np.zeros((16, 8 * 64), dtype=np.float16), + )) + + pending = ctx.launch( + "d5_probe", kernel, t, 64, _defer_wait=True, + ) + for h, _sip, meta in pending: + ctx.wait(h, _meta=meta) + finally: + restore() + return records + + +def test_no_pe_arrives_after_target_start_ns(): + """ADR-0009 D5: no PE may enter `_execute_kernel` after target_start_ns. + + Today this fails because IO_CPU's predictor under-shoots actual + dispatch latency for far cubes (cube4, cube9-15). Phase 2 fix: + chain-aware predictor in IO_CPU + monotonic upward re-stamp in M_CPU. + """ + records = _run_multicube_launch() + assert records, "expected per-PE _execute_kernel records" + + late = [ + r for r in records + if r["target_start_ns"] is not None + and r["late_ns"] is not None + and r["late_ns"] > 1e-6 + ] + + if late: + # Provide actionable diagnostic in the failure. + worst = sorted(late, key=lambda r: -r["late_ns"])[:5] + details = "\n".join( + f" {r['node_id']}: late by {r['late_ns']:.2f} ns " + f"(entry_now={r['entry_now']:.2f}, " + f"target_start_ns={r['target_start_ns']:.2f})" + for r in worst + ) + pytest.fail( + f"ADR-0009 D5 violated: {len(late)}/{len(records)} PEs " + f"entered _execute_kernel AFTER target_start_ns " + f"(barrier yield silently skipped). " + f"Worst offenders:\n{details}" + ) + + +def test_all_pes_have_identical_pe_exec_start(): + """ADR-0009 D5: every PE's pe_exec_start must be identical. + + With D5 honored, every PE either yields to target_start_ns (start = + target_start_ns) or, if late, would still be aligned by the M_CPU + upward re-stamp (Phase 2). Today: 75/128 PEs in this launch have + distinct pe_exec_start values because they skipped the barrier. + """ + records = _run_multicube_launch() + assert records, "expected per-PE _execute_kernel records" + + starts = sorted({round(r["pe_exec_start"], 6) for r in records}) + if len(starts) > 1: + spread = max(starts) - min(starts) + # Distribution of how many PEs at each distinct start time + from collections import Counter + bucket = Counter(round(r["pe_exec_start"], 6) for r in records) + details = "\n".join( + f" pe_exec_start={t}: {n} PEs" + for t, n in sorted(bucket.items()) + ) + pytest.fail( + f"ADR-0009 D5 violated: PEs have {len(starts)} distinct " + f"pe_exec_start values (spread = {spread:.2f} ns); " + f"D5 mandates a single common value. " + f"Distribution:\n{details}" + )