Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e9cc40f74d | |||
| c1a5cf3a2a | |||
| 90874abbfe |
@@ -86,11 +86,36 @@ Mechanism.
|
|||||||
|
|
||||||
- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`.
|
- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`.
|
||||||
- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it
|
- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it
|
||||||
computes `target_start_ns = env.now + max_latency` where `max_latency`
|
computes `target_start_ns = env.now + max_latency` where
|
||||||
is the maximum `ComponentContext.compute_path_latency_ns(path)` across
|
`max_latency` is the maximum, over every target (sip, cube, pe)
|
||||||
every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu,
|
tuple, of the **two-leg dispatch chain**:
|
||||||
pe_cpu_id)`. The stamped value is placed on the request carried by
|
|
||||||
every fanned-out sub-Transaction.
|
```
|
||||||
|
max_latency(sip, cube, pe) =
|
||||||
|
compute_path_latency_ns(find_node_path(io_cpu, m_cpu(sip, cube)))
|
||||||
|
+ compute_path_latency_ns(find_node_path(m_cpu(sip, cube), pe_cpu))
|
||||||
|
- io_cpu.overhead_ns
|
||||||
|
- m_cpu.overhead_ns
|
||||||
|
```
|
||||||
|
|
||||||
|
This models the actual dispatch as **two sequential Transactions**
|
||||||
|
(IO_CPU → M_CPU, then M_CPU → PE_CPU). Each leg's
|
||||||
|
`compute_path_latency_ns` adds its endpoints' `overhead_ns`;
|
||||||
|
`io_cpu.overhead_ns` is subtracted because IO_CPU has already
|
||||||
|
paid it before this method runs, and `m_cpu.overhead_ns` is
|
||||||
|
subtracted once because it appears as endpoint of leg1 *and*
|
||||||
|
start of leg2 but is paid only once at run time. A single
|
||||||
|
`find_node_path(io_cpu, pe_cpu)` walk is **not** equivalent —
|
||||||
|
it can pick a graph path that bypasses M_CPU and silently
|
||||||
|
under-shoots the prediction for far cubes, breaking the D5
|
||||||
|
invariant.
|
||||||
|
|
||||||
|
The fanned-out sub-Transactions carry **`nbytes = 0`** for
|
||||||
|
`KernelLaunchMsg` (control message only). Without this,
|
||||||
|
large kernel-launch payloads would occupy fabric BW on the
|
||||||
|
shared first hop and serialize the per-cube dispatch, pushing
|
||||||
|
far M_CPUs past `target_start_ns` and re-introducing the
|
||||||
|
late-arrival violation.
|
||||||
- **M_CPU** passes an already-stamped `target_start_ns` through
|
- **M_CPU** passes an already-stamped `target_start_ns` through
|
||||||
unchanged. Only when the value is absent (e.g. a direct
|
unchanged. Only when the value is absent (e.g. a direct
|
||||||
launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier
|
launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier
|
||||||
|
|||||||
@@ -372,24 +372,41 @@ When the receiver frees a slot, the sender must learn about it
|
|||||||
travel through general vc_comm fabric — it uses a **separate fast
|
travel through general vc_comm fabric — it uses a **separate fast
|
||||||
path**, an abstraction of the NVLink / UCIe credit-return wire.
|
path**, an abstraction of the NVLink / UCIe credit-return wire.
|
||||||
|
|
||||||
**Latency** is computed from the **bottleneck BW on the path**, not a
|
**Latency** is computed from the **full path latency** (per-node
|
||||||
magic constant:
|
overhead + edge propagation + drain), not a magic constant:
|
||||||
|
|
||||||
```
|
```
|
||||||
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
|
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
|
||||||
path = router.find_path(self_pe, peer_pe)
|
path = router.find_path(self_pe, peer_pe.pe_dma)
|
||||||
latency = compute_drain_ns(path, credit_size_bytes)
|
latency = compute_path_latency_ns(path, credit_size_bytes)
|
||||||
= credit_size_bytes / bottleneck_bw_on_path
|
= sum(edge.distance_mm * ns_per_mm)
|
||||||
|
+ sum(node_overhead_ns[n] for n in path)
|
||||||
|
+ credit_size_bytes / bottleneck_bw_on_path
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The router auto-appends `.pe_dma` to the source only, so the
|
||||||
|
destination MUST be spelled with the explicit `.pe_dma` suffix or
|
||||||
|
`find_path` raises and the credit silently teleports at zero cost
|
||||||
|
(latent bug fixed alongside this update).
|
||||||
|
|
||||||
|
`tl.recv` blocks on the credit-emit completion (recv yields-from
|
||||||
|
`_delayed_credit_send` rather than spawning it as a fork). This puts
|
||||||
|
the credit-return cost on the receiver's `pe_exec_ns`, modeling the
|
||||||
|
IPCQ control-plane completing the consume-acknowledgement before
|
||||||
|
recv returns to the kernel — the protocol equivalent of a non-posted
|
||||||
|
`tl.store` waiting for an HBM ack on the raw DMA path.
|
||||||
|
|
||||||
That gives us:
|
That gives us:
|
||||||
|
|
||||||
- **Topology-proportional approximation**: an in-cube credit return is
|
- **Topology-proportional approximation**: an in-cube credit return is
|
||||||
automatically faster than a cross-SIP credit return.
|
automatically faster than a cross-SIP credit return.
|
||||||
- **No magic constants**: no arbitrary `ipcq_ctrl_latency_ns`.
|
- **No magic constants**: every nanosecond comes from
|
||||||
|
`compute_path_latency_ns` on the same edge_map and `node_overhead_ns`
|
||||||
|
as data traffic.
|
||||||
- **No deadlock risk**: unlike piggyback, B can issue credit even when
|
- **No deadlock risk**: unlike piggyback, B can issue credit even when
|
||||||
it has no data to send back.
|
it has no data to send back. `peer_credit_store.put` is unbounded.
|
||||||
- **Reuses existing utility**: `ComponentContext.compute_drain_ns`.
|
- **`IPCQ ≥ raw DMA`** for matched physical moves — the credit-emit
|
||||||
|
cost on recv balances the HBM ack-trip cost RAW pays on the sender.
|
||||||
|
|
||||||
#### Component coupling — SimPy Store channel
|
#### Component coupling — SimPy Store channel
|
||||||
|
|
||||||
|
|||||||
@@ -365,23 +365,39 @@ data 경로의 piggyback 모델과 달리, credit return은 일반 vc_comm fabri
|
|||||||
거치지 않고 **별도 fast path**로 처리한다. 이는 실제 HW의 NVLink/UCIe
|
거치지 않고 **별도 fast path**로 처리한다. 이는 실제 HW의 NVLink/UCIe
|
||||||
credit return fast path를 추상화한 것이다.
|
credit return fast path를 추상화한 것이다.
|
||||||
|
|
||||||
**Latency 계산**: magic constant가 아니라 **라우팅 경로의 bottleneck BW**
|
**Latency 계산**: magic constant가 아니라 **라우팅 경로의 full path
|
||||||
기준으로 산출한다.
|
latency** (per-node overhead + edge propagation + drain) 기준으로
|
||||||
|
산출한다.
|
||||||
|
|
||||||
```
|
```
|
||||||
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
|
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
|
||||||
path = router.find_path(self_pe, peer_pe)
|
path = router.find_path(self_pe, peer_pe.pe_dma)
|
||||||
latency = compute_drain_ns(path, credit_size_bytes)
|
latency = compute_path_latency_ns(path, credit_size_bytes)
|
||||||
= credit_size_bytes / bottleneck_bw_on_path
|
= sum(edge.distance_mm * ns_per_mm)
|
||||||
|
+ sum(node_overhead_ns[n] for n in path)
|
||||||
|
+ credit_size_bytes / bottleneck_bw_on_path
|
||||||
```
|
```
|
||||||
|
|
||||||
|
router는 source에만 `.pe_dma`를 자동 부여하므로 destination에는 반드시
|
||||||
|
`.pe_dma` suffix를 명시해야 한다. 그렇지 않으면 `find_path`가 raise하고
|
||||||
|
credit이 0 cost로 silently teleport되는 latent bug가 발생한다 (이번
|
||||||
|
업데이트에서 수정됨).
|
||||||
|
|
||||||
|
`tl.recv`는 credit-emit 완료를 yield-from으로 기다린다 (이전에는
|
||||||
|
`env.process`로 fork). 이로써 credit-return cost가 receiver의
|
||||||
|
`pe_exec_ns`에 반영되어, IPCQ control-plane이 consume-acknowledgement를
|
||||||
|
완료한 뒤에야 recv가 kernel에 반환된다 — RAW DMA의 non-posted `tl.store`가
|
||||||
|
HBM ack-trip을 기다리는 것의 protocol-level 등가물이다.
|
||||||
|
|
||||||
이로써:
|
이로써:
|
||||||
- **토폴로지 비례 approximation**: cube 내 credit return과 cross-SIP credit이
|
- **토폴로지 비례 approximation**: cube 내 credit return과 cross-SIP credit이
|
||||||
자동으로 다른 latency를 가짐 (정확한 값은 아니지만 magic constant보다 의미 있음)
|
자동으로 다른 latency를 가짐
|
||||||
- **Magic constant 없음**: 별도 `ipcq_ctrl_latency_ns` 같은 임의 값 불필요
|
- **Magic constant 없음**: 모든 ns 값이 데이터 트래픽과 동일한 edge_map
|
||||||
- **Deadlock 위험 없음**: piggyback과 달리 B가 A에게 보낼 데이터가 없어도
|
및 `node_overhead_ns`에서 산출되는 `compute_path_latency_ns`로부터 옴
|
||||||
credit이 자동 발행됨
|
- **Deadlock 위험 없음**: `peer_credit_store.put`은 unbounded, B가 A에게
|
||||||
- **기존 utility 재사용**: `ComponentContext.compute_drain_ns` 그대로 사용
|
보낼 데이터가 없어도 credit이 자동 발행됨
|
||||||
|
- **`IPCQ ≥ raw DMA`** 보장: matched physical move에 대해 credit-emit이
|
||||||
|
RAW의 ack-trip cost와 균형을 이룸
|
||||||
|
|
||||||
```
|
```
|
||||||
PE B: tl.recv(W) → 데이터 가져감 → my_tail++
|
PE B: tl.recv(W) → 데이터 가져감 → my_tail++
|
||||||
|
|||||||
@@ -1,22 +1,24 @@
|
|||||||
"""SFR configuration for intercube + inter-SIP IPCQ wiring.
|
"""SFR configuration for the full IPCQ hardware wiring.
|
||||||
|
|
||||||
Provides ``configure_sfr_intercube_multisip`` which programs PE_IPCQ
|
Installs PE_IPCQ neighbor tables modeling the physical hardware.
|
||||||
neighbor tables for:
|
Wiring is independent of DPPolicy / kernel choice — the kernel decides
|
||||||
|
at runtime which links to use.
|
||||||
|
|
||||||
1. Intercube within each SIP — pe0 of every cube connects to pe0 of
|
Direction label namespaces (disjoint):
|
||||||
its N/S/E/W mesh neighbors (no wrap-around).
|
|
||||||
2. Inter-SIP on ALL cubes — pe0 of cube_c on sip_A connects to pe0 of
|
|
||||||
cube_c on each peer SIP, using ``global_E``/``global_W`` (ring) or
|
|
||||||
``global_N``/``global_S``/``global_E``/``global_W`` (mesh/torus)
|
|
||||||
direction labels. Wiring all cubes allows the kernel to
|
|
||||||
dynamically elect the root cube at runtime.
|
|
||||||
|
|
||||||
SIP-level topology is read from ``topology.yaml`` →
|
- Intra-cube PE-to-PE: ``intra_N / intra_S / intra_E / intra_W``
|
||||||
``system.sips.topology`` (e.g. ``ring_1d``, ``mesh_2d``).
|
Logical 2×4 PE grid within a cube (no wrap):
|
||||||
Intercube mesh dimensions come from ``sip.cube_mesh.w/h``.
|
|
||||||
|
|
||||||
Internally delegates to ``install_ipcq`` with a computed ``rank_to_pe``
|
Row 0: pe0 pe1 pe2 pe3
|
||||||
(pe0-only) and a closure-captured ``neighbors()`` function.
|
Row 1: pe4 pe5 pe6 pe7
|
||||||
|
|
||||||
|
- Intercube same-lane: ``N / S / E / W``
|
||||||
|
``pe_i of cube_A ↔ pe_i of cube_B`` across the 4×4 cube mesh
|
||||||
|
(no wrap). Every PE i ∈ [0..7] wired independently.
|
||||||
|
|
||||||
|
- Inter-SIP same-(cube, pe): ``global_N / global_S / global_E / global_W``
|
||||||
|
``pe_i of cube_c on sip_A ↔ pe_i of cube_c on sip_B`` per
|
||||||
|
``topology.yaml → system.sips.topology``.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -27,12 +29,46 @@ from kernbench.ccl.install import install_ipcq
|
|||||||
from kernbench.ccl.topologies import _BUILTIN as _TOPO_BUILTINS
|
from kernbench.ccl.topologies import _BUILTIN as _TOPO_BUILTINS
|
||||||
|
|
||||||
|
|
||||||
|
# ── Intra-cube 2×4 PE grid ───────────────────────────────────────────
|
||||||
|
|
||||||
|
_PE_GRID_COLS = 4
|
||||||
|
_PE_GRID_ROWS = 2
|
||||||
|
_PES_PER_CUBE = _PE_GRID_COLS * _PE_GRID_ROWS # 8
|
||||||
|
|
||||||
|
|
||||||
|
def _intra_cube_neighbors(pe: int) -> dict[str, int]:
|
||||||
|
"""Logical 2×4 PE grid neighbors within a cube (no wrap).
|
||||||
|
|
||||||
|
Returns directions in the ``intra_*`` namespace.
|
||||||
|
"""
|
||||||
|
row, col = divmod(pe, _PE_GRID_COLS)
|
||||||
|
nbrs: dict[str, int] = {}
|
||||||
|
if col < _PE_GRID_COLS - 1:
|
||||||
|
nbrs["intra_E"] = row * _PE_GRID_COLS + (col + 1)
|
||||||
|
if col > 0:
|
||||||
|
nbrs["intra_W"] = row * _PE_GRID_COLS + (col - 1)
|
||||||
|
if row < _PE_GRID_ROWS - 1:
|
||||||
|
nbrs["intra_S"] = (row + 1) * _PE_GRID_COLS + col
|
||||||
|
if row > 0:
|
||||||
|
nbrs["intra_N"] = (row - 1) * _PE_GRID_COLS + col
|
||||||
|
return nbrs
|
||||||
|
|
||||||
|
|
||||||
|
# ── Public entry point ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
def configure_sfr_intercube_multisip(
|
def configure_sfr_intercube_multisip(
|
||||||
engine: Any,
|
engine: Any,
|
||||||
spec: dict,
|
spec: dict,
|
||||||
cfg: dict,
|
cfg: dict,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Wire IPCQ for intercube (pe0, mesh) + inter-SIP (pe0, all cubes).
|
"""Wire the full IPCQ hardware model.
|
||||||
|
|
||||||
|
Every PE on every cube on every SIP gets neighbor table entries for:
|
||||||
|
|
||||||
|
- intra-cube (2×4 grid) in the ``intra_*`` namespace
|
||||||
|
- intercube same-lane (4×4 cube mesh, no wrap) in ``N/S/E/W``
|
||||||
|
- inter-SIP same-(cube, pe) in ``global_*``
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
engine: GraphEngine with ``_components``.
|
engine: GraphEngine with ``_components``.
|
||||||
@@ -46,48 +82,71 @@ def configure_sfr_intercube_multisip(
|
|||||||
mesh_w = int(cm["w"])
|
mesh_w = int(cm["w"])
|
||||||
mesh_h = int(cm["h"])
|
mesh_h = int(cm["h"])
|
||||||
n_cubes = mesh_w * mesh_h
|
n_cubes = mesh_w * mesh_h
|
||||||
n_sips = int(spec.get("system", {}).get("sips", {}).get("count", 1))
|
sips_cfg = spec.get("system", {}).get("sips", {})
|
||||||
sip_topology = str(
|
n_sips = int(sips_cfg.get("count", 1))
|
||||||
spec.get("system", {}).get("sips", {}).get("topology", "ring_1d")
|
sip_topology = str(sips_cfg.get("topology", "ring_1d"))
|
||||||
)
|
sip_w = sips_cfg.get("w")
|
||||||
|
sip_h = sips_cfg.get("h")
|
||||||
|
sip_w = int(sip_w) if sip_w is not None else None
|
||||||
|
sip_h = int(sip_h) if sip_h is not None else None
|
||||||
|
|
||||||
if sip_topology not in _TOPO_BUILTINS:
|
if sip_topology not in _TOPO_BUILTINS:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Unknown sip topology '{sip_topology}'. "
|
f"Unknown sip topology '{sip_topology}'. "
|
||||||
f"Available: {list(_TOPO_BUILTINS)}"
|
f"Available: {list(_TOPO_BUILTINS)}"
|
||||||
)
|
)
|
||||||
sip_topo_fn = _TOPO_BUILTINS[sip_topology]
|
_sip_topo_fn_raw = _TOPO_BUILTINS[sip_topology]
|
||||||
|
|
||||||
world_size = n_sips * n_cubes
|
def sip_topo_fn(rank: int, ws: int) -> dict:
|
||||||
|
if sip_w is not None and sip_h is not None:
|
||||||
|
try:
|
||||||
|
return _sip_topo_fn_raw(rank, ws, w=sip_w, h=sip_h)
|
||||||
|
except TypeError:
|
||||||
|
pass
|
||||||
|
return _sip_topo_fn_raw(rank, ws)
|
||||||
|
|
||||||
|
pes_per_cube = _PES_PER_CUBE
|
||||||
|
world_size = n_sips * n_cubes * pes_per_cube
|
||||||
pe_idx_to_pe: list[tuple[int, int, int]] = [
|
pe_idx_to_pe: list[tuple[int, int, int]] = [
|
||||||
(sip, cube, 0)
|
(sip, cube, pe)
|
||||||
for sip in range(n_sips)
|
for sip in range(n_sips)
|
||||||
for cube in range(n_cubes)
|
for cube in range(n_cubes)
|
||||||
|
for pe in range(pes_per_cube)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def _pe_idx(sip: int, cube: int, pe: int) -> int:
|
||||||
|
return (sip * n_cubes + cube) * pes_per_cube + pe
|
||||||
|
|
||||||
def _neighbors(pe_idx: int, ws: int, _base: dict) -> dict[str, int]:
|
def _neighbors(pe_idx: int, ws: int, _base: dict) -> dict[str, int]:
|
||||||
sip = pe_idx // n_cubes
|
tmp = pe_idx
|
||||||
cube = pe_idx % n_cubes
|
pe = tmp % pes_per_cube
|
||||||
|
tmp //= pes_per_cube
|
||||||
|
cube = tmp % n_cubes
|
||||||
|
sip = tmp // n_cubes
|
||||||
row = cube // mesh_w
|
row = cube // mesh_w
|
||||||
col = cube % mesh_w
|
col = cube % mesh_w
|
||||||
|
|
||||||
nbrs: dict[str, int] = {}
|
nbrs: dict[str, int] = {}
|
||||||
|
|
||||||
# Intercube within SIP (mesh, no wrap-around)
|
# ── Intra-cube (intra_N/S/E/W) ──
|
||||||
if col < mesh_w - 1:
|
for d, peer_pe in _intra_cube_neighbors(pe).items():
|
||||||
nbrs["E"] = sip * n_cubes + (row * mesh_w + col + 1)
|
nbrs[d] = _pe_idx(sip, cube, peer_pe)
|
||||||
if col > 0:
|
|
||||||
nbrs["W"] = sip * n_cubes + (row * mesh_w + col - 1)
|
|
||||||
if row < mesh_h - 1:
|
|
||||||
nbrs["S"] = sip * n_cubes + ((row + 1) * mesh_w + col)
|
|
||||||
if row > 0:
|
|
||||||
nbrs["N"] = sip * n_cubes + ((row - 1) * mesh_w + col)
|
|
||||||
|
|
||||||
# Inter-SIP on ALL cubes
|
# ── Intercube same-lane (N/S/E/W, 4×4 no wrap) ──
|
||||||
|
if col < mesh_w - 1:
|
||||||
|
nbrs["E"] = _pe_idx(sip, row * mesh_w + (col + 1), pe)
|
||||||
|
if col > 0:
|
||||||
|
nbrs["W"] = _pe_idx(sip, row * mesh_w + (col - 1), pe)
|
||||||
|
if row < mesh_h - 1:
|
||||||
|
nbrs["S"] = _pe_idx(sip, (row + 1) * mesh_w + col, pe)
|
||||||
|
if row > 0:
|
||||||
|
nbrs["N"] = _pe_idx(sip, (row - 1) * mesh_w + col, pe)
|
||||||
|
|
||||||
|
# ── Inter-SIP same-(cube, pe) (global_*) ──
|
||||||
if n_sips > 1:
|
if n_sips > 1:
|
||||||
sip_nbrs = sip_topo_fn(sip, n_sips)
|
sip_nbrs = sip_topo_fn(sip, n_sips)
|
||||||
for d, peer_sip in sip_nbrs.items():
|
for d, peer_sip in sip_nbrs.items():
|
||||||
nbrs[f"global_{d}"] = peer_sip * n_cubes + cube
|
nbrs[f"global_{d}"] = _pe_idx(peer_sip, cube, pe)
|
||||||
|
|
||||||
return nbrs
|
return nbrs
|
||||||
|
|
||||||
|
|||||||
@@ -33,23 +33,41 @@ def ring_1d_unidir(rank: int, world_size: int) -> NeighborMap:
|
|||||||
return {"E": (rank + 1) % world_size}
|
return {"E": (rank + 1) % world_size}
|
||||||
|
|
||||||
|
|
||||||
def mesh_2d(rank: int, world_size: int) -> NeighborMap:
|
def _resolve_2d_dims(
|
||||||
"""Square 2D mesh (N/S/E/W).
|
world_size: int, w: int | None, h: int | None, name: str,
|
||||||
|
) -> tuple[int, int]:
|
||||||
Layout: rank = row * side + col, with side = sqrt(world_size).
|
if w is not None and h is not None:
|
||||||
Wrap-around (torus) on all four edges.
|
if w * h != world_size:
|
||||||
"""
|
raise ValueError(
|
||||||
|
f"{name}: w*h ({w}*{h}) != world_size ({world_size})"
|
||||||
|
)
|
||||||
|
return w, h
|
||||||
side = int(round(world_size ** 0.5))
|
side = int(round(world_size ** 0.5))
|
||||||
if side * side != world_size:
|
if side * side != world_size:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"mesh_2d requires square world_size, got {world_size}"
|
f"{name} requires square world_size or explicit w,h, "
|
||||||
|
f"got {world_size}"
|
||||||
)
|
)
|
||||||
r, c = divmod(rank, side)
|
return side, side
|
||||||
|
|
||||||
|
|
||||||
|
def mesh_2d(
|
||||||
|
rank: int, world_size: int,
|
||||||
|
w: int | None = None, h: int | None = None,
|
||||||
|
) -> NeighborMap:
|
||||||
|
"""2D mesh (N/S/E/W) with wrap-around on all four edges.
|
||||||
|
|
||||||
|
Layout: rank = row * w + col. When w, h are given, supports
|
||||||
|
rectangular (e.g. 2x3) layouts. Otherwise falls back to square
|
||||||
|
side = sqrt(world_size).
|
||||||
|
"""
|
||||||
|
w, h = _resolve_2d_dims(world_size, w, h, "mesh_2d")
|
||||||
|
r, c = divmod(rank, w)
|
||||||
return {
|
return {
|
||||||
"N": ((r - 1) % side) * side + c,
|
"N": ((r - 1) % h) * w + c,
|
||||||
"S": ((r + 1) % side) * side + c,
|
"S": ((r + 1) % h) * w + c,
|
||||||
"W": r * side + (c - 1) % side,
|
"W": r * w + (c - 1) % w,
|
||||||
"E": r * side + (c + 1) % side,
|
"E": r * w + (c + 1) % w,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -73,36 +91,30 @@ def tree_binary(rank: int, world_size: int) -> NeighborMap:
|
|||||||
return n
|
return n
|
||||||
|
|
||||||
|
|
||||||
def torus_2d(rank: int, world_size: int) -> NeighborMap:
|
def torus_2d(
|
||||||
"""Square 2D torus (N/S/E/W) with wrap-around on all edges.
|
rank: int, world_size: int,
|
||||||
|
w: int | None = None, h: int | None = None,
|
||||||
Alias for mesh_2d (which already wraps). Explicit name for clarity
|
) -> NeighborMap:
|
||||||
when used as a SIP-level topology.
|
"""2D torus (N/S/E/W) with wrap-around on all edges. Alias for mesh_2d."""
|
||||||
"""
|
return mesh_2d(rank, world_size, w=w, h=h)
|
||||||
return mesh_2d(rank, world_size)
|
|
||||||
|
|
||||||
|
|
||||||
def mesh_2d_no_wrap(rank: int, world_size: int) -> NeighborMap:
|
def mesh_2d_no_wrap(
|
||||||
"""Square 2D mesh (N/S/E/W) WITHOUT wrap-around.
|
rank: int, world_size: int,
|
||||||
|
w: int | None = None, h: int | None = None,
|
||||||
Edge nodes have fewer neighbors (no wrapping). Used for SIP-level
|
) -> NeighborMap:
|
||||||
topologies where physical links don't wrap.
|
"""2D mesh (N/S/E/W) WITHOUT wrap-around. Supports rectangular dims."""
|
||||||
"""
|
w, h = _resolve_2d_dims(world_size, w, h, "mesh_2d_no_wrap")
|
||||||
side = int(round(world_size ** 0.5))
|
r, c = divmod(rank, w)
|
||||||
if side * side != world_size:
|
|
||||||
raise ValueError(
|
|
||||||
f"mesh_2d_no_wrap requires square world_size, got {world_size}"
|
|
||||||
)
|
|
||||||
r, c = divmod(rank, side)
|
|
||||||
n: NeighborMap = {}
|
n: NeighborMap = {}
|
||||||
if r > 0:
|
if r > 0:
|
||||||
n["N"] = (r - 1) * side + c
|
n["N"] = (r - 1) * w + c
|
||||||
if r < side - 1:
|
if r < h - 1:
|
||||||
n["S"] = (r + 1) * side + c
|
n["S"] = (r + 1) * w + c
|
||||||
if c > 0:
|
if c > 0:
|
||||||
n["W"] = r * side + (c - 1)
|
n["W"] = r * w + (c - 1)
|
||||||
if c < side - 1:
|
if c < w - 1:
|
||||||
n["E"] = r * side + (c + 1)
|
n["E"] = r * w + (c + 1)
|
||||||
return n
|
return n
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -86,26 +86,41 @@ class IoCpuComponent(ComponentBase):
|
|||||||
# For KernelLaunchMsg, compute the global barrier once here so
|
# For KernelLaunchMsg, compute the global barrier once here so
|
||||||
# every downstream PE_CPU uses the same target_start_ns.
|
# every downstream PE_CPU uses the same target_start_ns.
|
||||||
if isinstance(request, KernelLaunchMsg):
|
if isinstance(request, KernelLaunchMsg):
|
||||||
|
io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0)
|
||||||
global_max_latency = 0.0
|
global_max_latency = 0.0
|
||||||
pe_ids = self._resolve_pe_ids(
|
pe_ids = self._resolve_pe_ids(
|
||||||
getattr(request, "target_pe", "all")
|
getattr(request, "target_pe", "all")
|
||||||
)
|
)
|
||||||
for sip, cube in cube_targets:
|
for sip, cube in cube_targets:
|
||||||
|
try:
|
||||||
|
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
||||||
|
io_to_m_path = self.ctx.router.find_node_path(
|
||||||
|
self.node.id, m_cpu_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if len(io_to_m_path) < 2:
|
||||||
|
continue
|
||||||
|
leg1 = self.ctx.compute_path_latency_ns(
|
||||||
|
io_to_m_path, nbytes=0,
|
||||||
|
)
|
||||||
|
m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0)
|
||||||
for pe_id in pe_ids:
|
for pe_id in pe_ids:
|
||||||
pe_cpu_id = (
|
pe_cpu_id = (
|
||||||
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
path = self.ctx.router.find_node_path(
|
m_to_pe_path = self.ctx.router.find_node_path(
|
||||||
self.node.id, pe_cpu_id,
|
m_cpu_id, pe_cpu_id,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
if len(path) < 2:
|
if len(m_to_pe_path) < 2:
|
||||||
continue
|
continue
|
||||||
latency = self.ctx.compute_path_latency_ns(
|
leg2 = self.ctx.compute_path_latency_ns(
|
||||||
path, nbytes=0,
|
m_to_pe_path, nbytes=0,
|
||||||
)
|
)
|
||||||
|
latency = leg1 + leg2 - io_overhead - m_overhead
|
||||||
if latency > global_max_latency:
|
if latency > global_max_latency:
|
||||||
global_max_latency = latency
|
global_max_latency = latency
|
||||||
request = dataclasses.replace(
|
request = dataclasses.replace(
|
||||||
@@ -116,7 +131,12 @@ class IoCpuComponent(ComponentBase):
|
|||||||
# Setup aggregation
|
# Setup aggregation
|
||||||
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
||||||
|
|
||||||
# Fan out to each target cube's M_CPU
|
# Fan out to each target cube's M_CPU. Kernel-launch fanout
|
||||||
|
# carries control metadata only; nbytes is forced to 0 for
|
||||||
|
# KernelLaunchMsg so the launch sub-txns do not occupy data-fabric
|
||||||
|
# BW (would otherwise serialize 16 cubes worth of fanout on the
|
||||||
|
# shared first hop and break ADR-0009 D5's barrier prediction).
|
||||||
|
is_kernel_launch = isinstance(request, KernelLaunchMsg)
|
||||||
for sip, cube in cube_targets:
|
for sip, cube in cube_targets:
|
||||||
try:
|
try:
|
||||||
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
||||||
@@ -127,7 +147,8 @@ class IoCpuComponent(ComponentBase):
|
|||||||
continue
|
continue
|
||||||
sub_txn = Transaction(
|
sub_txn = Transaction(
|
||||||
request=request, path=path, step=0,
|
request=request, path=path, step=0,
|
||||||
nbytes=txn.nbytes, done=env.event(),
|
nbytes=0 if is_kernel_launch else txn.nbytes,
|
||||||
|
done=env.event(),
|
||||||
result_data=txn.result_data,
|
result_data=txn.result_data,
|
||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
|
|||||||
@@ -338,9 +338,13 @@ class PeIpcqComponent(ComponentBase):
|
|||||||
nbytes=req.result_data.get("nbytes", 0),
|
nbytes=req.result_data.get("nbytes", 0),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fast path credit return — bottleneck BW based latency
|
# Credit return: recv blocks on credit-emit so the protocol cost
|
||||||
env.process(
|
# (full path latency to deliver the credit metadata back to the
|
||||||
self._delayed_credit_send(env, direction, qp["peer_credit_store"], qp["my_tail"])
|
# sender) is reflected in the recv's pe_exec_ns. Models the IPCQ
|
||||||
|
# control-plane completing the consume-acknowledgement before
|
||||||
|
# recv returns to the kernel.
|
||||||
|
yield from self._delayed_credit_send(
|
||||||
|
env, direction, qp["peer_credit_store"], qp["my_tail"],
|
||||||
)
|
)
|
||||||
|
|
||||||
if not req.done.triggered:
|
if not req.done.triggered:
|
||||||
@@ -455,7 +459,12 @@ class PeIpcqComponent(ComponentBase):
|
|||||||
yield peer_credit_store.put(meta)
|
yield peer_credit_store.put(meta)
|
||||||
|
|
||||||
def _credit_latency_ns(self, direction: str) -> float:
|
def _credit_latency_ns(self, direction: str) -> float:
|
||||||
"""Compute credit fast path latency = credit_size / bottleneck_bw.
|
"""Full path latency for the credit-return packet.
|
||||||
|
|
||||||
|
Pays per-node overhead + edge prop + drain along the same fabric
|
||||||
|
the data took. PathRouter.find_path() auto-appends ".pe_dma" to
|
||||||
|
the source only, so the destination MUST be spelled with the
|
||||||
|
explicit ".pe_dma" suffix.
|
||||||
|
|
||||||
Falls back to 0 when ctx/router is unavailable (unit-test mode).
|
Falls back to 0 when ctx/router is unavailable (unit-test mode).
|
||||||
"""
|
"""
|
||||||
@@ -463,10 +472,12 @@ class PeIpcqComponent(ComponentBase):
|
|||||||
return 0.0
|
return 0.0
|
||||||
qp = self._queue_pairs[direction]
|
qp = self._queue_pairs[direction]
|
||||||
peer = qp["peer"]
|
peer = qp["peer"]
|
||||||
peer_pe_prefix = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}"
|
peer_pe_dma = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}.pe_dma"
|
||||||
try:
|
try:
|
||||||
path = self.ctx.router.find_path(self._pe_prefix, peer_pe_prefix)
|
path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma)
|
||||||
return self.ctx.compute_drain_ns(path, self._credit_size_bytes)
|
return self.ctx.compute_path_latency_ns(
|
||||||
|
path, self._credit_size_bytes,
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
return 0.0
|
return 0.0
|
||||||
|
|
||||||
|
|||||||
@@ -79,26 +79,41 @@ class IoCpuComponent(ComponentBase):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(request, KernelLaunchMsg):
|
if isinstance(request, KernelLaunchMsg):
|
||||||
|
io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0)
|
||||||
global_max_latency = 0.0
|
global_max_latency = 0.0
|
||||||
pe_ids = self._resolve_pe_ids(
|
pe_ids = self._resolve_pe_ids(
|
||||||
getattr(request, "target_pe", "all")
|
getattr(request, "target_pe", "all")
|
||||||
)
|
)
|
||||||
for sip, cube in cube_targets:
|
for sip, cube in cube_targets:
|
||||||
|
try:
|
||||||
|
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
||||||
|
io_to_m_path = self.ctx.router.find_node_path(
|
||||||
|
self.node.id, m_cpu_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if len(io_to_m_path) < 2:
|
||||||
|
continue
|
||||||
|
leg1 = self.ctx.compute_path_latency_ns(
|
||||||
|
io_to_m_path, nbytes=0,
|
||||||
|
)
|
||||||
|
m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0)
|
||||||
for pe_id in pe_ids:
|
for pe_id in pe_ids:
|
||||||
pe_cpu_id = (
|
pe_cpu_id = (
|
||||||
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
path = self.ctx.router.find_node_path(
|
m_to_pe_path = self.ctx.router.find_node_path(
|
||||||
self.node.id, pe_cpu_id,
|
m_cpu_id, pe_cpu_id,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
if len(path) < 2:
|
if len(m_to_pe_path) < 2:
|
||||||
continue
|
continue
|
||||||
latency = self.ctx.compute_path_latency_ns(
|
leg2 = self.ctx.compute_path_latency_ns(
|
||||||
path, nbytes=0,
|
m_to_pe_path, nbytes=0,
|
||||||
)
|
)
|
||||||
|
latency = leg1 + leg2 - io_overhead - m_overhead
|
||||||
if latency > global_max_latency:
|
if latency > global_max_latency:
|
||||||
global_max_latency = latency
|
global_max_latency = latency
|
||||||
request = dataclasses.replace(
|
request = dataclasses.replace(
|
||||||
@@ -109,7 +124,7 @@ class IoCpuComponent(ComponentBase):
|
|||||||
# Setup aggregation
|
# Setup aggregation
|
||||||
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
|
||||||
|
|
||||||
# Fan out to each target cube's M_CPU
|
is_kernel_launch = isinstance(request, KernelLaunchMsg)
|
||||||
for sip, cube in cube_targets:
|
for sip, cube in cube_targets:
|
||||||
try:
|
try:
|
||||||
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
|
||||||
@@ -120,7 +135,8 @@ class IoCpuComponent(ComponentBase):
|
|||||||
continue
|
continue
|
||||||
sub_txn = Transaction(
|
sub_txn = Transaction(
|
||||||
request=request, path=path, step=0,
|
request=request, path=path, step=0,
|
||||||
nbytes=txn.nbytes, done=env.event(),
|
nbytes=0 if is_kernel_launch else txn.nbytes,
|
||||||
|
done=env.event(),
|
||||||
result_data=txn.result_data,
|
result_data=txn.result_data,
|
||||||
)
|
)
|
||||||
yield self.out_ports[path[1]].put(sub_txn.advance())
|
yield self.out_ports[path[1]].put(sub_txn.advance())
|
||||||
|
|||||||
|
After Width: | Height: | Size: 39 KiB |
|
After Width: | Height: | Size: 71 KiB |
|
After Width: | Height: | Size: 38 KiB |
@@ -0,0 +1,34 @@
|
|||||||
|
algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns
|
||||||
|
intercube_allreduce,ring_1d,6,8,16,256,3073.1299999999937
|
||||||
|
intercube_allreduce,ring_1d,6,32,64,1024,3079.8799999999947
|
||||||
|
intercube_allreduce,ring_1d,6,64,128,2048,3088.879999999992
|
||||||
|
intercube_allreduce,ring_1d,6,128,256,4096,3106.8799999999865
|
||||||
|
intercube_allreduce,ring_1d,6,512,1024,16384,3225.8799999999865
|
||||||
|
intercube_allreduce,ring_1d,6,1024,2048,32768,3391.8799999999865
|
||||||
|
intercube_allreduce,ring_1d,6,2048,4096,65536,3723.8799999999865
|
||||||
|
intercube_allreduce,ring_1d,6,4096,8192,131072,4387.879999999965
|
||||||
|
intercube_allreduce,ring_1d,6,8192,16384,262144,5715.879999999957
|
||||||
|
intercube_allreduce,ring_1d,6,16384,32768,524288,8371.879999999932
|
||||||
|
intercube_allreduce,ring_1d,6,32768,65536,1048576,13683.879999999903
|
||||||
|
intercube_allreduce,torus_2d,6,8,16,256,2190.4799999999923
|
||||||
|
intercube_allreduce,torus_2d,6,32,64,1024,2196.479999999993
|
||||||
|
intercube_allreduce,torus_2d,6,64,128,2048,2204.4799999999905
|
||||||
|
intercube_allreduce,torus_2d,6,128,256,4096,2220.479999999985
|
||||||
|
intercube_allreduce,torus_2d,6,512,1024,16384,2325.479999999985
|
||||||
|
intercube_allreduce,torus_2d,6,1024,2048,32768,2471.479999999985
|
||||||
|
intercube_allreduce,torus_2d,6,2048,4096,65536,2763.479999999985
|
||||||
|
intercube_allreduce,torus_2d,6,4096,8192,131072,3347.4799999999777
|
||||||
|
intercube_allreduce,torus_2d,6,8192,16384,262144,4515.4799999999705
|
||||||
|
intercube_allreduce,torus_2d,6,16384,32768,524288,6851.479999999952
|
||||||
|
intercube_allreduce,torus_2d,6,32768,65536,1048576,11523.479999999923
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,8,16,256,3508.4249999999993
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,32,64,1024,3515.55
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,64,128,2048,3525.0499999999975
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,128,256,4096,3544.049999999992
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3667.049999999992
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3837.049999999992
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4177.049999999992
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,4857.049999999959
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,6217.049999999945
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,8937.049999999937
|
||||||
|
intercube_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,14377.049999999872
|
||||||
|
|
After Width: | Height: | Size: 37 KiB |
|
After Width: | Height: | Size: 48 KiB |
|
After Width: | Height: | Size: 48 KiB |
|
After Width: | Height: | Size: 50 KiB |
|
After Width: | Height: | Size: 50 KiB |
|
After Width: | Height: | Size: 44 KiB |
|
After Width: | Height: | Size: 129 KiB |
@@ -0,0 +1,91 @@
|
|||||||
|
hop,label,size_bytes,path,total_ns
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,ipcq,31.1399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,raw,12.019999999996799
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,ipcq,32.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,raw,13.019999999996799
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,ipcq,34.1399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,raw,14.019999999996799
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,ipcq,35.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,raw,15.019999999996799
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,ipcq,38.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,raw,17.0199999999968
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,ipcq,41.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,raw,19.0199999999968
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,ipcq,53.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,raw,27.0199999999968
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,ipcq,77.6399999999976
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,raw,43.0199999999968
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,ipcq,125.64000000000306
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,raw,75.02000000000407
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,ipcq,149.64000000000306
|
||||||
|
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,raw,91.02000000000407
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,ipcq,31.1399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,raw,12.019999999996799
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,ipcq,32.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,raw,13.019999999996799
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,ipcq,34.1399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,raw,14.019999999996799
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,ipcq,35.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,raw,15.019999999996799
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,ipcq,38.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,raw,17.0199999999968
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,ipcq,41.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,raw,19.0199999999968
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,ipcq,53.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,raw,27.0199999999968
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,ipcq,77.6399999999976
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,raw,43.0199999999968
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,ipcq,125.64000000000306
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,raw,75.02000000000407
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,ipcq,149.64000000000306
|
||||||
|
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,raw,91.02000000000407
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,ipcq,67.15999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,raw,68.53999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,ipcq,68.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,raw,70.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,ipcq,70.15999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,raw,71.53999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,ipcq,71.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,raw,73.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,ipcq,74.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,raw,76.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,ipcq,77.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,raw,79.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,ipcq,89.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,raw,91.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,ipcq,113.65999999999804
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,raw,115.03999999999724
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,ipcq,161.65999999999985
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,raw,163.04000000000087
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,ipcq,185.65999999999985
|
||||||
|
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,raw,187.04000000000087
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,ipcq,87.15999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,raw,88.53999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,ipcq,88.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,raw,90.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,ipcq,90.15999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,raw,91.53999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,ipcq,91.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,raw,93.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,ipcq,94.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,raw,96.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,ipcq,97.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,raw,99.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,ipcq,109.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,raw,111.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,ipcq,133.65999999999804
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,raw,135.03999999999724
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,ipcq,181.65999999999985
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,raw,183.04000000000087
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,ipcq,205.65999999999985
|
||||||
|
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,raw,207.04000000000087
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",128,ipcq,6.015000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",256,ipcq,6.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",384,ipcq,7.015000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",512,ipcq,7.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",768,ipcq,8.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",1024,ipcq,9.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",2048,ipcq,13.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",4096,ipcq,21.515000000003056
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",8192,ipcq,37.51499999999214
|
||||||
|
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",10240,ipcq,45.51499999999214
|
||||||
|
@@ -22,13 +22,23 @@ from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
|||||||
from kernbench.policy.placement.dp import DPPolicy
|
from kernbench.policy.placement.dp import DPPolicy
|
||||||
|
|
||||||
|
|
||||||
def _sip_topo_dims(sip_topo: str, n_sips: int) -> tuple[int, int]:
|
def _sip_topo_dims(
|
||||||
|
sip_topo: str, n_sips: int,
|
||||||
|
spec_w: int | None = None, spec_h: int | None = None,
|
||||||
|
) -> tuple[int, int]:
|
||||||
if sip_topo == "ring_1d":
|
if sip_topo == "ring_1d":
|
||||||
return (0, 0)
|
return (0, 0)
|
||||||
|
if spec_w is not None and spec_h is not None:
|
||||||
|
if spec_w * spec_h != n_sips:
|
||||||
|
raise ValueError(
|
||||||
|
f"sip layout {spec_w}x{spec_h} != n_sips ({n_sips})"
|
||||||
|
)
|
||||||
|
return (spec_w, spec_h)
|
||||||
side = int(round(math.sqrt(n_sips)))
|
side = int(round(math.sqrt(n_sips)))
|
||||||
if side * side != n_sips:
|
if side * side != n_sips:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"SIP topology '{sip_topo}' requires square n_sips, got {n_sips}"
|
f"SIP topology '{sip_topo}' requires square n_sips or "
|
||||||
|
f"explicit w/h in spec, got {n_sips}"
|
||||||
)
|
)
|
||||||
return (side, side)
|
return (side, side)
|
||||||
|
|
||||||
@@ -54,10 +64,13 @@ def run_allreduce(
|
|||||||
topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND
|
topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND
|
||||||
|
|
||||||
n_elem = int(cfg.get("n_elem", 8))
|
n_elem = int(cfg.get("n_elem", 8))
|
||||||
n_sips = int(spec.get("system", {}).get("sips", {}).get("count", 1))
|
sips_cfg = spec.get("system", {}).get("sips", {})
|
||||||
sip_topo = str(
|
n_sips = int(sips_cfg.get("count", 1))
|
||||||
spec.get("system", {}).get("sips", {}).get("topology", "ring_1d")
|
sip_topo = str(sips_cfg.get("topology", "ring_1d"))
|
||||||
)
|
spec_sip_w = sips_cfg.get("w")
|
||||||
|
spec_sip_h = sips_cfg.get("h")
|
||||||
|
spec_sip_w = int(spec_sip_w) if spec_sip_w is not None else None
|
||||||
|
spec_sip_h = int(spec_sip_h) if spec_sip_h is not None else None
|
||||||
|
|
||||||
cm = spec["sip"]["cube_mesh"]
|
cm = spec["sip"]["cube_mesh"]
|
||||||
cube_w = int(cm["w"])
|
cube_w = int(cm["w"])
|
||||||
@@ -65,7 +78,9 @@ def run_allreduce(
|
|||||||
n_cubes = cube_w * cube_h
|
n_cubes = cube_w * cube_h
|
||||||
|
|
||||||
sip_topo_kind = topo_name_to_kind.get(sip_topo, 0)
|
sip_topo_kind = topo_name_to_kind.get(sip_topo, 0)
|
||||||
sip_topo_w, sip_topo_h = _sip_topo_dims(sip_topo, n_sips)
|
sip_topo_w, sip_topo_h = _sip_topo_dims(
|
||||||
|
sip_topo, n_sips, spec_w=spec_sip_w, spec_h=spec_sip_h,
|
||||||
|
)
|
||||||
|
|
||||||
algo_name = cfg.get("algorithm", "allreduce")
|
algo_name = cfg.get("algorithm", "allreduce")
|
||||||
print(f"\n{'=' * 60}")
|
print(f"\n{'=' * 60}")
|
||||||
@@ -173,20 +188,36 @@ from kernbench.topology.builder import resolve_topology
|
|||||||
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||||
|
|
||||||
CONFIGS = [
|
CONFIGS = [
|
||||||
pytest.param("intercube_allreduce", "ring_1d", 2, id="ring_2sip"),
|
pytest.param(
|
||||||
pytest.param("intercube_allreduce", "torus_2d", 4, id="torus_4sip"),
|
"intercube_allreduce", "ring_1d", 6, None, None,
|
||||||
pytest.param("intercube_allreduce", "mesh_2d_no_wrap", 4, id="mesh_4sip"),
|
id="ring_6sip",
|
||||||
|
),
|
||||||
|
pytest.param(
|
||||||
|
"intercube_allreduce", "torus_2d", 6, 2, 3,
|
||||||
|
id="torus_6sip_2x3",
|
||||||
|
),
|
||||||
|
pytest.param(
|
||||||
|
"intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3,
|
||||||
|
id="mesh_6sip_2x3",
|
||||||
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def _write_temp_configs(
|
def _write_temp_configs(
|
||||||
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
|
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
|
||||||
|
sip_w=None, sip_h=None,
|
||||||
):
|
):
|
||||||
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
|
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
|
||||||
with open(TOPOLOGY_PATH) as f:
|
with open(TOPOLOGY_PATH) as f:
|
||||||
topo_cfg = yaml.safe_load(f)
|
topo_cfg = yaml.safe_load(f)
|
||||||
topo_cfg["system"]["sips"]["count"] = n_sips
|
topo_cfg["system"]["sips"]["count"] = n_sips
|
||||||
topo_cfg["system"]["sips"]["topology"] = sip_topology
|
topo_cfg["system"]["sips"]["topology"] = sip_topology
|
||||||
|
if sip_w is not None and sip_h is not None:
|
||||||
|
topo_cfg["system"]["sips"]["w"] = int(sip_w)
|
||||||
|
topo_cfg["system"]["sips"]["h"] = int(sip_h)
|
||||||
|
else:
|
||||||
|
topo_cfg["system"]["sips"].pop("w", None)
|
||||||
|
topo_cfg["system"]["sips"].pop("h", None)
|
||||||
topo_path = tmp_path / "topology.yaml"
|
topo_path = tmp_path / "topology.yaml"
|
||||||
with open(topo_path, "w") as f:
|
with open(topo_path, "w") as f:
|
||||||
yaml.dump(topo_cfg, f, default_flow_style=False)
|
yaml.dump(topo_cfg, f, default_flow_style=False)
|
||||||
@@ -211,10 +242,15 @@ def _write_temp_configs(
|
|||||||
return str(topo_path), str(tmp_ccl)
|
return str(topo_path), str(tmp_ccl)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("algorithm,sip_topology,n_sips", CONFIGS)
|
@pytest.mark.parametrize(
|
||||||
def test_allreduce(tmp_path, algorithm, sip_topology, n_sips):
|
"algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS,
|
||||||
|
)
|
||||||
|
def test_allreduce(
|
||||||
|
tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h,
|
||||||
|
):
|
||||||
topo_path, ccl_path = _write_temp_configs(
|
topo_path, ccl_path = _write_temp_configs(
|
||||||
tmp_path, sip_topology, n_sips, algorithm,
|
tmp_path, sip_topology, n_sips, algorithm,
|
||||||
|
sip_w=sip_w, sip_h=sip_h,
|
||||||
)
|
)
|
||||||
topo = resolve_topology(topo_path)
|
topo = resolve_topology(topo_path)
|
||||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
@@ -271,16 +307,17 @@ def test_allreduce_latency_sweep(tmp_path):
|
|||||||
records: list[dict] = []
|
records: list[dict] = []
|
||||||
|
|
||||||
# Apples-to-apples: same n_sips across all three topologies.
|
# Apples-to-apples: same n_sips across all three topologies.
|
||||||
for algorithm, sip_topology, n_sips in [
|
for algorithm, sip_topology, n_sips, sip_w, sip_h in [
|
||||||
("intercube_allreduce", "ring_1d", 4),
|
("intercube_allreduce", "ring_1d", 6, None, None),
|
||||||
("intercube_allreduce", "torus_2d", 4),
|
("intercube_allreduce", "torus_2d", 6, 2, 3),
|
||||||
("intercube_allreduce", "mesh_2d_no_wrap", 4),
|
("intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3),
|
||||||
]:
|
]:
|
||||||
for n_elem in _SWEEP_N_ELEM:
|
for n_elem in _SWEEP_N_ELEM:
|
||||||
sub = tmp_path / f"{sip_topology}_{n_elem}"
|
sub = tmp_path / f"{sip_topology}_{n_elem}"
|
||||||
sub.mkdir()
|
sub.mkdir()
|
||||||
topo_path, ccl_path = _write_temp_configs(
|
topo_path, ccl_path = _write_temp_configs(
|
||||||
sub, sip_topology, n_sips, algorithm,
|
sub, sip_topology, n_sips, algorithm,
|
||||||
|
sip_w=sip_w, sip_h=sip_h,
|
||||||
n_elem_override=n_elem,
|
n_elem_override=n_elem,
|
||||||
)
|
)
|
||||||
topo = resolve_topology(topo_path)
|
topo = resolve_topology(topo_path)
|
||||||
@@ -339,8 +376,7 @@ def test_allreduce_latency_sweep(tmp_path):
|
|||||||
w.writerow(r)
|
w.writerow(r)
|
||||||
|
|
||||||
topologies = sorted({r["sip_topology"] for r in records})
|
topologies = sorted({r["sip_topology"] for r in records})
|
||||||
# Per-topology plots: log-scale + linear-scale side-by-side.
|
# Per-topology plots, log-scale x-axis = bytes per PE.
|
||||||
# X-axis = bytes per PE (per-message payload size).
|
|
||||||
for topo_name in topologies:
|
for topo_name in topologies:
|
||||||
rs = sorted(
|
rs = sorted(
|
||||||
[r for r in records if r["sip_topology"] == topo_name],
|
[r for r in records if r["sip_topology"] == topo_name],
|
||||||
@@ -352,7 +388,6 @@ def test_allreduce_latency_sweep(tmp_path):
|
|||||||
f"Allreduce latency — {topo_name} "
|
f"Allreduce latency — {topo_name} "
|
||||||
f"(n_sips={rs[0]['n_sips']})"
|
f"(n_sips={rs[0]['n_sips']})"
|
||||||
)
|
)
|
||||||
# Log-scale
|
|
||||||
fig, ax = plt.subplots(figsize=(8, 5))
|
fig, ax = plt.subplots(figsize=(8, 5))
|
||||||
ax.plot(xs, ys, marker="o", color="tab:blue")
|
ax.plot(xs, ys, marker="o", color="tab:blue")
|
||||||
ax.set_xscale("log", base=2)
|
ax.set_xscale("log", base=2)
|
||||||
@@ -364,24 +399,9 @@ def test_allreduce_latency_sweep(tmp_path):
|
|||||||
fig.tight_layout()
|
fig.tight_layout()
|
||||||
fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
|
fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
|
||||||
plt.close(fig)
|
plt.close(fig)
|
||||||
# Linear-scale companion
|
|
||||||
fig, ax = plt.subplots(figsize=(8, 5))
|
|
||||||
ax.plot(xs, ys, marker="o", color="tab:blue")
|
|
||||||
ax.set_xlabel("Bytes per PE")
|
|
||||||
ax.set_ylabel("max pe_exec_ns (critical path)")
|
|
||||||
ax.set_title(title + " [linear scale]")
|
|
||||||
ax.grid(True, alpha=0.3)
|
|
||||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
|
||||||
fig.tight_layout()
|
|
||||||
fig.savefig(out_dir / f"{topo_name}_linear.png", dpi=120)
|
|
||||||
plt.close(fig)
|
|
||||||
|
|
||||||
# Combined overview — two variants: log-scale (overview.png) and
|
|
||||||
# linear-scale (overview_linear.png).
|
|
||||||
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
|
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
|
||||||
"mesh_2d_no_wrap": "tab:green"}
|
"mesh_2d_no_wrap": "tab:green"}
|
||||||
|
|
||||||
def _draw_overview(log_x: bool, filename: str, title_suffix: str) -> None:
|
|
||||||
fig, ax = plt.subplots(figsize=(9, 6))
|
fig, ax = plt.subplots(figsize=(9, 6))
|
||||||
for topo_name in topologies:
|
for topo_name in topologies:
|
||||||
rs = sorted(
|
rs = sorted(
|
||||||
@@ -395,27 +415,15 @@ def test_allreduce_latency_sweep(tmp_path):
|
|||||||
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
|
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
|
||||||
color=colors.get(topo_name),
|
color=colors.get(topo_name),
|
||||||
)
|
)
|
||||||
if log_x:
|
|
||||||
ax.set_xscale("log", base=2)
|
ax.set_xscale("log", base=2)
|
||||||
ax.set_xlabel("Bytes per PE (log scale)")
|
ax.set_xlabel("Bytes per PE (log scale)")
|
||||||
else:
|
|
||||||
ax.set_xlabel("Bytes per PE")
|
|
||||||
ax.set_ylabel("max pe_exec_ns (critical path)")
|
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||||
ax.set_title("Multi-device allreduce latency by topology" + title_suffix)
|
ax.set_title("Multi-device allreduce latency by topology")
|
||||||
ax.grid(True, alpha=0.3)
|
ax.grid(True, alpha=0.3)
|
||||||
ax.legend()
|
ax.legend()
|
||||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||||
fig.tight_layout()
|
fig.tight_layout()
|
||||||
fig.savefig(out_dir / filename, dpi=120)
|
fig.savefig(out_dir / "overview.png", dpi=120)
|
||||||
plt.close(fig)
|
plt.close(fig)
|
||||||
|
|
||||||
_draw_overview(log_x=True, filename="overview.png", title_suffix="")
|
print(f"\nWrote {out_dir / 'overview.png'}")
|
||||||
_draw_overview(
|
|
||||||
log_x=False, filename="overview_linear.png",
|
|
||||||
title_suffix=" [linear scale]",
|
|
||||||
)
|
|
||||||
|
|
||||||
print(
|
|
||||||
f"\nWrote {out_dir / 'overview.png'} + "
|
|
||||||
f"{out_dir / 'overview_linear.png'}"
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -0,0 +1,194 @@
|
|||||||
|
"""ADR-0009 D5 invariant: all PEs targeted by a single kernel launch MUST
|
||||||
|
begin executing the kernel body at the same simulated time, regardless of
|
||||||
|
their dispatch path length.
|
||||||
|
|
||||||
|
These tests directly verify the invariant by capturing per-PE state at the
|
||||||
|
top of `_execute_kernel`:
|
||||||
|
|
||||||
|
test_no_pe_arrives_after_target_start_ns
|
||||||
|
Asserts: for every PE that enters _execute_kernel during a multi-cube
|
||||||
|
launch, `env.now` at entry must be <= target_start_ns. Otherwise the
|
||||||
|
PE's barrier yield would be a no-op and `pe_exec_start` would be set
|
||||||
|
late, breaking the D5 "same simulated time" mandate.
|
||||||
|
|
||||||
|
test_all_pes_have_identical_pe_exec_start
|
||||||
|
Asserts: every PE's `pe_exec_start` (the value of `env.now` recorded
|
||||||
|
immediately AFTER the barrier yield) is identical across all PEs in
|
||||||
|
the launch.
|
||||||
|
|
||||||
|
Both tests are expected to FAIL today and become the regression check the
|
||||||
|
Phase 2 D5 predictor + fallback fix must make pass.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kernbench.policy.placement.dp import DPPolicy
|
||||||
|
from kernbench.runtime_api.context import RuntimeContext
|
||||||
|
from kernbench.runtime_api.types import DeviceSelector
|
||||||
|
from kernbench.sim_engine.engine import GraphEngine
|
||||||
|
from kernbench.topology.builder import resolve_topology
|
||||||
|
|
||||||
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
def _capture_per_pe_d5_state():
|
||||||
|
"""Monkey-patch PeCpuComponent._execute_kernel to record, per PE:
|
||||||
|
|
||||||
|
- entry_now: env.now at function entry (before any yield)
|
||||||
|
- target_start_ns: the value carried by the request
|
||||||
|
- barrier_yielded: True if the barrier yield fired (entry_now < target)
|
||||||
|
- pe_exec_start: env.now immediately after the barrier check
|
||||||
|
(i.e. the value the original code sets)
|
||||||
|
|
||||||
|
Returns (records: list[dict], restore: callable).
|
||||||
|
"""
|
||||||
|
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
|
||||||
|
|
||||||
|
records: list[dict] = []
|
||||||
|
original = pe_cpu_mod.PeCpuComponent._execute_kernel
|
||||||
|
|
||||||
|
def patched(self, env, txn):
|
||||||
|
request = txn.request
|
||||||
|
target_start = getattr(request, "target_start_ns", None)
|
||||||
|
entry_now = float(env.now)
|
||||||
|
rec = {
|
||||||
|
"node_id": self.node.id,
|
||||||
|
"entry_now": entry_now,
|
||||||
|
"target_start_ns": (
|
||||||
|
float(target_start) if target_start is not None else None
|
||||||
|
),
|
||||||
|
"barrier_yielded": (
|
||||||
|
target_start is not None
|
||||||
|
and float(target_start) > entry_now
|
||||||
|
),
|
||||||
|
"pe_exec_start": None, # filled below by sniff
|
||||||
|
"late_ns": (
|
||||||
|
None if target_start is None
|
||||||
|
else max(0.0, entry_now - float(target_start))
|
||||||
|
),
|
||||||
|
}
|
||||||
|
records.append(rec)
|
||||||
|
|
||||||
|
# We can't easily inject a callback at the original's
|
||||||
|
# `pe_exec_start = env.now` line without rewriting it. Approximate:
|
||||||
|
# if the original yields the barrier, env.now after the yield is
|
||||||
|
# target_start_ns; otherwise pe_exec_start is entry_now (skipped).
|
||||||
|
if rec["barrier_yielded"]:
|
||||||
|
rec["pe_exec_start"] = float(target_start)
|
||||||
|
else:
|
||||||
|
rec["pe_exec_start"] = entry_now
|
||||||
|
|
||||||
|
yield from original(self, env, txn)
|
||||||
|
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = original
|
||||||
|
|
||||||
|
return records, restore
|
||||||
|
|
||||||
|
|
||||||
|
def _run_multicube_launch():
|
||||||
|
"""Drive a no-op kernel launch across all 16 cubes x 8 PEs and return
|
||||||
|
the per-PE D5 records collected by the monkey-patch."""
|
||||||
|
records, restore = _capture_per_pe_d5_state()
|
||||||
|
try:
|
||||||
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
spec = topo.topology_obj.spec
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine, target_device=DeviceSelector("all"),
|
||||||
|
correlation_id="d5_barrier", spec=spec,
|
||||||
|
) as ctx:
|
||||||
|
dp = DPPolicy(
|
||||||
|
cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=16, num_pes=8,
|
||||||
|
)
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pass # no-op
|
||||||
|
|
||||||
|
ctx.ahbm.set_device(0)
|
||||||
|
t = ctx.zeros(
|
||||||
|
(16, 8 * 64), dtype="f16", dp=dp, name="probe",
|
||||||
|
)
|
||||||
|
t.copy_(ctx.from_numpy(
|
||||||
|
np.zeros((16, 8 * 64), dtype=np.float16),
|
||||||
|
))
|
||||||
|
|
||||||
|
pending = ctx.launch(
|
||||||
|
"d5_probe", kernel, t, 64, _defer_wait=True,
|
||||||
|
)
|
||||||
|
for h, _sip, meta in pending:
|
||||||
|
ctx.wait(h, _meta=meta)
|
||||||
|
finally:
|
||||||
|
restore()
|
||||||
|
return records
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_pe_arrives_after_target_start_ns():
|
||||||
|
"""ADR-0009 D5: no PE may enter `_execute_kernel` after target_start_ns.
|
||||||
|
|
||||||
|
Today this fails because IO_CPU's predictor under-shoots actual
|
||||||
|
dispatch latency for far cubes (cube4, cube9-15). Phase 2 fix:
|
||||||
|
chain-aware predictor in IO_CPU + monotonic upward re-stamp in M_CPU.
|
||||||
|
"""
|
||||||
|
records = _run_multicube_launch()
|
||||||
|
assert records, "expected per-PE _execute_kernel records"
|
||||||
|
|
||||||
|
late = [
|
||||||
|
r for r in records
|
||||||
|
if r["target_start_ns"] is not None
|
||||||
|
and r["late_ns"] is not None
|
||||||
|
and r["late_ns"] > 1e-6
|
||||||
|
]
|
||||||
|
|
||||||
|
if late:
|
||||||
|
# Provide actionable diagnostic in the failure.
|
||||||
|
worst = sorted(late, key=lambda r: -r["late_ns"])[:5]
|
||||||
|
details = "\n".join(
|
||||||
|
f" {r['node_id']}: late by {r['late_ns']:.2f} ns "
|
||||||
|
f"(entry_now={r['entry_now']:.2f}, "
|
||||||
|
f"target_start_ns={r['target_start_ns']:.2f})"
|
||||||
|
for r in worst
|
||||||
|
)
|
||||||
|
pytest.fail(
|
||||||
|
f"ADR-0009 D5 violated: {len(late)}/{len(records)} PEs "
|
||||||
|
f"entered _execute_kernel AFTER target_start_ns "
|
||||||
|
f"(barrier yield silently skipped). "
|
||||||
|
f"Worst offenders:\n{details}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_pes_have_identical_pe_exec_start():
|
||||||
|
"""ADR-0009 D5: every PE's pe_exec_start must be identical.
|
||||||
|
|
||||||
|
With D5 honored, every PE either yields to target_start_ns (start =
|
||||||
|
target_start_ns) or, if late, would still be aligned by the M_CPU
|
||||||
|
upward re-stamp (Phase 2). Today: 75/128 PEs in this launch have
|
||||||
|
distinct pe_exec_start values because they skipped the barrier.
|
||||||
|
"""
|
||||||
|
records = _run_multicube_launch()
|
||||||
|
assert records, "expected per-PE _execute_kernel records"
|
||||||
|
|
||||||
|
starts = sorted({round(r["pe_exec_start"], 6) for r in records})
|
||||||
|
if len(starts) > 1:
|
||||||
|
spread = max(starts) - min(starts)
|
||||||
|
# Distribution of how many PEs at each distinct start time
|
||||||
|
from collections import Counter
|
||||||
|
bucket = Counter(round(r["pe_exec_start"], 6) for r in records)
|
||||||
|
details = "\n".join(
|
||||||
|
f" pe_exec_start={t}: {n} PEs"
|
||||||
|
for t, n in sorted(bucket.items())
|
||||||
|
)
|
||||||
|
pytest.fail(
|
||||||
|
f"ADR-0009 D5 violated: PEs have {len(starts)} distinct "
|
||||||
|
f"pe_exec_start values (spread = {spread:.2f} ns); "
|
||||||
|
f"D5 mandates a single common value. "
|
||||||
|
f"Distribution:\n{details}"
|
||||||
|
)
|
||||||
@@ -0,0 +1,741 @@
|
|||||||
|
"""Diagnostic for the inter-cube RAW > IPCQ asymmetry on h3/h4 plots.
|
||||||
|
|
||||||
|
Single-shot run at h3 (sip0.cube0.pe0 -> sip0.cube1.pe0), nbytes=4096.
|
||||||
|
|
||||||
|
Captures per-PE pe_exec_ns and the actual path / drain / per-node overhead
|
||||||
|
breakdown for the RAW sub-txn (PE_DMA -> remote HBM_CTRL) vs the IPCQ
|
||||||
|
outbound sub-txn (PE_DMA -> peer PE_DMA), so we can localize the gap to
|
||||||
|
one of:
|
||||||
|
(a) drain at HBM-BW (RAW) vs fabric-BW (IPCQ)
|
||||||
|
(b) path-length / per-node overhead asymmetry
|
||||||
|
(c) RAW SRC paying tl.load (local HBM read) on top of remote tl.store
|
||||||
|
while IPCQ DST only pays inbound traversal+drain.
|
||||||
|
|
||||||
|
Phase 1 / test-only. No production code is modified.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
|
||||||
|
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
||||||
|
from kernbench.policy.placement.dp import DPPolicy
|
||||||
|
from kernbench.runtime_api.context import RuntimeContext
|
||||||
|
from kernbench.runtime_api.types import DeviceSelector
|
||||||
|
from kernbench.sim_engine.engine import GraphEngine
|
||||||
|
from kernbench.topology.builder import resolve_topology
|
||||||
|
|
||||||
|
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Allow the test to be re-run for h4 (inter-cube vertical) at multiple sizes
|
||||||
|
# to investigate why IPCQ slope flattens past 8192 B (path may differ).
|
||||||
|
NBYTES = int(os.environ.get("DIAG_NBYTES", "4096"))
|
||||||
|
ELEM_BYTES = 2
|
||||||
|
N_ELEM = NBYTES // ELEM_BYTES
|
||||||
|
N_CUBES = 16
|
||||||
|
N_PES = 8
|
||||||
|
HOP = os.environ.get("DIAG_HOP", "h3")
|
||||||
|
if HOP == "h4":
|
||||||
|
SRC = (0, 0, 0)
|
||||||
|
DST = (0, 4, 0) # h4 inter-cube vertical
|
||||||
|
else:
|
||||||
|
SRC = (0, 0, 0)
|
||||||
|
DST = (0, 1, 0) # h3 inter-cube horizontal
|
||||||
|
|
||||||
|
|
||||||
|
# ── Per-PE pe_exec_ns capture via monkey-patch ───────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _install_barrier_capture():
|
||||||
|
"""Wrap PeCpuComponent._execute_kernel to log, for every PE that
|
||||||
|
enters: env.now at entry, target_start_ns the request carried,
|
||||||
|
whether the barrier yield fired (i.e. env.now < target_start_ns),
|
||||||
|
and env.now at pe_exec_start.
|
||||||
|
"""
|
||||||
|
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
|
||||||
|
|
||||||
|
log: list[dict] = []
|
||||||
|
original = pe_cpu_mod.PeCpuComponent._execute_kernel
|
||||||
|
|
||||||
|
def patched(self, env, txn):
|
||||||
|
request = txn.request
|
||||||
|
target_start = getattr(request, "target_start_ns", None)
|
||||||
|
entry_now = float(env.now)
|
||||||
|
log_entry = {
|
||||||
|
"node_id": self.node.id,
|
||||||
|
"entry_now": entry_now,
|
||||||
|
"target_start_ns": (
|
||||||
|
float(target_start) if target_start is not None else None
|
||||||
|
),
|
||||||
|
"barrier_skipped": (
|
||||||
|
target_start is None
|
||||||
|
or float(target_start) <= entry_now
|
||||||
|
),
|
||||||
|
"delta_late_ns": (
|
||||||
|
None if target_start is None
|
||||||
|
else max(0.0, entry_now - float(target_start))
|
||||||
|
),
|
||||||
|
}
|
||||||
|
log.append(log_entry)
|
||||||
|
yield from original(self, env, txn)
|
||||||
|
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = original
|
||||||
|
|
||||||
|
return log, restore
|
||||||
|
|
||||||
|
|
||||||
|
def _install_per_pe_capture():
|
||||||
|
"""Wrap PeCpuComponent._execute_kernel so we record (node_id ->
|
||||||
|
pe_exec_ns) for every PE that executes a kernel during the run.
|
||||||
|
|
||||||
|
Returns (capture_dict, restore_callable).
|
||||||
|
"""
|
||||||
|
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
|
||||||
|
|
||||||
|
captured: dict[str, float] = {}
|
||||||
|
original = pe_cpu_mod.PeCpuComponent._execute_kernel
|
||||||
|
|
||||||
|
def patched(self, env, txn):
|
||||||
|
gen = original(self, env, txn)
|
||||||
|
try:
|
||||||
|
value = yield from gen
|
||||||
|
finally:
|
||||||
|
v = txn.result_data.get("pe_exec_ns")
|
||||||
|
if v is not None:
|
||||||
|
captured[self.node.id] = float(v)
|
||||||
|
return value
|
||||||
|
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
pe_cpu_mod.PeCpuComponent._execute_kernel = original
|
||||||
|
|
||||||
|
return captured, restore
|
||||||
|
|
||||||
|
|
||||||
|
def _install_recv_capture(target_node_id: str):
|
||||||
|
"""Wrap PeIpcqComponent._handle_recv to log entry/exit times and the
|
||||||
|
peer_head_cache/my_tail values seen at the start.
|
||||||
|
|
||||||
|
This pins down whether recv ever blocked on a wait_event, or whether
|
||||||
|
it consumed without waiting (i.e. peer_head_cache > my_tail at entry).
|
||||||
|
"""
|
||||||
|
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
|
||||||
|
|
||||||
|
log: list[dict] = []
|
||||||
|
original = pe_ipcq_mod.PeIpcqComponent._handle_recv
|
||||||
|
|
||||||
|
def patched(self, env, req, cmd):
|
||||||
|
if self.node.id != target_node_id:
|
||||||
|
yield from original(self, env, req, cmd)
|
||||||
|
return
|
||||||
|
# Snapshot state before dispatch
|
||||||
|
d = cmd.direction
|
||||||
|
qp = self._queue_pairs.get(d, {})
|
||||||
|
log.append({
|
||||||
|
"phase": "enter",
|
||||||
|
"t": float(env.now),
|
||||||
|
"direction": d,
|
||||||
|
"peer_head_cache": qp.get("peer_head_cache"),
|
||||||
|
"my_tail": qp.get("my_tail"),
|
||||||
|
})
|
||||||
|
yield from original(self, env, req, cmd)
|
||||||
|
qp = self._queue_pairs.get(d, {})
|
||||||
|
log.append({
|
||||||
|
"phase": "exit",
|
||||||
|
"t": float(env.now),
|
||||||
|
"direction": d,
|
||||||
|
"peer_head_cache": qp.get("peer_head_cache"),
|
||||||
|
"my_tail": qp.get("my_tail"),
|
||||||
|
})
|
||||||
|
|
||||||
|
pe_ipcq_mod.PeIpcqComponent._handle_recv = patched
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
pe_ipcq_mod.PeIpcqComponent._handle_recv = original
|
||||||
|
|
||||||
|
return log, restore
|
||||||
|
|
||||||
|
|
||||||
|
def _install_meta_arrival_capture(target_node_id: str):
|
||||||
|
"""Log every IpcqMetaArrival that lands on ``target_node_id`` PE_IPCQ.
|
||||||
|
|
||||||
|
Records (env_now, sender_seq, dst_addr, matched_direction,
|
||||||
|
peer_head_cache_before, my_tail_before).
|
||||||
|
"""
|
||||||
|
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
|
||||||
|
|
||||||
|
log: list[dict] = []
|
||||||
|
original = pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival
|
||||||
|
|
||||||
|
def patched(self, msg):
|
||||||
|
if self.node.id == target_node_id:
|
||||||
|
token = msg.token
|
||||||
|
now = float(self._env.now) if hasattr(self, "_env") else 0.0
|
||||||
|
# _env is not stored on the component; use ctx? Fall back to
|
||||||
|
# introspection via self._inbox._env (SimPy stores reference).
|
||||||
|
try:
|
||||||
|
now = float(self._inbox._env.now)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
entry = {
|
||||||
|
"t": now,
|
||||||
|
"sender_seq": getattr(token, "sender_seq", None),
|
||||||
|
"dst_addr": getattr(token, "dst_addr", None),
|
||||||
|
"src_sip": getattr(token, "src_sip", None),
|
||||||
|
"src_cube": getattr(token, "src_cube", None),
|
||||||
|
"src_pe": getattr(token, "src_pe", None),
|
||||||
|
"src_direction": getattr(token, "src_direction", None),
|
||||||
|
"nbytes": getattr(token, "nbytes", None),
|
||||||
|
"matched_direction": None,
|
||||||
|
"peer_head_cache_before": {},
|
||||||
|
"my_tail_before": {},
|
||||||
|
}
|
||||||
|
for d, qp in self._queue_pairs.items():
|
||||||
|
entry["peer_head_cache_before"][d] = qp["peer_head_cache"]
|
||||||
|
entry["my_tail_before"][d] = qp["my_tail"]
|
||||||
|
base = qp["my_rx_base_pa"]
|
||||||
|
size = qp["n_slots"] * qp["slot_size"]
|
||||||
|
if base <= entry["dst_addr"] < base + size:
|
||||||
|
entry["matched_direction"] = d
|
||||||
|
log.append(entry)
|
||||||
|
return original(self, msg)
|
||||||
|
|
||||||
|
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = patched
|
||||||
|
|
||||||
|
def restore():
|
||||||
|
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = original
|
||||||
|
|
||||||
|
return log, restore
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_qp_state(engine, target_node_id: str) -> dict:
|
||||||
|
"""Snapshot every direction's qp state on the target PE_IPCQ now.
|
||||||
|
|
||||||
|
Captures peer_head_cache, my_tail, my_rx_base_pa, n_slots, slot_size
|
||||||
|
for each installed direction.
|
||||||
|
"""
|
||||||
|
comp = engine._components.get(target_node_id)
|
||||||
|
if comp is None:
|
||||||
|
return {}
|
||||||
|
return {
|
||||||
|
d: {
|
||||||
|
"peer_head_cache": qp["peer_head_cache"],
|
||||||
|
"my_tail": qp["my_tail"],
|
||||||
|
"my_rx_base_pa": qp["my_rx_base_pa"],
|
||||||
|
"n_slots": qp["n_slots"],
|
||||||
|
"slot_size": qp["slot_size"],
|
||||||
|
"rx_range": (
|
||||||
|
qp["my_rx_base_pa"],
|
||||||
|
qp["my_rx_base_pa"] + qp["n_slots"] * qp["slot_size"],
|
||||||
|
),
|
||||||
|
}
|
||||||
|
for d, qp in comp.queue_pairs.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Path / drain breakdown using engine ctx ──────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _path_breakdown(ctx, path: list[str], nbytes: int) -> dict:
|
||||||
|
edge_total_ns = 0.0
|
||||||
|
edge_details = []
|
||||||
|
min_bw = float("inf")
|
||||||
|
for i in range(len(path) - 1):
|
||||||
|
edge = ctx.edge_map.get((path[i], path[i + 1]))
|
||||||
|
if edge is None:
|
||||||
|
edge_details.append((path[i], path[i + 1], None, None, None))
|
||||||
|
continue
|
||||||
|
prop_ns = edge.distance_mm * ctx.ns_per_mm
|
||||||
|
edge_total_ns += prop_ns
|
||||||
|
bw = getattr(edge, "bw_gbs", None) or 0.0
|
||||||
|
if bw > 0 and bw < min_bw:
|
||||||
|
min_bw = bw
|
||||||
|
edge_details.append(
|
||||||
|
(path[i], path[i + 1], edge.distance_mm, prop_ns, bw),
|
||||||
|
)
|
||||||
|
|
||||||
|
overhead_total_ns = 0.0
|
||||||
|
overhead_details = []
|
||||||
|
for nid in path:
|
||||||
|
oh = float(ctx.node_overhead_ns.get(nid, 0.0))
|
||||||
|
overhead_total_ns += oh
|
||||||
|
overhead_details.append((nid, oh))
|
||||||
|
|
||||||
|
drain_ns = ctx.compute_drain_ns(path, nbytes)
|
||||||
|
bottleneck_bw = None if min_bw == float("inf") else min_bw
|
||||||
|
|
||||||
|
return {
|
||||||
|
"path": path,
|
||||||
|
"edges": edge_details,
|
||||||
|
"edge_total_ns": edge_total_ns,
|
||||||
|
"overheads": overhead_details,
|
||||||
|
"overhead_total_ns": overhead_total_ns,
|
||||||
|
"drain_ns": drain_ns,
|
||||||
|
"bottleneck_bw_gbs": bottleneck_bw,
|
||||||
|
"expected_total_ns": edge_total_ns + overhead_total_ns + drain_ns,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _print_breakdown(label: str, br: dict) -> None:
|
||||||
|
print(f"\n {label}")
|
||||||
|
print(f" path ({len(br['path'])} nodes):")
|
||||||
|
for nid in br["path"]:
|
||||||
|
print(f" - {nid}")
|
||||||
|
print(f" edges (prop. delay):")
|
||||||
|
for src, dst, dist_mm, prop_ns, bw in br["edges"]:
|
||||||
|
if dist_mm is None:
|
||||||
|
print(f" ! {src} -> {dst} EDGE NOT FOUND IN edge_map")
|
||||||
|
continue
|
||||||
|
print(
|
||||||
|
f" {src} -> {dst} "
|
||||||
|
f"dist={dist_mm:.3f}mm prop={prop_ns:.2f}ns "
|
||||||
|
f"bw={bw or 0:.2f}GB/s"
|
||||||
|
)
|
||||||
|
print(f" per-node overhead_ns:")
|
||||||
|
for nid, oh in br["overheads"]:
|
||||||
|
if oh > 0:
|
||||||
|
print(f" {nid:<60s} overhead_ns={oh:.2f}")
|
||||||
|
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
|
||||||
|
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
|
||||||
|
print(f" bottleneck_bw_gbs = {br['bottleneck_bw_gbs']}")
|
||||||
|
print(f" drain_ns (nbytes={NBYTES}) = {br['drain_ns']:.2f}")
|
||||||
|
print(f" expected_total_ns = {br['expected_total_ns']:.2f}")
|
||||||
|
|
||||||
|
|
||||||
|
# ── RAW path scenario ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _dump_src_op_records(engine, src_sip, src_cube, src_pe, label) -> None:
|
||||||
|
"""Print op_logger records for ops on the SRC PE.
|
||||||
|
|
||||||
|
The op log captures t_start/t_end for memory/math/gemm/copy ops on
|
||||||
|
every component, so we can see how long tl.load vs tl.store vs
|
||||||
|
tl.send actually took at the engine level.
|
||||||
|
"""
|
||||||
|
op_logger = getattr(engine, "_op_logger", None)
|
||||||
|
if op_logger is None:
|
||||||
|
print(f" ({label}) op_logger not available")
|
||||||
|
return
|
||||||
|
src_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}."
|
||||||
|
recs = [r for r in op_logger.records if r.component_id.startswith(src_prefix)]
|
||||||
|
print(f" ({label}) op_logger records on SRC PE ({src_prefix}*):")
|
||||||
|
for r in recs[:40]:
|
||||||
|
dur = r.t_end - r.t_start
|
||||||
|
comp_short = r.component_id.replace(src_prefix, "")
|
||||||
|
params_short = ""
|
||||||
|
if "nbytes" in r.params:
|
||||||
|
params_short = f" nbytes={r.params['nbytes']}"
|
||||||
|
if "src_addr" in r.params:
|
||||||
|
params_short += f" src_addr={r.params['src_addr']}"
|
||||||
|
if "dst_addr" in r.params:
|
||||||
|
params_short += f" dst_addr={r.params['dst_addr']}"
|
||||||
|
print(
|
||||||
|
f" t=[{r.t_start:7.2f}..{r.t_end:7.2f}] dur={dur:6.2f}ns "
|
||||||
|
f"{comp_short:<25s} {r.op_kind:<8s} {r.op_name:<12s}{params_short}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_raw():
|
||||||
|
captured, restore = _install_per_pe_capture()
|
||||||
|
try:
|
||||||
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
spec = topo.topology_obj.spec
|
||||||
|
|
||||||
|
src_sip, src_cube, src_pe = SRC
|
||||||
|
dst_sip, dst_cube, dst_pe = DST
|
||||||
|
assert src_sip == dst_sip
|
||||||
|
|
||||||
|
src_off = (src_cube * N_PES + src_pe) * N_ELEM * ELEM_BYTES
|
||||||
|
dst_off = (dst_cube * N_PES + dst_pe) * N_ELEM * ELEM_BYTES
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine,
|
||||||
|
target_device=DeviceSelector("all"),
|
||||||
|
correlation_id="diag_raw",
|
||||||
|
spec=spec,
|
||||||
|
) as rt:
|
||||||
|
dp = DPPolicy(
|
||||||
|
cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=N_CUBES, num_pes=N_PES,
|
||||||
|
)
|
||||||
|
rt.ahbm.set_device(src_sip)
|
||||||
|
t = rt.zeros(
|
||||||
|
(N_CUBES, N_PES * N_ELEM), dtype="f16",
|
||||||
|
dp=dp, name="raw_tensor",
|
||||||
|
)
|
||||||
|
t.copy_(rt.from_numpy(
|
||||||
|
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
|
||||||
|
))
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pe_id = tl.program_id(axis=0)
|
||||||
|
cube_id = tl.program_id(axis=1)
|
||||||
|
if cube_id == src_cube and pe_id == src_pe:
|
||||||
|
data = tl.load(
|
||||||
|
t_ptr + src_off, shape=(n_elem,), dtype="f16",
|
||||||
|
)
|
||||||
|
tl.store(t_ptr + dst_off, data)
|
||||||
|
|
||||||
|
pending = rt.launch(
|
||||||
|
"diag_raw_kernel", kernel, t, N_ELEM, _defer_wait=True,
|
||||||
|
)
|
||||||
|
for h, _sip, meta in pending:
|
||||||
|
rt.wait(h, _meta=meta)
|
||||||
|
|
||||||
|
# Compute the RAW sub-txn path: src PE_DMA -> dst HBM_CTRL
|
||||||
|
from kernbench.policy.address.phyaddr import PhysAddr
|
||||||
|
ctx = next(iter(engine._components.values())).ctx
|
||||||
|
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
|
||||||
|
# Resolve dst PA to HBM controller node
|
||||||
|
# The raw store kernel issues DmaWriteCmd on dst VA; in the engine
|
||||||
|
# this is translated via PE_MMU. For diagnostic we approximate
|
||||||
|
# the destination as the dst cube's HBM controller for slice
|
||||||
|
# belonging to dst_pe.
|
||||||
|
# Use the resolver on a constructed PA matching the same memory
|
||||||
|
# slice the kernel writes to.
|
||||||
|
# The tensor is "row_wise" sharded across cubes, so each cube
|
||||||
|
# owns row[cube_id, :], with each PE owning a column slice.
|
||||||
|
# The actual dst PA depends on the AHBM allocator; we read it
|
||||||
|
# via the tensor's shard map.
|
||||||
|
shard_map = getattr(t, "_shard_map", None) or getattr(t, "shard_map", None)
|
||||||
|
# Fallback: query the resolver directly by constructing a PA in
|
||||||
|
# the dst cube's HBM region. If shard_map is unavailable, still
|
||||||
|
# show the breakdown for src-PE-DMA -> first reachable HBM_CTRL
|
||||||
|
# in dst cube.
|
||||||
|
dst_hbm_id = f"sip{dst_sip}.cube{dst_cube}.hbm_ctrl"
|
||||||
|
if dst_hbm_id not in engine._components:
|
||||||
|
# try alternate naming
|
||||||
|
for nid in engine._components.keys():
|
||||||
|
if (
|
||||||
|
nid.startswith(f"sip{dst_sip}.cube{dst_cube}.")
|
||||||
|
and "hbm" in nid
|
||||||
|
):
|
||||||
|
dst_hbm_id = nid
|
||||||
|
break
|
||||||
|
|
||||||
|
# find_path() prepends ".pe_dma" to src_pe automatically
|
||||||
|
try:
|
||||||
|
raw_path = ctx.router.find_path(src_pe_prefix, dst_hbm_id)
|
||||||
|
except Exception as e:
|
||||||
|
raw_path = []
|
||||||
|
print(f" WARN: find_path raw failed: {e}")
|
||||||
|
if not raw_path:
|
||||||
|
# Try other HBM-related node names in dst cube
|
||||||
|
for nid in engine._components.keys():
|
||||||
|
if not nid.startswith(f"sip{dst_sip}.cube{dst_cube}."):
|
||||||
|
continue
|
||||||
|
if "hbm" not in nid:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
p = ctx.router.find_path(src_pe_prefix, nid)
|
||||||
|
except Exception:
|
||||||
|
p = []
|
||||||
|
if p:
|
||||||
|
raw_path = p
|
||||||
|
print(f" (fallback raw dst node: {nid})")
|
||||||
|
break
|
||||||
|
|
||||||
|
return captured, ctx, raw_path, engine
|
||||||
|
finally:
|
||||||
|
restore()
|
||||||
|
|
||||||
|
|
||||||
|
# ── IPCQ path scenario ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _run_ipcq():
|
||||||
|
captured, restore = _install_per_pe_capture()
|
||||||
|
dst_pe_ipcq_id = (
|
||||||
|
f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_ipcq"
|
||||||
|
)
|
||||||
|
arrival_log, restore_arrival = _install_meta_arrival_capture(
|
||||||
|
dst_pe_ipcq_id,
|
||||||
|
)
|
||||||
|
recv_log, restore_recv = _install_recv_capture(dst_pe_ipcq_id)
|
||||||
|
barrier_log, restore_barrier = _install_barrier_capture()
|
||||||
|
try:
|
||||||
|
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||||
|
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||||
|
spec = topo.topology_obj.spec
|
||||||
|
|
||||||
|
src_sip, src_cube, src_pe = SRC
|
||||||
|
dst_sip, dst_cube, dst_pe = DST
|
||||||
|
|
||||||
|
cfg = load_ccl_config()
|
||||||
|
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
|
||||||
|
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), NBYTES)
|
||||||
|
|
||||||
|
with RuntimeContext(
|
||||||
|
engine=engine,
|
||||||
|
target_device=DeviceSelector("all"),
|
||||||
|
correlation_id="diag_ipcq",
|
||||||
|
spec=spec,
|
||||||
|
) as rt:
|
||||||
|
configure_sfr_intercube_multisip(engine, spec, merged)
|
||||||
|
dp = DPPolicy(
|
||||||
|
cube="row_wise", pe="column_wise",
|
||||||
|
num_cubes=N_CUBES, num_pes=N_PES,
|
||||||
|
)
|
||||||
|
|
||||||
|
def kernel(t_ptr, n_elem, tl):
|
||||||
|
pe_id = tl.program_id(axis=0)
|
||||||
|
cube_id = tl.program_id(axis=1)
|
||||||
|
if cube_id == src_cube and pe_id == src_pe:
|
||||||
|
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
|
||||||
|
tl.send(dir=("E" if HOP == "h3" else "S"), src=data)
|
||||||
|
elif cube_id == dst_cube and pe_id == dst_pe:
|
||||||
|
tl.recv(
|
||||||
|
dir=("W" if HOP == "h3" else "N"),
|
||||||
|
shape=(n_elem,), dtype="f16",
|
||||||
|
)
|
||||||
|
|
||||||
|
tensors = []
|
||||||
|
for s in sorted({src_sip, dst_sip}):
|
||||||
|
rt.ahbm.set_device(s)
|
||||||
|
t = rt.zeros(
|
||||||
|
(N_CUBES, N_PES * N_ELEM), dtype="f16",
|
||||||
|
dp=dp, name=f"sip{s}",
|
||||||
|
)
|
||||||
|
t.copy_(rt.from_numpy(
|
||||||
|
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
|
||||||
|
))
|
||||||
|
tensors.append(t)
|
||||||
|
|
||||||
|
all_pending = []
|
||||||
|
for tt in tensors:
|
||||||
|
pending = rt.launch(
|
||||||
|
"diag_ipcq_kernel", kernel, tt, N_ELEM, _defer_wait=True,
|
||||||
|
)
|
||||||
|
all_pending.extend(pending)
|
||||||
|
for h, _sip, meta in all_pending:
|
||||||
|
rt.wait(h, _meta=meta)
|
||||||
|
|
||||||
|
ctx = next(iter(engine._components.values())).ctx
|
||||||
|
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
|
||||||
|
dst_pe_dma = f"sip{dst_sip}.cube{dst_cube}.pe{dst_pe}.pe_dma"
|
||||||
|
try:
|
||||||
|
ipcq_path = ctx.router.find_path(src_pe_prefix, dst_pe_dma)
|
||||||
|
except Exception as e:
|
||||||
|
ipcq_path = []
|
||||||
|
print(f" WARN: find_path ipcq failed: {e}")
|
||||||
|
# Snapshot DST PE_IPCQ qp state at end-of-run so we can see what
|
||||||
|
# peer_head_cache/my_tail looked like (and at which directions).
|
||||||
|
qp_state = _snapshot_qp_state(engine, dst_pe_ipcq_id)
|
||||||
|
return (captured, ctx, ipcq_path, engine,
|
||||||
|
arrival_log, qp_state, recv_log, barrier_log)
|
||||||
|
finally:
|
||||||
|
restore_barrier()
|
||||||
|
restore_recv()
|
||||||
|
restore_arrival()
|
||||||
|
restore()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Test entry ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.diagnostic
|
||||||
|
def test_pe_to_pe_diagnostic_h3():
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print(f" Diagnostic: h3 inter-cube horizontal, nbytes={NBYTES}")
|
||||||
|
print(f" src={SRC} dst={DST}")
|
||||||
|
print("=" * 78)
|
||||||
|
|
||||||
|
# ── RAW scenario
|
||||||
|
print("\n[RAW] tl.load + tl.store (sender pays both legs)")
|
||||||
|
raw_per_pe, raw_ctx, raw_path, raw_engine = _run_raw()
|
||||||
|
print(f" per-PE pe_exec_ns ({len(raw_per_pe)} entries):")
|
||||||
|
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
|
||||||
|
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
|
||||||
|
for nid in (src_id, dst_id):
|
||||||
|
if nid in raw_per_pe:
|
||||||
|
print(f" {nid:<60s} {raw_per_pe[nid]:.2f} ns <-- key PE")
|
||||||
|
nonzero = {k: v for k, v in raw_per_pe.items() if v > 0.5}
|
||||||
|
if nonzero:
|
||||||
|
print(f" other PEs with pe_exec_ns > 0.5 ns:")
|
||||||
|
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
|
||||||
|
if nid not in (src_id, dst_id):
|
||||||
|
print(f" {nid:<60s} {v:.2f} ns")
|
||||||
|
print(f" max(pe_exec_ns) = "
|
||||||
|
f"{max(raw_per_pe.values()) if raw_per_pe else 0:.2f} ns")
|
||||||
|
|
||||||
|
if raw_path:
|
||||||
|
br = _path_breakdown(raw_ctx, raw_path, NBYTES)
|
||||||
|
_print_breakdown("RAW sub-txn path (src.pe_dma -> dst.hbm_ctrl)", br)
|
||||||
|
_dump_src_op_records(raw_engine, *SRC, "RAW")
|
||||||
|
|
||||||
|
# ── IPCQ scenario
|
||||||
|
print("\n[IPCQ] tl.send + tl.recv (recv pays inbound traversal+drain)")
|
||||||
|
(ipcq_per_pe, ipcq_ctx, ipcq_path, ipcq_engine,
|
||||||
|
arrival_log, qp_state, recv_log, barrier_log) = _run_ipcq()
|
||||||
|
print(f"\n [BARRIER LOG] {len(barrier_log)} _execute_kernel entries:")
|
||||||
|
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
|
||||||
|
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
|
||||||
|
n_skipped = 0
|
||||||
|
src_entry = None
|
||||||
|
dst_entry = None
|
||||||
|
for e in barrier_log:
|
||||||
|
if e["barrier_skipped"]:
|
||||||
|
n_skipped += 1
|
||||||
|
if e["node_id"] == src_id:
|
||||||
|
src_entry = e
|
||||||
|
if e["node_id"] == dst_id:
|
||||||
|
dst_entry = e
|
||||||
|
print(f" PEs entering _execute_kernel: {len(barrier_log)}")
|
||||||
|
print(f" PEs that SKIPPED barrier (env.now > target_start): {n_skipped}")
|
||||||
|
if src_entry:
|
||||||
|
print(
|
||||||
|
f" SRC pe ({src_id}): entry_now={src_entry['entry_now']:.2f} "
|
||||||
|
f"target_start={src_entry['target_start_ns']:.2f} "
|
||||||
|
f"skipped={src_entry['barrier_skipped']} "
|
||||||
|
f"late_ns={src_entry['delta_late_ns']:.2f}"
|
||||||
|
)
|
||||||
|
if dst_entry:
|
||||||
|
print(
|
||||||
|
f" DST pe ({dst_id}): entry_now={dst_entry['entry_now']:.2f} "
|
||||||
|
f"target_start={dst_entry['target_start_ns']:.2f} "
|
||||||
|
f"skipped={dst_entry['barrier_skipped']} "
|
||||||
|
f"late_ns={dst_entry['delta_late_ns']:.2f}"
|
||||||
|
)
|
||||||
|
# Top 5 latest arrivals
|
||||||
|
sorted_late = sorted(
|
||||||
|
[e for e in barrier_log if e["delta_late_ns"] is not None],
|
||||||
|
key=lambda e: -e["delta_late_ns"],
|
||||||
|
)[:5]
|
||||||
|
print(f" Top 5 latest PE arrivals (positive = barrier missed):")
|
||||||
|
for e in sorted_late:
|
||||||
|
if e["delta_late_ns"] > 0:
|
||||||
|
print(
|
||||||
|
f" {e['node_id']}: late by {e['delta_late_ns']:.2f} ns "
|
||||||
|
f"(entry={e['entry_now']:.2f}, target={e['target_start_ns']:.2f})"
|
||||||
|
)
|
||||||
|
print(f"\n [RECV LOG on dst pe_ipcq] {len(recv_log)} entries:")
|
||||||
|
for e in recv_log:
|
||||||
|
print(
|
||||||
|
f" {e['phase']:5s} t={e['t']:8.2f} ns "
|
||||||
|
f"dir={e['direction']} "
|
||||||
|
f"peer_head_cache={e['peer_head_cache']} "
|
||||||
|
f"my_tail={e['my_tail']}"
|
||||||
|
)
|
||||||
|
print(f"\n [META-ARRIVAL LOG on dst pe_ipcq] {len(arrival_log)} arrivals:")
|
||||||
|
for i, e in enumerate(arrival_log):
|
||||||
|
print(
|
||||||
|
f" #{i:2d} t={e['t']:8.2f} ns "
|
||||||
|
f"src=(sip{e['src_sip']},cube{e['src_cube']},pe{e['src_pe']}) "
|
||||||
|
f"dir={e['src_direction']} "
|
||||||
|
f"sender_seq={e['sender_seq']} "
|
||||||
|
f"matched_dir={e['matched_direction']} "
|
||||||
|
f"nbytes={e['nbytes']}"
|
||||||
|
)
|
||||||
|
for d, ph in e["peer_head_cache_before"].items():
|
||||||
|
mt = e["my_tail_before"][d]
|
||||||
|
if ph != 0 or mt != 0 or d == e["matched_direction"]:
|
||||||
|
print(
|
||||||
|
f" before: dir={d} peer_head_cache={ph} my_tail={mt}"
|
||||||
|
)
|
||||||
|
print(f"\n [QP STATE END-OF-RUN on dst pe_ipcq]:")
|
||||||
|
for d, st in qp_state.items():
|
||||||
|
print(
|
||||||
|
f" dir={d} peer_head_cache={st['peer_head_cache']} "
|
||||||
|
f"my_tail={st['my_tail']} rx_range=[{st['rx_range'][0]}..."
|
||||||
|
f"{st['rx_range'][1]}) n_slots={st['n_slots']} "
|
||||||
|
f"slot_size={st['slot_size']}"
|
||||||
|
)
|
||||||
|
print(f" per-PE pe_exec_ns ({len(ipcq_per_pe)} entries):")
|
||||||
|
for nid in (src_id, dst_id):
|
||||||
|
if nid in ipcq_per_pe:
|
||||||
|
print(f" {nid:<60s} {ipcq_per_pe[nid]:.2f} ns <-- key PE")
|
||||||
|
nonzero = {k: v for k, v in ipcq_per_pe.items() if v > 0.5}
|
||||||
|
if nonzero:
|
||||||
|
print(f" other PEs with pe_exec_ns > 0.5 ns:")
|
||||||
|
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
|
||||||
|
if nid not in (src_id, dst_id):
|
||||||
|
print(f" {nid:<60s} {v:.2f} ns")
|
||||||
|
print(f" max(pe_exec_ns) = "
|
||||||
|
f"{max(ipcq_per_pe.values()) if ipcq_per_pe else 0:.2f} ns")
|
||||||
|
|
||||||
|
if ipcq_path:
|
||||||
|
br = _path_breakdown(ipcq_ctx, ipcq_path, NBYTES)
|
||||||
|
_print_breakdown("IPCQ sub-txn path (src.pe_dma -> peer.pe_dma)", br)
|
||||||
|
_dump_src_op_records(ipcq_engine, *SRC, "IPCQ")
|
||||||
|
_dump_src_op_records(ipcq_engine, *DST, "IPCQ DST")
|
||||||
|
|
||||||
|
# ── Credit-return path analysis (where the missing IPCQ "ack" lives)
|
||||||
|
print("\n" + "-" * 78)
|
||||||
|
print("Credit-return path (current modeling)")
|
||||||
|
print("-" * 78)
|
||||||
|
src_pe_prefix = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}"
|
||||||
|
dst_pe_prefix = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}"
|
||||||
|
# PE_IPCQ._credit_latency_ns calls
|
||||||
|
# ctx.router.find_path(self._pe_prefix, peer_pe_prefix)
|
||||||
|
# where the *destination* lacks the ".pe_dma" suffix. find_path()
|
||||||
|
# only auto-appends to the source, so this raises -> the except
|
||||||
|
# clause silently returns 0.0. Effectively credit latency = 0.
|
||||||
|
try:
|
||||||
|
ipcq_ctx.router.find_path(dst_pe_prefix, src_pe_prefix)
|
||||||
|
bug_caught = False
|
||||||
|
except Exception as e:
|
||||||
|
bug_caught = True
|
||||||
|
print(f" CONFIRMED BUG in _credit_latency_ns: dest lacks '.pe_dma' "
|
||||||
|
f"-> find_path raises -> caught exception -> returns 0.0")
|
||||||
|
print(f" Error: {e}")
|
||||||
|
# The intended credit path is recv -> sender (reverse data direction)
|
||||||
|
try:
|
||||||
|
credit_path = ipcq_ctx.router.find_path(
|
||||||
|
dst_pe_prefix, f"{src_pe_prefix}.pe_dma",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
credit_path = []
|
||||||
|
print(f" WARN: corrected find_path credit failed: {e}")
|
||||||
|
if credit_path:
|
||||||
|
credit_size = 16 # PE_IPCQ default _credit_size_bytes
|
||||||
|
# Today's modeling: drain only, 16 bytes -> ~0.125 ns
|
||||||
|
cur = ipcq_ctx.compute_drain_ns(credit_path, credit_size)
|
||||||
|
# Proposed modeling: full path latency (edges + node overhead + drain)
|
||||||
|
proposed = ipcq_ctx.compute_path_latency_ns(credit_path, credit_size)
|
||||||
|
print(f" credit path nodes = {len(credit_path)} (recv -> sender)")
|
||||||
|
for nid in credit_path[:6]:
|
||||||
|
print(f" {nid}")
|
||||||
|
if len(credit_path) > 6:
|
||||||
|
print(f" ... {len(credit_path) - 6} more nodes")
|
||||||
|
br = _path_breakdown(ipcq_ctx, credit_path, credit_size)
|
||||||
|
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
|
||||||
|
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
|
||||||
|
print(f" drain_ns(16 bytes) = {br['drain_ns']:.2f}")
|
||||||
|
print(f" CURRENT _credit_latency_ns (drain only) = {cur:.3f} ns")
|
||||||
|
print(f" PROPOSED (compute_path_latency_ns) = {proposed:.2f} ns")
|
||||||
|
print(f" delta = {proposed - cur:+.2f} ns")
|
||||||
|
|
||||||
|
# ── Comparison summary
|
||||||
|
print("\n" + "-" * 78)
|
||||||
|
print("Summary")
|
||||||
|
print("-" * 78)
|
||||||
|
raw_max = max(raw_per_pe.values()) if raw_per_pe else 0.0
|
||||||
|
ipcq_max = max(ipcq_per_pe.values()) if ipcq_per_pe else 0.0
|
||||||
|
print(f" RAW max(pe_exec_ns) = {raw_max:.2f} ns")
|
||||||
|
print(f" IPCQ max(pe_exec_ns) (current) = {ipcq_max:.2f} ns")
|
||||||
|
print(f" delta (RAW - IPCQ current) = {raw_max - ipcq_max:+.2f} ns")
|
||||||
|
if credit_path:
|
||||||
|
ipcq_with_credit = ipcq_max + (proposed - cur)
|
||||||
|
print(
|
||||||
|
f" IPCQ projected w/ blocking credit + full path overhead "
|
||||||
|
f"= {ipcq_with_credit:.2f} ns"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" delta (RAW - IPCQ projected) = "
|
||||||
|
f"{raw_max - ipcq_with_credit:+.2f} ns "
|
||||||
|
f"(<= 0 means IPCQ >= RAW)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# No assertions — this is observational.
|
||||||
|
assert raw_per_pe, "no RAW pe_exec_ns recorded"
|
||||||
|
assert ipcq_per_pe, "no IPCQ pe_exec_ns recorded"
|
||||||
@@ -0,0 +1,106 @@
|
|||||||
|
"""Rectangular (non-square) SIP-level 2D topology support.
|
||||||
|
|
||||||
|
Phase 1 regression target: today the 2D builtin topology functions in
|
||||||
|
``kernbench.ccl.topologies`` (``mesh_2d``, ``torus_2d``,
|
||||||
|
``mesh_2d_no_wrap``) hardcode ``side = sqrt(world_size)`` and raise
|
||||||
|
``ValueError`` for any non-square ``world_size``. This blocks running
|
||||||
|
the allreduce sweep at n_sips=6 on torus/mesh layouts.
|
||||||
|
|
||||||
|
Phase 2 will extend these functions to accept optional ``w, h`` kwargs
|
||||||
|
so a 2×3 (or 3×2, etc.) layout works. Until then, every test below is
|
||||||
|
expected to FAIL.
|
||||||
|
|
||||||
|
Layout convention used here (matches non-rectangular case):
|
||||||
|
rank = row * w + col for 0 <= row < h, 0 <= col < w
|
||||||
|
|
||||||
|
For w=2, h=3, world_size=6 the layout is:
|
||||||
|
|
||||||
|
col=0 col=1
|
||||||
|
row=0: 0 1
|
||||||
|
row=1: 2 3
|
||||||
|
row=2: 4 5
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kernbench.ccl.topologies import (
|
||||||
|
mesh_2d,
|
||||||
|
mesh_2d_no_wrap,
|
||||||
|
torus_2d,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── mesh_2d_no_wrap (no wrap-around) ──────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_mesh_2d_no_wrap_2x3_top_left():
|
||||||
|
"""rank 0 (top-left, no N, no W): only S and E."""
|
||||||
|
nbrs = mesh_2d_no_wrap(rank=0, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"S": 2, "E": 1}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
def test_mesh_2d_no_wrap_2x3_top_right():
|
||||||
|
"""rank 1 (top-right, no N, no E): only S and W."""
|
||||||
|
nbrs = mesh_2d_no_wrap(rank=1, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"S": 3, "W": 0}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
def test_mesh_2d_no_wrap_2x3_middle_left():
|
||||||
|
"""rank 2 (middle-left, no W): N, S, E."""
|
||||||
|
nbrs = mesh_2d_no_wrap(rank=2, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"N": 0, "S": 4, "E": 3}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
def test_mesh_2d_no_wrap_2x3_bottom_right():
|
||||||
|
"""rank 5 (bottom-right, no S, no E): only N and W."""
|
||||||
|
nbrs = mesh_2d_no_wrap(rank=5, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"N": 3, "W": 4}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
# ── torus_2d (wrap-around on all four edges) ─────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_torus_2d_2x3_top_left():
|
||||||
|
"""rank 0: N wraps to row 2 col 0 (rank 4); W wraps to col 1 (rank 1)."""
|
||||||
|
nbrs = torus_2d(rank=0, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"N": 4, "S": 2, "W": 1, "E": 1}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
def test_torus_2d_2x3_bottom_right():
|
||||||
|
"""rank 5: S wraps to row 0 (rank 1); E wraps to col 0 (rank 4)."""
|
||||||
|
nbrs = torus_2d(rank=5, world_size=6, w=2, h=3)
|
||||||
|
assert nbrs == {"N": 3, "S": 1, "W": 4, "E": 4}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
# ── mesh_2d alias for torus_2d ───────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_mesh_2d_2x3_matches_torus_2d():
|
||||||
|
"""mesh_2d is currently a torus alias; behaviour must match torus_2d."""
|
||||||
|
for rank in range(6):
|
||||||
|
assert mesh_2d(rank=rank, world_size=6, w=2, h=3) == \
|
||||||
|
torus_2d(rank=rank, world_size=6, w=2, h=3)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Back-compat: square layouts still work without w/h kwargs ────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_square_back_compat_mesh_2d_no_wrap():
|
||||||
|
"""Calling without w, h should still work for square world_size."""
|
||||||
|
nbrs = mesh_2d_no_wrap(rank=0, world_size=4)
|
||||||
|
assert nbrs == {"S": 2, "E": 1}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
def test_square_back_compat_torus_2d():
|
||||||
|
nbrs = torus_2d(rank=0, world_size=4)
|
||||||
|
assert nbrs == {"N": 2, "S": 2, "W": 1, "E": 1}, nbrs
|
||||||
|
|
||||||
|
|
||||||
|
# ── Validation: w*h must match world_size ────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_rectangular_dims_must_match_world_size():
|
||||||
|
"""Phase 2 contract: explicit w, h must satisfy w*h == world_size."""
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
mesh_2d_no_wrap(rank=0, world_size=6, w=3, h=3) # 9 != 6
|
||||||