From 1c8ddc2d034ffa2d06d91ef6e07a612ad6a6cb35 Mon Sep 17 00:00:00 2001 From: Yangwook Kang Date: Sun, 12 Apr 2026 23:02:19 -0700 Subject: [PATCH] Fix Phase 1 slot-overwrite race + PE_MATH latency model (n_slots=4 safe) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: In ring all-reduce, PE_IPCQ's recv handler advances my_tail and issues a credit return immediately. With tight credit latency (0.12ns intra-cube), the sender can refill the slot BEFORE the receiver's outbound PE_DMA reads from it for the next send. The outbound snapshot then captures stale data from a later round. Fix: Propagate TensorHandle.data (captured at recv-time, before credit return) through the entire send chain: tl.send(src=handle) → IpcqSendCmd.data → IpcqDmaToken.data PE_DMA outbound already prefers token.data over MemoryStore read, so the recv-time snapshot is used for the in-flight data. This eliminates the race: the snapshot is captured before the slot can be overwritten. Additional fixes: - PE_MATH handle_command: compute SIMD latency from output tensor element count via _compute_ns(), using max(overhead_ns, compute_ns). Previously used overhead_ns=0.0 for all standalone MathCmd, making math ops take 0ns in SimPy. - DataExecutor secondary sort: same-t_start ops sorted by op_kind (memory < gemm < math) so IPCQ slot writes execute before math reads. - ipcq_copy recorded at INBOUND time (receiver PE_DMA arrival) instead of outbound. Inbound time is after fabric propagation, so it sorts correctly relative to the receiver's math. - record_copy accepts explicit snapshot parameter (from token.data). Result: N_ELEM=32 + 256-rank + n_slots=4 + cross-SIP now passes. n_slots reverted to 4 (the deeper buffer was a workaround, not needed). 502 tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/kernbench/common/ipcq_types.py | 8 ++++ src/kernbench/components/builtin/pe_dma.py | 46 +++++++++++++-------- src/kernbench/components/builtin/pe_ipcq.py | 4 ++ src/kernbench/components/builtin/pe_math.py | 24 +++++++++-- src/kernbench/sim_engine/data_executor.py | 22 +++++++--- src/kernbench/sim_engine/op_log.py | 18 ++++---- src/kernbench/triton_emu/tl_context.py | 7 ++++ 7 files changed, 93 insertions(+), 36 deletions(-) diff --git a/src/kernbench/common/ipcq_types.py b/src/kernbench/common/ipcq_types.py index 0deb789..2cddead 100644 --- a/src/kernbench/common/ipcq_types.py +++ b/src/kernbench/common/ipcq_types.py @@ -93,6 +93,14 @@ class IpcqSendCmd: shape: tuple[int, ...] # data shape (op_log + MemoryStore use) dtype: str handle_id: str # completion tracking + # In-flight data snapshot captured at tl.send() time from the + # TensorHandle.data field. Carries the actual numpy array that was + # visible at recv-time (when handle.data was populated), avoiding a + # Phase 1 race where a later IPCQ inbound overwrites the sender's + # slot between recv and send. If None, PE_DMA outbound falls back to + # reading MemoryStore[src_addr] (correct for sources that are never + # overwritten, such as HBM tiles). + data: Any = None data_op: bool = True # ADR-0020 op_log recording flag diff --git a/src/kernbench/components/builtin/pe_dma.py b/src/kernbench/components/builtin/pe_dma.py index eae866f..fecd56b 100644 --- a/src/kernbench/components/builtin/pe_dma.py +++ b/src/kernbench/components/builtin/pe_dma.py @@ -154,23 +154,13 @@ class PeDmaComponent(PeEngineBase): except Exception: token.data = None - # Record the IPCQ copy in op_log at OUTBOUND time. ADR-0020 D6: - # Phase 2 replays the copy in t_start order; using outbound time - # (rather than inbound) ensures the copy executes before any later - # local op at the sender that might overwrite token.src_addr (e.g. - # a tl.store after a recv). - if self._op_logger is not None: - try: - self._op_logger.record_copy( - t_start=float(env.now), t_end=float(env.now), - component_id=self.node.id, - src_space=token.src_space, src_addr=token.src_addr, - dst_space=peer.buffer_kind, - dst_addr=token.dst_addr, - shape=token.shape, dtype=token.dtype, nbytes=token.nbytes, - ) - except Exception: - pass + # Note: ipcq_copy is recorded at INBOUND time (in _handle_ipcq_inbound), + # not here. Outbound time is too early — it precedes fabric propagation, + # so in Phase 2 a later round's copy can sort before the receiver's + # math for an earlier round, causing slot data corruption. + # The secondary sort in DataExecutor (memory ops before math at the + # same t_start) ensures the inbound copy runs before the local math + # that reads the slot. try: path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma) @@ -223,6 +213,28 @@ class PeDmaComponent(PeEngineBase): except Exception: pass + # Record the IPCQ copy at INBOUND time with embedded data snapshot. + # The snapshot (token.data) was captured by the sender's outbound + # PE_DMA at send time. Phase 2 writes the snapshot directly to + # dst — it does NOT re-read from MemoryStore[src_addr], which may + # have been mutated by a different PE's Phase 2 ops by that point. + # DataExecutor's secondary sort (memory before math at same + # t_start) ensures the write completes before the local math + # that reads the slot. + if self._op_logger is not None: + try: + self._op_logger.record_copy( + t_start=float(env.now), t_end=float(env.now), + component_id=self.node.id, + src_space=token.src_space, src_addr=token.src_addr, + dst_space=token.dst_endpoint.buffer_kind, + dst_addr=token.dst_addr, + shape=token.shape, dtype=token.dtype, nbytes=token.nbytes, + snapshot=token.data, + ) + except Exception: + pass + # 2. Forward IpcqMetaArrival to local PE_IPCQ ipcq_id = f"{self._pe_prefix}.pe_ipcq" if ipcq_id in self.out_ports: diff --git a/src/kernbench/components/builtin/pe_ipcq.py b/src/kernbench/components/builtin/pe_ipcq.py index 710786a..27b5d8c 100644 --- a/src/kernbench/components/builtin/pe_ipcq.py +++ b/src/kernbench/components/builtin/pe_ipcq.py @@ -221,6 +221,10 @@ class PeIpcqComponent(ComponentBase): handle_id=cmd.handle_id, shape=cmd.shape, dtype=cmd.dtype, + # Carry the handle's recv-time data snapshot so the outbound + # PE_DMA doesn't need to re-read from MemoryStore (which may + # have been overwritten by a later inbound in the meantime). + data=getattr(cmd, "data", None), sender_seq=qp["my_head"], src_sip=self._self_sip, src_cube=self._self_cube, diff --git a/src/kernbench/components/builtin/pe_math.py b/src/kernbench/components/builtin/pe_math.py index 664c545..ff9e142 100644 --- a/src/kernbench/components/builtin/pe_math.py +++ b/src/kernbench/components/builtin/pe_math.py @@ -92,13 +92,31 @@ class PeMathComponent(PeEngineBase): token.pipeline_ctx.complete_tile() def handle_command(self, env: simpy.Environment, pe_txn: PeInternalTxn) -> Generator: - """Legacy PeInternalTxn handling.""" + """PeInternalTxn handling for standalone MathCmd (CCL kernels). + + Latency = max(overhead_ns, _compute_ns(num_elements)): + - overhead_ns: fixed per-invocation setup cost (from node attrs). + - _compute_ns: SIMD cycle-based model (from vector_width + clock_freq). + The larger of the two dominates (setup-bound vs compute-bound). + """ + from kernbench.common.pe_commands import MathCmd + import math as _math + + cmd = pe_txn.command + num_elements = 0 + if isinstance(cmd, MathCmd) and cmd.out.shape: + num_elements = _math.prod(cmd.out.shape) + + overhead_ns = float(self.node.attrs.get("overhead_ns", 0.0)) + compute_ns = self._compute_ns(num_elements) + ns = max(overhead_ns, compute_ns) + if self._accel: with self._accel.request() as req: yield req - yield from self.run(env, 0) + yield env.timeout(ns) else: - yield from self.run(env, 0) + yield env.timeout(ns) pe_txn.done.succeed() def _forward_txn(self, env: simpy.Environment, txn: Any) -> Generator: diff --git a/src/kernbench/sim_engine/data_executor.py b/src/kernbench/sim_engine/data_executor.py index 74be788..72ab782 100644 --- a/src/kernbench/sim_engine/data_executor.py +++ b/src/kernbench/sim_engine/data_executor.py @@ -26,15 +26,27 @@ class DataExecutor: self._op_log = op_log self.store = store + # Ordering priority within the same t_start: memory copies must run + # before math/gemm so that slot data is populated before a consumer + # PE's math op reads it. With 0-ns PE_MATH overhead and tight SimPy + # scheduling, ipcq_copy and math ops from different PEs can collide + # at the exact same t_start. + _KIND_ORDER = {"memory": 0, "gemm": 1, "math": 2, "unknown": 3} + def run(self) -> None: """Execute all ops in op_log order. - Ops are processed sequentially in t_start order. The previous - ThreadPoolExecutor-based parallel execution was removed because - same-t_start groups are almost always size 1 (each PE processes - one command at a time), so the thread-pool overhead dominated. + Primary sort: t_start (ascending). + Secondary sort: op_kind priority — memory (ipcq_copy/dma_write) + before gemm before math. This ensures IPCQ slot data arrives + before a consumer PE's math op tries to read it, even when both + share the same SimPy timestamp. """ - for op in self._op_log: + ops = sorted( + self._op_log, + key=lambda r: (r.t_start, self._KIND_ORDER.get(r.op_kind, 3)), + ) + for op in ops: if op.op_kind != "memory" or op.op_name != "dma_read": self._execute_op(op) diff --git a/src/kernbench/sim_engine/op_log.py b/src/kernbench/sim_engine/op_log.py index f0accd8..ce0c69c 100644 --- a/src/kernbench/sim_engine/op_log.py +++ b/src/kernbench/sim_engine/op_log.py @@ -103,21 +103,17 @@ class OpLogger: src_space: str, src_addr: int, dst_space: str, dst_addr: int, shape: tuple[int, ...], dtype: str, nbytes: int, + snapshot: Any = None, ) -> None: """Record a memory copy op for Phase 2 replay (ADR-0023 + ADR-0020). - Used by PE_DMA at outbound (sender) time: the snapshot captures - the source data at the moment the send was issued, so Phase 2 - replay does not see later mutations of the source addr (e.g. a - tl.store that runs after the recv at the sender). - - For sources whose data is not yet materialized in Phase 1 (math - scratch outputs), the snapshot is None and Phase 2 falls back to - reading from MemoryStore — by which point the corresponding math - op has been replayed and the scratch addr is populated. + ``snapshot``: if provided (e.g. token.data from in-flight DMA), + used directly. Otherwise falls back to a fresh read from + MemoryStore[src_addr]. The snapshot is what Phase 2 writes into + dst_addr, avoiding stale-source races from cross-PE mutations. """ - snap = None - if self._memory_store is not None: + snap = snapshot + if snap is None and self._memory_store is not None: try: arr = self._memory_store.read( src_space, src_addr, shape=shape, dtype=dtype, diff --git a/src/kernbench/triton_emu/tl_context.py b/src/kernbench/triton_emu/tl_context.py index 22e43d6..64b65a8 100644 --- a/src/kernbench/triton_emu/tl_context.py +++ b/src/kernbench/triton_emu/tl_context.py @@ -421,12 +421,19 @@ class TLContext: space = getattr(src, "space", space) if src_addr is None or nbytes is None or shape is None: raise ValueError("tl.send: provide either a TensorHandle or src_addr/nbytes/shape") + # Carry the handle's .data snapshot (if available). When the source + # is a recv slot, .data holds the numpy array that was read from + # MemoryStore at recv-time. This prevents a Phase 1 race where a + # later IPCQ inbound overwrites the slot before the outbound + # PE_DMA reads it. + handle_data = getattr(src, "data", None) if src is not None else None self._emit_dispatch_overhead() cmd = IpcqSendCmd( direction=dir, src_addr=src_addr, src_space=space, nbytes=nbytes, shape=shape, dtype=dtype, handle_id=self._next_handle_id(), + data=handle_data, ) self._emit(cmd)