Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 19dfc86dc3 | |||
| 14d800b0ae | |||
| 6918e6e906 |
@@ -67,6 +67,51 @@ Completion semantics:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### D5. Launch timing is endpoint-synchronized
|
||||||
|
|
||||||
|
All PEs targeted by a single kernel launch MUST begin executing the kernel
|
||||||
|
body at the same simulated time, regardless of their dispatch path length
|
||||||
|
from the launch entry point.
|
||||||
|
|
||||||
|
Rationale. The dispatch tree Host → IO_CPU → M_CPU → PE_CPU has variable
|
||||||
|
latency at every level. PEs near their M_CPU receive the launch earlier
|
||||||
|
than PEs farther away; cubes near an IO_CPU receive it earlier than cubes
|
||||||
|
farther away. Without synchronization, each PE's kernel begins at a
|
||||||
|
different `env.now`, making per-PE metrics such as `pe_exec_ns` a function
|
||||||
|
of dispatch-path geometry rather than of the kernel's behavior —
|
||||||
|
producing measurement artifacts in benchmarks that time kernel-internal
|
||||||
|
waits (for example `tl.recv` on cross-cube or cross-SIP hops).
|
||||||
|
|
||||||
|
Mechanism.
|
||||||
|
|
||||||
|
- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`.
|
||||||
|
- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it
|
||||||
|
computes `target_start_ns = env.now + max_latency` where `max_latency`
|
||||||
|
is the maximum `ComponentContext.compute_path_latency_ns(path)` across
|
||||||
|
every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu,
|
||||||
|
pe_cpu_id)`. The stamped value is placed on the request carried by
|
||||||
|
every fanned-out sub-Transaction.
|
||||||
|
- **M_CPU** passes an already-stamped `target_start_ns` through
|
||||||
|
unchanged. Only when the value is absent (e.g. a direct
|
||||||
|
launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier
|
||||||
|
`env.now + max(local command-path latency)`.
|
||||||
|
- **PE_CPU** yields `env.timeout(target_start_ns - env.now)` at the top
|
||||||
|
of `_execute_kernel`, before recording `pe_exec_start` and invoking
|
||||||
|
the kernel body.
|
||||||
|
- When `target_start_ns is None`, PE_CPU falls through to the legacy
|
||||||
|
unsynchronized behavior — preserving backward compatibility.
|
||||||
|
|
||||||
|
IO_CPU-level stamping guarantees every PE across every targeted cube
|
||||||
|
uses the same barrier sim-time, eliminating both the within-cube
|
||||||
|
dispatch-offset artifact *and* the cross-cube offset artifact in
|
||||||
|
multi-cube launches. Models a real-hardware timed-broadcast launch
|
||||||
|
(latency-equalized dispatch tree).
|
||||||
|
|
||||||
|
The synchronization is internal to the engine / IO_CPU / M_CPU / PE_CPU
|
||||||
|
control plane — runtime API and application kernels are unchanged.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Links
|
## Links
|
||||||
|
|
||||||
- SPEC R1, R2, R7, R8
|
- SPEC R1, R2, R7, R8
|
||||||
|
|||||||
@@ -420,11 +420,21 @@ fan-out (see `IpcqInitMsg` in D12).
|
|||||||
#### PE_DMA's added responsibility
|
#### PE_DMA's added responsibility
|
||||||
|
|
||||||
When `vc_comm` receives a token, PE_DMA processes it as the following
|
When `vc_comm` receives a token, PE_DMA processes it as the following
|
||||||
**atomic** sequence. **No SimPy yield is allowed between the two steps**
|
sequence: pay the Transaction's terminal BW drain, then atomically
|
||||||
(invariant I6):
|
write data and forward metadata. **No SimPy yield is allowed between
|
||||||
|
the data write and the metadata forward** (invariant I6). The drain
|
||||||
|
yield must sit before the atomic block, not inside it:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def _on_vc_comm_recv(self, env, token):
|
def _on_vc_comm_recv(self, env, txn):
|
||||||
|
# Pay the terminal BW drain (nbytes / bottleneck_bw stamped by the
|
||||||
|
# sender PE_DMA). MUST happen before the atomic block so recv only
|
||||||
|
# wakes after the bytes have "landed".
|
||||||
|
drain = getattr(txn, "drain_ns", 0.0)
|
||||||
|
if drain > 0:
|
||||||
|
yield env.timeout(drain)
|
||||||
|
|
||||||
|
token = txn.request
|
||||||
# ── ATOMIC: no yield between these two operations ──
|
# ── ATOMIC: no yield between these two operations ──
|
||||||
data = self._memory_store.read(token.src_space, token.src_addr,
|
data = self._memory_store.read(token.src_space, token.src_addr,
|
||||||
shape=..., dtype=...)
|
shape=..., dtype=...)
|
||||||
@@ -439,6 +449,33 @@ The final `put` is yieldable but uses an unbounded internal store, so
|
|||||||
it completes in a single step. That `put` is the closing call of the
|
it completes in a single step. That `put` is the closing call of the
|
||||||
atomic block; nothing may be inserted before it.
|
atomic block; nothing may be inserted before it.
|
||||||
|
|
||||||
|
#### Drain-at-inbound semantics (D9 timing model)
|
||||||
|
|
||||||
|
The Transaction carries `drain_ns = nbytes / bottleneck_bw_on_path`
|
||||||
|
stamped at send-side PE_DMA. In this simulator per-hop `overhead_ns`
|
||||||
|
is paid at each forwarding component via `run()`, and the remaining
|
||||||
|
BW drain is paid once at the Transaction's terminal. Every non-IPCQ
|
||||||
|
Transaction (raw DMA, kernel-launch fanout, etc.) pays this drain via
|
||||||
|
`ComponentBase._forward_txn` at the terminal node. For IPCQ the
|
||||||
|
destination PE_DMA intercepts the Transaction with `_handle_ipcq_inbound`
|
||||||
|
(so IPCQ-specific data write + metadata forward can happen), so **the
|
||||||
|
drain MUST be paid explicitly at the top of that handler** to keep
|
||||||
|
IPCQ's timing model on par with every other fabric Transaction.
|
||||||
|
|
||||||
|
Side-effects of paying drain here:
|
||||||
|
|
||||||
|
- **SRC `tl.send`** is unchanged — fire-and-forget semantics are
|
||||||
|
preserved because the sender PE_DMA does not `yield sub_done`. The
|
||||||
|
`sub_done.succeed()` call (made after metadata forward below) is an
|
||||||
|
event with no listener on the sender side.
|
||||||
|
- **DST `tl.recv`** unblocks `drain_ns` later. Since recv wakes only
|
||||||
|
when `IpcqMetaArrival` reaches its local PE_IPCQ, and the metadata
|
||||||
|
forward now happens after the drain, recv observes the full fabric
|
||||||
|
transfer time including bandwidth cost.
|
||||||
|
|
||||||
|
Matches the physical picture: send dispatches and leaves; recv waits
|
||||||
|
until the bytes have actually been drained into its inbox.
|
||||||
|
|
||||||
### D9.5. ADR-0020 (2-pass) integration
|
### D9.5. ADR-0020 (2-pass) integration
|
||||||
|
|
||||||
`tl.send` / `tl.recv` integrates with ADR-0020's two-pass model. Phase
|
`tl.send` / `tl.recv` integrates with ADR-0020's two-pass model. Phase
|
||||||
|
|||||||
@@ -426,11 +426,22 @@ backend init에서 IpcqInitMsg fan-out 시 양방향 fast path channel을 함께
|
|||||||
|
|
||||||
#### PE_DMA의 책임 추가
|
#### PE_DMA의 책임 추가
|
||||||
|
|
||||||
PE_DMA(vc_comm)는 token 수신 시 다음 atomic 시퀀스로 처리한다.
|
PE_DMA(vc_comm)는 token 수신 시 다음 시퀀스로 처리한다: Transaction
|
||||||
**두 동작 사이에 SimPy yield를 두어서는 안 된다** (I6 MUST 규칙 참조):
|
terminal의 BW drain을 먼저 지불하고, 이어서 atomic하게 data write +
|
||||||
|
metadata forward 수행. **data write와 metadata forward 사이에는 SimPy
|
||||||
|
yield를 두어서는 안 된다** (I6 MUST 규칙 참조). drain yield는 atomic
|
||||||
|
구간 안이 아니라 그 앞에 위치해야 한다:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def _on_vc_comm_recv(self, env, token):
|
def _on_vc_comm_recv(self, env, txn):
|
||||||
|
# Sender PE_DMA가 찍어 둔 drain_ns (= nbytes / bottleneck_bw) 를
|
||||||
|
# 여기서 지불. atomic 구간보다 앞이어야 한다 — recv는 bytes가
|
||||||
|
# "도착"한 이후에만 깨어나야 하므로.
|
||||||
|
drain = getattr(txn, "drain_ns", 0.0)
|
||||||
|
if drain > 0:
|
||||||
|
yield env.timeout(drain)
|
||||||
|
|
||||||
|
token = txn.request
|
||||||
# ── ATOMIC: 두 동작 사이에 yield 금지 ──
|
# ── ATOMIC: 두 동작 사이에 yield 금지 ──
|
||||||
# 1. data를 dst_addr에 write (dst의 메모리 공간은 token.dst_endpoint.buffer_kind)
|
# 1. data를 dst_addr에 write (dst의 메모리 공간은 token.dst_endpoint.buffer_kind)
|
||||||
data = self._memory_store.read(token.src_space, token.src_addr,
|
data = self._memory_store.read(token.src_space, token.src_addr,
|
||||||
@@ -446,6 +457,32 @@ wire로 capacity가 unbounded인 store를 사용하므로 즉시 완료된다 (
|
|||||||
single-step). 이 최종 put이 atomic 구간의 끝이며, 그 이전에 다른 yield가
|
single-step). 이 최종 put이 atomic 구간의 끝이며, 그 이전에 다른 yield가
|
||||||
삽입되면 안 된다.
|
삽입되면 안 된다.
|
||||||
|
|
||||||
|
#### Drain-at-inbound semantics (D9 timing model)
|
||||||
|
|
||||||
|
Transaction은 sender PE_DMA가 `drain_ns = nbytes / bottleneck_bw_on_path`
|
||||||
|
를 찍어 둔 상태로 fabric에 들어간다. 이 simulator에서 per-hop `overhead_ns`
|
||||||
|
는 각 forwarding component의 `run()` 에서 지불되고, 남은 BW drain은
|
||||||
|
Transaction의 terminal node에서 한 번 지불된다. IPCQ가 아닌 모든
|
||||||
|
Transaction (raw DMA, kernel-launch fanout 등) 은
|
||||||
|
`ComponentBase._forward_txn` 이 terminal에서 이 drain을 지불한다. IPCQ의
|
||||||
|
경우 목적지 PE_DMA가 `_handle_ipcq_inbound` 핸들러로 Transaction을
|
||||||
|
가로채서 (IPCQ 전용 data write + metadata forward를 해야 하므로)
|
||||||
|
**이 핸들러 최상단에서 drain을 명시적으로 지불해야 한다** — 그래야 IPCQ의
|
||||||
|
timing model이 다른 모든 fabric Transaction과 동일선상에 놓인다.
|
||||||
|
|
||||||
|
여기서 drain을 지불할 때의 side-effect:
|
||||||
|
|
||||||
|
- **SRC `tl.send`**: 동작 불변. sender PE_DMA가 `sub_done` 을 `yield`
|
||||||
|
하지 않으므로 fire-and-forget 의미가 보존된다. metadata forward 이후
|
||||||
|
호출되는 `sub_done.succeed()` 는 sender 입장에서 listener가 없는 이벤트.
|
||||||
|
- **DST `tl.recv`**: `drain_ns` 만큼 늦게 깨어난다. recv는 local PE_IPCQ
|
||||||
|
의 `IpcqMetaArrival` 수신 시에만 wake되며, metadata forward가 drain
|
||||||
|
이후로 이동했으므로 recv는 bandwidth까지 포함한 전체 fabric transfer
|
||||||
|
시간을 관측하게 된다.
|
||||||
|
|
||||||
|
물리적 그림과 일치: send는 dispatch하고 바로 반환; recv는 bytes가 실제로
|
||||||
|
자신의 inbox로 drain될 때까지 대기.
|
||||||
|
|
||||||
#### Backpressure latency 정확도
|
#### Backpressure latency 정확도
|
||||||
|
|
||||||
backpressure 해제까지 걸리는 시간:
|
backpressure 해제까지 걸리는 시간:
|
||||||
|
|||||||
@@ -58,7 +58,18 @@ class IoCpuComponent(ComponentBase):
|
|||||||
self._pending[key] = (expected, received, parent_done)
|
self._pending[key] = (expected, received, parent_done)
|
||||||
|
|
||||||
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
|
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
|
||||||
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses."""
|
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses.
|
||||||
|
|
||||||
|
ADR-0009 D5 (extended): for KernelLaunchMsg, stamp a single global
|
||||||
|
target_start_ns = env.now + max(IO_CPU → any target PE_CPU path
|
||||||
|
latency across all target cubes). M_CPU passes this value through
|
||||||
|
unchanged; every PE in every cube yields until the same sim-time
|
||||||
|
before beginning kernel execution. Without this, cross-cube
|
||||||
|
launches would have each cube's M_CPU compute its own per-cube
|
||||||
|
barrier relative to its local env.now, leaving PEs on different
|
||||||
|
cubes out of sync (the "h3/h4 dispatch-offset artifact").
|
||||||
|
"""
|
||||||
|
import dataclasses
|
||||||
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
|
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
|
||||||
|
|
||||||
request = txn.request
|
request = txn.request
|
||||||
@@ -72,6 +83,36 @@ class IoCpuComponent(ComponentBase):
|
|||||||
txn.done.succeed()
|
txn.done.succeed()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# For KernelLaunchMsg, compute the global barrier once here so
|
||||||
|
# every downstream PE_CPU uses the same target_start_ns.
|
||||||
|
if isinstance(request, KernelLaunchMsg):
|
||||||
|
global_max_latency = 0.0
|
||||||
|
pe_ids = self._resolve_pe_ids(
|
||||||
|
getattr(request, "target_pe", "all")
|
||||||
|
)
|
||||||
|
for sip, cube in cube_targets:
|
||||||
|
for pe_id in pe_ids:
|
||||||
|
pe_cpu_id = (
|
||||||
|
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
path = self.ctx.router.find_node_path(
|
||||||
|
self.node.id, pe_cpu_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if len(path) < 2:
|
||||||
|
continue
|
||||||
|
latency = self.ctx.compute_path_latency_ns(
|
||||||
|
path, nbytes=0,
|
||||||
|
)
|
||||||
|
if latency > global_max_latency:
|
||||||
|
global_max_latency = latency
|
||||||
|
request = dataclasses.replace(
|
||||||
|
request,
|
||||||
|
target_start_ns=float(env.now) + global_max_latency,
|
||||||
|
)
|
||||||
|
|
||||||
# Setup aggregation
|
# Setup aggregation
|
||||||
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
||||||
|
|
||||||
@@ -91,6 +132,19 @@ class IoCpuComponent(ComponentBase):
|
|||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
|
|
||||||
|
def _resolve_pe_ids(self, target_pe: Any) -> list[int]:
|
||||||
|
"""Resolve target_pe → list of PE indices (mirrors M_CPU logic)."""
|
||||||
|
if isinstance(target_pe, int):
|
||||||
|
return [target_pe]
|
||||||
|
if isinstance(target_pe, tuple):
|
||||||
|
return list(target_pe)
|
||||||
|
# "all": all PEs in a cube
|
||||||
|
n_slices = 8
|
||||||
|
if self.ctx and self.ctx.spec:
|
||||||
|
mm = self.ctx.spec.get("cube", {}).get("memory_map", {})
|
||||||
|
n_slices = mm.get("hbm_slices_per_cube", 8)
|
||||||
|
return list(range(n_slices))
|
||||||
|
|
||||||
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
|
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
|
||||||
"""Return list of (sip, cube) pairs to fan out to."""
|
"""Return list of (sip, cube) pairs to fan out to."""
|
||||||
from kernbench.runtime_api.kernel import (
|
from kernbench.runtime_api.kernel import (
|
||||||
|
|||||||
@@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase):
|
|||||||
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
|
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
|
||||||
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
|
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
|
||||||
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
|
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
|
||||||
|
|
||||||
|
ADR-0009 D5: stamps target_start_ns so every PE in this fanout
|
||||||
|
starts executing at the same env.now regardless of dispatch path.
|
||||||
"""
|
"""
|
||||||
|
import dataclasses
|
||||||
request = txn.request
|
request = txn.request
|
||||||
target_pe = getattr(request, "target_pe", "all")
|
target_pe = getattr(request, "target_pe", "all")
|
||||||
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
|
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
|
||||||
@@ -172,9 +176,13 @@ class MCpuComponent(ComponentBase):
|
|||||||
txn.done.succeed()
|
txn.done.succeed()
|
||||||
return
|
return
|
||||||
|
|
||||||
# Fan out to each PE_CPU, using response-based aggregation
|
# Resolve per-PE paths. If IO_CPU already stamped a global
|
||||||
sub_txns: list[Transaction] = []
|
# target_start_ns (ADR-0009 D5 extended), pass it through
|
||||||
n_dispatched = 0
|
# unchanged so every PE across every cube uses the same barrier.
|
||||||
|
# Otherwise (e.g. direct-to-M_CPU launch in a unit test) compute
|
||||||
|
# a per-cube barrier from env.now.
|
||||||
|
per_pe: list[tuple[int, list[str], float]] = []
|
||||||
|
max_latency = 0.0
|
||||||
for pe_id in pe_ids:
|
for pe_id in pe_ids:
|
||||||
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
|
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
|
||||||
try:
|
try:
|
||||||
@@ -183,8 +191,24 @@ class MCpuComponent(ComponentBase):
|
|||||||
continue
|
continue
|
||||||
if len(path) < 2:
|
if len(path) < 2:
|
||||||
continue
|
continue
|
||||||
|
latency = self.ctx.compute_path_latency_ns(path, nbytes=0)
|
||||||
|
per_pe.append((pe_id, path, latency))
|
||||||
|
if latency > max_latency:
|
||||||
|
max_latency = latency
|
||||||
|
|
||||||
|
if getattr(request, "target_start_ns", None) is not None:
|
||||||
|
stamped_request = request
|
||||||
|
else:
|
||||||
|
stamped_request = dataclasses.replace(
|
||||||
|
request, target_start_ns=float(env.now) + max_latency,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fan out to each PE_CPU, using response-based aggregation
|
||||||
|
sub_txns: list[Transaction] = []
|
||||||
|
n_dispatched = 0
|
||||||
|
for pe_id, path, _lat in per_pe:
|
||||||
sub_txn = Transaction(
|
sub_txn = Transaction(
|
||||||
request=request, path=path, step=0,
|
request=stamped_request, path=path, step=0,
|
||||||
nbytes=0, done=env.event(),
|
nbytes=0, done=env.event(),
|
||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
@@ -204,16 +228,21 @@ class MCpuComponent(ComponentBase):
|
|||||||
yield all_done
|
yield all_done
|
||||||
del self._parent_txns[request.request_id]
|
del self._parent_txns[request.request_id]
|
||||||
|
|
||||||
# Aggregate PE-internal metrics (max across PEs)
|
# Aggregate PE-internal metrics (max across PEs and across cubes).
|
||||||
|
# Multiple M_CPUs share the same result_data dict via IO_CPU fanout;
|
||||||
|
# merge against the existing value so cubes don't clobber each other.
|
||||||
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
|
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
|
||||||
if pe_exec_values:
|
if pe_exec_values:
|
||||||
txn.result_data["pe_exec_ns"] = max(pe_exec_values)
|
cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values))
|
||||||
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
|
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
|
||||||
if dma_values:
|
if dma_values:
|
||||||
txn.result_data["dma_ns"] = max(dma_values)
|
cur = txn.result_data.get("dma_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["dma_ns"] = max(cur, max(dma_values))
|
||||||
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
|
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
|
||||||
if compute_values:
|
if compute_values:
|
||||||
txn.result_data["compute_ns"] = max(compute_values)
|
cur = txn.result_data.get("compute_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["compute_ns"] = max(cur, max(compute_values))
|
||||||
|
|
||||||
# Send aggregate response on reverse command path back to IO_CPU
|
# Send aggregate response on reverse command path back to IO_CPU
|
||||||
reverse_path = list(reversed(txn.path))
|
reverse_path = list(reversed(txn.path))
|
||||||
|
|||||||
@@ -95,6 +95,13 @@ class PeCpuComponent(ComponentBase):
|
|||||||
request = txn.request
|
request = txn.request
|
||||||
yield from self.run(env, 0)
|
yield from self.run(env, 0)
|
||||||
|
|
||||||
|
# ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a
|
||||||
|
# target_start_ns, wait until then so every PE in this launch
|
||||||
|
# begins pe_exec measurement at the same simulated time.
|
||||||
|
target_start = getattr(request, "target_start_ns", None)
|
||||||
|
if target_start is not None and target_start > env.now:
|
||||||
|
yield env.timeout(float(target_start) - env.now)
|
||||||
|
|
||||||
kernel_fn = get_kernel(request.kernel_ref.name)
|
kernel_fn = get_kernel(request.kernel_ref.name)
|
||||||
num_programs = self._derive_num_programs(request)
|
num_programs = self._derive_num_programs(request)
|
||||||
kernel_args = self._unpack_kernel_args(request)
|
kernel_args = self._unpack_kernel_args(request)
|
||||||
|
|||||||
@@ -186,13 +186,37 @@ class PeDmaComponent(PeEngineBase):
|
|||||||
# ── IPCQ inbound (fabric → PE_DMA → MemoryStore + PE_IPCQ) ──────
|
# ── IPCQ inbound (fabric → PE_DMA → MemoryStore + PE_IPCQ) ──────
|
||||||
|
|
||||||
def _handle_ipcq_inbound(self, env: simpy.Environment, txn: Any) -> Generator:
|
def _handle_ipcq_inbound(self, env: simpy.Environment, txn: Any) -> Generator:
|
||||||
"""At destination PE_DMA: atomically write data and forward metadata.
|
"""At destination PE_DMA: pay terminal drain, then atomically write
|
||||||
|
data and forward metadata.
|
||||||
|
|
||||||
|
ADR-0023 D9 (drain at inbound terminal): the Transaction carries
|
||||||
|
``drain_ns = nbytes / bottleneck_bw_on_path`` stamped by the sender
|
||||||
|
PE_DMA. Like every other Transaction terminal in the simulator (see
|
||||||
|
``ComponentBase._forward_txn``), this drain must be paid when the
|
||||||
|
Transaction reaches its destination. SRC-side ``tl.send`` is
|
||||||
|
fire-and-forget — it never yields on ``sub_done`` — so paying the
|
||||||
|
drain here does NOT delay the sender. What it DOES delay is the
|
||||||
|
IpcqMetaArrival forwarded below: that delay is the only signal
|
||||||
|
``tl.recv`` on DST blocks on, which is exactly the desired
|
||||||
|
semantics — "send dispatches and returns; recv waits until the
|
||||||
|
bytes have actually landed in its inbox".
|
||||||
|
|
||||||
|
The drain MUST be paid before the atomic block — inserting a yield
|
||||||
|
inside would break invariant I6.
|
||||||
|
|
||||||
I6 (MUST): no SimPy yield between MemoryStore.write and the
|
I6 (MUST): no SimPy yield between MemoryStore.write and the
|
||||||
IpcqMetaArrival put into PE_IPCQ.
|
IpcqMetaArrival put into PE_IPCQ.
|
||||||
"""
|
"""
|
||||||
from kernbench.common.ipcq_types import IpcqMetaArrival
|
from kernbench.common.ipcq_types import IpcqMetaArrival
|
||||||
|
|
||||||
|
# Pay terminal BW drain before the atomic write/metadata forward.
|
||||||
|
# Without this, IPCQ effectively got fabric bandwidth for free at
|
||||||
|
# the terminal (only intermediate-hop overhead_ns was charged),
|
||||||
|
# making IPCQ lower than raw DMA at large sizes in benchmarks.
|
||||||
|
drain = getattr(txn, "drain_ns", 0.0)
|
||||||
|
if drain > 0:
|
||||||
|
yield env.timeout(drain)
|
||||||
|
|
||||||
token = txn.request
|
token = txn.request
|
||||||
|
|
||||||
# ── ATOMIC: do not introduce yield between these two operations ──
|
# ── ATOMIC: do not introduce yield between these two operations ──
|
||||||
|
|||||||
@@ -26,6 +26,9 @@ class ComponentContext:
|
|||||||
spec: dict = field(default_factory=dict) # topology spec (cube layout, PE count, etc.)
|
spec: dict = field(default_factory=dict) # topology spec (cube layout, PE count, etc.)
|
||||||
memory_store: Any = None # MemoryStore for Phase 1 data-aware execution (ADR-0020)
|
memory_store: Any = None # MemoryStore for Phase 1 data-aware execution (ADR-0020)
|
||||||
op_logger: Any = None # OpLogger for Phase 1 op recording (ADR-0020)
|
op_logger: Any = None # OpLogger for Phase 1 op recording (ADR-0020)
|
||||||
|
# node_id -> overhead_ns (ADR-0009 D5: used by M_CPU to compute per-PE
|
||||||
|
# dispatch latency when stamping target_start_ns on KernelLaunchMsg).
|
||||||
|
node_overhead_ns: dict[str, float] = field(default_factory=dict)
|
||||||
|
|
||||||
def get_shared_resource(
|
def get_shared_resource(
|
||||||
self, env: simpy.Environment, key: str, capacity: int = 1,
|
self, env: simpy.Environment, key: str, capacity: int = 1,
|
||||||
@@ -52,3 +55,19 @@ class ComponentContext:
|
|||||||
if min_bw == float("inf"):
|
if min_bw == float("inf"):
|
||||||
return 0.0
|
return 0.0
|
||||||
return nbytes / min_bw
|
return nbytes / min_bw
|
||||||
|
|
||||||
|
def compute_path_latency_ns(self, path: list[str], nbytes: int = 0) -> float:
|
||||||
|
"""Formula latency along path: wire + per-node overhead + drain.
|
||||||
|
|
||||||
|
ADR-0009 D5: M_CPU uses this to compute per-PE dispatch latency
|
||||||
|
when stamping target_start_ns on KernelLaunchMsg fanout.
|
||||||
|
"""
|
||||||
|
total = 0.0
|
||||||
|
for i in range(len(path) - 1):
|
||||||
|
edge = self.edge_map.get((path[i], path[i + 1]))
|
||||||
|
if edge:
|
||||||
|
total += edge.distance_mm * self.ns_per_mm
|
||||||
|
for node_id in path:
|
||||||
|
total += self.node_overhead_ns.get(node_id, 0.0)
|
||||||
|
total += self.compute_drain_ns(path, nbytes)
|
||||||
|
return total
|
||||||
|
|||||||
@@ -58,7 +58,13 @@ class IoCpuComponent(ComponentBase):
|
|||||||
self._pending[key] = (expected, received, parent_done)
|
self._pending[key] = (expected, received, parent_done)
|
||||||
|
|
||||||
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
|
def _dispatch_to_m_cpus(self, env: simpy.Environment, txn: Any) -> Generator:
|
||||||
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses."""
|
"""Fan out sub-Transactions to target cube M_CPUs, wait for responses.
|
||||||
|
|
||||||
|
ADR-0009 D5 (extended): stamp a global target_start_ns on
|
||||||
|
KernelLaunchMsg so every PE across every target cube starts at
|
||||||
|
the same env.now. See the non-legacy builtin for full rationale.
|
||||||
|
"""
|
||||||
|
import dataclasses
|
||||||
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
|
from kernbench.runtime_api.kernel import KernelLaunchMsg, MemoryReadMsg, MemoryWriteMsg
|
||||||
|
|
||||||
request = txn.request
|
request = txn.request
|
||||||
@@ -72,6 +78,34 @@ class IoCpuComponent(ComponentBase):
|
|||||||
txn.done.succeed()
|
txn.done.succeed()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if isinstance(request, KernelLaunchMsg):
|
||||||
|
global_max_latency = 0.0
|
||||||
|
pe_ids = self._resolve_pe_ids(
|
||||||
|
getattr(request, "target_pe", "all")
|
||||||
|
)
|
||||||
|
for sip, cube in cube_targets:
|
||||||
|
for pe_id in pe_ids:
|
||||||
|
pe_cpu_id = (
|
||||||
|
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
path = self.ctx.router.find_node_path(
|
||||||
|
self.node.id, pe_cpu_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if len(path) < 2:
|
||||||
|
continue
|
||||||
|
latency = self.ctx.compute_path_latency_ns(
|
||||||
|
path, nbytes=0,
|
||||||
|
)
|
||||||
|
if latency > global_max_latency:
|
||||||
|
global_max_latency = latency
|
||||||
|
request = dataclasses.replace(
|
||||||
|
request,
|
||||||
|
target_start_ns=float(env.now) + global_max_latency,
|
||||||
|
)
|
||||||
|
|
||||||
# Setup aggregation
|
# Setup aggregation
|
||||||
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
||||||
|
|
||||||
@@ -91,6 +125,18 @@ class IoCpuComponent(ComponentBase):
|
|||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
|
|
||||||
|
def _resolve_pe_ids(self, target_pe: Any) -> list[int]:
|
||||||
|
"""Resolve target_pe → list of PE indices (mirrors M_CPU logic)."""
|
||||||
|
if isinstance(target_pe, int):
|
||||||
|
return [target_pe]
|
||||||
|
if isinstance(target_pe, tuple):
|
||||||
|
return list(target_pe)
|
||||||
|
n_slices = 8
|
||||||
|
if self.ctx and self.ctx.spec:
|
||||||
|
mm = self.ctx.spec.get("cube", {}).get("memory_map", {})
|
||||||
|
n_slices = mm.get("hbm_slices_per_cube", 8)
|
||||||
|
return list(range(n_slices))
|
||||||
|
|
||||||
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
|
def _resolve_cube_targets(self, request: Any) -> list[tuple[int, int]]:
|
||||||
"""Return list of (sip, cube) pairs to fan out to."""
|
"""Return list of (sip, cube) pairs to fan out to."""
|
||||||
from kernbench.runtime_api.kernel import (
|
from kernbench.runtime_api.kernel import (
|
||||||
|
|||||||
@@ -162,7 +162,11 @@ class MCpuComponent(ComponentBase):
|
|||||||
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
|
Routes through find_node_path (M_CPU → NOC → PE_CPU command edges).
|
||||||
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
|
PE_CPU sends ResponseMsg back via NOC → M_CPU on completion.
|
||||||
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
|
Then sends aggregate ResponseMsg back to IO_CPU on the reverse path.
|
||||||
|
|
||||||
|
ADR-0009 D5: stamps target_start_ns so every PE in this fanout
|
||||||
|
starts executing at the same env.now regardless of dispatch path.
|
||||||
"""
|
"""
|
||||||
|
import dataclasses
|
||||||
request = txn.request
|
request = txn.request
|
||||||
target_pe = getattr(request, "target_pe", "all")
|
target_pe = getattr(request, "target_pe", "all")
|
||||||
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
|
cube_prefix = self.node.id.rsplit(".", 1)[0] # e.g. "sip0.cube0"
|
||||||
@@ -172,9 +176,10 @@ class MCpuComponent(ComponentBase):
|
|||||||
txn.done.succeed()
|
txn.done.succeed()
|
||||||
return
|
return
|
||||||
|
|
||||||
# Fan out to each PE_CPU, using response-based aggregation
|
# Resolve per-PE paths. If IO_CPU already stamped a global
|
||||||
sub_txns: list[Transaction] = []
|
# target_start_ns (ADR-0009 D5 extended), pass it through.
|
||||||
n_dispatched = 0
|
per_pe: list[tuple[int, list[str], float]] = []
|
||||||
|
max_latency = 0.0
|
||||||
for pe_id in pe_ids:
|
for pe_id in pe_ids:
|
||||||
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
|
pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu"
|
||||||
try:
|
try:
|
||||||
@@ -183,8 +188,24 @@ class MCpuComponent(ComponentBase):
|
|||||||
continue
|
continue
|
||||||
if len(path) < 2:
|
if len(path) < 2:
|
||||||
continue
|
continue
|
||||||
|
latency = self.ctx.compute_path_latency_ns(path, nbytes=0)
|
||||||
|
per_pe.append((pe_id, path, latency))
|
||||||
|
if latency > max_latency:
|
||||||
|
max_latency = latency
|
||||||
|
|
||||||
|
if getattr(request, "target_start_ns", None) is not None:
|
||||||
|
stamped_request = request
|
||||||
|
else:
|
||||||
|
stamped_request = dataclasses.replace(
|
||||||
|
request, target_start_ns=float(env.now) + max_latency,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fan out to each PE_CPU, using response-based aggregation
|
||||||
|
sub_txns: list[Transaction] = []
|
||||||
|
n_dispatched = 0
|
||||||
|
for pe_id, path, _lat in per_pe:
|
||||||
sub_txn = Transaction(
|
sub_txn = Transaction(
|
||||||
request=request, path=path, step=0,
|
request=stamped_request, path=path, step=0,
|
||||||
nbytes=0, done=env.event(),
|
nbytes=0, done=env.event(),
|
||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
@@ -204,16 +225,21 @@ class MCpuComponent(ComponentBase):
|
|||||||
yield all_done
|
yield all_done
|
||||||
del self._parent_txns[request.request_id]
|
del self._parent_txns[request.request_id]
|
||||||
|
|
||||||
# Aggregate PE-internal metrics (max across PEs)
|
# Aggregate PE-internal metrics (max across PEs and across cubes).
|
||||||
|
# Multiple M_CPUs share the same result_data dict via IO_CPU fanout;
|
||||||
|
# merge against the existing value so cubes don't clobber each other.
|
||||||
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
|
pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns]
|
||||||
if pe_exec_values:
|
if pe_exec_values:
|
||||||
txn.result_data["pe_exec_ns"] = max(pe_exec_values)
|
cur = txn.result_data.get("pe_exec_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["pe_exec_ns"] = max(cur, max(pe_exec_values))
|
||||||
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
|
dma_values = [st.result_data.get("dma_ns", 0.0) for st in sub_txns]
|
||||||
if dma_values:
|
if dma_values:
|
||||||
txn.result_data["dma_ns"] = max(dma_values)
|
cur = txn.result_data.get("dma_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["dma_ns"] = max(cur, max(dma_values))
|
||||||
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
|
compute_values = [st.result_data.get("compute_ns", 0.0) for st in sub_txns]
|
||||||
if compute_values:
|
if compute_values:
|
||||||
txn.result_data["compute_ns"] = max(compute_values)
|
cur = txn.result_data.get("compute_ns", 0.0) or 0.0
|
||||||
|
txn.result_data["compute_ns"] = max(cur, max(compute_values))
|
||||||
|
|
||||||
# Send aggregate response on reverse command path back to IO_CPU
|
# Send aggregate response on reverse command path back to IO_CPU
|
||||||
reverse_path = list(reversed(txn.path))
|
reverse_path = list(reversed(txn.path))
|
||||||
|
|||||||
@@ -71,6 +71,13 @@ class PeCpuComponent(ComponentBase):
|
|||||||
request = txn.request
|
request = txn.request
|
||||||
yield from self.run(env, 0)
|
yield from self.run(env, 0)
|
||||||
|
|
||||||
|
# ADR-0009 D5: synchronized launch barrier. If M_CPU stamped a
|
||||||
|
# target_start_ns, wait until then so every PE in this launch
|
||||||
|
# begins pe_exec measurement at the same simulated time.
|
||||||
|
target_start = getattr(request, "target_start_ns", None)
|
||||||
|
if target_start is not None and target_start > env.now:
|
||||||
|
yield env.timeout(float(target_start) - env.now)
|
||||||
|
|
||||||
kernel_fn = get_kernel(request.kernel_ref.name)
|
kernel_fn = get_kernel(request.kernel_ref.name)
|
||||||
num_programs = self._derive_num_programs(request)
|
num_programs = self._derive_num_programs(request)
|
||||||
kernel_args = self._unpack_kernel_args(request)
|
kernel_args = self._unpack_kernel_args(request)
|
||||||
|
|||||||
@@ -19,7 +19,14 @@ class PageFault(Exception):
|
|||||||
|
|
||||||
|
|
||||||
class PeMMU:
|
class PeMMU:
|
||||||
"""Per-PE MMU with page-aligned VA→PA translation table.
|
"""Per-PE MMU with sub-page-capable VA→PA translation table.
|
||||||
|
|
||||||
|
Each page-table entry is a list of (start_in_page, end_in_page,
|
||||||
|
pa_at_offset_zero) regions. This is a SIMULATOR STOPGAP — real MMUs
|
||||||
|
store one PA per page-table entry. Sub-page regions exist here so
|
||||||
|
DPPolicy layouts that shard below page granularity (e.g. 128 B
|
||||||
|
payloads with 4 KB pages) don't silently mis-route through last-
|
||||||
|
write-wins overwrites. Memory note: project_mmu_subpage_stopgap.md.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
page_size: Page size in bytes (default 2 MB).
|
page_size: Page size in bytes (default 2 MB).
|
||||||
@@ -34,7 +41,11 @@ class PeMMU:
|
|||||||
self._page_size = page_size
|
self._page_size = page_size
|
||||||
self._page_shift = (page_size - 1).bit_length()
|
self._page_shift = (page_size - 1).bit_length()
|
||||||
self._page_mask = page_size - 1
|
self._page_mask = page_size - 1
|
||||||
self._table: dict[int, int] = {} # va_page_number → pa_page_base
|
# vpn → list of (start_in_page, end_in_page, pa_at_offset_zero).
|
||||||
|
# pa_at_offset_zero is the PA that offset 0 of the page would map
|
||||||
|
# to under this region — i.e. translate(off) = pa_at_offset_zero
|
||||||
|
# + off when start <= off < end.
|
||||||
|
self._table: dict[int, list[tuple[int, int, int]]] = {}
|
||||||
self._overhead_ns = overhead_ns
|
self._overhead_ns = overhead_ns
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -46,21 +57,67 @@ class PeMMU:
|
|||||||
return len(self._table)
|
return len(self._table)
|
||||||
|
|
||||||
def map(self, va: int, pa: int, size: int) -> None:
|
def map(self, va: int, pa: int, size: int) -> None:
|
||||||
"""Register VA→PA mapping for a contiguous range."""
|
"""Register VA→PA mapping for a contiguous range.
|
||||||
for off in range(0, size, self._page_size):
|
|
||||||
vpn = (va + off) >> self._page_shift
|
Sub-page-aware: a single page can hold multiple disjoint regions,
|
||||||
self._table[vpn] = pa + off
|
each pointing to a different PA. Later map() calls APPEND a new
|
||||||
|
region; on overlap with an existing region, the new region wins
|
||||||
|
for the overlapping offsets (translate iterates in reverse so the
|
||||||
|
last write takes precedence — matches legacy single-PA behavior
|
||||||
|
when a full page is re-mapped).
|
||||||
|
"""
|
||||||
|
end_va = va + size
|
||||||
|
cur = va
|
||||||
|
while cur < end_va:
|
||||||
|
vpn = cur >> self._page_shift
|
||||||
|
page_base_va = vpn << self._page_shift
|
||||||
|
page_end_va = page_base_va + self._page_size
|
||||||
|
region_start = cur - page_base_va
|
||||||
|
region_end = min(end_va, page_end_va) - page_base_va
|
||||||
|
# PA seen at offset 0 of page if this region's mapping covered it
|
||||||
|
pa_at_offset_zero = pa + (cur - va) - region_start
|
||||||
|
self._table.setdefault(vpn, []).append(
|
||||||
|
(region_start, region_end, pa_at_offset_zero)
|
||||||
|
)
|
||||||
|
cur = page_base_va + region_end
|
||||||
|
|
||||||
def unmap(self, va: int, size: int) -> None:
|
def unmap(self, va: int, size: int) -> None:
|
||||||
"""Remove VA mapping for a contiguous range."""
|
"""Remove VA mapping for a contiguous range.
|
||||||
for off in range(0, size, self._page_size):
|
|
||||||
vpn = (va + off) >> self._page_shift
|
Drops any region whose extent is contained within the unmapped
|
||||||
self._table.pop(vpn, None)
|
range. Partial overlaps (region straddles the range boundary)
|
||||||
|
are left in place — caller is expected to unmap on the same
|
||||||
|
boundaries it mapped on.
|
||||||
|
"""
|
||||||
|
end_va = va + size
|
||||||
|
cur = va
|
||||||
|
while cur < end_va:
|
||||||
|
vpn = cur >> self._page_shift
|
||||||
|
page_base_va = vpn << self._page_shift
|
||||||
|
page_end_va = page_base_va + self._page_size
|
||||||
|
unmap_start = cur - page_base_va
|
||||||
|
unmap_end = min(end_va, page_end_va) - page_base_va
|
||||||
|
regions = self._table.get(vpn)
|
||||||
|
if regions is not None:
|
||||||
|
kept = [
|
||||||
|
r for r in regions
|
||||||
|
if not (r[0] >= unmap_start and r[1] <= unmap_end)
|
||||||
|
]
|
||||||
|
if kept:
|
||||||
|
self._table[vpn] = kept
|
||||||
|
else:
|
||||||
|
del self._table[vpn]
|
||||||
|
cur = page_base_va + unmap_end
|
||||||
|
|
||||||
def translate(self, va: int) -> int:
|
def translate(self, va: int) -> int:
|
||||||
"""Translate VA to PA. Raises PageFault if unmapped."""
|
"""Translate VA to PA. Raises PageFault if unmapped."""
|
||||||
vpn = va >> self._page_shift
|
vpn = va >> self._page_shift
|
||||||
pa_page_base = self._table.get(vpn)
|
regions = self._table.get(vpn)
|
||||||
if pa_page_base is None:
|
if regions is None:
|
||||||
raise PageFault(va)
|
raise PageFault(va)
|
||||||
return pa_page_base + (va & self._page_mask)
|
offset = va & self._page_mask
|
||||||
|
# Iterate latest-first so newer map() calls win on overlap
|
||||||
|
for start, end, pa_at_offset_zero in reversed(regions):
|
||||||
|
if start <= offset < end:
|
||||||
|
return pa_at_offset_zero + offset
|
||||||
|
raise PageFault(va)
|
||||||
|
|||||||
@@ -90,6 +90,11 @@ class KernelLaunchMsg:
|
|||||||
args: tuple[KernelArg, ...]
|
args: tuple[KernelArg, ...]
|
||||||
target_cubes: tuple[int, ...] | Literal["all"] = "all"
|
target_cubes: tuple[int, ...] | Literal["all"] = "all"
|
||||||
target_pe: int | tuple[int, ...] | Literal["all"] = "all"
|
target_pe: int | tuple[int, ...] | Literal["all"] = "all"
|
||||||
|
# ADR-0009 D5: synchronized kernel start. When set, each PE_CPU yields
|
||||||
|
# until env.now >= target_start_ns before beginning kernel execution,
|
||||||
|
# so every PE in a launch starts at the same simulated time regardless
|
||||||
|
# of its M_CPU dispatch path length. Stamped by M_CPU fan-out.
|
||||||
|
target_start_ns: float | None = None
|
||||||
msg_type: Literal["kernel_launch"] = "kernel_launch"
|
msg_type: Literal["kernel_launch"] = "kernel_launch"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,6 +67,10 @@ class GraphEngine:
|
|||||||
spec=graph.spec,
|
spec=graph.spec,
|
||||||
memory_store=self._memory_store,
|
memory_store=self._memory_store,
|
||||||
op_logger=self._op_logger,
|
op_logger=self._op_logger,
|
||||||
|
node_overhead_ns={
|
||||||
|
nid: float(n.attrs.get("overhead_ns", 0.0))
|
||||||
|
for nid, n in graph.nodes.items()
|
||||||
|
},
|
||||||
)
|
)
|
||||||
self._components: dict[str, ComponentBase] = {
|
self._components: dict[str, ComponentBase] = {
|
||||||
node_id: ComponentRegistry.create(node, overrides, ctx)
|
node_id: ComponentRegistry.create(node, overrides, ctx)
|
||||||
|
|||||||
@@ -179,7 +179,9 @@ CONFIGS = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def _write_temp_configs(tmp_path, sip_topology, n_sips, algorithm):
|
def _write_temp_configs(
|
||||||
|
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
|
||||||
|
):
|
||||||
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
|
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
|
||||||
with open(TOPOLOGY_PATH) as f:
|
with open(TOPOLOGY_PATH) as f:
|
||||||
topo_cfg = yaml.safe_load(f)
|
topo_cfg = yaml.safe_load(f)
|
||||||
@@ -193,6 +195,15 @@ def _write_temp_configs(tmp_path, sip_topology, n_sips, algorithm):
|
|||||||
with open(ccl_path) as f:
|
with open(ccl_path) as f:
|
||||||
ccl_cfg = yaml.safe_load(f)
|
ccl_cfg = yaml.safe_load(f)
|
||||||
ccl_cfg["defaults"]["algorithm"] = algorithm
|
ccl_cfg["defaults"]["algorithm"] = algorithm
|
||||||
|
if n_elem_override is not None:
|
||||||
|
ccl_cfg.setdefault("algorithms", {}).setdefault(
|
||||||
|
algorithm, {},
|
||||||
|
)["n_elem"] = int(n_elem_override)
|
||||||
|
# Ensure IPCQ slot is big enough for the per-message payload.
|
||||||
|
per_msg_bytes = int(n_elem_override) * 2 # f16
|
||||||
|
default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096))
|
||||||
|
if per_msg_bytes > default_slot:
|
||||||
|
ccl_cfg["defaults"]["slot_size"] = per_msg_bytes
|
||||||
tmp_ccl = tmp_path / "ccl.yaml"
|
tmp_ccl = tmp_path / "ccl.yaml"
|
||||||
with open(tmp_ccl, "w") as f:
|
with open(tmp_ccl, "w") as f:
|
||||||
yaml.dump(ccl_cfg, f, default_flow_style=False)
|
yaml.dump(ccl_cfg, f, default_flow_style=False)
|
||||||
@@ -220,3 +231,191 @@ def test_allreduce(tmp_path, algorithm, sip_topology, n_sips):
|
|||||||
algorithm=algorithm, ccl_yaml=ccl_path,
|
algorithm=algorithm, ccl_yaml=ccl_path,
|
||||||
)
|
)
|
||||||
assert result["ok_cubes"] > 0
|
assert result["ok_cubes"] > 0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Latency sweep ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# avoid 16 (== n_cubes, dim_map collision). Goes up to 1 MB per SIP:
|
||||||
|
# bytes_per_sip = n_cubes * n_elem * 2 = 32 * n_elem.
|
||||||
|
_SWEEP_N_ELEM = [
|
||||||
|
8, 32, 64, 128, 512, 1024, 2048,
|
||||||
|
4096, 8192, 16384, 32768,
|
||||||
|
]
|
||||||
|
_ELEM_BYTES_F16 = 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_allreduce_latency_sweep(tmp_path):
|
||||||
|
"""Sweep n_elem across each SIP topology; record max(pe_exec_ns)
|
||||||
|
as the critical-path kernel latency. Emits CSV + PNG plots to
|
||||||
|
tests/allreduce_latency_plots/.
|
||||||
|
"""
|
||||||
|
import csv
|
||||||
|
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.ticker import FuncFormatter
|
||||||
|
|
||||||
|
def _fmt_bytes(x, _pos):
|
||||||
|
"""Format tick as B / KB / MB."""
|
||||||
|
if x <= 0:
|
||||||
|
return "0"
|
||||||
|
if x >= 1024 * 1024:
|
||||||
|
return f"{x / (1024 * 1024):.0f} MB"
|
||||||
|
if x >= 1024:
|
||||||
|
return f"{x / 1024:.0f} KB"
|
||||||
|
return f"{x:.0f} B"
|
||||||
|
|
||||||
|
_bytes_fmt = FuncFormatter(_fmt_bytes)
|
||||||
|
|
||||||
|
out_dir = Path(__file__).parent / "allreduce_latency_plots"
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
records: list[dict] = []
|
||||||
|
|
||||||
|
# Apples-to-apples: same n_sips across all three topologies.
|
||||||
|
for algorithm, sip_topology, n_sips in [
|
||||||
|
("intercube_allreduce", "ring_1d", 4),
|
||||||
|
("intercube_allreduce", "torus_2d", 4),
|
||||||
|
("intercube_allreduce", "mesh_2d_no_wrap", 4),
|
||||||
|
]:
|
||||||
|
for n_elem in _SWEEP_N_ELEM:
|
||||||
|
sub = tmp_path / f"{sip_topology}_{n_elem}"
|
||||||
|
sub.mkdir()
|
||||||
|
topo_path, ccl_path = _write_temp_configs(
|
||||||
|
sub, sip_topology, n_sips, algorithm,
|
||||||
|
n_elem_override=n_elem,
|
||||||
|
)
|
||||||
|
topo = resolve_topology(topo_path)
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
spec = topo.topology_obj.spec
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine,
|
||||||
|
target_device=DeviceSelector("all"),
|
||||||
|
correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}",
|
||||||
|
spec=spec,
|
||||||
|
) as ctx:
|
||||||
|
result = run_allreduce(
|
||||||
|
ctx, engine, spec,
|
||||||
|
algorithm=algorithm, ccl_yaml=ccl_path,
|
||||||
|
)
|
||||||
|
assert result["ok_cubes"] > 0
|
||||||
|
|
||||||
|
pe_exec_vals = [
|
||||||
|
float(tr.get("pe_exec_ns", 0.0) or 0.0)
|
||||||
|
for _, (_, tr) in engine._results.items()
|
||||||
|
if isinstance(tr, dict)
|
||||||
|
]
|
||||||
|
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||||
|
|
||||||
|
cm = spec["sip"]["cube_mesh"]
|
||||||
|
n_cubes = int(cm["w"]) * int(cm["h"])
|
||||||
|
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
|
||||||
|
# pe="replicate" + num_pes=1 → one active PE per cube owns
|
||||||
|
# the whole cube row. Per-PE bytes == per-cube-tile bytes ==
|
||||||
|
# per-message bytes over the IPCQ fabric.
|
||||||
|
bytes_per_pe = n_elem * _ELEM_BYTES_F16
|
||||||
|
|
||||||
|
records.append({
|
||||||
|
"algorithm": algorithm,
|
||||||
|
"sip_topology": sip_topology,
|
||||||
|
"n_sips": n_sips,
|
||||||
|
"n_elem": n_elem,
|
||||||
|
"bytes_per_pe": bytes_per_pe,
|
||||||
|
"bytes_per_sip": bytes_per_sip,
|
||||||
|
"latency_ns": crit_ns,
|
||||||
|
})
|
||||||
|
print(
|
||||||
|
f"[{sip_topology:<16} n_sips={n_sips} n_elem={n_elem:>5} "
|
||||||
|
f"bytes/pe={bytes_per_pe:>7} bytes/sip={bytes_per_sip:>9}] "
|
||||||
|
f"pe_exec_max = {crit_ns:8.1f} ns"
|
||||||
|
)
|
||||||
|
|
||||||
|
with open(out_dir / "summary.csv", "w", newline="", encoding="utf-8") as f:
|
||||||
|
w = csv.DictWriter(f, fieldnames=[
|
||||||
|
"algorithm", "sip_topology", "n_sips", "n_elem",
|
||||||
|
"bytes_per_pe", "bytes_per_sip", "latency_ns",
|
||||||
|
])
|
||||||
|
w.writeheader()
|
||||||
|
for r in records:
|
||||||
|
w.writerow(r)
|
||||||
|
|
||||||
|
topologies = sorted({r["sip_topology"] for r in records})
|
||||||
|
# Per-topology plots: log-scale + linear-scale side-by-side.
|
||||||
|
# X-axis = bytes per PE (per-message payload size).
|
||||||
|
for topo_name in topologies:
|
||||||
|
rs = sorted(
|
||||||
|
[r for r in records if r["sip_topology"] == topo_name],
|
||||||
|
key=lambda r: r["bytes_per_pe"],
|
||||||
|
)
|
||||||
|
xs = [r["bytes_per_pe"] for r in rs]
|
||||||
|
ys = [r["latency_ns"] for r in rs]
|
||||||
|
title = (
|
||||||
|
f"Allreduce latency — {topo_name} "
|
||||||
|
f"(n_sips={rs[0]['n_sips']})"
|
||||||
|
)
|
||||||
|
# Log-scale
|
||||||
|
fig, ax = plt.subplots(figsize=(8, 5))
|
||||||
|
ax.plot(xs, ys, marker="o", color="tab:blue")
|
||||||
|
ax.set_xscale("log", base=2)
|
||||||
|
ax.set_xlabel("Bytes per PE (log scale)")
|
||||||
|
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||||
|
ax.set_title(title)
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||||
|
fig.tight_layout()
|
||||||
|
fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
|
||||||
|
plt.close(fig)
|
||||||
|
# Linear-scale companion
|
||||||
|
fig, ax = plt.subplots(figsize=(8, 5))
|
||||||
|
ax.plot(xs, ys, marker="o", color="tab:blue")
|
||||||
|
ax.set_xlabel("Bytes per PE")
|
||||||
|
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||||
|
ax.set_title(title + " [linear scale]")
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||||
|
fig.tight_layout()
|
||||||
|
fig.savefig(out_dir / f"{topo_name}_linear.png", dpi=120)
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
# Combined overview — two variants: log-scale (overview.png) and
|
||||||
|
# linear-scale (overview_linear.png).
|
||||||
|
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
|
||||||
|
"mesh_2d_no_wrap": "tab:green"}
|
||||||
|
|
||||||
|
def _draw_overview(log_x: bool, filename: str, title_suffix: str) -> None:
|
||||||
|
fig, ax = plt.subplots(figsize=(9, 6))
|
||||||
|
for topo_name in topologies:
|
||||||
|
rs = sorted(
|
||||||
|
[r for r in records if r["sip_topology"] == topo_name],
|
||||||
|
key=lambda r: r["bytes_per_pe"],
|
||||||
|
)
|
||||||
|
ax.plot(
|
||||||
|
[r["bytes_per_pe"] for r in rs],
|
||||||
|
[r["latency_ns"] for r in rs],
|
||||||
|
marker="o",
|
||||||
|
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
|
||||||
|
color=colors.get(topo_name),
|
||||||
|
)
|
||||||
|
if log_x:
|
||||||
|
ax.set_xscale("log", base=2)
|
||||||
|
ax.set_xlabel("Bytes per PE (log scale)")
|
||||||
|
else:
|
||||||
|
ax.set_xlabel("Bytes per PE")
|
||||||
|
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||||
|
ax.set_title("Multi-device allreduce latency by topology" + title_suffix)
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.legend()
|
||||||
|
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||||
|
fig.tight_layout()
|
||||||
|
fig.savefig(out_dir / filename, dpi=120)
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
_draw_overview(log_x=True, filename="overview.png", title_suffix="")
|
||||||
|
_draw_overview(
|
||||||
|
log_x=False, filename="overview_linear.png",
|
||||||
|
title_suffix=" [linear scale]",
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"\nWrote {out_dir / 'overview.png'} + "
|
||||||
|
f"{out_dir / 'overview_linear.png'}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
"""ADR-0009 D5: synchronized launch barrier.
|
||||||
|
|
||||||
|
M_CPU stamps KernelLaunchMsg with target_start_ns = env.now + max path
|
||||||
|
latency; PE_CPU yields until that time before recording pe_exec_start.
|
||||||
|
Every PE in a single launch MUST begin kernel execution at the same
|
||||||
|
env.now regardless of its dispatch path length.
|
||||||
|
|
||||||
|
We verify this indirectly: for a no-op kernel, pe_exec_ns = env.now -
|
||||||
|
pe_exec_start. If every PE's pe_exec_start is identical and every PE
|
||||||
|
runs the same no-op body, every pe_exec_ns value must be identical.
|
||||||
|
Without D5, pe_exec_start varies by dispatch-path length and so does
|
||||||
|
pe_exec_ns.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from kernbench.policy.placement.dp import DPPolicy
|
||||||
|
from kernbench.runtime_api.context import RuntimeContext
|
||||||
|
from kernbench.runtime_api.types import DeviceSelector
|
||||||
|
from kernbench.sim_engine.engine import GraphEngine
|
||||||
|
from kernbench.topology.builder import resolve_topology
|
||||||
|
|
||||||
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
def test_kernel_launch_sync_all_pes_have_equal_exec_time():
|
||||||
|
"""No-op kernel: every PE's pe_exec_ns must be identical under D5."""
|
||||||
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
spec = topo.topology_obj.spec
|
||||||
|
|
||||||
|
with RuntimeContext(engine=engine, target_device=DeviceSelector("all"),
|
||||||
|
correlation_id="sync_test", spec=spec) as ctx:
|
||||||
|
dp = DPPolicy(cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=16, num_pes=8)
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pass # no-op
|
||||||
|
|
||||||
|
ctx.ahbm.set_device(0)
|
||||||
|
t = ctx.zeros((16, 8 * 64), dtype="f16", dp=dp, name="probe")
|
||||||
|
t.copy_(ctx.from_numpy(np.zeros((16, 8 * 64), dtype=np.float16)))
|
||||||
|
|
||||||
|
pending = ctx.launch("sync_probe", kernel, t, 64, _defer_wait=True)
|
||||||
|
for h, _sip, meta in pending:
|
||||||
|
ctx.wait(h, _meta=meta)
|
||||||
|
|
||||||
|
pe_exec_vals = []
|
||||||
|
for h, _sip, _meta in pending:
|
||||||
|
_, trace = engine.get_completion(h)
|
||||||
|
if trace and trace.get("pe_exec_ns") is not None:
|
||||||
|
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
||||||
|
|
||||||
|
assert pe_exec_vals, "expected completion traces with pe_exec_ns"
|
||||||
|
spread = max(pe_exec_vals) - min(pe_exec_vals)
|
||||||
|
assert spread < 1e-6, (
|
||||||
|
f"ADR-0009 D5 violated: pe_exec_ns spread across PEs = "
|
||||||
|
f"{spread:.6f} ns (expected 0). Values: {pe_exec_vals}"
|
||||||
|
)
|
||||||
@@ -0,0 +1,358 @@
|
|||||||
|
"""PE-to-PE latency sweep across hop types and data sizes.
|
||||||
|
|
||||||
|
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for five
|
||||||
|
hop types:
|
||||||
|
|
||||||
|
H1 Intra-cube horizontal pe0 → pe1
|
||||||
|
H2 Intra-cube vertical pe0 → pe4
|
||||||
|
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
|
||||||
|
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
|
||||||
|
H5 Inter-SIP sip0.cube0.pe0 → sip1.cube0.pe0 (IPCQ only —
|
||||||
|
raw needs
|
||||||
|
cross-SIP MMU)
|
||||||
|
|
||||||
|
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import csv
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
|
||||||
|
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
||||||
|
from kernbench.policy.placement.dp import DPPolicy
|
||||||
|
from kernbench.runtime_api.context import RuntimeContext
|
||||||
|
from kernbench.runtime_api.types import DeviceSelector
|
||||||
|
from kernbench.sim_engine.engine import GraphEngine
|
||||||
|
from kernbench.topology.builder import resolve_topology
|
||||||
|
|
||||||
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||||
|
PLOT_DIR = Path(__file__).parent / "pe2pe_latency_plots"
|
||||||
|
|
||||||
|
SIZES = [128, 256, 384, 512, 768, 1024, 2048, 4096, 8192, 10240]
|
||||||
|
|
||||||
|
N_CUBES = 16
|
||||||
|
N_PES = 8
|
||||||
|
ELEM_BYTES = 2 # f16
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Hop:
|
||||||
|
id: str
|
||||||
|
label: str
|
||||||
|
src: tuple[int, int, int]
|
||||||
|
dst: tuple[int, int, int]
|
||||||
|
send_dir: str
|
||||||
|
recv_dir: str
|
||||||
|
supports_raw: bool # False for cross-SIP (DPPolicy intra-device only)
|
||||||
|
|
||||||
|
|
||||||
|
HOPS = [
|
||||||
|
Hop("h1_intra_horizontal", "Intra-cube horizontal (pe0 to pe1)",
|
||||||
|
(0, 0, 0), (0, 0, 1), "intra_E", "intra_W", True),
|
||||||
|
Hop("h2_intra_vertical", "Intra-cube vertical (pe0 to pe4)",
|
||||||
|
(0, 0, 0), (0, 0, 4), "intra_S", "intra_N", True),
|
||||||
|
Hop("h3_inter_cube_horizontal", "Inter-cube horizontal (cube0 to cube1)",
|
||||||
|
(0, 0, 0), (0, 1, 0), "E", "W", True),
|
||||||
|
Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)",
|
||||||
|
(0, 0, 0), (0, 4, 0), "S", "N", True),
|
||||||
|
Hop("h5_inter_sip", "Inter-SIP (sip0 to sip1, same cube/pe)",
|
||||||
|
(0, 0, 0), (1, 0, 0), "global_E", "global_W", False),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _make_engine():
|
||||||
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
return engine, topo.topology_obj.spec
|
||||||
|
|
||||||
|
|
||||||
|
# ── IPCQ path ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _measure_ipcq(hop: Hop, nbytes: int) -> float:
|
||||||
|
engine, spec = _make_engine()
|
||||||
|
|
||||||
|
cfg = load_ccl_config()
|
||||||
|
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
|
||||||
|
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), nbytes)
|
||||||
|
|
||||||
|
n_elem = nbytes // ELEM_BYTES
|
||||||
|
src_sip, src_cube, src_pe = hop.src
|
||||||
|
dst_sip, dst_cube, dst_pe = hop.dst
|
||||||
|
send_dir, recv_dir = hop.send_dir, hop.recv_dir
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine,
|
||||||
|
target_device=DeviceSelector("all"),
|
||||||
|
correlation_id=f"ipcq_{hop.id}_{nbytes}",
|
||||||
|
spec=spec,
|
||||||
|
) as ctx:
|
||||||
|
configure_sfr_intercube_multisip(engine, spec, merged)
|
||||||
|
|
||||||
|
dp = DPPolicy(
|
||||||
|
cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=N_CUBES, num_pes=N_PES,
|
||||||
|
)
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pe_id = tl.program_id(axis=0)
|
||||||
|
cube_id = tl.program_id(axis=1)
|
||||||
|
if cube_id == src_cube and pe_id == src_pe:
|
||||||
|
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
|
||||||
|
tl.send(dir=send_dir, src=data)
|
||||||
|
elif cube_id == dst_cube and pe_id == dst_pe:
|
||||||
|
tl.recv(dir=recv_dir, shape=(n_elem,), dtype="f16")
|
||||||
|
|
||||||
|
tensors = []
|
||||||
|
for s in sorted({src_sip, dst_sip}):
|
||||||
|
ctx.ahbm.set_device(s)
|
||||||
|
t = ctx.zeros(
|
||||||
|
(N_CUBES, N_PES * n_elem), dtype="f16",
|
||||||
|
dp=dp, name=f"sip{s}",
|
||||||
|
)
|
||||||
|
t.copy_(ctx.from_numpy(
|
||||||
|
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
||||||
|
))
|
||||||
|
tensors.append(t)
|
||||||
|
|
||||||
|
all_pending = []
|
||||||
|
for t in tensors:
|
||||||
|
pending = ctx.launch(
|
||||||
|
f"{hop.id}_ipcq", kernel, t, n_elem, _defer_wait=True,
|
||||||
|
)
|
||||||
|
all_pending.extend(pending)
|
||||||
|
for h, sip_id, meta in all_pending:
|
||||||
|
ctx.wait(h, _meta=meta)
|
||||||
|
|
||||||
|
# Per-PE kernel execution time (excludes launch dispatch and
|
||||||
|
# response aggregation). IPCQ: DST blocks on tl.recv until the
|
||||||
|
# send arrives, so max across SIPs = DST's transfer time.
|
||||||
|
pe_exec_vals = []
|
||||||
|
for h, _sip, _meta in all_pending:
|
||||||
|
_, trace = engine.get_completion(h)
|
||||||
|
if trace and trace.get("pe_exec_ns") is not None:
|
||||||
|
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
||||||
|
|
||||||
|
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Raw DMA path (intra-SIP only) ────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _measure_raw(hop: Hop, nbytes: int) -> float:
|
||||||
|
"""tl.load from source slice + tl.store to destination slice. The VA
|
||||||
|
mapping spans the cube mesh within one SIP (MmuMapMsg broadcasts to all
|
||||||
|
cubes of the SIP), so the store goes through the fabric to the
|
||||||
|
destination PE's HBM. No IPCQ protocol involved.
|
||||||
|
"""
|
||||||
|
if not hop.supports_raw:
|
||||||
|
raise RuntimeError(f"hop {hop.id} does not support raw path")
|
||||||
|
|
||||||
|
engine, spec = _make_engine()
|
||||||
|
|
||||||
|
n_elem = nbytes // ELEM_BYTES
|
||||||
|
src_sip, src_cube, src_pe = hop.src
|
||||||
|
dst_sip, dst_cube, dst_pe = hop.dst
|
||||||
|
assert src_sip == dst_sip
|
||||||
|
|
||||||
|
# Slice offsets in the (N_CUBES, N_PES * n_elem) tensor:
|
||||||
|
# row = cube, slice within row = pe * n_elem .. (pe+1)*n_elem
|
||||||
|
# Byte offsets from va_base:
|
||||||
|
src_off = (src_cube * N_PES + src_pe) * n_elem * ELEM_BYTES
|
||||||
|
dst_off = (dst_cube * N_PES + dst_pe) * n_elem * ELEM_BYTES
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine,
|
||||||
|
target_device=DeviceSelector("all"),
|
||||||
|
correlation_id=f"raw_{hop.id}_{nbytes}",
|
||||||
|
spec=spec,
|
||||||
|
) as ctx:
|
||||||
|
dp = DPPolicy(
|
||||||
|
cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=N_CUBES, num_pes=N_PES,
|
||||||
|
)
|
||||||
|
ctx.ahbm.set_device(src_sip)
|
||||||
|
t = ctx.zeros(
|
||||||
|
(N_CUBES, N_PES * n_elem), dtype="f16",
|
||||||
|
dp=dp, name="raw_tensor",
|
||||||
|
)
|
||||||
|
t.copy_(ctx.from_numpy(
|
||||||
|
np.full((N_CUBES, N_PES * n_elem), 1.0, dtype=np.float16),
|
||||||
|
))
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pe_id = tl.program_id(axis=0)
|
||||||
|
cube_id = tl.program_id(axis=1)
|
||||||
|
if cube_id == src_cube and pe_id == src_pe:
|
||||||
|
data = tl.load(
|
||||||
|
t_ptr + src_off, shape=(n_elem,), dtype="f16",
|
||||||
|
)
|
||||||
|
tl.store(t_ptr + dst_off, data)
|
||||||
|
|
||||||
|
pending = ctx.launch(
|
||||||
|
f"{hop.id}_raw", kernel, t, n_elem, _defer_wait=True,
|
||||||
|
)
|
||||||
|
for h, sip_id, meta in pending:
|
||||||
|
ctx.wait(h, _meta=meta)
|
||||||
|
|
||||||
|
# Per-PE kernel execution time. Raw: only SRC does real work
|
||||||
|
# (tl.load + tl.store, store is blocking), so max across all PEs
|
||||||
|
# = SRC's transfer time. Idle PEs contribute only overhead_ns.
|
||||||
|
pe_exec_vals = []
|
||||||
|
for h, _sip, _meta in pending:
|
||||||
|
_, trace = engine.get_completion(h)
|
||||||
|
if trace and trace.get("pe_exec_ns") is not None:
|
||||||
|
pe_exec_vals.append(float(trace["pe_exec_ns"]))
|
||||||
|
|
||||||
|
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── CSV + plotting ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _write_csv(records, path: Path) -> None:
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
||||||
|
w = csv.DictWriter(
|
||||||
|
f, fieldnames=["hop", "label", "size_bytes", "path", "total_ns"],
|
||||||
|
)
|
||||||
|
w.writeheader()
|
||||||
|
for r in records:
|
||||||
|
w.writerow(r)
|
||||||
|
|
||||||
|
|
||||||
|
def _plot_per_hop(records, hop: Hop, path: Path) -> None:
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
ipcq = sorted(
|
||||||
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||||
|
key=lambda r: r["size_bytes"],
|
||||||
|
)
|
||||||
|
raw = sorted(
|
||||||
|
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
||||||
|
key=lambda r: r["size_bytes"],
|
||||||
|
)
|
||||||
|
|
||||||
|
fig, ax = plt.subplots(figsize=(8, 5))
|
||||||
|
if ipcq:
|
||||||
|
ax.plot(
|
||||||
|
[r["size_bytes"] for r in ipcq],
|
||||||
|
[r["total_ns"] for r in ipcq],
|
||||||
|
marker="o", label="IPCQ (send/recv)", color="tab:blue",
|
||||||
|
)
|
||||||
|
if raw:
|
||||||
|
ax.plot(
|
||||||
|
[r["size_bytes"] for r in raw],
|
||||||
|
[r["total_ns"] for r in raw],
|
||||||
|
marker="s", label="Raw DMA (load+store)", color="tab:orange",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
ax.text(
|
||||||
|
0.98, 0.02, "(Raw DMA unavailable for cross-SIP)",
|
||||||
|
transform=ax.transAxes, ha="right", va="bottom",
|
||||||
|
fontsize=9, color="gray",
|
||||||
|
)
|
||||||
|
ax.set_xlabel("Data size (bytes)")
|
||||||
|
ax.set_ylabel("Latency (ns)")
|
||||||
|
ax.set_title(hop.label)
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.legend()
|
||||||
|
fig.tight_layout()
|
||||||
|
fig.savefig(path, dpi=120)
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
def _plot_overview(records, path: Path) -> None:
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
fig, axes = plt.subplots(2, 3, figsize=(16, 9))
|
||||||
|
axes = axes.flatten()
|
||||||
|
for i, hop in enumerate(HOPS):
|
||||||
|
ax = axes[i]
|
||||||
|
ipcq = sorted(
|
||||||
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||||
|
key=lambda r: r["size_bytes"],
|
||||||
|
)
|
||||||
|
raw = sorted(
|
||||||
|
[r for r in records if r["hop"] == hop.id and r["path"] == "raw"],
|
||||||
|
key=lambda r: r["size_bytes"],
|
||||||
|
)
|
||||||
|
if ipcq:
|
||||||
|
ax.plot(
|
||||||
|
[r["size_bytes"] for r in ipcq],
|
||||||
|
[r["total_ns"] for r in ipcq],
|
||||||
|
marker="o", label="IPCQ", color="tab:blue",
|
||||||
|
)
|
||||||
|
if raw:
|
||||||
|
ax.plot(
|
||||||
|
[r["size_bytes"] for r in raw],
|
||||||
|
[r["total_ns"] for r in raw],
|
||||||
|
marker="s", label="Raw", color="tab:orange",
|
||||||
|
)
|
||||||
|
ax.set_title(hop.label, fontsize=10)
|
||||||
|
ax.set_xlabel("bytes")
|
||||||
|
ax.set_ylabel("ns")
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.legend(fontsize=8)
|
||||||
|
for j in range(len(HOPS), len(axes)):
|
||||||
|
axes[j].axis("off")
|
||||||
|
fig.suptitle(
|
||||||
|
"PE-to-PE latency: IPCQ vs raw DMA",
|
||||||
|
fontsize=14,
|
||||||
|
)
|
||||||
|
fig.tight_layout()
|
||||||
|
fig.savefig(path, dpi=120)
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Test entry ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_pe_to_pe_latency_sweep():
|
||||||
|
records: list[dict] = []
|
||||||
|
|
||||||
|
for hop in HOPS:
|
||||||
|
for size in SIZES:
|
||||||
|
# IPCQ path
|
||||||
|
ipcq_ns = _measure_ipcq(hop, size)
|
||||||
|
records.append({
|
||||||
|
"hop": hop.id, "label": hop.label,
|
||||||
|
"size_bytes": size, "path": "ipcq",
|
||||||
|
"total_ns": ipcq_ns,
|
||||||
|
})
|
||||||
|
|
||||||
|
raw_s = "n/a"
|
||||||
|
if hop.supports_raw:
|
||||||
|
raw_ns = _measure_raw(hop, size)
|
||||||
|
records.append({
|
||||||
|
"hop": hop.id, "label": hop.label,
|
||||||
|
"size_bytes": size, "path": "raw",
|
||||||
|
"total_ns": raw_ns,
|
||||||
|
})
|
||||||
|
raw_s = f"{raw_ns:7.1f}ns"
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"[{hop.id}] size={size:5d} "
|
||||||
|
f"ipcq={ipcq_ns:7.1f}ns raw={raw_s}"
|
||||||
|
)
|
||||||
|
|
||||||
|
PLOT_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
_write_csv(records, PLOT_DIR / "summary.csv")
|
||||||
|
for hop in HOPS:
|
||||||
|
_plot_per_hop(records, hop, PLOT_DIR / f"{hop.id}.png")
|
||||||
|
_plot_overview(records, PLOT_DIR / "overview.png")
|
||||||
|
|
||||||
|
for hop in HOPS:
|
||||||
|
rs = sorted(
|
||||||
|
[r for r in records if r["hop"] == hop.id and r["path"] == "ipcq"],
|
||||||
|
key=lambda r: r["size_bytes"],
|
||||||
|
)
|
||||||
|
for r in rs:
|
||||||
|
assert r["total_ns"] > 0, f"{hop.id}: total_ns must be > 0"
|
||||||
|
|
||||||
|
print(f"\n Plots + CSV written to {PLOT_DIR}")
|
||||||
Reference in New Issue
Block a user