diff --git a/docs/adr/ADR-0009-kernel-execution-messaging.md b/docs/adr/ADR-0009-kernel-execution-messaging.md index 91ca443..eb99650 100644 --- a/docs/adr/ADR-0009-kernel-execution-messaging.md +++ b/docs/adr/ADR-0009-kernel-execution-messaging.md @@ -67,6 +67,51 @@ Completion semantics: --- +### D5. Launch timing is endpoint-synchronized + +All PEs targeted by a single kernel launch MUST begin executing the kernel +body at the same simulated time, regardless of their dispatch path length +from the launch entry point. + +Rationale. The dispatch tree Host → IO_CPU → M_CPU → PE_CPU has variable +latency at every level. PEs near their M_CPU receive the launch earlier +than PEs farther away; cubes near an IO_CPU receive it earlier than cubes +farther away. Without synchronization, each PE's kernel begins at a +different `env.now`, making per-PE metrics such as `pe_exec_ns` a function +of dispatch-path geometry rather than of the kernel's behavior — +producing measurement artifacts in benchmarks that time kernel-internal +waits (for example `tl.recv` on cross-cube or cross-SIP hops). + +Mechanism. + +- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`. +- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it + computes `target_start_ns = env.now + max_latency` where `max_latency` + is the maximum `ComponentContext.compute_path_latency_ns(path)` across + every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu, + pe_cpu_id)`. The stamped value is placed on the request carried by + every fanned-out sub-Transaction. +- **M_CPU** passes an already-stamped `target_start_ns` through + unchanged. Only when the value is absent (e.g. a direct + launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier + `env.now + max(local command-path latency)`. +- **PE_CPU** yields `env.timeout(target_start_ns - env.now)` at the top + of `_execute_kernel`, before recording `pe_exec_start` and invoking + the kernel body. +- When `target_start_ns is None`, PE_CPU falls through to the legacy + unsynchronized behavior — preserving backward compatibility. + +IO_CPU-level stamping guarantees every PE across every targeted cube +uses the same barrier sim-time, eliminating both the within-cube +dispatch-offset artifact *and* the cross-cube offset artifact in +multi-cube launches. Models a real-hardware timed-broadcast launch +(latency-equalized dispatch tree). + +The synchronization is internal to the engine / IO_CPU / M_CPU / PE_CPU +control plane — runtime API and application kernels are unchanged. + +--- + ## Links - SPEC R1, R2, R7, R8 diff --git a/docs/adr/ADR-0023-ipcq-pe-collective.en.md b/docs/adr/ADR-0023-ipcq-pe-collective.en.md index c97f20e..8c79f07 100644 --- a/docs/adr/ADR-0023-ipcq-pe-collective.en.md +++ b/docs/adr/ADR-0023-ipcq-pe-collective.en.md @@ -420,11 +420,21 @@ fan-out (see `IpcqInitMsg` in D12). #### PE_DMA's added responsibility When `vc_comm` receives a token, PE_DMA processes it as the following -**atomic** sequence. **No SimPy yield is allowed between the two steps** -(invariant I6): +sequence: pay the Transaction's terminal BW drain, then atomically +write data and forward metadata. **No SimPy yield is allowed between +the data write and the metadata forward** (invariant I6). The drain +yield must sit before the atomic block, not inside it: ```python -def _on_vc_comm_recv(self, env, token): +def _on_vc_comm_recv(self, env, txn): + # Pay the terminal BW drain (nbytes / bottleneck_bw stamped by the + # sender PE_DMA). MUST happen before the atomic block so recv only + # wakes after the bytes have "landed". + drain = getattr(txn, "drain_ns", 0.0) + if drain > 0: + yield env.timeout(drain) + + token = txn.request # ── ATOMIC: no yield between these two operations ── data = self._memory_store.read(token.src_space, token.src_addr, shape=..., dtype=...) @@ -439,6 +449,33 @@ The final `put` is yieldable but uses an unbounded internal store, so it completes in a single step. That `put` is the closing call of the atomic block; nothing may be inserted before it. +#### Drain-at-inbound semantics (D9 timing model) + +The Transaction carries `drain_ns = nbytes / bottleneck_bw_on_path` +stamped at send-side PE_DMA. In this simulator per-hop `overhead_ns` +is paid at each forwarding component via `run()`, and the remaining +BW drain is paid once at the Transaction's terminal. Every non-IPCQ +Transaction (raw DMA, kernel-launch fanout, etc.) pays this drain via +`ComponentBase._forward_txn` at the terminal node. For IPCQ the +destination PE_DMA intercepts the Transaction with `_handle_ipcq_inbound` +(so IPCQ-specific data write + metadata forward can happen), so **the +drain MUST be paid explicitly at the top of that handler** to keep +IPCQ's timing model on par with every other fabric Transaction. + +Side-effects of paying drain here: + +- **SRC `tl.send`** is unchanged — fire-and-forget semantics are + preserved because the sender PE_DMA does not `yield sub_done`. The + `sub_done.succeed()` call (made after metadata forward below) is an + event with no listener on the sender side. +- **DST `tl.recv`** unblocks `drain_ns` later. Since recv wakes only + when `IpcqMetaArrival` reaches its local PE_IPCQ, and the metadata + forward now happens after the drain, recv observes the full fabric + transfer time including bandwidth cost. + +Matches the physical picture: send dispatches and leaves; recv waits +until the bytes have actually been drained into its inbox. + ### D9.5. ADR-0020 (2-pass) integration `tl.send` / `tl.recv` integrates with ADR-0020's two-pass model. Phase diff --git a/docs/adr/ADR-0023-ipcq-pe-collective.md b/docs/adr/ADR-0023-ipcq-pe-collective.md index 77369dd..f2afa6b 100644 --- a/docs/adr/ADR-0023-ipcq-pe-collective.md +++ b/docs/adr/ADR-0023-ipcq-pe-collective.md @@ -426,11 +426,22 @@ backend init에서 IpcqInitMsg fan-out 시 양방향 fast path channel을 함께 #### PE_DMA의 책임 추가 -PE_DMA(vc_comm)는 token 수신 시 다음 atomic 시퀀스로 처리한다. -**두 동작 사이에 SimPy yield를 두어서는 안 된다** (I6 MUST 규칙 참조): +PE_DMA(vc_comm)는 token 수신 시 다음 시퀀스로 처리한다: Transaction +terminal의 BW drain을 먼저 지불하고, 이어서 atomic하게 data write + +metadata forward 수행. **data write와 metadata forward 사이에는 SimPy +yield를 두어서는 안 된다** (I6 MUST 규칙 참조). drain yield는 atomic +구간 안이 아니라 그 앞에 위치해야 한다: ```python -def _on_vc_comm_recv(self, env, token): +def _on_vc_comm_recv(self, env, txn): + # Sender PE_DMA가 찍어 둔 drain_ns (= nbytes / bottleneck_bw) 를 + # 여기서 지불. atomic 구간보다 앞이어야 한다 — recv는 bytes가 + # "도착"한 이후에만 깨어나야 하므로. + drain = getattr(txn, "drain_ns", 0.0) + if drain > 0: + yield env.timeout(drain) + + token = txn.request # ── ATOMIC: 두 동작 사이에 yield 금지 ── # 1. data를 dst_addr에 write (dst의 메모리 공간은 token.dst_endpoint.buffer_kind) data = self._memory_store.read(token.src_space, token.src_addr, @@ -446,6 +457,32 @@ wire로 capacity가 unbounded인 store를 사용하므로 즉시 완료된다 ( single-step). 이 최종 put이 atomic 구간의 끝이며, 그 이전에 다른 yield가 삽입되면 안 된다. +#### Drain-at-inbound semantics (D9 timing model) + +Transaction은 sender PE_DMA가 `drain_ns = nbytes / bottleneck_bw_on_path` +를 찍어 둔 상태로 fabric에 들어간다. 이 simulator에서 per-hop `overhead_ns` +는 각 forwarding component의 `run()` 에서 지불되고, 남은 BW drain은 +Transaction의 terminal node에서 한 번 지불된다. IPCQ가 아닌 모든 +Transaction (raw DMA, kernel-launch fanout 등) 은 +`ComponentBase._forward_txn` 이 terminal에서 이 drain을 지불한다. IPCQ의 +경우 목적지 PE_DMA가 `_handle_ipcq_inbound` 핸들러로 Transaction을 +가로채서 (IPCQ 전용 data write + metadata forward를 해야 하므로) +**이 핸들러 최상단에서 drain을 명시적으로 지불해야 한다** — 그래야 IPCQ의 +timing model이 다른 모든 fabric Transaction과 동일선상에 놓인다. + +여기서 drain을 지불할 때의 side-effect: + +- **SRC `tl.send`**: 동작 불변. sender PE_DMA가 `sub_done` 을 `yield` + 하지 않으므로 fire-and-forget 의미가 보존된다. metadata forward 이후 + 호출되는 `sub_done.succeed()` 는 sender 입장에서 listener가 없는 이벤트. +- **DST `tl.recv`**: `drain_ns` 만큼 늦게 깨어난다. recv는 local PE_IPCQ + 의 `IpcqMetaArrival` 수신 시에만 wake되며, metadata forward가 drain + 이후로 이동했으므로 recv는 bandwidth까지 포함한 전체 fabric transfer + 시간을 관측하게 된다. + +물리적 그림과 일치: send는 dispatch하고 바로 반환; recv는 bytes가 실제로 +자신의 inbox로 drain될 때까지 대기. + #### Backpressure latency 정확도 backpressure 해제까지 걸리는 시간: diff --git a/src/kernbench/components/builtin/io_cpu.py b/src/kernbench/components/builtin/io_cpu.py index 83f2b8a..a98c227 100644 --- a/src/kernbench/components/builtin/io_cpu.py +++ b/src/kernbench/components/builtin/io_cpu.py @@ -58,7 +58,18 @@ class IoCpuComponent(ComponentBase): self._pending[key] = (expected, received, parent_done) def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator: - """Fan out sub-Transactions to target cube M_CPUs, wait for responses.""" + """Fan out sub-Transactions to target cube M_CPUs, wait for responses. + + ADR-0009 D5 (extended): for KernelLaunchMsg, stamp a single global + target_start_ns = env.now + max(IO_CPU → any target PE_CPU path + latency across all target cubes). M_CPU passes this value through + unchanged; every PE in every cube yields until the same sim-time + before beginning kernel execution. Without this, cross-cube + launches would have each cube's M_CPU compute its own per-cube + barrier relative to its local env.now, leaving PEs on different + cubes out of sync (the "h3/h4 dispatch-offset artifact"). + """ + import dataclasses from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg request = txn.request @@ -72,6 +83,36 @@ class IoCpuComponent(ComponentBase): txn.done.succeed() return + # For KernelLaunchMsg, compute the global barrier once here so + # every downstream PE_CPU uses the same target_start_ns. + if isinstance(request, KernelLaunchMsg): + global_max_latency = 0.0 + pe_ids = self._resolve_pe_ids( + getattr(request, "target_pe", "all") + ) + for sip, cube in cube_targets: + for pe_id in pe_ids: + pe_cpu_id = ( + f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" + ) + try: + path = self.ctx.router.find_node_path( + self.node.id, pe_cpu_id, + ) + except Exception: + continue + if len(path) < 2: + continue + latency = self.ctx.compute_path_latency_ns( + path, nbytes=0, + ) + if latency > global_max_latency: + global_max_latency = latency + request = dataclasses.replace( + request, + target_start_ns=float(env.now) + global_max_latency, + ) + # Setup aggregation self._pending[request.request_id] = (len(cube_targets), 0, txn.done) @@ -91,6 +132,19 @@ class IoCpuComponent(ComponentBase): ) yield self.out_ports[path[1]].put(sub_txn.advance()) + def _resolve_pe_ids(self, target_pe: Any) -> list[int]: + """Resolve target_pe → list of PE indices (mirrors M_CPU logic).""" + if isinstance(target_pe, int): + return [target_pe] + if isinstance(target_pe, tuple): + return list(target_pe) + # "all": all PEs in a cube + n_slices = 8 + if self.ctx and self.ctx.spec: + mm = self.ctx.spec.get("cube", {}).get("memory_map", {}) + n_slices = mm.get("hbm_slices_per_cube", 8) + return list(range(n_slices)) + def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]: """Return list of (sip, cube) pairs to fan out to.""" from kernbench.runtime_api.kernel import ( diff --git a/src/kernbench/components/builtin/m_cpu.py b/src/kernbench/components/builtin/m_cpu.py index 496de9c..740a272 100644 --- a/src/kernbench/components/builtin/m_cpu.py +++ b/src/kernbench/components/builtin/m_cpu.py @@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase): Routes through find_node_path (M_CPU → NOC → PE_CPU command edges). PE_CPU sends ResponseMsg back via NOC → M_CPU on completion. Then sends aggregate ResponseMsg back to IO_CPU on the reverse path. + + ADR-0009 D5: stamps target_start_ns so every PE in this fanout + starts executing at the same env.now regardless of dispatch path. """ + import dataclasses request = txn.request target_pe = getattr(request, "target_pe", "all") cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0" @@ -172,9 +176,13 @@ class MCpuComponent(ComponentBase): txn.done.succeed() return - # Fan out to each PE_CPU, using response-based aggregation - sub_txns: list[Transaction] = [] - n_dispatched = 0 + # Resolve per-PE paths. If IO_CPU already stamped a global + # target_start_ns (ADR-0009 D5 extended), pass it through + # unchanged so every PE across every cube uses the same barrier. + # Otherwise (e.g. direct-to-M_CPU launch in a unit test) compute + # a per-cube barrier from env.now. + per_pe: list[tuple[int, list[str], float]] = [] + max_latency = 0.0 for pe_id in pe_ids: pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu" try: @@ -183,8 +191,24 @@ class MCpuComponent(ComponentBase): continue if len(path) < 2: continue + latency = self.ctx.compute_path_latency_ns(path, nbytes=0) + per_pe.append((pe_id, path, latency)) + if latency > max_latency: + max_latency = latency + + if getattr(request, "target_start_ns", None) is not None: + stamped_request = request + else: + stamped_request = dataclasses.replace( + request, target_start_ns=float(env.now) + max_latency, + ) + + # Fan out to each PE_CPU, using response-based aggregation + sub_txns: list[Transaction] = [] + n_dispatched = 0 + for pe_id, path, _lat in per_pe: sub_txn = Transaction( - request=request, path=path, step=0, + request=stamped_request, path=path, step=0, nbytes=0, done=env.event(), ) yield self.out_ports[path[1]].put(sub_txn.advance()) diff --git a/src/kernbench/components/builtin/pe_cpu.py b/src/kernbench/components/builtin/pe_cpu.py index d5f615a..ca0512e 100644 --- a/src/kernbench/components/builtin/pe_cpu.py +++ b/src/kernbench/components/builtin/pe_cpu.py @@ -95,6 +95,13 @@ class PeCpuComponent(ComponentBase): request = txn.request yield from self.run(env, 0) + # ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a + # target_start_ns, wait until then so every PE in this launch + # begins pe_exec measurement at the same simulated time. + target_start = getattr(request, "target_start_ns", None) + if target_start is not None and target_start > env.now: + yield env.timeout(float(target_start) - env.now) + kernel_fn = get_kernel(request.kernel_ref.name) num_programs = self._derive_num_programs(request) kernel_args = self._unpack_kernel_args(request) diff --git a/src/kernbench/components/builtin/pe_dma.py b/src/kernbench/components/builtin/pe_dma.py index fecd56b..84988c3 100644 --- a/src/kernbench/components/builtin/pe_dma.py +++ b/src/kernbench/components/builtin/pe_dma.py @@ -186,13 +186,37 @@ class PeDmaComponent(PeEngineBase): # ── IPCQ inbound (fabric → PE_DMA → MemoryStore + PE_IPCQ) ────── def _handle_ipcq_inbound(self, env: simpy.Environment, txn: Any) -> Generator: - """At destination PE_DMA: atomically write data and forward metadata. + """At destination PE_DMA: pay terminal drain, then atomically write + data and forward metadata. + + ADR-0023 D9 (drain at inbound terminal): the Transaction carries + ``drain_ns = nbytes / bottleneck_bw_on_path`` stamped by the sender + PE_DMA. Like every other Transaction terminal in the simulator (see + ``ComponentBase._forward_txn``), this drain must be paid when the + Transaction reaches its destination. SRC-side ``tl.send`` is + fire-and-forget — it never yields on ``sub_done`` — so paying the + drain here does NOT delay the sender. What it DOES delay is the + IpcqMetaArrival forwarded below: that delay is the only signal + ``tl.recv`` on DST blocks on, which is exactly the desired + semantics — "send dispatches and returns; recv waits until the + bytes have actually landed in its inbox". + + The drain MUST be paid before the atomic block — inserting a yield + inside would break invariant I6. I6 (MUST): no SimPy yield between MemoryStore.write and the IpcqMetaArrival put into PE_IPCQ. """ from kernbench.common.ipcq_types import IpcqMetaArrival + # Pay terminal BW drain before the atomic write/metadata forward. + # Without this, IPCQ effectively got fabric bandwidth for free at + # the terminal (only intermediate-hop overhead_ns was charged), + # making IPCQ lower than raw DMA at large sizes in benchmarks. + drain = getattr(txn, "drain_ns", 0.0) + if drain > 0: + yield env.timeout(drain) + token = txn.request # ── ATOMIC: do not introduce yield between these two operations ── diff --git a/src/kernbench/components/context.py b/src/kernbench/components/context.py index 14c84c3..bd93aa4 100644 --- a/src/kernbench/components/context.py +++ b/src/kernbench/components/context.py @@ -26,6 +26,9 @@ class ComponentContext: spec: dict = field(default_factory=dict) # topology spec (cube layout, PE count, etc.) memory_store: Any = None # MemoryStore for Phase 1 data-aware execution (ADR-0020) op_logger: Any = None # OpLogger for Phase 1 op recording (ADR-0020) + # node_id -> overhead_ns (ADR-0009 D5: used by M_CPU to compute per-PE + # dispatch latency when stamping target_start_ns on KernelLaunchMsg). + node_overhead_ns: dict[str, float] = field(default_factory=dict) def get_shared_resource( self, env: simpy.Environment, key: str, capacity: int = 1, @@ -52,3 +55,19 @@ class ComponentContext: if min_bw == float("inf"): return 0.0 return nbytes / min_bw + + def compute_path_latency_ns(self, path: list[str], nbytes: int = 0) -> float: + """Formula latency along path: wire + per-node overhead + drain. + + ADR-0009 D5: M_CPU uses this to compute per-PE dispatch latency + when stamping target_start_ns on KernelLaunchMsg fanout. + """ + total = 0.0 + for i in range(len(path) - 1): + edge = self.edge_map.get((path[i], path[i + 1])) + if edge: + total += edge.distance_mm * self.ns_per_mm + for node_id in path: + total += self.node_overhead_ns.get(node_id, 0.0) + total += self.compute_drain_ns(path, nbytes) + return total diff --git a/src/kernbench/components/legacy/builtin/io_cpu.py b/src/kernbench/components/legacy/builtin/io_cpu.py index 83f2b8a..3cff07b 100644 --- a/src/kernbench/components/legacy/builtin/io_cpu.py +++ b/src/kernbench/components/legacy/builtin/io_cpu.py @@ -58,7 +58,13 @@ class IoCpuComponent(ComponentBase): self._pending[key] = (expected, received, parent_done) def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator: - """Fan out sub-Transactions to target cube M_CPUs, wait for responses.""" + """Fan out sub-Transactions to target cube M_CPUs, wait for responses. + + ADR-0009 D5 (extended): stamp a global target_start_ns on + KernelLaunchMsg so every PE across every target cube starts at + the same env.now. See the non-legacy builtin for full rationale. + """ + import dataclasses from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg request = txn.request @@ -72,6 +78,34 @@ class IoCpuComponent(ComponentBase): txn.done.succeed() return + if isinstance(request, KernelLaunchMsg): + global_max_latency = 0.0 + pe_ids = self._resolve_pe_ids( + getattr(request, "target_pe", "all") + ) + for sip, cube in cube_targets: + for pe_id in pe_ids: + pe_cpu_id = ( + f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" + ) + try: + path = self.ctx.router.find_node_path( + self.node.id, pe_cpu_id, + ) + except Exception: + continue + if len(path) < 2: + continue + latency = self.ctx.compute_path_latency_ns( + path, nbytes=0, + ) + if latency > global_max_latency: + global_max_latency = latency + request = dataclasses.replace( + request, + target_start_ns=float(env.now) + global_max_latency, + ) + # Setup aggregation self._pending[request.request_id] = (len(cube_targets), 0, txn.done) @@ -91,6 +125,18 @@ class IoCpuComponent(ComponentBase): ) yield self.out_ports[path[1]].put(sub_txn.advance()) + def _resolve_pe_ids(self, target_pe: Any) -> list[int]: + """Resolve target_pe → list of PE indices (mirrors M_CPU logic).""" + if isinstance(target_pe, int): + return [target_pe] + if isinstance(target_pe, tuple): + return list(target_pe) + n_slices = 8 + if self.ctx and self.ctx.spec: + mm = self.ctx.spec.get("cube", {}).get("memory_map", {}) + n_slices = mm.get("hbm_slices_per_cube", 8) + return list(range(n_slices)) + def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]: """Return list of (sip, cube) pairs to fan out to.""" from kernbench.runtime_api.kernel import ( diff --git a/src/kernbench/components/legacy/builtin/m_cpu.py b/src/kernbench/components/legacy/builtin/m_cpu.py index 496de9c..b8e928c 100644 --- a/src/kernbench/components/legacy/builtin/m_cpu.py +++ b/src/kernbench/components/legacy/builtin/m_cpu.py @@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase): Routes through find_node_path (M_CPU → NOC → PE_CPU command edges). PE_CPU sends ResponseMsg back via NOC → M_CPU on completion. Then sends aggregate ResponseMsg back to IO_CPU on the reverse path. + + ADR-0009 D5: stamps target_start_ns so every PE in this fanout + starts executing at the same env.now regardless of dispatch path. """ + import dataclasses request = txn.request target_pe = getattr(request, "target_pe", "all") cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0" @@ -172,9 +176,10 @@ class MCpuComponent(ComponentBase): txn.done.succeed() return - # Fan out to each PE_CPU, using response-based aggregation - sub_txns: list[Transaction] = [] - n_dispatched = 0 + # Resolve per-PE paths. If IO_CPU already stamped a global + # target_start_ns (ADR-0009 D5 extended), pass it through. + per_pe: list[tuple[int, list[str], float]] = [] + max_latency = 0.0 for pe_id in pe_ids: pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu" try: @@ -183,8 +188,24 @@ class MCpuComponent(ComponentBase): continue if len(path) < 2: continue + latency = self.ctx.compute_path_latency_ns(path, nbytes=0) + per_pe.append((pe_id, path, latency)) + if latency > max_latency: + max_latency = latency + + if getattr(request, "target_start_ns", None) is not None: + stamped_request = request + else: + stamped_request = dataclasses.replace( + request, target_start_ns=float(env.now) + max_latency, + ) + + # Fan out to each PE_CPU, using response-based aggregation + sub_txns: list[Transaction] = [] + n_dispatched = 0 + for pe_id, path, _lat in per_pe: sub_txn = Transaction( - request=request, path=path, step=0, + request=stamped_request, path=path, step=0, nbytes=0, done=env.event(), ) yield self.out_ports[path[1]].put(sub_txn.advance()) diff --git a/src/kernbench/components/legacy/builtin/pe_cpu.py b/src/kernbench/components/legacy/builtin/pe_cpu.py index 4947b9d..0fc8c18 100644 --- a/src/kernbench/components/legacy/builtin/pe_cpu.py +++ b/src/kernbench/components/legacy/builtin/pe_cpu.py @@ -71,6 +71,13 @@ class PeCpuComponent(ComponentBase): request = txn.request yield from self.run(env, 0) + # ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a + # target_start_ns, wait until then so every PE in this launch + # begins pe_exec measurement at the same simulated time. + target_start = getattr(request, "target_start_ns", None) + if target_start is not None and target_start > env.now: + yield env.timeout(float(target_start) - env.now) + kernel_fn = get_kernel(request.kernel_ref.name) num_programs = self._derive_num_programs(request) kernel_args = self._unpack_kernel_args(request) diff --git a/src/kernbench/runtime_api/kernel.py b/src/kernbench/runtime_api/kernel.py index 27fe732..2967e64 100644 --- a/src/kernbench/runtime_api/kernel.py +++ b/src/kernbench/runtime_api/kernel.py @@ -90,6 +90,11 @@ class KernelLaunchMsg: args: tuple[KernelArg, ...] target_cubes: tuple[int, ...] | Literal["all"] = "all" target_pe: int | tuple[int, ...] | Literal["all"] = "all" + # ADR-0009 D5: synchronized kernel start. When set, each PE_CPU yields + # until env.now >= target_start_ns before beginning kernel execution, + # so every PE in a launch starts at the same simulated time regardless + # of its M_CPU dispatch path length. Stamped by M_CPU fan-out. + target_start_ns: float | None = None msg_type: Literal["kernel_launch"] = "kernel_launch" diff --git a/src/kernbench/sim_engine/engine.py b/src/kernbench/sim_engine/engine.py index 0995397..62904a8 100644 --- a/src/kernbench/sim_engine/engine.py +++ b/src/kernbench/sim_engine/engine.py @@ -67,6 +67,10 @@ class GraphEngine: spec=graph.spec, memory_store=self._memory_store, op_logger=self._op_logger, + node_overhead_ns={ + nid: float(n.attrs.get("overhead_ns", 0.0)) + for nid, n in graph.nodes.items() + }, ) self._components: dict[str, ComponentBase] = { node_id: ComponentRegistry.create(node, overrides, ctx) diff --git a/tests/test_kernel_launch_sync.py b/tests/test_kernel_launch_sync.py new file mode 100644 index 0000000..5c88e7d --- /dev/null +++ b/tests/test_kernel_launch_sync.py @@ -0,0 +1,62 @@ +"""ADR-0009 D5: synchronized launch barrier. + +M_CPU stamps KernelLaunchMsg with target_start_ns = env.now + max path +latency; PE_CPU yields until that time before recording pe_exec_start. +Every PE in a single launch MUST begin kernel execution at the same +env.now regardless of its dispatch path length. + +We verify this indirectly: for a no-op kernel, pe_exec_ns = env.now - +pe_exec_start. If every PE's pe_exec_start is identical and every PE +runs the same no-op body, every pe_exec_ns value must be identical. +Without D5, pe_exec_start varies by dispatch-path length and so does +pe_exec_ns. +""" +from __future__ import annotations + +from pathlib import Path + +import numpy as np + +from kernbench.policy.placement.dp import DPPolicy +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + + +def test_kernel_launch_sync_all_pes_have_equal_exec_time(): + """No-op kernel: every PE's pe_exec_ns must be identical under D5.""" + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + with RuntimeContext(engine=engine, target_device=DeviceSelector("all"), + correlation_id="sync_test", spec=spec) as ctx: + dp = DPPolicy(cube="row_wise", pe="column_wise", + num_cubes=16, num_pes=8) + + def kernel(t_ptr, n_elem, tl): + pass # no-op + + ctx.ahbm.set_device(0) + t = ctx.zeros((16, 8 * 64), dtype="f16", dp=dp, name="probe") + t.copy_(ctx.from_numpy(np.zeros((16, 8 * 64), dtype=np.float16))) + + pending = ctx.launch("sync_probe", kernel, t, 64, _defer_wait=True) + for h, _sip, meta in pending: + ctx.wait(h, _meta=meta) + + pe_exec_vals = [] + for h, _sip, _meta in pending: + _, trace = engine.get_completion(h) + if trace and trace.get("pe_exec_ns") is not None: + pe_exec_vals.append(float(trace["pe_exec_ns"])) + + assert pe_exec_vals, "expected completion traces with pe_exec_ns" + spread = max(pe_exec_vals) - min(pe_exec_vals) + assert spread < 1e-6, ( + f"ADR-0009 D5 violated: pe_exec_ns spread across PEs = " + f"{spread:.6f} ns (expected 0). Values: {pe_exec_vals}" + )