3 Commits

Author SHA1 Message Date
mukesh 19dfc86dc3 Allreduce latency sweep across topologies and data sizes
Adds test_allreduce_latency_sweep that runs the existing intercube
allreduce kernel under three SIP topologies (ring_1d, torus_2d,
mesh_2d_no_wrap, all at n_sips=4) across 11 data sizes from 256 B/SIP
up to 1 MB/SIP. For each point, captures max(pe_exec_ns) — the
critical-path kernel time — and emits CSV plus log-x and linear-x
plots, both per-topology and combined overview, with KB/MB-formatted
tick labels. Reuses run_allreduce + _write_temp_configs and adds a
slot_size auto-bump when n_elem*2 exceeds the default IPCQ slot.

Sweep skips n_elem=16 because the runtime's dim_map scalar-arg
remapping (context.py:761) collides any int-valued kernel scalar that
matches a global tensor dim with its local shard size.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 10:16:29 -07:00
mukesh 14d800b0ae Kernel-launch sync (ADR-0009 D5) and IPCQ drain at inbound (ADR-0023)
- KernelLaunchMsg gains target_start_ns: IO_CPU stamps a global barrier
  (max path latency across every target PE), M_CPU passes it through,
  PE_CPU yields until it before recording pe_exec_start. Every PE in a
  launch begins kernel execution at the same env.now regardless of its
  dispatch path length — eliminates per-PE dispatch-offset artifact in
  cross-PE and cross-cube latency measurements.

- PE_DMA._handle_ipcq_inbound now pays Transaction.drain_ns at the top,
  matching the terminal-drain behavior of ComponentBase._forward_txn for
  every non-IPCQ Transaction. SRC-side tl.send stays fire-and-forget
  (sender doesn't yield on sub_done); tl.recv now blocks until bytes
  have actually drained into its inbox.

- ComponentContext: new compute_path_latency_ns helper + node_overhead_ns
  field populated by GraphEngine.

- tests/test_kernel_launch_sync.py: asserts all PEs in one launch
  produce identical pe_exec_ns for a no-op kernel (zero spread).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 15:30:29 -07:00
mukesh 6918e6e906 PE-to-PE latency test + supporting fixes
Adds tests/test_pe_to_pe_latency.py: a sweep that measures PE-to-PE
transfer latency for five hop types (intra-cube horizontal/vertical,
inter-cube horizontal/vertical, inter-SIP) across data sizes 128 B to
10 KB, on both the IPCQ (tl.send/tl.recv) and raw-DMA (tl.load+tl.store)
paths. Emits per-hop PNG plots, an overview PNG, and a CSV summary into
tests/pe2pe_latency_plots/. Latency is reported as max(pe_exec_ns) across
participating PEs, read from engine.get_completion(), so the measurement
captures the SRC/DST PE's kernel body time rather than the full launch+
response-aggregation envelope.

Two simulator fixes were needed to make this measurement meaningful:

- PeMMU now stores a list of (start, end, pa) sub-regions per page
  rather than a single PA. DPPolicy layouts with shards smaller than
  page_size (e.g. 128 B payloads with 4 KB pages) used to silently
  overwrite each other through last-write-wins, causing DMAs intended
  for cube0 to physically route to cube3 - inflating latency by ~170 ns
  per DMA at small sizes. STOPGAP: real MMUs don't support sub-page
  regions; long-term fix is either smaller MMU page size or DPPolicy
  validation that refuses sub-page shards.

- M_CPU's per-PE metrics aggregation (pe_exec_ns, dma_ns, compute_ns)
  now max-merges against the existing value in result_data rather than
  overwriting. Multi-cube workloads share one result_data dict via
  IO_CPU fanout; the previous overwrite caused whichever cube's M_CPU
  finished last to clobber others' values, so multi-cube pe_exec_ns was
  racy and frequently 0. Same fix applied in legacy/builtin/m_cpu.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 21:04:31 -07:00
17 changed files with 1055 additions and 39 deletions
@@ -67,6 +67,51 @@ Completion semantics:
--- ---
### D5. Launch timing is endpoint-synchronized
All PEs targeted by a single kernel launch MUST begin executing the kernel
body at the same simulated time, regardless of their dispatch path length
from the launch entry point.
Rationale. The dispatch tree Host → IO_CPU → M_CPU → PE_CPU has variable
latency at every level. PEs near their M_CPU receive the launch earlier
than PEs farther away; cubes near an IO_CPU receive it earlier than cubes
farther away. Without synchronization, each PE's kernel begins at a
different `env.now`, making per-PE metrics such as `pe_exec_ns` a function
of dispatch-path geometry rather than of the kernel's behavior —
producing measurement artifacts in benchmarks that time kernel-internal
waits (for example `tl.recv` on cross-cube or cross-SIP hops).
Mechanism.
- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`.
- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it
computes `target_start_ns = env.now + max_latency` where `max_latency`
is the maximum `ComponentContext.compute_path_latency_ns(path)` across
every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu,
pe_cpu_id)`. The stamped value is placed on the request carried by
every fanned-out sub-Transaction.
- **M_CPU** passes an already-stamped `target_start_ns` through
unchanged. Only when the value is absent (e.g. a direct
launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier
`env.now + max(local command-path latency)`.
- **PE_CPU** yields `env.timeout(target_start_ns - env.now)` at the top
of `_execute_kernel`, before recording `pe_exec_start` and invoking
the kernel body.
- When `target_start_ns is None`, PE_CPU falls through to the legacy
unsynchronized behavior — preserving backward compatibility.
IO_CPU-level stamping guarantees every PE across every targeted cube
uses the same barrier sim-time, eliminating both the within-cube
dispatch-offset artifact *and* the cross-cube offset artifact in
multi-cube launches. Models a real-hardware timed-broadcast launch
(latency-equalized dispatch tree).
The synchronization is internal to the engine / IO_CPU / M_CPU / PE_CPU
control plane — runtime API and application kernels are unchanged.
---
## Links ## Links
- SPEC R1, R2, R7, R8 - SPEC R1, R2, R7, R8
+40 -3
View File
@@ -420,11 +420,21 @@ fan-out (see `IpcqInitMsg` in D12).
#### PE_DMA's added responsibility #### PE_DMA's added responsibility
When `vc_comm` receives a token, PE_DMA processes it as the following When `vc_comm` receives a token, PE_DMA processes it as the following
**atomic** sequence. **No SimPy yield is allowed between the two steps** sequence: pay the Transaction's terminal BW drain, then atomically
(invariant I6): write data and forward metadata. **No SimPy yield is allowed between
the data write and the metadata forward** (invariant I6). The drain
yield must sit before the atomic block, not inside it:
```python ```python
def _on_vc_comm_recv(self, env, token): def _on_vc_comm_recv(self, env, txn):
# Pay the terminal BW drain (nbytes / bottleneck_bw stamped by the
# sender PE_DMA). MUST happen before the atomic block so recv only
# wakes after the bytes have "landed".
drain = getattr(txn, "drain_ns", 0.0)
if drain > 0:
yield env.timeout(drain)
token = txn.request
# ── ATOMIC: no yield between these two operations ── # ── ATOMIC: no yield between these two operations ──
data = self._memory_store.read(token.src_space, token.src_addr, data = self._memory_store.read(token.src_space, token.src_addr,
shape=..., dtype=...) shape=..., dtype=...)
@@ -439,6 +449,33 @@ The final `put` is yieldable but uses an unbounded internal store, so
it completes in a single step. That `put` is the closing call of the it completes in a single step. That `put` is the closing call of the
atomic block; nothing may be inserted before it. atomic block; nothing may be inserted before it.
#### Drain-at-inbound semantics (D9 timing model)
The Transaction carries `drain_ns = nbytes / bottleneck_bw_on_path`
stamped at send-side PE_DMA. In this simulator per-hop `overhead_ns`
is paid at each forwarding component via `run()`, and the remaining
BW drain is paid once at the Transaction's terminal. Every non-IPCQ
Transaction (raw DMA, kernel-launch fanout, etc.) pays this drain via
`ComponentBase._forward_txn` at the terminal node. For IPCQ the
destination PE_DMA intercepts the Transaction with `_handle_ipcq_inbound`
(so IPCQ-specific data write + metadata forward can happen), so **the
drain MUST be paid explicitly at the top of that handler** to keep
IPCQ's timing model on par with every other fabric Transaction.
Side-effects of paying drain here:
- **SRC `tl.send`** is unchanged — fire-and-forget semantics are
preserved because the sender PE_DMA does not `yield sub_done`. The
`sub_done.succeed()` call (made after metadata forward below) is an
event with no listener on the sender side.
- **DST `tl.recv`** unblocks `drain_ns` later. Since recv wakes only
when `IpcqMetaArrival` reaches its local PE_IPCQ, and the metadata
forward now happens after the drain, recv observes the full fabric
transfer time including bandwidth cost.
Matches the physical picture: send dispatches and leaves; recv waits
until the bytes have actually been drained into its inbox.
### D9.5. ADR-0020 (2-pass) integration ### D9.5. ADR-0020 (2-pass) integration
`tl.send` / `tl.recv` integrates with ADR-0020's two-pass model. Phase `tl.send` / `tl.recv` integrates with ADR-0020's two-pass model. Phase
+40 -3
View File
@@ -426,11 +426,22 @@ backend init에서 IpcqInitMsg fan-out 시 양방향 fast path channel을 함께
#### PE_DMA의 책임 추가 #### PE_DMA의 책임 추가
PE_DMA(vc_comm)는 token 수신 시 다음 atomic 시퀀스로 처리한다. PE_DMA(vc_comm)는 token 수신 시 다음 시퀀스로 처리한다: Transaction
**두 동작 사이에 SimPy yield를 두어서는 안 된다** (I6 MUST 규칙 참조): terminal의 BW drain을 먼저 지불하고, 이어서 atomic하게 data write +
metadata forward 수행. **data write와 metadata forward 사이에는 SimPy
yield를 두어서는 안 된다** (I6 MUST 규칙 참조). drain yield는 atomic
구간 안이 아니라 그 앞에 위치해야 한다:
```python ```python
def _on_vc_comm_recv(self, env, token): def _on_vc_comm_recv(self, env, txn):
# Sender PE_DMA가 찍어 둔 drain_ns (= nbytes / bottleneck_bw) 를
# 여기서 지불. atomic 구간보다 앞이어야 한다 — recv는 bytes가
# "도착"한 이후에만 깨어나야 하므로.
drain = getattr(txn, "drain_ns", 0.0)
if drain > 0:
yield env.timeout(drain)
token = txn.request
# ── ATOMIC: 두 동작 사이에 yield 금지 ── # ── ATOMIC: 두 동작 사이에 yield 금지 ──
# 1. data를 dst_addr에 write (dst의 메모리 공간은 token.dst_endpoint.buffer_kind) # 1. data를 dst_addr에 write (dst의 메모리 공간은 token.dst_endpoint.buffer_kind)
data = self._memory_store.read(token.src_space, token.src_addr, data = self._memory_store.read(token.src_space, token.src_addr,
@@ -446,6 +457,32 @@ wire로 capacity가 unbounded인 store를 사용하므로 즉시 완료된다 (
single-step). 이 최종 put이 atomic 구간의 끝이며, 그 이전에 다른 yield가 single-step). 이 최종 put이 atomic 구간의 끝이며, 그 이전에 다른 yield가
삽입되면 안 된다. 삽입되면 안 된다.
#### Drain-at-inbound semantics (D9 timing model)
Transaction은 sender PE_DMA가 `drain_ns = nbytes / bottleneck_bw_on_path`
를 찍어 둔 상태로 fabric에 들어간다. 이 simulator에서 per-hop `overhead_ns`
는 각 forwarding component의 `run()` 에서 지불되고, 남은 BW drain은
Transaction의 terminal node에서 한 번 지불된다. IPCQ가 아닌 모든
Transaction (raw DMA, kernel-launch fanout 등) 은
`ComponentBase._forward_txn` 이 terminal에서 이 drain을 지불한다. IPCQ의
경우 목적지 PE_DMA가 `_handle_ipcq_inbound` 핸들러로 Transaction을
가로채서 (IPCQ 전용 data write + metadata forward를 해야 하므로)
**이 핸들러 최상단에서 drain을 명시적으로 지불해야 한다** — 그래야 IPCQ의
timing model이 다른 모든 fabric Transaction과 동일선상에 놓인다.
여기서 drain을 지불할 때의 side-effect:
- **SRC `tl.send`**: 동작 불변. sender PE_DMA가 `sub_done``yield`
하지 않으므로 fire-and-forget 의미가 보존된다. metadata forward 이후
호출되는 `sub_done.succeed()` 는 sender 입장에서 listener가 없는 이벤트.
- **DST `tl.recv`**: `drain_ns` 만큼 늦게 깨어난다. recv는 local PE_IPCQ
`IpcqMetaArrival` 수신 시에만 wake되며, metadata forward가 drain
이후로 이동했으므로 recv는 bandwidth까지 포함한 전체 fabric transfer
시간을 관측하게 된다.
물리적 그림과 일치: send는 dispatch하고 바로 반환; recv는 bytes가 실제로
자신의 inbox로 drain될 때까지 대기.
#### Backpressure latency 정확도 #### Backpressure latency 정확도
backpressure 해제까지 걸리는 시간: backpressure 해제까지 걸리는 시간:
+55 -1
View File
@@ -58,7 +58,18 @@ class IoCpuComponent(ComponentBase):
self._pending[key] = (expected, received, parent_done) self._pending[key] = (expected, received, parent_done)
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator: def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses.""" """Fan out sub-Transactions to target cube M_CPUs, wait for responses.
ADR-0009 D5 (extended): for KernelLaunchMsg, stamp a single global
target_start_ns = env.now + max(IO_CPU → any target PE_CPU path
latency across all target cubes). M_CPU passes this value through
unchanged; every PE in every cube yields until the same sim-time
before beginning kernel execution. Without this, cross-cube
launches would have each cube's M_CPU compute its own per-cube
barrier relative to its local env.now, leaving PEs on different
cubes out of sync (the "h3/h4 dispatch-offset artifact").
"""
import dataclasses
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
request = txn.request request = txn.request
@@ -72,6 +83,36 @@ class IoCpuComponent(ComponentBase):
txn.done.succeed() txn.done.succeed()
return return
# For KernelLaunchMsg, compute the global barrier once here so
# every downstream PE_CPU uses the same target_start_ns.
if isinstance(request, KernelLaunchMsg):
global_max_latency = 0.0
pe_ids = self._resolve_pe_ids(
getattr(request, "target_pe", "all")
)
for sip, cube in cube_targets:
for pe_id in pe_ids:
pe_cpu_id = (
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
)
try:
path = self.ctx.router.find_node_path(
self.node.id, pe_cpu_id,
)
except Exception:
continue
if len(path) < 2:
continue
latency = self.ctx.compute_path_latency_ns(
path, nbytes=0,
)
if latency > global_max_latency:
global_max_latency = latency
request = dataclasses.replace(
request,
target_start_ns=float(env.now) + global_max_latency,
)
# Setup aggregation # Setup aggregation
self._pending[request.request_id] = (len(cube_targets), 0, txn.done) self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
@@ -91,6 +132,19 @@ class IoCpuComponent(ComponentBase):
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
def _resolve_pe_ids(self, target_pe: Any) -> list[int]:
"""Resolve target_pe → list of PE indices (mirrors M_CPU logic)."""
if isinstance(target_pe, int):
return [target_pe]
if isinstance(target_pe, tuple):
return list(target_pe)
# "all": all PEs in a cube
n_slices = 8
if self.ctx and self.ctx.spec:
mm = self.ctx.spec.get("cube", {}).get("memory_map", {})
n_slices = mm.get("hbm_slices_per_cube", 8)
return list(range(n_slices))
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]: def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
"""Return list of (sip, cube) pairs to fan out to.""" """Return list of (sip, cube) pairs to fan out to."""
from kernbench.runtime_api.kernel import ( from kernbench.runtime_api.kernel import (
+37 -8
View File
@@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase):
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges). Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion. PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path. Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
ADR-0009 D5: stamps target_start_ns so every PE in this fanout
starts executing at the same env.now regardless of dispatch path.
""" """
import dataclasses
request = txn.request request = txn.request
target_pe = getattr(request, "target_pe", "all") target_pe = getattr(request, "target_pe", "all")
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0" cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
@@ -172,9 +176,13 @@ class MCpuComponent(ComponentBase):
txn.done.succeed() txn.done.succeed()
return return
# Fan out to each PE_CPU, using response-based aggregation # Resolve per-PE paths. If IO_CPU already stamped a global
sub_txns: list[Transaction] = [] # target_start_ns (ADR-0009 D5 extended), pass it through
n_dispatched = 0 # unchanged so every PE across every cube uses the same barrier.
# Otherwise (e.g. direct-to-M_CPU launch in a unit test) compute
# a per-cube barrier from env.now.
per_pe: list[tuple[int, list[str], float]] = []
max_latency = 0.0
for pe_id in pe_ids: for pe_id in pe_ids:
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu" pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
try: try:
@@ -183,8 +191,24 @@ class MCpuComponent(ComponentBase):
continue continue
if len(path) < 2: if len(path) < 2:
continue continue
latency = self.ctx.compute_path_latency_ns(path, nbytes=0)
per_pe.append((pe_id, path, latency))
if latency > max_latency:
max_latency = latency
if getattr(request, "target_start_ns", None) is not None:
stamped_request = request
else:
stamped_request = dataclasses.replace(
request, target_start_ns=float(env.now) + max_latency,
)
# Fan out to each PE_CPU, using response-based aggregation
sub_txns: list[Transaction] = []
n_dispatched = 0
for pe_id, path, _lat in per_pe:
sub_txn = Transaction( sub_txn = Transaction(
request=request, path=path, step=0, request=stamped_request, path=path, step=0,
nbytes=0, done=env.event(), nbytes=0, done=env.event(),
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
@@ -204,16 +228,21 @@ class MCpuComponent(ComponentBase):
yield all_done yield all_done
del self._parent_txns[request.request_id] del self._parent_txns[request.request_id]
# Aggregate PE-internal metrics (max across PEs) # Aggregate PE-internal metrics (max across PEs and across cubes).
# Multiple M_CPUs share the same result_data dict via IO_CPU fanout;
# merge against the existing value so cubes don't clobber each other.
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns] pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
if pe_exec_values: if pe_exec_values:
txn.result_data["pe_exec_ns"] = max(pe_exec_values) cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0
txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values))
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns] dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
if dma_values: if dma_values:
txn.result_data["dma_ns"] = max(dma_values) cur = txn.result_data.get("dma_ns", 0.0) or 0.0
txn.result_data["dma_ns"] = max(cur, max(dma_values))
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns] compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
if compute_values: if compute_values:
txn.result_data["compute_ns"] = max(compute_values) cur = txn.result_data.get("compute_ns", 0.0) or 0.0
txn.result_data["compute_ns"] = max(cur, max(compute_values))
# Send aggregate response on reverse command path back to IO_CPU # Send aggregate response on reverse command path back to IO_CPU
reverse_path = list(reversed(txn.path)) reverse_path = list(reversed(txn.path))
@@ -95,6 +95,13 @@ class PeCpuComponent(ComponentBase):
request = txn.request request = txn.request
yield from self.run(env, 0) yield from self.run(env, 0)
# ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a
# target_start_ns, wait until then so every PE in this launch
# begins pe_exec measurement at the same simulated time.
target_start = getattr(request, "target_start_ns", None)
if target_start is not None and target_start > env.now:
yield env.timeout(float(target_start) - env.now)
kernel_fn = get_kernel(request.kernel_ref.name) kernel_fn = get_kernel(request.kernel_ref.name)
num_programs = self._derive_num_programs(request) num_programs = self._derive_num_programs(request)
kernel_args = self._unpack_kernel_args(request) kernel_args = self._unpack_kernel_args(request)
+25 -1
View File
@@ -186,13 +186,37 @@ class PeDmaComponent(PeEngineBase):
# ── IPCQ inbound (fabric → PE_DMA → MemoryStore + PE_IPCQ) ────── # ── IPCQ inbound (fabric → PE_DMA → MemoryStore + PE_IPCQ) ──────
def _handle_ipcq_inbound(self, env: simpy.Environment, txn: Any) -> Generator: def _handle_ipcq_inbound(self, env: simpy.Environment, txn: Any) -> Generator:
"""At destination PE_DMA: atomically write data and forward metadata. """At destination PE_DMA: pay terminal drain, then atomically write
data and forward metadata.
ADR-0023 D9 (drain at inbound terminal): the Transaction carries
``drain_ns = nbytes / bottleneck_bw_on_path`` stamped by the sender
PE_DMA. Like every other Transaction terminal in the simulator (see
``ComponentBase._forward_txn``), this drain must be paid when the
Transaction reaches its destination. SRC-side ``tl.send`` is
fire-and-forget — it never yields on ``sub_done`` — so paying the
drain here does NOT delay the sender. What it DOES delay is the
IpcqMetaArrival forwarded below: that delay is the only signal
``tl.recv`` on DST blocks on, which is exactly the desired
semantics — "send dispatches and returns; recv waits until the
bytes have actually landed in its inbox".
The drain MUST be paid before the atomic block — inserting a yield
inside would break invariant I6.
I6 (MUST): no SimPy yield between MemoryStore.write and the I6 (MUST): no SimPy yield between MemoryStore.write and the
IpcqMetaArrival put into PE_IPCQ. IpcqMetaArrival put into PE_IPCQ.
""" """
from kernbench.common.ipcq_types import IpcqMetaArrival from kernbench.common.ipcq_types import IpcqMetaArrival
# Pay terminal BW drain before the atomic write/metadata forward.
# Without this, IPCQ effectively got fabric bandwidth for free at
# the terminal (only intermediate-hop overhead_ns was charged),
# making IPCQ lower than raw DMA at large sizes in benchmarks.
drain = getattr(txn, "drain_ns", 0.0)
if drain > 0:
yield env.timeout(drain)
token = txn.request token = txn.request
# ── ATOMIC: do not introduce yield between these two operations ── # ── ATOMIC: do not introduce yield between these two operations ──
+19
View File
@@ -26,6 +26,9 @@ class ComponentContext:
spec: dict = field(default_factory=dict) # topology spec (cube layout, PE count, etc.) spec: dict = field(default_factory=dict) # topology spec (cube layout, PE count, etc.)
memory_store: Any = None # MemoryStore for Phase 1 data-aware execution (ADR-0020) memory_store: Any = None # MemoryStore for Phase 1 data-aware execution (ADR-0020)
op_logger: Any = None # OpLogger for Phase 1 op recording (ADR-0020) op_logger: Any = None # OpLogger for Phase 1 op recording (ADR-0020)
# node_id -> overhead_ns (ADR-0009 D5: used by M_CPU to compute per-PE
# dispatch latency when stamping target_start_ns on KernelLaunchMsg).
node_overhead_ns: dict[str, float] = field(default_factory=dict)
def get_shared_resource( def get_shared_resource(
self, env: simpy.Environment, key: str, capacity: int = 1, self, env: simpy.Environment, key: str, capacity: int = 1,
@@ -52,3 +55,19 @@ class ComponentContext:
if min_bw == float("inf"): if min_bw == float("inf"):
return 0.0 return 0.0
return nbytes / min_bw return nbytes / min_bw
def compute_path_latency_ns(self, path: list[str], nbytes: int = 0) -> float:
"""Formula latency along path: wire + per-node overhead + drain.
ADR-0009 D5: M_CPU uses this to compute per-PE dispatch latency
when stamping target_start_ns on KernelLaunchMsg fanout.
"""
total = 0.0
for i in range(len(path) - 1):
edge = self.edge_map.get((path[i], path[i + 1]))
if edge:
total += edge.distance_mm * self.ns_per_mm
for node_id in path:
total += self.node_overhead_ns.get(node_id, 0.0)
total += self.compute_drain_ns(path, nbytes)
return total
@@ -58,7 +58,13 @@ class IoCpuComponent(ComponentBase):
self._pending[key] = (expected, received, parent_done) self._pending[key] = (expected, received, parent_done)
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator: def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses.""" """Fan out sub-Transactions to target cube M_CPUs, wait for responses.
ADR-0009 D5 (extended): stamp a global target_start_ns on
KernelLaunchMsg so every PE across every target cube starts at
the same env.now. See the non-legacy builtin for full rationale.
"""
import dataclasses
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
request = txn.request request = txn.request
@@ -72,6 +78,34 @@ class IoCpuComponent(ComponentBase):
txn.done.succeed() txn.done.succeed()
return return
if isinstance(request, KernelLaunchMsg):
global_max_latency = 0.0
pe_ids = self._resolve_pe_ids(
getattr(request, "target_pe", "all")
)
for sip, cube in cube_targets:
for pe_id in pe_ids:
pe_cpu_id = (
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
)
try:
path = self.ctx.router.find_node_path(
self.node.id, pe_cpu_id,
)
except Exception:
continue
if len(path) < 2:
continue
latency = self.ctx.compute_path_latency_ns(
path, nbytes=0,
)
if latency > global_max_latency:
global_max_latency = latency
request = dataclasses.replace(
request,
target_start_ns=float(env.now) + global_max_latency,
)
# Setup aggregation # Setup aggregation
self._pending[request.request_id] = (len(cube_targets), 0, txn.done) self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
@@ -91,6 +125,18 @@ class IoCpuComponent(ComponentBase):
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
def _resolve_pe_ids(self, target_pe: Any) -> list[int]:
"""Resolve target_pe → list of PE indices (mirrors M_CPU logic)."""
if isinstance(target_pe, int):
return [target_pe]
if isinstance(target_pe, tuple):
return list(target_pe)
n_slices = 8
if self.ctx and self.ctx.spec:
mm = self.ctx.spec.get("cube", {}).get("memory_map", {})
n_slices = mm.get("hbm_slices_per_cube", 8)
return list(range(n_slices))
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]: def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
"""Return list of (sip, cube) pairs to fan out to.""" """Return list of (sip, cube) pairs to fan out to."""
from kernbench.runtime_api.kernel import ( from kernbench.runtime_api.kernel import (
@@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase):
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges). Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion. PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path. Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
ADR-0009 D5: stamps target_start_ns so every PE in this fanout
starts executing at the same env.now regardless of dispatch path.
""" """
import dataclasses
request = txn.request request = txn.request
target_pe = getattr(request, "target_pe", "all") target_pe = getattr(request, "target_pe", "all")
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0" cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
@@ -172,9 +176,10 @@ class MCpuComponent(ComponentBase):
txn.done.succeed() txn.done.succeed()
return return
# Fan out to each PE_CPU, using response-based aggregation # Resolve per-PE paths. If IO_CPU already stamped a global
sub_txns: list[Transaction] = [] # target_start_ns (ADR-0009 D5 extended), pass it through.
n_dispatched = 0 per_pe: list[tuple[int, list[str], float]] = []
max_latency = 0.0
for pe_id in pe_ids: for pe_id in pe_ids:
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu" pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
try: try:
@@ -183,8 +188,24 @@ class MCpuComponent(ComponentBase):
continue continue
if len(path) < 2: if len(path) < 2:
continue continue
latency = self.ctx.compute_path_latency_ns(path, nbytes=0)
per_pe.append((pe_id, path, latency))
if latency > max_latency:
max_latency = latency
if getattr(request, "target_start_ns", None) is not None:
stamped_request = request
else:
stamped_request = dataclasses.replace(
request, target_start_ns=float(env.now) + max_latency,
)
# Fan out to each PE_CPU, using response-based aggregation
sub_txns: list[Transaction] = []
n_dispatched = 0
for pe_id, path, _lat in per_pe:
sub_txn = Transaction( sub_txn = Transaction(
request=request, path=path, step=0, request=stamped_request, path=path, step=0,
nbytes=0, done=env.event(), nbytes=0, done=env.event(),
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
@@ -204,16 +225,21 @@ class MCpuComponent(ComponentBase):
yield all_done yield all_done
del self._parent_txns[request.request_id] del self._parent_txns[request.request_id]
# Aggregate PE-internal metrics (max across PEs) # Aggregate PE-internal metrics (max across PEs and across cubes).
# Multiple M_CPUs share the same result_data dict via IO_CPU fanout;
# merge against the existing value so cubes don't clobber each other.
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns] pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
if pe_exec_values: if pe_exec_values:
txn.result_data["pe_exec_ns"] = max(pe_exec_values) cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0
txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values))
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns] dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
if dma_values: if dma_values:
txn.result_data["dma_ns"] = max(dma_values) cur = txn.result_data.get("dma_ns", 0.0) or 0.0
txn.result_data["dma_ns"] = max(cur, max(dma_values))
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns] compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
if compute_values: if compute_values:
txn.result_data["compute_ns"] = max(compute_values) cur = txn.result_data.get("compute_ns", 0.0) or 0.0
txn.result_data["compute_ns"] = max(cur, max(compute_values))
# Send aggregate response on reverse command path back to IO_CPU # Send aggregate response on reverse command path back to IO_CPU
reverse_path = list(reversed(txn.path)) reverse_path = list(reversed(txn.path))
@@ -71,6 +71,13 @@ class PeCpuComponent(ComponentBase):
request = txn.request request = txn.request
yield from self.run(env, 0) yield from self.run(env, 0)
# ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a
# target_start_ns, wait until then so every PE in this launch
# begins pe_exec measurement at the same simulated time.
target_start = getattr(request, "target_start_ns", None)
if target_start is not None and target_start > env.now:
yield env.timeout(float(target_start) - env.now)
kernel_fn = get_kernel(request.kernel_ref.name) kernel_fn = get_kernel(request.kernel_ref.name)
num_programs = self._derive_num_programs(request) num_programs = self._derive_num_programs(request)
kernel_args = self._unpack_kernel_args(request) kernel_args = self._unpack_kernel_args(request)
+70 -13
View File
@@ -19,7 +19,14 @@ class PageFault(Exception):
class PeMMU: class PeMMU:
"""Per-PE MMU with page-aligned VA→PA translation table. """Per-PE MMU with sub-page-capable VA→PA translation table.
Each page-table entry is a list of (start_in_page, end_in_page,
pa_at_offset_zero) regions. This is a SIMULATOR STOPGAP — real MMUs
store one PA per page-table entry. Sub-page regions exist here so
DPPolicy layouts that shard below page granularity (e.g. 128 B
payloads with 4 KB pages) don't silently mis-route through last-
write-wins overwrites. Memory note: project_mmu_subpage_stopgap.md.
Args: Args:
page_size: Page size in bytes (default 2 MB). page_size: Page size in bytes (default 2 MB).
@@ -34,7 +41,11 @@ class PeMMU:
self._page_size = page_size self._page_size = page_size
self._page_shift = (page_size - 1).bit_length() self._page_shift = (page_size - 1).bit_length()
self._page_mask = page_size - 1 self._page_mask = page_size - 1
self._table: dict[int, int] = {} # va_page_number → pa_page_base # vpn → list of (start_in_page, end_in_page, pa_at_offset_zero).
# pa_at_offset_zero is the PA that offset 0 of the page would map
# to under this region — i.e. translate(off) = pa_at_offset_zero
# + off when start <= off < end.
self._table: dict[int, list[tuple[int, int, int]]] = {}
self._overhead_ns = overhead_ns self._overhead_ns = overhead_ns
@property @property
@@ -46,21 +57,67 @@ class PeMMU:
return len(self._table) return len(self._table)
def map(self, va: int, pa: int, size: int) -> None: def map(self, va: int, pa: int, size: int) -> None:
"""Register VA→PA mapping for a contiguous range.""" """Register VA→PA mapping for a contiguous range.
for off in range(0, size, self._page_size):
vpn = (va + off) >> self._page_shift Sub-page-aware: a single page can hold multiple disjoint regions,
self._table[vpn] = pa + off each pointing to a different PA. Later map() calls APPEND a new
region; on overlap with an existing region, the new region wins
for the overlapping offsets (translate iterates in reverse so the
last write takes precedence — matches legacy single-PA behavior
when a full page is re-mapped).
"""
end_va = va + size
cur = va
while cur < end_va:
vpn = cur >> self._page_shift
page_base_va = vpn << self._page_shift
page_end_va = page_base_va + self._page_size
region_start = cur - page_base_va
region_end = min(end_va, page_end_va) - page_base_va
# PA seen at offset 0 of page if this region's mapping covered it
pa_at_offset_zero = pa + (cur - va) - region_start
self._table.setdefault(vpn, []).append(
(region_start, region_end, pa_at_offset_zero)
)
cur = page_base_va + region_end
def unmap(self, va: int, size: int) -> None: def unmap(self, va: int, size: int) -> None:
"""Remove VA mapping for a contiguous range.""" """Remove VA mapping for a contiguous range.
for off in range(0, size, self._page_size):
vpn = (va + off) >> self._page_shift Drops any region whose extent is contained within the unmapped
self._table.pop(vpn, None) range. Partial overlaps (region straddles the range boundary)
are left in place — caller is expected to unmap on the same
boundaries it mapped on.
"""
end_va = va + size
cur = va
while cur < end_va:
vpn = cur >> self._page_shift
page_base_va = vpn << self._page_shift
page_end_va = page_base_va + self._page_size
unmap_start = cur - page_base_va
unmap_end = min(end_va, page_end_va) - page_base_va
regions = self._table.get(vpn)
if regions is not None:
kept = [
r for r in regions
if not (r[0] >= unmap_start and r[1] <= unmap_end)
]
if kept:
self._table[vpn] = kept
else:
del self._table[vpn]
cur = page_base_va + unmap_end
def translate(self, va: int) -> int: def translate(self, va: int) -> int:
"""Translate VA to PA. Raises PageFault if unmapped.""" """Translate VA to PA. Raises PageFault if unmapped."""
vpn = va >> self._page_shift vpn = va >> self._page_shift
pa_page_base = self._table.get(vpn) regions = self._table.get(vpn)
if pa_page_base is None: if regions is None:
raise PageFault(va)
offset = va & self._page_mask
# Iterate latest-first so newer map() calls win on overlap
for start, end, pa_at_offset_zero in reversed(regions):
if start <= offset < end:
return pa_at_offset_zero + offset
raise PageFault(va) raise PageFault(va)
return pa_page_base + (va & self._page_mask)
+5
View File
@@ -90,6 +90,11 @@ class KernelLaunchMsg:
args: tuple[KernelArg, ...] args: tuple[KernelArg, ...]
target_cubes: tuple[int, ...] | Literal["all"] = "all" target_cubes: tuple[int, ...] | Literal["all"] = "all"
target_pe: int | tuple[int, ...] | Literal["all"] = "all" target_pe: int | tuple[int, ...] | Literal["all"] = "all"
# ADR-0009 D5: synchronized kernel start. When set, each PE_CPU yields
# until env.now >= target_start_ns before beginning kernel execution,
# so every PE in a launch starts at the same simulated time regardless
# of its M_CPU dispatch path length. Stamped by M_CPU fan-out.
target_start_ns: float | None = None
msg_type: Literal["kernel_launch"] = "kernel_launch" msg_type: Literal["kernel_launch"] = "kernel_launch"
+4
View File
@@ -67,6 +67,10 @@ class GraphEngine:
spec=graph.spec, spec=graph.spec,
memory_store=self._memory_store, memory_store=self._memory_store,
op_logger=self._op_logger, op_logger=self._op_logger,
node_overhead_ns={
nid: float(n.attrs.get("overhead_ns", 0.0))
for nid, n in graph.nodes.items()
},
) )
self._components: dict[str, ComponentBase] = { self._components: dict[str, ComponentBase] = {
node_id: ComponentRegistry.create(node, overrides, ctx) node_id: ComponentRegistry.create(node, overrides, ctx)
+200 -1
View File
@@ -179,7 +179,9 @@ CONFIGS = [
] ]
def _write_temp_configs(tmp_path, sip_topology, n_sips, algorithm): def _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
):
"""Write temp topology.yaml and ccl.yaml with the given overrides.""" """Write temp topology.yaml and ccl.yaml with the given overrides."""
with open(TOPOLOGY_PATH) as f: with open(TOPOLOGY_PATH) as f:
topo_cfg = yaml.safe_load(f) topo_cfg = yaml.safe_load(f)
@@ -193,6 +195,15 @@ def _write_temp_configs(tmp_path, sip_topology, n_sips, algorithm):
with open(ccl_path) as f: with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f) ccl_cfg = yaml.safe_load(f)
ccl_cfg["defaults"]["algorithm"] = algorithm ccl_cfg["defaults"]["algorithm"] = algorithm
if n_elem_override is not None:
ccl_cfg.setdefault("algorithms", {}).setdefault(
algorithm, {},
)["n_elem"] = int(n_elem_override)
# Ensure IPCQ slot is big enough for the per-message payload.
per_msg_bytes = int(n_elem_override) * 2 # f16
default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096))
if per_msg_bytes > default_slot:
ccl_cfg["defaults"]["slot_size"] = per_msg_bytes
tmp_ccl = tmp_path / "ccl.yaml" tmp_ccl = tmp_path / "ccl.yaml"
with open(tmp_ccl, "w") as f: with open(tmp_ccl, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False) yaml.dump(ccl_cfg, f, default_flow_style=False)
@@ -220,3 +231,191 @@ def test_allreduce(tmp_path, algorithm, sip_topology, n_sips):
algorithm=algorithm, ccl_yaml=ccl_path, algorithm=algorithm, ccl_yaml=ccl_path,
) )
assert result["ok_cubes"] > 0 assert result["ok_cubes"] > 0
# ── Latency sweep ─────────────────────────────────────────────────────
# avoid 16 (== n_cubes, dim_map collision). Goes up to 1 MB per SIP:
# bytes_per_sip = n_cubes * n_elem * 2 = 32 * n_elem.
_SWEEP_N_ELEM = [
8, 32, 64, 128, 512, 1024, 2048,
4096, 8192, 16384, 32768,
]
_ELEM_BYTES_F16 = 2
def test_allreduce_latency_sweep(tmp_path):
"""Sweep n_elem across each SIP topology; record max(pe_exec_ns)
as the critical-path kernel latency. Emits CSV + PNG plots to
tests/allreduce_latency_plots/.
"""
import csv
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
def _fmt_bytes(x, _pos):
"""Format tick as B / KB / MB."""
if x <= 0:
return "0"
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f} MB"
if x >= 1024:
return f"{x / 1024:.0f} KB"
return f"{x:.0f} B"
_bytes_fmt = FuncFormatter(_fmt_bytes)
out_dir = Path(__file__).parent / "allreduce_latency_plots"
out_dir.mkdir(parents=True, exist_ok=True)
records: list[dict] = []
# Apples-to-apples: same n_sips across all three topologies.
for algorithm, sip_topology, n_sips in [
("intercube_allreduce", "ring_1d", 4),
("intercube_allreduce", "torus_2d", 4),
("intercube_allreduce", "mesh_2d_no_wrap", 4),
]:
for n_elem in _SWEEP_N_ELEM:
sub = tmp_path / f"{sip_topology}_{n_elem}"
sub.mkdir()
topo_path, ccl_path = _write_temp_configs(
sub, sip_topology, n_sips, algorithm,
n_elem_override=n_elem,
)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm=algorithm, ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0
pe_exec_vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
cm = spec["sip"]["cube_mesh"]
n_cubes = int(cm["w"]) * int(cm["h"])
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
# pe="replicate" + num_pes=1 → one active PE per cube owns
# the whole cube row. Per-PE bytes == per-cube-tile bytes ==
# per-message bytes over the IPCQ fabric.
bytes_per_pe = n_elem * _ELEM_BYTES_F16
records.append({
"algorithm": algorithm,
"sip_topology": sip_topology,
"n_sips": n_sips,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"bytes_per_sip": bytes_per_sip,
"latency_ns": crit_ns,
})
print(
f"[{sip_topology:<16} n_sips={n_sips} n_elem={n_elem:>5} "
f"bytes/pe={bytes_per_pe:>7} bytes/sip={bytes_per_sip:>9}] "
f"pe_exec_max = {crit_ns:8.1f} ns"
)
with open(out_dir / "summary.csv", "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=[
"algorithm", "sip_topology", "n_sips", "n_elem",
"bytes_per_pe", "bytes_per_sip", "latency_ns",
])
w.writeheader()
for r in records:
w.writerow(r)
topologies = sorted({r["sip_topology"] for r in records})
# Per-topology plots: log-scale + linear-scale side-by-side.
# X-axis = bytes per PE (per-message payload size).
for topo_name in topologies:
rs = sorted(
[r for r in records if r["sip_topology"] == topo_name],
key=lambda r: r["bytes_per_pe"],
)
xs = [r["bytes_per_pe"] for r in rs]
ys = [r["latency_ns"] for r in rs]
title = (
f"Allreduce latency — {topo_name} "
f"(n_sips={rs[0]['n_sips']})"
)
# Log-scale
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(xs, ys, marker="o", color="tab:blue")
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title(title)
ax.grid(True, alpha=0.3)
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
plt.close(fig)
# Linear-scale companion
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(xs, ys, marker="o", color="tab:blue")
ax.set_xlabel("Bytes per PE")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title(title + " [linear scale]")
ax.grid(True, alpha=0.3)
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / f"{topo_name}_linear.png", dpi=120)
plt.close(fig)
# Combined overview — two variants: log-scale (overview.png) and
# linear-scale (overview_linear.png).
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
"mesh_2d_no_wrap": "tab:green"}
def _draw_overview(log_x: bool, filename: str, title_suffix: str) -> None:
fig, ax = plt.subplots(figsize=(9, 6))
for topo_name in topologies:
rs = sorted(
[r for r in records if r["sip_topology"] == topo_name],
key=lambda r: r["bytes_per_pe"],
)
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o",
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
color=colors.get(topo_name),
)
if log_x:
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
else:
ax.set_xlabel("Bytes per PE")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title("Multi-device allreduce latency by topology" + title_suffix)
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / filename, dpi=120)
plt.close(fig)
_draw_overview(log_x=True, filename="overview.png", title_suffix="")
_draw_overview(
log_x=False, filename="overview_linear.png",
title_suffix=" [linear scale]",
)
print(
f"\nWrote {out_dir / 'overview.png'} + "
f"{out_dir / 'overview_linear.png'}"
)
+62
View File
@@ -0,0 +1,62 @@
"""ADR-0009 D5: synchronized launch barrier.
M_CPU stamps KernelLaunchMsg with target_start_ns = env.now + max path
latency; PE_CPU yields until that time before recording pe_exec_start.
Every PE in a single launch MUST begin kernel execution at the same
env.now regardless of its dispatch path length.
We verify this indirectly: for a no-op kernel, pe_exec_ns = env.now -
pe_exec_start. If every PE's pe_exec_start is identical and every PE
runs the same no-op body, every pe_exec_ns value must be identical.
Without D5, pe_exec_start varies by dispatch-path length and so does
pe_exec_ns.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def test_kernel_launch_sync_all_pes_have_equal_exec_time():
"""No-op kernel: every PE's pe_exec_ns must be identical under D5."""
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(engine=engine, target_device=DeviceSelector("all"),
correlation_id="sync_test", spec=spec) as ctx:
dp = DPPolicy(cube="row_wise", pe="column_wise",
num_cubes=16, num_pes=8)
def kernel(t_ptr, n_elem, tl):
pass # no-op
ctx.ahbm.set_device(0)
t = ctx.zeros((16, 8 * 64), dtype="f16", dp=dp, name="probe")
t.copy_(ctx.from_numpy(np.zeros((16, 8 * 64), dtype=np.float16)))
pending = ctx.launch("sync_probe", kernel, t, 64, _defer_wait=True)
for h, _sip, meta in pending:
ctx.wait(h, _meta=meta)
pe_exec_vals = []
for h, _sip, _meta in pending:
_, trace = engine.get_completion(h)
if trace and trace.get("pe_exec_ns") is not None:
pe_exec_vals.append(float(trace["pe_exec_ns"]))
assert pe_exec_vals, "expected completion traces with pe_exec_ns"
spread = max(pe_exec_vals) - min(pe_exec_vals)
assert spread < 1e-6, (
f"ADR-0009 D5 violated: pe_exec_ns spread across PEs = "
f"{spread:.6f} ns (expected 0). Values: {pe_exec_vals}"
)
+358
View File
@@ -0,0 +1,358 @@
"""PE-to-PE latency sweep across hop types and data sizes.
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for five
hop types:
H1 Intra-cube horizontal pe0 → pe1
H2 Intra-cube vertical pe0 → pe4
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
H5 Inter-SIP sip0.cube0.pe0 → sip1.cube0.pe0 (IPCQ only —
raw needs
cross-SIP MMU)
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pytest
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
PLOT_DIR = Path(__file__).parent / "pe2pe_latency_plots"
SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240]
N_CUBES = 16
N_PES = 8
ELEM_BYTES = 2 # f16
@dataclass(frozen=True)
class Hop:
id: str
label: str
src: tuple[int, int, int]
dst: tuple[int, int, int]
send_dir: str
recv_dir: str
supports_raw: bool # False for cross-SIP (DPPolicy intra-device only)
HOPS = [
Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)",
(0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True),
Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)",
(0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True),
Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)",
(0, 0, 0), (0, 1, 0), "E", "W", True),
Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)",
(0, 0, 0), (0, 4, 0), "S", "N", True),
Hop("h5_inter_sip", "Inter-SIP (sip0 to sip1, same cube/pe)",
(0, 0, 0), (1, 0, 0), "global_E", "global_W", False),
]
def _make_engine():
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
return engine, topo.topology_obj.spec
# ── IPCQ path ────────────────────────────────────────────────────────
def _measure_ipcq(hop: Hop, nbytes: int) -> float:
engine, spec = _make_engine()
cfg = load_ccl_config()
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes)
n_elem = nbytes // ELEM_BYTES
src_sip, src_cube, src_pe = hop.src
dst_sip, dst_cube, dst_pe = hop.dst
send_dir, recv_dir = hop.send_dir, hop.recv_dir
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"ipcq_{hop.id}_{nbytes}",
spec=spec,
) as ctx:
configure_sfr_intercube_multisip(engine, spec, merged)
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
tl.send(dir=send_dir, src=data)
elif cube_id == dst_cube and pe_id == dst_pe:
tl.recv(dir=recv_dir, shape=(n_elem,), dtype="f16")
tensors = []
for s in sorted({src_sip, dst_sip}):
ctx.ahbm.set_device(s)
t = ctx.zeros(
(N_CUBES, N_PES * n_elem), dtype="f16",
dp=dp, name=f"sip{s}",
)
t.copy_(ctx.from_numpy(
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
))
tensors.append(t)
all_pending = []
for t in tensors:
pending = ctx.launch(
f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True,
)
all_pending.extend(pending)
for h, sip_id, meta in all_pending:
ctx.wait(h, _meta=meta)
# Per-PE kernel execution time (excludes launch dispatch and
# response aggregation). IPCQ: DST blocks on tl.recv until the
# send arrives, so max across SIPs = DST's transfer time.
pe_exec_vals = []
for h, _sip, _meta in all_pending:
_, trace = engine.get_completion(h)
if trace and trace.get("pe_exec_ns") is not None:
pe_exec_vals.append(float(trace["pe_exec_ns"]))
return max(pe_exec_vals) if pe_exec_vals else 0.0
# ── Raw DMA path (intra-SIP only) ────────────────────────────────────
def _measure_raw(hop: Hop, nbytes: int) -> float:
"""tl.load from source slice + tl.store to destination slice. The VA
mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all
cubes of the SIP), so the store goes through the fabric to the
destination PE's HBM. No IPCQ protocol involved.
"""
if not hop.supports_raw:
raise RuntimeError(f"hop {hop.id} does not support raw path")
engine, spec = _make_engine()
n_elem = nbytes // ELEM_BYTES
src_sip, src_cube, src_pe = hop.src
dst_sip, dst_cube, dst_pe = hop.dst
assert src_sip == dst_sip
# Slice offsets in the (N_CUBES, N_PES * n_elem) tensor:
# row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem
# Byte offsets from va_base:
src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES
dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"raw_{hop.id}_{nbytes}",
spec=spec,
) as ctx:
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
ctx.ahbm.set_device(src_sip)
t = ctx.zeros(
(N_CUBES, N_PES * n_elem), dtype="f16",
dp=dp, name="raw_tensor",
)
t.copy_(ctx.from_numpy(
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
))
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(
t_ptr + src_off, shape=(n_elem,), dtype="f16",
)
tl.store(t_ptr + dst_off, data)
pending = ctx.launch(
f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True,
)
for h, sip_id, meta in pending:
ctx.wait(h, _meta=meta)
# Per-PE kernel execution time. Raw: only SRC does real work
# (tl.load + tl.store, store is blocking), so max across all PEs
# = SRC's transfer time. Idle PEs contribute only overhead_ns.
pe_exec_vals = []
for h, _sip, _meta in pending:
_, trace = engine.get_completion(h)
if trace and trace.get("pe_exec_ns") is not None:
pe_exec_vals.append(float(trace["pe_exec_ns"]))
return max(pe_exec_vals) if pe_exec_vals else 0.0
# ── CSV + plotting ───────────────────────────────────────────────────
def _write_csv(records, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(
f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"],
)
w.writeheader()
for r in records:
w.writerow(r)
def _plot_per_hop(records, hop: Hop, path: Path) -> None:
import matplotlib.pyplot as plt
ipcq = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
raw = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
key=lambda r: r["size_bytes"],
)
fig, ax = plt.subplots(figsize=(8, 5))
if ipcq:
ax.plot(
[r["size_bytes"] for r in ipcq],
[r["total_ns"] for r in ipcq],
marker="o", label="IPCQ (send/recv)", color="tab:blue",
)
if raw:
ax.plot(
[r["size_bytes"] for r in raw],
[r["total_ns"] for r in raw],
marker="s", label="Raw DMA (load+store)", color="tab:orange",
)
else:
ax.text(
0.98, 0.02, "(Raw DMA unavailable for cross-SIP)",
transform=ax.transAxes, ha="right", va="bottom",
fontsize=9, color="gray",
)
ax.set_xlabel("Data size (bytes)")
ax.set_ylabel("Latency (ns)")
ax.set_title(hop.label)
ax.grid(True, alpha=0.3)
ax.legend()
fig.tight_layout()
fig.savefig(path, dpi=120)
plt.close(fig)
def _plot_overview(records, path: Path) -> None:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 3, figsize=(16, 9))
axes = axes.flatten()
for i, hop in enumerate(HOPS):
ax = axes[i]
ipcq = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
raw = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
key=lambda r: r["size_bytes"],
)
if ipcq:
ax.plot(
[r["size_bytes"] for r in ipcq],
[r["total_ns"] for r in ipcq],
marker="o", label="IPCQ", color="tab:blue",
)
if raw:
ax.plot(
[r["size_bytes"] for r in raw],
[r["total_ns"] for r in raw],
marker="s", label="Raw", color="tab:orange",
)
ax.set_title(hop.label, fontsize=10)
ax.set_xlabel("bytes")
ax.set_ylabel("ns")
ax.grid(True, alpha=0.3)
ax.legend(fontsize=8)
for j in range(len(HOPS), len(axes)):
axes[j].axis("off")
fig.suptitle(
"PE-to-PE latency: IPCQ vs raw DMA",
fontsize=14,
)
fig.tight_layout()
fig.savefig(path, dpi=120)
plt.close(fig)
# ── Test entry ───────────────────────────────────────────────────────
def test_pe_to_pe_latency_sweep():
records: list[dict] = []
for hop in HOPS:
for size in SIZES:
# IPCQ path
ipcq_ns = _measure_ipcq(hop, size)
records.append({
"hop": hop.id, "label": hop.label,
"size_bytes": size, "path": "ipcq",
"total_ns": ipcq_ns,
})
raw_s = "n/a"
if hop.supports_raw:
raw_ns = _measure_raw(hop, size)
records.append({
"hop": hop.id, "label": hop.label,
"size_bytes": size, "path": "raw",
"total_ns": raw_ns,
})
raw_s = f"{raw_ns:7.1f}ns"
print(
f"[{hop.id}] size={size:5d} "
f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}"
)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
_write_csv(records, PLOT_DIR / "summary.csv")
for hop in HOPS:
_plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png")
_plot_overview(records, PLOT_DIR / "overview.png")
for hop in HOPS:
rs = sorted(
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
key=lambda r: r["size_bytes"],
)
for r in rs:
assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0"
print(f"\n Plots + CSV written to {PLOT_DIR}")