Fix Phase 1 slot-overwrite race + PE_MATH latency model (n_slots=4 safe)

Root cause: In ring all-reduce, PE_IPCQ's recv handler advances my_tail
and issues a credit return immediately. With tight credit latency
(0.12ns intra-cube), the sender can refill the slot BEFORE the
receiver's outbound PE_DMA reads from it for the next send. The
outbound snapshot then captures stale data from a later round.

Fix: Propagate TensorHandle.data (captured at recv-time, before credit
return) through the entire send chain:
  tl.send(src=handle) → IpcqSendCmd.data → IpcqDmaToken.data
PE_DMA outbound already prefers token.data over MemoryStore read, so
the recv-time snapshot is used for the in-flight data. This eliminates
the race: the snapshot is captured before the slot can be overwritten.

Additional fixes:
- PE_MATH handle_command: compute SIMD latency from output tensor
  element count via _compute_ns(), using max(overhead_ns, compute_ns).
  Previously used overhead_ns=0.0 for all standalone MathCmd, making
  math ops take 0ns in SimPy.
- DataExecutor secondary sort: same-t_start ops sorted by op_kind
  (memory < gemm < math) so IPCQ slot writes execute before math reads.
- ipcq_copy recorded at INBOUND time (receiver PE_DMA arrival) instead
  of outbound. Inbound time is after fabric propagation, so it sorts
  correctly relative to the receiver's math.
- record_copy accepts explicit snapshot parameter (from token.data).

Result: N_ELEM=32 + 256-rank + n_slots=4 + cross-SIP now passes.
n_slots reverted to 4 (the deeper buffer was a workaround, not needed).
502 tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 23:02:19 -07:00
parent 74f5f5cf08
commit 1c8ddc2d03
7 changed files with 93 additions and 36 deletions
+8
View File
@@ -93,6 +93,14 @@ class IpcqSendCmd:
shape: tuple[int, ...] # data shape (op_log + MemoryStore use) shape: tuple[int, ...] # data shape (op_log + MemoryStore use)
dtype: str dtype: str
handle_id: str # completion tracking handle_id: str # completion tracking
# In-flight data snapshot captured at tl.send() time from the
# TensorHandle.data field. Carries the actual numpy array that was
# visible at recv-time (when handle.data was populated), avoiding a
# Phase 1 race where a later IPCQ inbound overwrites the sender's
# slot between recv and send. If None, PE_DMA outbound falls back to
# reading MemoryStore[src_addr] (correct for sources that are never
# overwritten, such as HBM tiles).
data: Any = None
data_op: bool = True # ADR-0020 op_log recording flag data_op: bool = True # ADR-0020 op_log recording flag
+29 -17
View File
@@ -154,23 +154,13 @@ class PeDmaComponent(PeEngineBase):
except Exception: except Exception:
token.data = None token.data = None
# Record the IPCQ copy in op_log at OUTBOUND time. ADR-0020 D6: # Note: ipcq_copy is recorded at INBOUND time (in _handle_ipcq_inbound),
# Phase 2 replays the copy in t_start order; using outbound time # not here. Outbound time is too early — it precedes fabric propagation,
# (rather than inbound) ensures the copy executes before any later # so in Phase 2 a later round's copy can sort before the receiver's
# local op at the sender that might overwrite token.src_addr (e.g. # math for an earlier round, causing slot data corruption.
# a tl.store after a recv). # The secondary sort in DataExecutor (memory ops before math at the
if self._op_logger is not None: # same t_start) ensures the inbound copy runs before the local math
try: # that reads the slot.
self._op_logger.record_copy(
t_start=float(env.now), t_end=float(env.now),
component_id=self.node.id,
src_space=token.src_space, src_addr=token.src_addr,
dst_space=peer.buffer_kind,
dst_addr=token.dst_addr,
shape=token.shape, dtype=token.dtype, nbytes=token.nbytes,
)
except Exception:
pass
try: try:
path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma) path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma)
@@ -223,6 +213,28 @@ class PeDmaComponent(PeEngineBase):
except Exception: except Exception:
pass pass
# Record the IPCQ copy at INBOUND time with embedded data snapshot.
# The snapshot (token.data) was captured by the sender's outbound
# PE_DMA at send time. Phase 2 writes the snapshot directly to
# dst — it does NOT re-read from MemoryStore[src_addr], which may
# have been mutated by a different PE's Phase 2 ops by that point.
# DataExecutor's secondary sort (memory before math at same
# t_start) ensures the write completes before the local math
# that reads the slot.
if self._op_logger is not None:
try:
self._op_logger.record_copy(
t_start=float(env.now), t_end=float(env.now),
component_id=self.node.id,
src_space=token.src_space, src_addr=token.src_addr,
dst_space=token.dst_endpoint.buffer_kind,
dst_addr=token.dst_addr,
shape=token.shape, dtype=token.dtype, nbytes=token.nbytes,
snapshot=token.data,
)
except Exception:
pass
# 2. Forward IpcqMetaArrival to local PE_IPCQ # 2. Forward IpcqMetaArrival to local PE_IPCQ
ipcq_id = f"{self._pe_prefix}.pe_ipcq" ipcq_id = f"{self._pe_prefix}.pe_ipcq"
if ipcq_id in self.out_ports: if ipcq_id in self.out_ports:
@@ -221,6 +221,10 @@ class PeIpcqComponent(ComponentBase):
handle_id=cmd.handle_id, handle_id=cmd.handle_id,
shape=cmd.shape, shape=cmd.shape,
dtype=cmd.dtype, dtype=cmd.dtype,
# Carry the handle's recv-time data snapshot so the outbound
# PE_DMA doesn't need to re-read from MemoryStore (which may
# have been overwritten by a later inbound in the meantime).
data=getattr(cmd, "data", None),
sender_seq=qp["my_head"], sender_seq=qp["my_head"],
src_sip=self._self_sip, src_sip=self._self_sip,
src_cube=self._self_cube, src_cube=self._self_cube,
+21 -3
View File
@@ -92,13 +92,31 @@ class PeMathComponent(PeEngineBase):
token.pipeline_ctx.complete_tile() token.pipeline_ctx.complete_tile()
def handle_command(self, env: simpy.Environment, pe_txn: PeInternalTxn) -> Generator: def handle_command(self, env: simpy.Environment, pe_txn: PeInternalTxn) -> Generator:
"""Legacy PeInternalTxn handling.""" """PeInternalTxn handling for standalone MathCmd (CCL kernels).
Latency = max(overhead_ns, _compute_ns(num_elements)):
- overhead_ns: fixed per-invocation setup cost (from node attrs).
- _compute_ns: SIMD cycle-based model (from vector_width + clock_freq).
The larger of the two dominates (setup-bound vs compute-bound).
"""
from kernbench.common.pe_commands import MathCmd
import math as _math
cmd = pe_txn.command
num_elements = 0
if isinstance(cmd, MathCmd) and cmd.out.shape:
num_elements = _math.prod(cmd.out.shape)
overhead_ns = float(self.node.attrs.get("overhead_ns", 0.0))
compute_ns = self._compute_ns(num_elements)
ns = max(overhead_ns, compute_ns)
if self._accel: if self._accel:
with self._accel.request() as req: with self._accel.request() as req:
yield req yield req
yield from self.run(env, 0) yield env.timeout(ns)
else: else:
yield from self.run(env, 0) yield env.timeout(ns)
pe_txn.done.succeed() pe_txn.done.succeed()
def _forward_txn(self, env: simpy.Environment, txn: Any) -> Generator: def _forward_txn(self, env: simpy.Environment, txn: Any) -> Generator:
+17 -5
View File
@@ -26,15 +26,27 @@ class DataExecutor:
self._op_log = op_log self._op_log = op_log
self.store = store self.store = store
# Ordering priority within the same t_start: memory copies must run
# before math/gemm so that slot data is populated before a consumer
# PE's math op reads it. With 0-ns PE_MATH overhead and tight SimPy
# scheduling, ipcq_copy and math ops from different PEs can collide
# at the exact same t_start.
_KIND_ORDER = {"memory": 0, "gemm": 1, "math": 2, "unknown": 3}
def run(self) -> None: def run(self) -> None:
"""Execute all ops in op_log order. """Execute all ops in op_log order.
Ops are processed sequentially in t_start order. The previous Primary sort: t_start (ascending).
ThreadPoolExecutor-based parallel execution was removed because Secondary sort: op_kind priority — memory (ipcq_copy/dma_write)
same-t_start groups are almost always size 1 (each PE processes before gemm before math. This ensures IPCQ slot data arrives
one command at a time), so the thread-pool overhead dominated. before a consumer PE's math op tries to read it, even when both
share the same SimPy timestamp.
""" """
for op in self._op_log: ops = sorted(
self._op_log,
key=lambda r: (r.t_start, self._KIND_ORDER.get(r.op_kind, 3)),
)
for op in ops:
if op.op_kind != "memory" or op.op_name != "dma_read": if op.op_kind != "memory" or op.op_name != "dma_read":
self._execute_op(op) self._execute_op(op)
+7 -11
View File
@@ -103,21 +103,17 @@ class OpLogger:
src_space: str, src_addr: int, src_space: str, src_addr: int,
dst_space: str, dst_addr: int, dst_space: str, dst_addr: int,
shape: tuple[int, ...], dtype: str, nbytes: int, shape: tuple[int, ...], dtype: str, nbytes: int,
snapshot: Any = None,
) -> None: ) -> None:
"""Record a memory copy op for Phase 2 replay (ADR-0023 + ADR-0020). """Record a memory copy op for Phase 2 replay (ADR-0023 + ADR-0020).
Used by PE_DMA at outbound (sender) time: the snapshot captures ``snapshot``: if provided (e.g. token.data from in-flight DMA),
the source data at the moment the send was issued, so Phase 2 used directly. Otherwise falls back to a fresh read from
replay does not see later mutations of the source addr (e.g. a MemoryStore[src_addr]. The snapshot is what Phase 2 writes into
tl.store that runs after the recv at the sender). dst_addr, avoiding stale-source races from cross-PE mutations.
For sources whose data is not yet materialized in Phase 1 (math
scratch outputs), the snapshot is None and Phase 2 falls back to
reading from MemoryStore — by which point the corresponding math
op has been replayed and the scratch addr is populated.
""" """
snap = None snap = snapshot
if self._memory_store is not None: if snap is None and self._memory_store is not None:
try: try:
arr = self._memory_store.read( arr = self._memory_store.read(
src_space, src_addr, shape=shape, dtype=dtype, src_space, src_addr, shape=shape, dtype=dtype,
+7
View File
@@ -421,12 +421,19 @@ class TLContext:
space = getattr(src, "space", space) space = getattr(src, "space", space)
if src_addr is None or nbytes is None or shape is None: if src_addr is None or nbytes is None or shape is None:
raise ValueError("tl.send: provide either a TensorHandle or src_addr/nbytes/shape") raise ValueError("tl.send: provide either a TensorHandle or src_addr/nbytes/shape")
# Carry the handle's .data snapshot (if available). When the source
# is a recv slot, .data holds the numpy array that was read from
# MemoryStore at recv-time. This prevents a Phase 1 race where a
# later IPCQ inbound overwrites the slot before the outbound
# PE_DMA reads it.
handle_data = getattr(src, "data", None) if src is not None else None
self._emit_dispatch_overhead() self._emit_dispatch_overhead()
cmd = IpcqSendCmd( cmd = IpcqSendCmd(
direction=dir, direction=dir,
src_addr=src_addr, src_space=space, src_addr=src_addr, src_space=space,
nbytes=nbytes, shape=shape, dtype=dtype, nbytes=nbytes, shape=shape, dtype=dtype,
handle_id=self._next_handle_id(), handle_id=self._next_handle_id(),
data=handle_data,
) )
self._emit(cmd) self._emit(cmd)