3 Commits

Author SHA1 Message Date
mukesh e9cc40f74d Rectangular SIP topology + 6-device allreduce sweep
mesh_2d, torus_2d, and mesh_2d_no_wrap accept optional w,h kwargs;
sqrt fall-back preserved for square layouts (back-compat tests
confirm 4-SIP and 9-SIP square configs still work). sfr_config
reads system.sips.w/h from spec and threads dims through to the
topology fn.

test_allreduce_multidevice CONFIGS switched from 4 SIPs (square)
to 6 SIPs: ring_1d_6sip, torus_2d_6sip_2x3, mesh_2d_no_wrap_6sip_2x3.
_write_temp_configs writes system.sips.w/h when supplied;
_sip_topo_dims reads them back. Latency sweep loop also moved to
6-SIP layouts. Linear-scale plot variants dropped -- only log-scale
*.png + summary.csv emitted. Plots in tests/allreduce_latency_plots
regenerated.

New tests/test_sip_topology_rectangular.py asserts neighbor
correctness for 2x3 layouts and back-compat for square fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:13:14 -07:00
mukesh c1a5cf3a2a ADR-0009 D5: chain-aware target_start_ns + zero-byte launch fanout
The single-walk predictor (find_node_path(io_cpu, pe_cpu) +
compute_path_latency_ns) under-shot actual dispatch latency for far
cubes -- the routing graph could pick a path bypassing M_CPU, and
non-zero-nbytes launch sub-txns serialized on shared first hops.
Far PEs arrived at _execute_kernel after target_start_ns, silently
skipped the barrier yield, and started pe_exec_start late. Their
reported pe_exec_ns under-counted by exactly the late_ns amount
(63 ns observed at h4 cube4.pe0 in the IPCQ test, up to 113 ns
worst case for cubes 9-11), producing the suspicious flat region
in the h4 IPCQ curve at 8192/10240 bytes.

Fix:
  - IO_CPU predictor uses the explicit two-leg chain
    (IO_CPU->M_CPU + M_CPU->PE_CPU - io.overhead - m.overhead), so
    every PE on every targeted cube has a barrier >= its real
    dispatch arrival.
  - Kernel-launch fanout sub-txns carry nbytes=0 (control-plane,
    not data-plane), removing the per-cube fanout serialization
    that pushed far M_CPUs past the predictor.
  - Legacy io_cpu mirror updated.

ADR-0009 D5 mechanism updated to specify the two-leg formula and
the nbytes=0 requirement. New tests/test_d5_barrier_invariant.py
asserts (a) no PE enters _execute_kernel after target_start_ns and
(b) every PE in a multi-cube launch has identical pe_exec_start --
both regressions silently pass on the existing
tests/test_kernel_launch_sync.py because that test only inspects
post-aggregation max(pe_exec_ns).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:12:58 -07:00
mukesh 90874abbfe ADR-0023 D9: blocking credit-emit with full-path latency
PE_IPCQ._handle_recv now yields-from _delayed_credit_send instead of
spawning it as a fork, so the receiver's pe_exec_ns includes the
credit-return cost. _credit_latency_ns switches from
compute_drain_ns(path, 16) to compute_path_latency_ns(path, 16) and
fixes a latent find_path bug where the destination lacked the
".pe_dma" suffix (silently returned 0 ns under the bare except).

Net effect on h3/h4 inter-cube pe-to-pe latency: IPCQ >= raw DMA at
every size, matching real-HW posted-write semantics. tl.send remains
fire-and-forget. ADR-0023 D9 amended; new diagnostic test
tests/test_pe_to_pe_diagnostic.py captures per-PE pe_exec_ns, paths,
drain, and meta-arrival timing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:12:38 -07:00
24 changed files with 1538 additions and 187 deletions
@@ -86,11 +86,36 @@ Mechanism.
- `KernelLaunchMsg` carries an optional `target_start_ns: float | None`. - `KernelLaunchMsg` carries an optional `target_start_ns: float | None`.
- **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it - **IO_CPU** is the canonical stamper. On fan-out to M_CPUs, it
computes `target_start_ns = env.now + max_latency` where `max_latency` computes `target_start_ns = env.now + max_latency` where
is the maximum `ComponentContext.compute_path_latency_ns(path)` across `max_latency` is the maximum, over every target (sip, cube, pe)
every target (sip, cube, pe) tuple — `path = find_node_path(io_cpu, tuple, of the **two-leg dispatch chain**:
pe_cpu_id)`. The stamped value is placed on the request carried by
every fanned-out sub-Transaction. ```
max_latency(sip, cube, pe) =
compute_path_latency_ns(find_node_path(io_cpu, m_cpu(sip, cube)))
+ compute_path_latency_ns(find_node_path(m_cpu(sip, cube), pe_cpu))
- io_cpu.overhead_ns
- m_cpu.overhead_ns
```
This models the actual dispatch as **two sequential Transactions**
(IO_CPU → M_CPU, then M_CPU → PE_CPU). Each leg's
`compute_path_latency_ns` adds its endpoints' `overhead_ns`;
`io_cpu.overhead_ns` is subtracted because IO_CPU has already
paid it before this method runs, and `m_cpu.overhead_ns` is
subtracted once because it appears as endpoint of leg1 *and*
start of leg2 but is paid only once at run time. A single
`find_node_path(io_cpu, pe_cpu)` walk is **not** equivalent —
it can pick a graph path that bypasses M_CPU and silently
under-shoots the prediction for far cubes, breaking the D5
invariant.
The fanned-out sub-Transactions carry **`nbytes = 0`** for
`KernelLaunchMsg` (control message only). Without this,
large kernel-launch payloads would occupy fabric BW on the
shared first hop and serialize the per-cube dispatch, pushing
far M_CPUs past `target_start_ns` and re-introducing the
late-arrival violation.
- **M_CPU** passes an already-stamped `target_start_ns` through - **M_CPU** passes an already-stamped `target_start_ns` through
unchanged. Only when the value is absent (e.g. a direct unchanged. Only when the value is absent (e.g. a direct
launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier launch-to-M_CPU unit test) does M_CPU compute a per-cube barrier
+25 -8
View File
@@ -372,24 +372,41 @@ When the receiver frees a slot, the sender must learn about it
travel through general vc_comm fabric — it uses a **separate fast travel through general vc_comm fabric — it uses a **separate fast
path**, an abstraction of the NVLink / UCIe credit-return wire. path**, an abstraction of the NVLink / UCIe credit-return wire.
**Latency** is computed from the **bottleneck BW on the path**, not a **Latency** is computed from the **full path latency** (per-node
magic constant: overhead + edge propagation + drain), not a magic constant:
``` ```
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes) credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
path = router.find_path(self_pe, peer_pe) path = router.find_path(self_pe, peer_pe.pe_dma)
latency = compute_drain_ns(path, credit_size_bytes) latency = compute_path_latency_ns(path, credit_size_bytes)
= credit_size_bytes / bottleneck_bw_on_path = sum(edge.distance_mm * ns_per_mm)
+ sum(node_overhead_ns[n] for n in path)
+ credit_size_bytes / bottleneck_bw_on_path
``` ```
The router auto-appends `.pe_dma` to the source only, so the
destination MUST be spelled with the explicit `.pe_dma` suffix or
`find_path` raises and the credit silently teleports at zero cost
(latent bug fixed alongside this update).
`tl.recv` blocks on the credit-emit completion (recv yields-from
`_delayed_credit_send` rather than spawning it as a fork). This puts
the credit-return cost on the receiver's `pe_exec_ns`, modeling the
IPCQ control-plane completing the consume-acknowledgement before
recv returns to the kernel — the protocol equivalent of a non-posted
`tl.store` waiting for an HBM ack on the raw DMA path.
That gives us: That gives us:
- **Topology-proportional approximation**: an in-cube credit return is - **Topology-proportional approximation**: an in-cube credit return is
automatically faster than a cross-SIP credit return. automatically faster than a cross-SIP credit return.
- **No magic constants**: no arbitrary `ipcq_ctrl_latency_ns`. - **No magic constants**: every nanosecond comes from
`compute_path_latency_ns` on the same edge_map and `node_overhead_ns`
as data traffic.
- **No deadlock risk**: unlike piggyback, B can issue credit even when - **No deadlock risk**: unlike piggyback, B can issue credit even when
it has no data to send back. it has no data to send back. `peer_credit_store.put` is unbounded.
- **Reuses existing utility**: `ComponentContext.compute_drain_ns`. - **`IPCQ ≥ raw DMA`** for matched physical moves — the credit-emit
cost on recv balances the HBM ack-trip cost RAW pays on the sender.
#### Component coupling — SimPy Store channel #### Component coupling — SimPy Store channel
+26 -10
View File
@@ -365,23 +365,39 @@ data 경로의 piggyback 모델과 달리, credit return은 일반 vc_comm fabri
거치지 않고 **별도 fast path**로 처리한다. 이는 실제 HW의 NVLink/UCIe 거치지 않고 **별도 fast path**로 처리한다. 이는 실제 HW의 NVLink/UCIe
credit return fast path를 추상화한 것이다. credit return fast path를 추상화한 것이다.
**Latency 계산**: magic constant가 아니라 **라우팅 경로의 bottleneck BW** **Latency 계산**: magic constant가 아니라 **라우팅 경로의 full path
기준으로 산출한다. latency** (per-node overhead + edge propagation + drain) 기준으로
산출한다.
``` ```
credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes) credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes)
path = router.find_path(self_pe, peer_pe) path = router.find_path(self_pe, peer_pe.pe_dma)
latency = compute_drain_ns(path, credit_size_bytes) latency = compute_path_latency_ns(path, credit_size_bytes)
= credit_size_bytes / bottleneck_bw_on_path = sum(edge.distance_mm * ns_per_mm)
+ sum(node_overhead_ns[n] for n in path)
+ credit_size_bytes / bottleneck_bw_on_path
``` ```
router는 source에만 `.pe_dma`를 자동 부여하므로 destination에는 반드시
`.pe_dma` suffix를 명시해야 한다. 그렇지 않으면 `find_path`가 raise하고
credit이 0 cost로 silently teleport되는 latent bug가 발생한다 (이번
업데이트에서 수정됨).
`tl.recv`는 credit-emit 완료를 yield-from으로 기다린다 (이전에는
`env.process`로 fork). 이로써 credit-return cost가 receiver의
`pe_exec_ns`에 반영되어, IPCQ control-plane이 consume-acknowledgement를
완료한 뒤에야 recv가 kernel에 반환된다 — RAW DMA의 non-posted `tl.store`
HBM ack-trip을 기다리는 것의 protocol-level 등가물이다.
이로써: 이로써:
- **토폴로지 비례 approximation**: cube 내 credit return과 cross-SIP credit이 - **토폴로지 비례 approximation**: cube 내 credit return과 cross-SIP credit이
자동으로 다른 latency를 가짐 (정확한 값은 아니지만 magic constant보다 의미 있음) 자동으로 다른 latency를 가짐
- **Magic constant 없음**: 별도 `ipcq_ctrl_latency_ns` 같은 임의 값 불필요 - **Magic constant 없음**: 모든 ns 값이 데이터 트래픽과 동일한 edge_map
- **Deadlock 위험 없음**: piggyback과 달리 B가 A에게 보낼 데이터가 없어도 `node_overhead_ns`에서 산출되는 `compute_path_latency_ns`로부터 옴
credit이 자동 발행됨 - **Deadlock 위험 없음**: `peer_credit_store.put`은 unbounded, B가 A에게
- **기존 utility 재사용**: `ComponentContext.compute_drain_ns` 그대로 사용 보낼 데이터가 없어도 credit이 자동 발행됨
- **`IPCQ ≥ raw DMA`** 보장: matched physical move에 대해 credit-emit이
RAW의 ack-trip cost와 균형을 이룸
``` ```
PE B: tl.recv(W) → 데이터 가져감 → my_tail++ PE B: tl.recv(W) → 데이터 가져감 → my_tail++
+95 -36
View File
@@ -1,22 +1,24 @@
"""SFR configuration for intercube + inter-SIP IPCQ wiring. """SFR configuration for the full IPCQ hardware wiring.
Provides ``configure_sfr_intercube_multisip`` which programs PE_IPCQ Installs PE_IPCQ neighbor tables modeling the physical hardware.
neighbor tables for: Wiring is independent of DPPolicy / kernel choice — the kernel decides
at runtime which links to use.
1. Intercube within each SIP — pe0 of every cube connects to pe0 of Direction label namespaces (disjoint):
its N/S/E/W mesh neighbors (no wrap-around).
2. Inter-SIP on ALL cubes — pe0 of cube_c on sip_A connects to pe0 of
cube_c on each peer SIP, using ``global_E``/``global_W`` (ring) or
``global_N``/``global_S``/``global_E``/``global_W`` (mesh/torus)
direction labels. Wiring all cubes allows the kernel to
dynamically elect the root cube at runtime.
SIP-level topology is read from ``topology.yaml`` → - Intra-cube PE-to-PE: ``intra_N / intra_S / intra_E / intra_W``
``system.sips.topology`` (e.g. ``ring_1d``, ``mesh_2d``). Logical 2×4 PE grid within a cube (no wrap):
Intercube mesh dimensions come from ``sip.cube_mesh.w/h``.
Internally delegates to ``install_ipcq`` with a computed ``rank_to_pe`` Row 0: pe0 pe1 pe2 pe3
(pe0-only) and a closure-captured ``neighbors()`` function. Row 1: pe4 pe5 pe6 pe7
- Intercube same-lane: ``N / S / E / W``
``pe_i of cube_A ↔ pe_i of cube_B`` across the 4×4 cube mesh
(no wrap). Every PE i ∈ [0..7] wired independently.
- Inter-SIP same-(cube, pe): ``global_N / global_S / global_E / global_W``
``pe_i of cube_c on sip_A ↔ pe_i of cube_c on sip_B`` per
``topology.yaml → system.sips.topology``.
""" """
from __future__ import annotations from __future__ import annotations
@@ -27,12 +29,46 @@ from kernbench.ccl.install import install_ipcq
from kernbench.ccl.topologies import _BUILTIN as _TOPO_BUILTINS from kernbench.ccl.topologies import _BUILTIN as _TOPO_BUILTINS
# ── Intra-cube 2×4 PE grid ───────────────────────────────────────────
_PE_GRID_COLS = 4
_PE_GRID_ROWS = 2
_PES_PER_CUBE = _PE_GRID_COLS * _PE_GRID_ROWS # 8
def _intra_cube_neighbors(pe: int) -> dict[str, int]:
"""Logical 2×4 PE grid neighbors within a cube (no wrap).
Returns directions in the ``intra_*`` namespace.
"""
row, col = divmod(pe, _PE_GRID_COLS)
nbrs: dict[str, int] = {}
if col < _PE_GRID_COLS - 1:
nbrs["intra_E"] = row * _PE_GRID_COLS + (col + 1)
if col > 0:
nbrs["intra_W"] = row * _PE_GRID_COLS + (col - 1)
if row < _PE_GRID_ROWS - 1:
nbrs["intra_S"] = (row + 1) * _PE_GRID_COLS + col
if row > 0:
nbrs["intra_N"] = (row - 1) * _PE_GRID_COLS + col
return nbrs
# ── Public entry point ───────────────────────────────────────────────
def configure_sfr_intercube_multisip( def configure_sfr_intercube_multisip(
engine: Any, engine: Any,
spec: dict, spec: dict,
cfg: dict, cfg: dict,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Wire IPCQ for intercube (pe0, mesh) + inter-SIP (pe0, all cubes). """Wire the full IPCQ hardware model.
Every PE on every cube on every SIP gets neighbor table entries for:
- intra-cube (2×4 grid) in the ``intra_*`` namespace
- intercube same-lane (4×4 cube mesh, no wrap) in ``N/S/E/W``
- inter-SIP same-(cube, pe) in ``global_*``
Args: Args:
engine: GraphEngine with ``_components``. engine: GraphEngine with ``_components``.
@@ -46,48 +82,71 @@ def configure_sfr_intercube_multisip(
mesh_w = int(cm["w"]) mesh_w = int(cm["w"])
mesh_h = int(cm["h"]) mesh_h = int(cm["h"])
n_cubes = mesh_w * mesh_h n_cubes = mesh_w * mesh_h
n_sips = int(spec.get("system", {}).get("sips", {}).get("count", 1)) sips_cfg = spec.get("system", {}).get("sips", {})
sip_topology = str( n_sips = int(sips_cfg.get("count", 1))
spec.get("system", {}).get("sips", {}).get("topology", "ring_1d") sip_topology = str(sips_cfg.get("topology", "ring_1d"))
) sip_w = sips_cfg.get("w")
sip_h = sips_cfg.get("h")
sip_w = int(sip_w) if sip_w is not None else None
sip_h = int(sip_h) if sip_h is not None else None
if sip_topology not in _TOPO_BUILTINS: if sip_topology not in _TOPO_BUILTINS:
raise ValueError( raise ValueError(
f"Unknown sip topology '{sip_topology}'. " f"Unknown sip topology '{sip_topology}'. "
f"Available: {list(_TOPO_BUILTINS)}" f"Available: {list(_TOPO_BUILTINS)}"
) )
sip_topo_fn = _TOPO_BUILTINS[sip_topology] _sip_topo_fn_raw = _TOPO_BUILTINS[sip_topology]
world_size = n_sips * n_cubes def sip_topo_fn(rank: int, ws: int) -> dict:
if sip_w is not None and sip_h is not None:
try:
return _sip_topo_fn_raw(rank, ws, w=sip_w, h=sip_h)
except TypeError:
pass
return _sip_topo_fn_raw(rank, ws)
pes_per_cube = _PES_PER_CUBE
world_size = n_sips * n_cubes * pes_per_cube
pe_idx_to_pe: list[tuple[int, int, int]] = [ pe_idx_to_pe: list[tuple[int, int, int]] = [
(sip, cube, 0) (sip, cube, pe)
for sip in range(n_sips) for sip in range(n_sips)
for cube in range(n_cubes) for cube in range(n_cubes)
for pe in range(pes_per_cube)
] ]
def _pe_idx(sip: int, cube: int, pe: int) -> int:
return (sip * n_cubes + cube) * pes_per_cube + pe
def _neighbors(pe_idx: int, ws: int, _base: dict) -> dict[str, int]: def _neighbors(pe_idx: int, ws: int, _base: dict) -> dict[str, int]:
sip = pe_idx // n_cubes tmp = pe_idx
cube = pe_idx % n_cubes pe = tmp % pes_per_cube
tmp //= pes_per_cube
cube = tmp % n_cubes
sip = tmp // n_cubes
row = cube // mesh_w row = cube // mesh_w
col = cube % mesh_w col = cube % mesh_w
nbrs: dict[str, int] = {} nbrs: dict[str, int] = {}
# Intercube within SIP (mesh, no wrap-around) # ── Intra-cube (intra_N/S/E/W) ──
if col < mesh_w - 1: for d, peer_pe in _intra_cube_neighbors(pe).items():
nbrs["E"] = sip * n_cubes + (row * mesh_w + col + 1) nbrs[d] = _pe_idx(sip, cube, peer_pe)
if col > 0:
nbrs["W"] = sip * n_cubes + (row * mesh_w + col - 1)
if row < mesh_h - 1:
nbrs["S"] = sip * n_cubes + ((row + 1) * mesh_w + col)
if row > 0:
nbrs["N"] = sip * n_cubes + ((row - 1) * mesh_w + col)
# Inter-SIP on ALL cubes # ── Intercube same-lane (N/S/E/W, 4×4 no wrap) ──
if col < mesh_w - 1:
nbrs["E"] = _pe_idx(sip, row * mesh_w + (col + 1), pe)
if col > 0:
nbrs["W"] = _pe_idx(sip, row * mesh_w + (col - 1), pe)
if row < mesh_h - 1:
nbrs["S"] = _pe_idx(sip, (row + 1) * mesh_w + col, pe)
if row > 0:
nbrs["N"] = _pe_idx(sip, (row - 1) * mesh_w + col, pe)
# ── Inter-SIP same-(cube, pe) (global_*) ──
if n_sips > 1: if n_sips > 1:
sip_nbrs = sip_topo_fn(sip, n_sips) sip_nbrs = sip_topo_fn(sip, n_sips)
for d, peer_sip in sip_nbrs.items(): for d, peer_sip in sip_nbrs.items():
nbrs[f"global_{d}"] = peer_sip * n_cubes + cube nbrs[f"global_{d}"] = _pe_idx(peer_sip, cube, pe)
return nbrs return nbrs
+49 -37
View File
@@ -33,23 +33,41 @@ def ring_1d_unidir(rank: int, world_size: int) -> NeighborMap:
return {"E": (rank + 1) % world_size} return {"E": (rank + 1) % world_size}
def mesh_2d(rank: int, world_size: int) -> NeighborMap: def _resolve_2d_dims(
"""Square 2D mesh (N/S/E/W). world_size: int, w: int | None, h: int | None, name: str,
) -> tuple[int, int]:
Layout: rank = row * side + col, with side = sqrt(world_size). if w is not None and h is not None:
Wrap-around (torus) on all four edges. if w * h != world_size:
""" raise ValueError(
f"{name}: w*h ({w}*{h}) != world_size ({world_size})"
)
return w, h
side = int(round(world_size ** 0.5)) side = int(round(world_size ** 0.5))
if side * side != world_size: if side * side != world_size:
raise ValueError( raise ValueError(
f"mesh_2d requires square world_size, got {world_size}" f"{name} requires square world_size or explicit w,h, "
f"got {world_size}"
) )
r, c = divmod(rank, side) return side, side
def mesh_2d(
rank: int, world_size: int,
w: int | None = None, h: int | None = None,
) -> NeighborMap:
"""2D mesh (N/S/E/W) with wrap-around on all four edges.
Layout: rank = row * w + col. When w, h are given, supports
rectangular (e.g. 2x3) layouts. Otherwise falls back to square
side = sqrt(world_size).
"""
w, h = _resolve_2d_dims(world_size, w, h, "mesh_2d")
r, c = divmod(rank, w)
return { return {
"N": ((r - 1) % side) * side + c, "N": ((r - 1) % h) * w + c,
"S": ((r + 1) % side) * side + c, "S": ((r + 1) % h) * w + c,
"W": r * side + (c - 1) % side, "W": r * w + (c - 1) % w,
"E": r * side + (c + 1) % side, "E": r * w + (c + 1) % w,
} }
@@ -73,36 +91,30 @@ def tree_binary(rank: int, world_size: int) -> NeighborMap:
return n return n
def torus_2d(rank: int, world_size: int) -> NeighborMap: def torus_2d(
"""Square 2D torus (N/S/E/W) with wrap-around on all edges. rank: int, world_size: int,
w: int | None = None, h: int | None = None,
Alias for mesh_2d (which already wraps). Explicit name for clarity ) -> NeighborMap:
when used as a SIP-level topology. """2D torus (N/S/E/W) with wrap-around on all edges. Alias for mesh_2d."""
""" return mesh_2d(rank, world_size, w=w, h=h)
return mesh_2d(rank, world_size)
def mesh_2d_no_wrap(rank: int, world_size: int) -> NeighborMap: def mesh_2d_no_wrap(
"""Square 2D mesh (N/S/E/W) WITHOUT wrap-around. rank: int, world_size: int,
w: int | None = None, h: int | None = None,
Edge nodes have fewer neighbors (no wrapping). Used for SIP-level ) -> NeighborMap:
topologies where physical links don't wrap. """2D mesh (N/S/E/W) WITHOUT wrap-around. Supports rectangular dims."""
""" w, h = _resolve_2d_dims(world_size, w, h, "mesh_2d_no_wrap")
side = int(round(world_size ** 0.5)) r, c = divmod(rank, w)
if side * side != world_size:
raise ValueError(
f"mesh_2d_no_wrap requires square world_size, got {world_size}"
)
r, c = divmod(rank, side)
n: NeighborMap = {} n: NeighborMap = {}
if r > 0: if r > 0:
n["N"] = (r - 1) * side + c n["N"] = (r - 1) * w + c
if r < side - 1: if r < h - 1:
n["S"] = (r + 1) * side + c n["S"] = (r + 1) * w + c
if c > 0: if c > 0:
n["W"] = r * side + (c - 1) n["W"] = r * w + (c - 1)
if c < side - 1: if c < w - 1:
n["E"] = r * side + (c + 1) n["E"] = r * w + (c + 1)
return n return n
+28 -7
View File
@@ -86,26 +86,41 @@ class IoCpuComponent(ComponentBase):
# For KernelLaunchMsg, compute the global barrier once here so # For KernelLaunchMsg, compute the global barrier once here so
# every downstream PE_CPU uses the same target_start_ns. # every downstream PE_CPU uses the same target_start_ns.
if isinstance(request, KernelLaunchMsg): if isinstance(request, KernelLaunchMsg):
io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0)
global_max_latency = 0.0 global_max_latency = 0.0
pe_ids = self._resolve_pe_ids( pe_ids = self._resolve_pe_ids(
getattr(request, "target_pe", "all") getattr(request, "target_pe", "all")
) )
for sip, cube in cube_targets: for sip, cube in cube_targets:
try:
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
io_to_m_path = self.ctx.router.find_node_path(
self.node.id, m_cpu_id,
)
except Exception:
continue
if len(io_to_m_path) < 2:
continue
leg1 = self.ctx.compute_path_latency_ns(
io_to_m_path, nbytes=0,
)
m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0)
for pe_id in pe_ids: for pe_id in pe_ids:
pe_cpu_id = ( pe_cpu_id = (
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
) )
try: try:
path = self.ctx.router.find_node_path( m_to_pe_path = self.ctx.router.find_node_path(
self.node.id, pe_cpu_id, m_cpu_id, pe_cpu_id,
) )
except Exception: except Exception:
continue continue
if len(path) < 2: if len(m_to_pe_path) < 2:
continue continue
latency = self.ctx.compute_path_latency_ns( leg2 = self.ctx.compute_path_latency_ns(
path, nbytes=0, m_to_pe_path, nbytes=0,
) )
latency = leg1 + leg2 - io_overhead - m_overhead
if latency > global_max_latency: if latency > global_max_latency:
global_max_latency = latency global_max_latency = latency
request = dataclasses.replace( request = dataclasses.replace(
@@ -116,7 +131,12 @@ class IoCpuComponent(ComponentBase):
# Setup aggregation # Setup aggregation
self._pending[request.request_id] = (len(cube_targets), 0, txn.done) self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
# Fan out to each target cube's M_CPU # Fan out to each target cube's M_CPU. Kernel-launch fanout
# carries control metadata only; nbytes is forced to 0 for
# KernelLaunchMsg so the launch sub-txns do not occupy data-fabric
# BW (would otherwise serialize 16 cubes worth of fanout on the
# shared first hop and break ADR-0009 D5's barrier prediction).
is_kernel_launch = isinstance(request, KernelLaunchMsg)
for sip, cube in cube_targets: for sip, cube in cube_targets:
try: try:
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
@@ -127,7 +147,8 @@ class IoCpuComponent(ComponentBase):
continue continue
sub_txn = Transaction( sub_txn = Transaction(
request=request, path=path, step=0, request=request, path=path, step=0,
nbytes=txn.nbytes, done=env.event(), nbytes=0 if is_kernel_launch else txn.nbytes,
done=env.event(),
result_data=txn.result_data, result_data=txn.result_data,
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
+18 -7
View File
@@ -338,9 +338,13 @@ class PeIpcqComponent(ComponentBase):
nbytes=req.result_data.get("nbytes", 0), nbytes=req.result_data.get("nbytes", 0),
) )
# Fast path credit return — bottleneck BW based latency # Credit return: recv blocks on credit-emit so the protocol cost
env.process( # (full path latency to deliver the credit metadata back to the
self._delayed_credit_send(env, direction, qp["peer_credit_store"], qp["my_tail"]) # sender) is reflected in the recv's pe_exec_ns. Models the IPCQ
# control-plane completing the consume-acknowledgement before
# recv returns to the kernel.
yield from self._delayed_credit_send(
env, direction, qp["peer_credit_store"], qp["my_tail"],
) )
if not req.done.triggered: if not req.done.triggered:
@@ -455,7 +459,12 @@ class PeIpcqComponent(ComponentBase):
yield peer_credit_store.put(meta) yield peer_credit_store.put(meta)
def _credit_latency_ns(self, direction: str) -> float: def _credit_latency_ns(self, direction: str) -> float:
"""Compute credit fast path latency = credit_size / bottleneck_bw. """Full path latency for the credit-return packet.
Pays per-node overhead + edge prop + drain along the same fabric
the data took. PathRouter.find_path() auto-appends ".pe_dma" to
the source only, so the destination MUST be spelled with the
explicit ".pe_dma" suffix.
Falls back to 0 when ctx/router is unavailable (unit-test mode). Falls back to 0 when ctx/router is unavailable (unit-test mode).
""" """
@@ -463,10 +472,12 @@ class PeIpcqComponent(ComponentBase):
return 0.0 return 0.0
qp = self._queue_pairs[direction] qp = self._queue_pairs[direction]
peer = qp["peer"] peer = qp["peer"]
peer_pe_prefix = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}" peer_pe_dma = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}.pe_dma"
try: try:
path = self.ctx.router.find_path(self._pe_prefix, peer_pe_prefix) path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma)
return self.ctx.compute_drain_ns(path, self._credit_size_bytes) return self.ctx.compute_path_latency_ns(
path, self._credit_size_bytes,
)
except Exception: except Exception:
return 0.0 return 0.0
@@ -79,26 +79,41 @@ class IoCpuComponent(ComponentBase):
return return
if isinstance(request, KernelLaunchMsg): if isinstance(request, KernelLaunchMsg):
io_overhead = self.ctx.node_overhead_ns.get(self.node.id, 0.0)
global_max_latency = 0.0 global_max_latency = 0.0
pe_ids = self._resolve_pe_ids( pe_ids = self._resolve_pe_ids(
getattr(request, "target_pe", "all") getattr(request, "target_pe", "all")
) )
for sip, cube in cube_targets: for sip, cube in cube_targets:
try:
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
io_to_m_path = self.ctx.router.find_node_path(
self.node.id, m_cpu_id,
)
except Exception:
continue
if len(io_to_m_path) < 2:
continue
leg1 = self.ctx.compute_path_latency_ns(
io_to_m_path, nbytes=0,
)
m_overhead = self.ctx.node_overhead_ns.get(m_cpu_id, 0.0)
for pe_id in pe_ids: for pe_id in pe_ids:
pe_cpu_id = ( pe_cpu_id = (
f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu" f"sip{sip}.cube{cube}.pe{pe_id}.pe_cpu"
) )
try: try:
path = self.ctx.router.find_node_path( m_to_pe_path = self.ctx.router.find_node_path(
self.node.id, pe_cpu_id, m_cpu_id, pe_cpu_id,
) )
except Exception: except Exception:
continue continue
if len(path) < 2: if len(m_to_pe_path) < 2:
continue continue
latency = self.ctx.compute_path_latency_ns( leg2 = self.ctx.compute_path_latency_ns(
path, nbytes=0, m_to_pe_path, nbytes=0,
) )
latency = leg1 + leg2 - io_overhead - m_overhead
if latency > global_max_latency: if latency > global_max_latency:
global_max_latency = latency global_max_latency = latency
request = dataclasses.replace( request = dataclasses.replace(
@@ -109,7 +124,7 @@ class IoCpuComponent(ComponentBase):
# Setup aggregation # Setup aggregation
self._pending[request.request_id] = (len(cube_targets), 0, txn.done) self._pending[request.request_id] = (len(cube_targets), 0, txn.done)
# Fan out to each target cube's M_CPU is_kernel_launch = isinstance(request, KernelLaunchMsg)
for sip, cube in cube_targets: for sip, cube in cube_targets:
try: try:
m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube) m_cpu_id = self.ctx.resolver.find_m_cpu(sip, cube)
@@ -120,7 +135,8 @@ class IoCpuComponent(ComponentBase):
continue continue
sub_txn = Transaction( sub_txn = Transaction(
request=request, path=path, step=0, request=request, path=path, step=0,
nbytes=txn.nbytes, done=env.event(), nbytes=0 if is_kernel_launch else txn.nbytes,
done=env.event(),
result_data=txn.result_data, result_data=txn.result_data,
) )
yield self.out_ports[path[1]].put(sub_txn.advance()) yield self.out_ports[path[1]].put(sub_txn.advance())
Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

+34
View File
@@ -0,0 +1,34 @@
algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns
intercube_allreduce,ring_1d,6,8,16,256,3073.1299999999937
intercube_allreduce,ring_1d,6,32,64,1024,3079.8799999999947
intercube_allreduce,ring_1d,6,64,128,2048,3088.879999999992
intercube_allreduce,ring_1d,6,128,256,4096,3106.8799999999865
intercube_allreduce,ring_1d,6,512,1024,16384,3225.8799999999865
intercube_allreduce,ring_1d,6,1024,2048,32768,3391.8799999999865
intercube_allreduce,ring_1d,6,2048,4096,65536,3723.8799999999865
intercube_allreduce,ring_1d,6,4096,8192,131072,4387.879999999965
intercube_allreduce,ring_1d,6,8192,16384,262144,5715.879999999957
intercube_allreduce,ring_1d,6,16384,32768,524288,8371.879999999932
intercube_allreduce,ring_1d,6,32768,65536,1048576,13683.879999999903
intercube_allreduce,torus_2d,6,8,16,256,2190.4799999999923
intercube_allreduce,torus_2d,6,32,64,1024,2196.479999999993
intercube_allreduce,torus_2d,6,64,128,2048,2204.4799999999905
intercube_allreduce,torus_2d,6,128,256,4096,2220.479999999985
intercube_allreduce,torus_2d,6,512,1024,16384,2325.479999999985
intercube_allreduce,torus_2d,6,1024,2048,32768,2471.479999999985
intercube_allreduce,torus_2d,6,2048,4096,65536,2763.479999999985
intercube_allreduce,torus_2d,6,4096,8192,131072,3347.4799999999777
intercube_allreduce,torus_2d,6,8192,16384,262144,4515.4799999999705
intercube_allreduce,torus_2d,6,16384,32768,524288,6851.479999999952
intercube_allreduce,torus_2d,6,32768,65536,1048576,11523.479999999923
intercube_allreduce,mesh_2d_no_wrap,6,8,16,256,3508.4249999999993
intercube_allreduce,mesh_2d_no_wrap,6,32,64,1024,3515.55
intercube_allreduce,mesh_2d_no_wrap,6,64,128,2048,3525.0499999999975
intercube_allreduce,mesh_2d_no_wrap,6,128,256,4096,3544.049999999992
intercube_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3667.049999999992
intercube_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3837.049999999992
intercube_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4177.049999999992
intercube_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,4857.049999999959
intercube_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,6217.049999999945
intercube_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,8937.049999999937
intercube_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,14377.049999999872
1 algorithm sip_topology n_sips n_elem bytes_per_pe bytes_per_sip latency_ns
2 intercube_allreduce ring_1d 6 8 16 256 3073.1299999999937
3 intercube_allreduce ring_1d 6 32 64 1024 3079.8799999999947
4 intercube_allreduce ring_1d 6 64 128 2048 3088.879999999992
5 intercube_allreduce ring_1d 6 128 256 4096 3106.8799999999865
6 intercube_allreduce ring_1d 6 512 1024 16384 3225.8799999999865
7 intercube_allreduce ring_1d 6 1024 2048 32768 3391.8799999999865
8 intercube_allreduce ring_1d 6 2048 4096 65536 3723.8799999999865
9 intercube_allreduce ring_1d 6 4096 8192 131072 4387.879999999965
10 intercube_allreduce ring_1d 6 8192 16384 262144 5715.879999999957
11 intercube_allreduce ring_1d 6 16384 32768 524288 8371.879999999932
12 intercube_allreduce ring_1d 6 32768 65536 1048576 13683.879999999903
13 intercube_allreduce torus_2d 6 8 16 256 2190.4799999999923
14 intercube_allreduce torus_2d 6 32 64 1024 2196.479999999993
15 intercube_allreduce torus_2d 6 64 128 2048 2204.4799999999905
16 intercube_allreduce torus_2d 6 128 256 4096 2220.479999999985
17 intercube_allreduce torus_2d 6 512 1024 16384 2325.479999999985
18 intercube_allreduce torus_2d 6 1024 2048 32768 2471.479999999985
19 intercube_allreduce torus_2d 6 2048 4096 65536 2763.479999999985
20 intercube_allreduce torus_2d 6 4096 8192 131072 3347.4799999999777
21 intercube_allreduce torus_2d 6 8192 16384 262144 4515.4799999999705
22 intercube_allreduce torus_2d 6 16384 32768 524288 6851.479999999952
23 intercube_allreduce torus_2d 6 32768 65536 1048576 11523.479999999923
24 intercube_allreduce mesh_2d_no_wrap 6 8 16 256 3508.4249999999993
25 intercube_allreduce mesh_2d_no_wrap 6 32 64 1024 3515.55
26 intercube_allreduce mesh_2d_no_wrap 6 64 128 2048 3525.0499999999975
27 intercube_allreduce mesh_2d_no_wrap 6 128 256 4096 3544.049999999992
28 intercube_allreduce mesh_2d_no_wrap 6 512 1024 16384 3667.049999999992
29 intercube_allreduce mesh_2d_no_wrap 6 1024 2048 32768 3837.049999999992
30 intercube_allreduce mesh_2d_no_wrap 6 2048 4096 65536 4177.049999999992
31 intercube_allreduce mesh_2d_no_wrap 6 4096 8192 131072 4857.049999999959
32 intercube_allreduce mesh_2d_no_wrap 6 8192 16384 262144 6217.049999999945
33 intercube_allreduce mesh_2d_no_wrap 6 16384 32768 524288 8937.049999999937
34 intercube_allreduce mesh_2d_no_wrap 6 32768 65536 1048576 14377.049999999872
Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 129 KiB

+91
View File
@@ -0,0 +1,91 @@
hop,label,size_bytes,path,total_ns
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,ipcq,31.1399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,raw,12.019999999996799
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,ipcq,32.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,raw,13.019999999996799
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,ipcq,34.1399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,raw,14.019999999996799
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,ipcq,35.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,raw,15.019999999996799
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,ipcq,38.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,raw,17.0199999999968
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,ipcq,41.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,raw,19.0199999999968
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,ipcq,53.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,raw,27.0199999999968
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,ipcq,77.6399999999976
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,raw,43.0199999999968
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,ipcq,125.64000000000306
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,raw,75.02000000000407
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,ipcq,149.64000000000306
h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,raw,91.02000000000407
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,ipcq,31.1399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,raw,12.019999999996799
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,ipcq,32.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,raw,13.019999999996799
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,ipcq,34.1399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,raw,14.019999999996799
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,ipcq,35.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,raw,15.019999999996799
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,ipcq,38.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,raw,17.0199999999968
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,ipcq,41.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,raw,19.0199999999968
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,ipcq,53.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,raw,27.0199999999968
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,ipcq,77.6399999999976
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,raw,43.0199999999968
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,ipcq,125.64000000000306
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,raw,75.02000000000407
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,ipcq,149.64000000000306
h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,raw,91.02000000000407
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,ipcq,67.15999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,raw,68.53999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,ipcq,68.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,raw,70.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,ipcq,70.15999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,raw,71.53999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,ipcq,71.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,raw,73.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,ipcq,74.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,raw,76.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,ipcq,77.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,raw,79.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,ipcq,89.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,raw,91.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,ipcq,113.65999999999804
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,raw,115.03999999999724
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,ipcq,161.65999999999985
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,raw,163.04000000000087
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,ipcq,185.65999999999985
h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,raw,187.04000000000087
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,ipcq,87.15999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,raw,88.53999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,ipcq,88.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,raw,90.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,ipcq,90.15999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,raw,91.53999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,ipcq,91.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,raw,93.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,ipcq,94.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,raw,96.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,ipcq,97.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,raw,99.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,ipcq,109.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,raw,111.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,ipcq,133.65999999999804
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,raw,135.03999999999724
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,ipcq,181.65999999999985
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,raw,183.04000000000087
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,ipcq,205.65999999999985
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,raw,207.04000000000087
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",128,ipcq,6.015000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",256,ipcq,6.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",384,ipcq,7.015000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",512,ipcq,7.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",768,ipcq,8.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",1024,ipcq,9.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",2048,ipcq,13.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",4096,ipcq,21.515000000003056
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",8192,ipcq,37.51499999999214
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",10240,ipcq,45.51499999999214
1 hop label size_bytes path total_ns
2 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 128 ipcq 31.1399999999976
3 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 128 raw 12.019999999996799
4 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 256 ipcq 32.6399999999976
5 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 256 raw 13.019999999996799
6 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 384 ipcq 34.1399999999976
7 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 384 raw 14.019999999996799
8 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 512 ipcq 35.6399999999976
9 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 512 raw 15.019999999996799
10 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 768 ipcq 38.6399999999976
11 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 768 raw 17.0199999999968
12 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 1024 ipcq 41.6399999999976
13 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 1024 raw 19.0199999999968
14 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 2048 ipcq 53.6399999999976
15 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 2048 raw 27.0199999999968
16 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 4096 ipcq 77.6399999999976
17 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 4096 raw 43.0199999999968
18 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 8192 ipcq 125.64000000000306
19 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 8192 raw 75.02000000000407
20 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 10240 ipcq 149.64000000000306
21 h1_intra_horizontal Intra-cube horizontal (pe0 to pe1) 10240 raw 91.02000000000407
22 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 128 ipcq 31.1399999999976
23 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 128 raw 12.019999999996799
24 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 256 ipcq 32.6399999999976
25 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 256 raw 13.019999999996799
26 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 384 ipcq 34.1399999999976
27 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 384 raw 14.019999999996799
28 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 512 ipcq 35.6399999999976
29 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 512 raw 15.019999999996799
30 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 768 ipcq 38.6399999999976
31 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 768 raw 17.0199999999968
32 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 1024 ipcq 41.6399999999976
33 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 1024 raw 19.0199999999968
34 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 2048 ipcq 53.6399999999976
35 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 2048 raw 27.0199999999968
36 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 4096 ipcq 77.6399999999976
37 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 4096 raw 43.0199999999968
38 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 8192 ipcq 125.64000000000306
39 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 8192 raw 75.02000000000407
40 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 10240 ipcq 149.64000000000306
41 h2_intra_vertical Intra-cube vertical (pe0 to pe4) 10240 raw 91.02000000000407
42 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 128 ipcq 67.15999999999804
43 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 128 raw 68.53999999999724
44 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 256 ipcq 68.65999999999804
45 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 256 raw 70.03999999999724
46 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 384 ipcq 70.15999999999804
47 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 384 raw 71.53999999999724
48 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 512 ipcq 71.65999999999804
49 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 512 raw 73.03999999999724
50 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 768 ipcq 74.65999999999804
51 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 768 raw 76.03999999999724
52 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 1024 ipcq 77.65999999999804
53 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 1024 raw 79.03999999999724
54 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 2048 ipcq 89.65999999999804
55 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 2048 raw 91.03999999999724
56 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 4096 ipcq 113.65999999999804
57 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 4096 raw 115.03999999999724
58 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 8192 ipcq 161.65999999999985
59 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 8192 raw 163.04000000000087
60 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 10240 ipcq 185.65999999999985
61 h3_inter_cube_horizontal Inter-cube horizontal (cube0 to cube1) 10240 raw 187.04000000000087
62 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 128 ipcq 87.15999999999804
63 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 128 raw 88.53999999999724
64 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 256 ipcq 88.65999999999804
65 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 256 raw 90.03999999999724
66 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 384 ipcq 90.15999999999804
67 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 384 raw 91.53999999999724
68 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 512 ipcq 91.65999999999804
69 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 512 raw 93.03999999999724
70 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 768 ipcq 94.65999999999804
71 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 768 raw 96.03999999999724
72 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 1024 ipcq 97.65999999999804
73 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 1024 raw 99.03999999999724
74 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 2048 ipcq 109.65999999999804
75 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 2048 raw 111.03999999999724
76 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 4096 ipcq 133.65999999999804
77 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 4096 raw 135.03999999999724
78 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 8192 ipcq 181.65999999999985
79 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 8192 raw 183.04000000000087
80 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 10240 ipcq 205.65999999999985
81 h4_inter_cube_vertical Inter-cube vertical (cube0 to cube4) 10240 raw 207.04000000000087
82 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 128 ipcq 6.015000000003056
83 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 256 ipcq 6.515000000003056
84 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 384 ipcq 7.015000000003056
85 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 512 ipcq 7.515000000003056
86 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 768 ipcq 8.515000000003056
87 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 1024 ipcq 9.515000000003056
88 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 2048 ipcq 13.515000000003056
89 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 4096 ipcq 21.515000000003056
90 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 8192 ipcq 37.51499999999214
91 h5_inter_sip Inter-SIP (sip0 to sip1, same cube/pe) 10240 ipcq 45.51499999999214
+78 -70
View File
@@ -22,13 +22,23 @@ from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy from kernbench.policy.placement.dp import DPPolicy
def _sip_topo_dims(sip_topo: str, n_sips: int) -> tuple[int, int]: def _sip_topo_dims(
sip_topo: str, n_sips: int,
spec_w: int | None = None, spec_h: int | None = None,
) -> tuple[int, int]:
if sip_topo == "ring_1d": if sip_topo == "ring_1d":
return (0, 0) return (0, 0)
if spec_w is not None and spec_h is not None:
if spec_w * spec_h != n_sips:
raise ValueError(
f"sip layout {spec_w}x{spec_h} != n_sips ({n_sips})"
)
return (spec_w, spec_h)
side = int(round(math.sqrt(n_sips))) side = int(round(math.sqrt(n_sips)))
if side * side != n_sips: if side * side != n_sips:
raise ValueError( raise ValueError(
f"SIP topology '{sip_topo}' requires square n_sips, got {n_sips}" f"SIP topology '{sip_topo}' requires square n_sips or "
f"explicit w/h in spec, got {n_sips}"
) )
return (side, side) return (side, side)
@@ -54,10 +64,13 @@ def run_allreduce(
topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND
n_elem = int(cfg.get("n_elem", 8)) n_elem = int(cfg.get("n_elem", 8))
n_sips = int(spec.get("system", {}).get("sips", {}).get("count", 1)) sips_cfg = spec.get("system", {}).get("sips", {})
sip_topo = str( n_sips = int(sips_cfg.get("count", 1))
spec.get("system", {}).get("sips", {}).get("topology", "ring_1d") sip_topo = str(sips_cfg.get("topology", "ring_1d"))
) spec_sip_w = sips_cfg.get("w")
spec_sip_h = sips_cfg.get("h")
spec_sip_w = int(spec_sip_w) if spec_sip_w is not None else None
spec_sip_h = int(spec_sip_h) if spec_sip_h is not None else None
cm = spec["sip"]["cube_mesh"] cm = spec["sip"]["cube_mesh"]
cube_w = int(cm["w"]) cube_w = int(cm["w"])
@@ -65,7 +78,9 @@ def run_allreduce(
n_cubes = cube_w * cube_h n_cubes = cube_w * cube_h
sip_topo_kind = topo_name_to_kind.get(sip_topo, 0) sip_topo_kind = topo_name_to_kind.get(sip_topo, 0)
sip_topo_w, sip_topo_h = _sip_topo_dims(sip_topo, n_sips) sip_topo_w, sip_topo_h = _sip_topo_dims(
sip_topo, n_sips, spec_w=spec_sip_w, spec_h=spec_sip_h,
)
algo_name = cfg.get("algorithm", "allreduce") algo_name = cfg.get("algorithm", "allreduce")
print(f"\n{'=' * 60}") print(f"\n{'=' * 60}")
@@ -173,20 +188,36 @@ from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
CONFIGS = [ CONFIGS = [
pytest.param("intercube_allreduce", "ring_1d", 2, id="ring_2sip"), pytest.param(
pytest.param("intercube_allreduce", "torus_2d", 4, id="torus_4sip"), "intercube_allreduce", "ring_1d", 6, None, None,
pytest.param("intercube_allreduce", "mesh_2d_no_wrap", 4, id="mesh_4sip"), id="ring_6sip",
),
pytest.param(
"intercube_allreduce", "torus_2d", 6, 2, 3,
id="torus_6sip_2x3",
),
pytest.param(
"intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3,
id="mesh_6sip_2x3",
),
] ]
def _write_temp_configs( def _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None, tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
sip_w=None, sip_h=None,
): ):
"""Write temp topology.yaml and ccl.yaml with the given overrides.""" """Write temp topology.yaml and ccl.yaml with the given overrides."""
with open(TOPOLOGY_PATH) as f: with open(TOPOLOGY_PATH) as f:
topo_cfg = yaml.safe_load(f) topo_cfg = yaml.safe_load(f)
topo_cfg["system"]["sips"]["count"] = n_sips topo_cfg["system"]["sips"]["count"] = n_sips
topo_cfg["system"]["sips"]["topology"] = sip_topology topo_cfg["system"]["sips"]["topology"] = sip_topology
if sip_w is not None and sip_h is not None:
topo_cfg["system"]["sips"]["w"] = int(sip_w)
topo_cfg["system"]["sips"]["h"] = int(sip_h)
else:
topo_cfg["system"]["sips"].pop("w", None)
topo_cfg["system"]["sips"].pop("h", None)
topo_path = tmp_path / "topology.yaml" topo_path = tmp_path / "topology.yaml"
with open(topo_path, "w") as f: with open(topo_path, "w") as f:
yaml.dump(topo_cfg, f, default_flow_style=False) yaml.dump(topo_cfg, f, default_flow_style=False)
@@ -211,10 +242,15 @@ def _write_temp_configs(
return str(topo_path), str(tmp_ccl) return str(topo_path), str(tmp_ccl)
@pytest.mark.parametrize("algorithm,sip_topology,n_sips", CONFIGS) @pytest.mark.parametrize(
def test_allreduce(tmp_path, algorithm, sip_topology, n_sips): "algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS,
)
def test_allreduce(
tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h,
):
topo_path, ccl_path = _write_temp_configs( topo_path, ccl_path = _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm, tmp_path, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
) )
topo = resolve_topology(topo_path) topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True) engine = GraphEngine(topo.topology_obj, enable_data=True)
@@ -271,16 +307,17 @@ def test_allreduce_latency_sweep(tmp_path):
records: list[dict] = [] records: list[dict] = []
# Apples-to-apples: same n_sips across all three topologies. # Apples-to-apples: same n_sips across all three topologies.
for algorithm, sip_topology, n_sips in [ for algorithm, sip_topology, n_sips, sip_w, sip_h in [
("intercube_allreduce", "ring_1d", 4), ("intercube_allreduce", "ring_1d", 6, None, None),
("intercube_allreduce", "torus_2d", 4), ("intercube_allreduce", "torus_2d", 6, 2, 3),
("intercube_allreduce", "mesh_2d_no_wrap", 4), ("intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3),
]: ]:
for n_elem in _SWEEP_N_ELEM: for n_elem in _SWEEP_N_ELEM:
sub = tmp_path / f"{sip_topology}_{n_elem}" sub = tmp_path / f"{sip_topology}_{n_elem}"
sub.mkdir() sub.mkdir()
topo_path, ccl_path = _write_temp_configs( topo_path, ccl_path = _write_temp_configs(
sub, sip_topology, n_sips, algorithm, sub, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
n_elem_override=n_elem, n_elem_override=n_elem,
) )
topo = resolve_topology(topo_path) topo = resolve_topology(topo_path)
@@ -339,8 +376,7 @@ def test_allreduce_latency_sweep(tmp_path):
w.writerow(r) w.writerow(r)
topologies = sorted({r["sip_topology"] for r in records}) topologies = sorted({r["sip_topology"] for r in records})
# Per-topology plots: log-scale + linear-scale side-by-side. # Per-topology plots, log-scale x-axis = bytes per PE.
# X-axis = bytes per PE (per-message payload size).
for topo_name in topologies: for topo_name in topologies:
rs = sorted( rs = sorted(
[r for r in records if r["sip_topology"] == topo_name], [r for r in records if r["sip_topology"] == topo_name],
@@ -352,7 +388,6 @@ def test_allreduce_latency_sweep(tmp_path):
f"Allreduce latency — {topo_name} " f"Allreduce latency — {topo_name} "
f"(n_sips={rs[0]['n_sips']})" f"(n_sips={rs[0]['n_sips']})"
) )
# Log-scale
fig, ax = plt.subplots(figsize=(8, 5)) fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(xs, ys, marker="o", color="tab:blue") ax.plot(xs, ys, marker="o", color="tab:blue")
ax.set_xscale("log", base=2) ax.set_xscale("log", base=2)
@@ -364,58 +399,31 @@ def test_allreduce_latency_sweep(tmp_path):
fig.tight_layout() fig.tight_layout()
fig.savefig(out_dir / f"{topo_name}.png", dpi=120) fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
plt.close(fig) plt.close(fig)
# Linear-scale companion
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(xs, ys, marker="o", color="tab:blue")
ax.set_xlabel("Bytes per PE")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title(title + " [linear scale]")
ax.grid(True, alpha=0.3)
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / f"{topo_name}_linear.png", dpi=120)
plt.close(fig)
# Combined overview — two variants: log-scale (overview.png) and
# linear-scale (overview_linear.png).
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange", colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
"mesh_2d_no_wrap": "tab:green"} "mesh_2d_no_wrap": "tab:green"}
fig, ax = plt.subplots(figsize=(9, 6))
for topo_name in topologies:
rs = sorted(
[r for r in records if r["sip_topology"] == topo_name],
key=lambda r: r["bytes_per_pe"],
)
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o",
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
color=colors.get(topo_name),
)
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title("Multi-device allreduce latency by topology")
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / "overview.png", dpi=120)
plt.close(fig)
def _draw_overview(log_x: bool, filename: str, title_suffix: str) -> None: print(f"\nWrote {out_dir / 'overview.png'}")
fig, ax = plt.subplots(figsize=(9, 6))
for topo_name in topologies:
rs = sorted(
[r for r in records if r["sip_topology"] == topo_name],
key=lambda r: r["bytes_per_pe"],
)
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o",
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
color=colors.get(topo_name),
)
if log_x:
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
else:
ax.set_xlabel("Bytes per PE")
ax.set_ylabel("max pe_exec_ns (critical path)")
ax.set_title("Multi-device allreduce latency by topology" + title_suffix)
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(out_dir / filename, dpi=120)
plt.close(fig)
_draw_overview(log_x=True, filename="overview.png", title_suffix="")
_draw_overview(
log_x=False, filename="overview_linear.png",
title_suffix=" [linear scale]",
)
print(
f"\nWrote {out_dir / 'overview.png'} + "
f"{out_dir / 'overview_linear.png'}"
)
+194
View File
@@ -0,0 +1,194 @@
"""ADR-0009 D5 invariant: all PEs targeted by a single kernel launch MUST
begin executing the kernel body at the same simulated time, regardless of
their dispatch path length.
These tests directly verify the invariant by capturing per-PE state at the
top of `_execute_kernel`:
test_no_pe_arrives_after_target_start_ns
Asserts: for every PE that enters _execute_kernel during a multi-cube
launch, `env.now` at entry must be <= target_start_ns. Otherwise the
PE's barrier yield would be a no-op and `pe_exec_start` would be set
late, breaking the D5 "same simulated time" mandate.
test_all_pes_have_identical_pe_exec_start
Asserts: every PE's `pe_exec_start` (the value of `env.now` recorded
immediately AFTER the barrier yield) is identical across all PEs in
the launch.
Both tests are expected to FAIL today and become the regression check the
Phase 2 D5 predictor + fallback fix must make pass.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pytest
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _capture_per_pe_d5_state():
"""Monkey-patch PeCpuComponent._execute_kernel to record, per PE:
- entry_now: env.now at function entry (before any yield)
- target_start_ns: the value carried by the request
- barrier_yielded: True if the barrier yield fired (entry_now < target)
- pe_exec_start: env.now immediately after the barrier check
(i.e. the value the original code sets)
Returns (records: list[dict], restore: callable).
"""
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
records: list[dict] = []
original = pe_cpu_mod.PeCpuComponent._execute_kernel
def patched(self, env, txn):
request = txn.request
target_start = getattr(request, "target_start_ns", None)
entry_now = float(env.now)
rec = {
"node_id": self.node.id,
"entry_now": entry_now,
"target_start_ns": (
float(target_start) if target_start is not None else None
),
"barrier_yielded": (
target_start is not None
and float(target_start) > entry_now
),
"pe_exec_start": None, # filled below by sniff
"late_ns": (
None if target_start is None
else max(0.0, entry_now - float(target_start))
),
}
records.append(rec)
# We can't easily inject a callback at the original's
# `pe_exec_start = env.now` line without rewriting it. Approximate:
# if the original yields the barrier, env.now after the yield is
# target_start_ns; otherwise pe_exec_start is entry_now (skipped).
if rec["barrier_yielded"]:
rec["pe_exec_start"] = float(target_start)
else:
rec["pe_exec_start"] = entry_now
yield from original(self, env, txn)
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
def restore():
pe_cpu_mod.PeCpuComponent._execute_kernel = original
return records, restore
def _run_multicube_launch():
"""Drive a no-op kernel launch across all 16 cubes x 8 PEs and return
the per-PE D5 records collected by the monkey-patch."""
records, restore = _capture_per_pe_d5_state()
try:
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine, target_device=DeviceSelector("all"),
correlation_id="d5_barrier", spec=spec,
) as ctx:
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=16, num_pes=8,
)
def kernel(t_ptr, n_elem, tl):
pass # no-op
ctx.ahbm.set_device(0)
t = ctx.zeros(
(16, 8 * 64), dtype="f16", dp=dp, name="probe",
)
t.copy_(ctx.from_numpy(
np.zeros((16, 8 * 64), dtype=np.float16),
))
pending = ctx.launch(
"d5_probe", kernel, t, 64, _defer_wait=True,
)
for h, _sip, meta in pending:
ctx.wait(h, _meta=meta)
finally:
restore()
return records
def test_no_pe_arrives_after_target_start_ns():
"""ADR-0009 D5: no PE may enter `_execute_kernel` after target_start_ns.
Today this fails because IO_CPU's predictor under-shoots actual
dispatch latency for far cubes (cube4, cube9-15). Phase 2 fix:
chain-aware predictor in IO_CPU + monotonic upward re-stamp in M_CPU.
"""
records = _run_multicube_launch()
assert records, "expected per-PE _execute_kernel records"
late = [
r for r in records
if r["target_start_ns"] is not None
and r["late_ns"] is not None
and r["late_ns"] > 1e-6
]
if late:
# Provide actionable diagnostic in the failure.
worst = sorted(late, key=lambda r: -r["late_ns"])[:5]
details = "\n".join(
f" {r['node_id']}: late by {r['late_ns']:.2f} ns "
f"(entry_now={r['entry_now']:.2f}, "
f"target_start_ns={r['target_start_ns']:.2f})"
for r in worst
)
pytest.fail(
f"ADR-0009 D5 violated: {len(late)}/{len(records)} PEs "
f"entered _execute_kernel AFTER target_start_ns "
f"(barrier yield silently skipped). "
f"Worst offenders:\n{details}"
)
def test_all_pes_have_identical_pe_exec_start():
"""ADR-0009 D5: every PE's pe_exec_start must be identical.
With D5 honored, every PE either yields to target_start_ns (start =
target_start_ns) or, if late, would still be aligned by the M_CPU
upward re-stamp (Phase 2). Today: 75/128 PEs in this launch have
distinct pe_exec_start values because they skipped the barrier.
"""
records = _run_multicube_launch()
assert records, "expected per-PE _execute_kernel records"
starts = sorted({round(r["pe_exec_start"], 6) for r in records})
if len(starts) > 1:
spread = max(starts) - min(starts)
# Distribution of how many PEs at each distinct start time
from collections import Counter
bucket = Counter(round(r["pe_exec_start"], 6) for r in records)
details = "\n".join(
f" pe_exec_start={t}: {n} PEs"
for t, n in sorted(bucket.items())
)
pytest.fail(
f"ADR-0009 D5 violated: PEs have {len(starts)} distinct "
f"pe_exec_start values (spread = {spread:.2f} ns); "
f"D5 mandates a single common value. "
f"Distribution:\n{details}"
)
+741
View File
@@ -0,0 +1,741 @@
"""Diagnostic for the inter-cube RAW > IPCQ asymmetry on h3/h4 plots.
Single-shot run at h3 (sip0.cube0.pe0 -> sip0.cube1.pe0), nbytes=4096.
Captures per-PE pe_exec_ns and the actual path / drain / per-node overhead
breakdown for the RAW sub-txn (PE_DMA -> remote HBM_CTRL) vs the IPCQ
outbound sub-txn (PE_DMA -> peer PE_DMA), so we can localize the gap to
one of:
(a) drain at HBM-BW (RAW) vs fabric-BW (IPCQ)
(b) path-length / per-node overhead asymmetry
(c) RAW SRC paying tl.load (local HBM read) on top of remote tl.store
while IPCQ DST only pays inbound traversal+drain.
Phase 1 / test-only. No production code is modified.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pytest
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
import os
# Allow the test to be re-run for h4 (inter-cube vertical) at multiple sizes
# to investigate why IPCQ slope flattens past 8192 B (path may differ).
NBYTES = int(os.environ.get("DIAG_NBYTES", "4096"))
ELEM_BYTES = 2
N_ELEM = NBYTES // ELEM_BYTES
N_CUBES = 16
N_PES = 8
HOP = os.environ.get("DIAG_HOP", "h3")
if HOP == "h4":
SRC = (0, 0, 0)
DST = (0, 4, 0) # h4 inter-cube vertical
else:
SRC = (0, 0, 0)
DST = (0, 1, 0) # h3 inter-cube horizontal
# ── Per-PE pe_exec_ns capture via monkey-patch ───────────────────────
def _install_barrier_capture():
"""Wrap PeCpuComponent._execute_kernel to log, for every PE that
enters: env.now at entry, target_start_ns the request carried,
whether the barrier yield fired (i.e. env.now < target_start_ns),
and env.now at pe_exec_start.
"""
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
log: list[dict] = []
original = pe_cpu_mod.PeCpuComponent._execute_kernel
def patched(self, env, txn):
request = txn.request
target_start = getattr(request, "target_start_ns", None)
entry_now = float(env.now)
log_entry = {
"node_id": self.node.id,
"entry_now": entry_now,
"target_start_ns": (
float(target_start) if target_start is not None else None
),
"barrier_skipped": (
target_start is None
or float(target_start) <= entry_now
),
"delta_late_ns": (
None if target_start is None
else max(0.0, entry_now - float(target_start))
),
}
log.append(log_entry)
yield from original(self, env, txn)
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
def restore():
pe_cpu_mod.PeCpuComponent._execute_kernel = original
return log, restore
def _install_per_pe_capture():
"""Wrap PeCpuComponent._execute_kernel so we record (node_id ->
pe_exec_ns) for every PE that executes a kernel during the run.
Returns (capture_dict, restore_callable).
"""
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
captured: dict[str, float] = {}
original = pe_cpu_mod.PeCpuComponent._execute_kernel
def patched(self, env, txn):
gen = original(self, env, txn)
try:
value = yield from gen
finally:
v = txn.result_data.get("pe_exec_ns")
if v is not None:
captured[self.node.id] = float(v)
return value
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
def restore():
pe_cpu_mod.PeCpuComponent._execute_kernel = original
return captured, restore
def _install_recv_capture(target_node_id: str):
"""Wrap PeIpcqComponent._handle_recv to log entry/exit times and the
peer_head_cache/my_tail values seen at the start.
This pins down whether recv ever blocked on a wait_event, or whether
it consumed without waiting (i.e. peer_head_cache > my_tail at entry).
"""
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
log: list[dict] = []
original = pe_ipcq_mod.PeIpcqComponent._handle_recv
def patched(self, env, req, cmd):
if self.node.id != target_node_id:
yield from original(self, env, req, cmd)
return
# Snapshot state before dispatch
d = cmd.direction
qp = self._queue_pairs.get(d, {})
log.append({
"phase": "enter",
"t": float(env.now),
"direction": d,
"peer_head_cache": qp.get("peer_head_cache"),
"my_tail": qp.get("my_tail"),
})
yield from original(self, env, req, cmd)
qp = self._queue_pairs.get(d, {})
log.append({
"phase": "exit",
"t": float(env.now),
"direction": d,
"peer_head_cache": qp.get("peer_head_cache"),
"my_tail": qp.get("my_tail"),
})
pe_ipcq_mod.PeIpcqComponent._handle_recv = patched
def restore():
pe_ipcq_mod.PeIpcqComponent._handle_recv = original
return log, restore
def _install_meta_arrival_capture(target_node_id: str):
"""Log every IpcqMetaArrival that lands on ``target_node_id`` PE_IPCQ.
Records (env_now, sender_seq, dst_addr, matched_direction,
peer_head_cache_before, my_tail_before).
"""
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
log: list[dict] = []
original = pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival
def patched(self, msg):
if self.node.id == target_node_id:
token = msg.token
now = float(self._env.now) if hasattr(self, "_env") else 0.0
# _env is not stored on the component; use ctx? Fall back to
# introspection via self._inbox._env (SimPy stores reference).
try:
now = float(self._inbox._env.now)
except Exception:
pass
entry = {
"t": now,
"sender_seq": getattr(token, "sender_seq", None),
"dst_addr": getattr(token, "dst_addr", None),
"src_sip": getattr(token, "src_sip", None),
"src_cube": getattr(token, "src_cube", None),
"src_pe": getattr(token, "src_pe", None),
"src_direction": getattr(token, "src_direction", None),
"nbytes": getattr(token, "nbytes", None),
"matched_direction": None,
"peer_head_cache_before": {},
"my_tail_before": {},
}
for d, qp in self._queue_pairs.items():
entry["peer_head_cache_before"][d] = qp["peer_head_cache"]
entry["my_tail_before"][d] = qp["my_tail"]
base = qp["my_rx_base_pa"]
size = qp["n_slots"] * qp["slot_size"]
if base <= entry["dst_addr"] < base + size:
entry["matched_direction"] = d
log.append(entry)
return original(self, msg)
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = patched
def restore():
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = original
return log, restore
def _snapshot_qp_state(engine, target_node_id: str) -> dict:
"""Snapshot every direction's qp state on the target PE_IPCQ now.
Captures peer_head_cache, my_tail, my_rx_base_pa, n_slots, slot_size
for each installed direction.
"""
comp = engine._components.get(target_node_id)
if comp is None:
return {}
return {
d: {
"peer_head_cache": qp["peer_head_cache"],
"my_tail": qp["my_tail"],
"my_rx_base_pa": qp["my_rx_base_pa"],
"n_slots": qp["n_slots"],
"slot_size": qp["slot_size"],
"rx_range": (
qp["my_rx_base_pa"],
qp["my_rx_base_pa"] + qp["n_slots"] * qp["slot_size"],
),
}
for d, qp in comp.queue_pairs.items()
}
# ── Path / drain breakdown using engine ctx ──────────────────────────
def _path_breakdown(ctx, path: list[str], nbytes: int) -> dict:
edge_total_ns = 0.0
edge_details = []
min_bw = float("inf")
for i in range(len(path) - 1):
edge = ctx.edge_map.get((path[i], path[i + 1]))
if edge is None:
edge_details.append((path[i], path[i + 1], None, None, None))
continue
prop_ns = edge.distance_mm * ctx.ns_per_mm
edge_total_ns += prop_ns
bw = getattr(edge, "bw_gbs", None) or 0.0
if bw > 0 and bw < min_bw:
min_bw = bw
edge_details.append(
(path[i], path[i + 1], edge.distance_mm, prop_ns, bw),
)
overhead_total_ns = 0.0
overhead_details = []
for nid in path:
oh = float(ctx.node_overhead_ns.get(nid, 0.0))
overhead_total_ns += oh
overhead_details.append((nid, oh))
drain_ns = ctx.compute_drain_ns(path, nbytes)
bottleneck_bw = None if min_bw == float("inf") else min_bw
return {
"path": path,
"edges": edge_details,
"edge_total_ns": edge_total_ns,
"overheads": overhead_details,
"overhead_total_ns": overhead_total_ns,
"drain_ns": drain_ns,
"bottleneck_bw_gbs": bottleneck_bw,
"expected_total_ns": edge_total_ns + overhead_total_ns + drain_ns,
}
def _print_breakdown(label: str, br: dict) -> None:
print(f"\n {label}")
print(f" path ({len(br['path'])} nodes):")
for nid in br["path"]:
print(f" - {nid}")
print(f" edges (prop. delay):")
for src, dst, dist_mm, prop_ns, bw in br["edges"]:
if dist_mm is None:
print(f" ! {src} -> {dst} EDGE NOT FOUND IN edge_map")
continue
print(
f" {src} -> {dst} "
f"dist={dist_mm:.3f}mm prop={prop_ns:.2f}ns "
f"bw={bw or 0:.2f}GB/s"
)
print(f" per-node overhead_ns:")
for nid, oh in br["overheads"]:
if oh > 0:
print(f" {nid:<60s} overhead_ns={oh:.2f}")
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
print(f" bottleneck_bw_gbs = {br['bottleneck_bw_gbs']}")
print(f" drain_ns (nbytes={NBYTES}) = {br['drain_ns']:.2f}")
print(f" expected_total_ns = {br['expected_total_ns']:.2f}")
# ── RAW path scenario ────────────────────────────────────────────────
def _dump_src_op_records(engine, src_sip, src_cube, src_pe, label) -> None:
"""Print op_logger records for ops on the SRC PE.
The op log captures t_start/t_end for memory/math/gemm/copy ops on
every component, so we can see how long tl.load vs tl.store vs
tl.send actually took at the engine level.
"""
op_logger = getattr(engine, "_op_logger", None)
if op_logger is None:
print(f" ({label}) op_logger not available")
return
src_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}."
recs = [r for r in op_logger.records if r.component_id.startswith(src_prefix)]
print(f" ({label}) op_logger records on SRC PE ({src_prefix}*):")
for r in recs[:40]:
dur = r.t_end - r.t_start
comp_short = r.component_id.replace(src_prefix, "")
params_short = ""
if "nbytes" in r.params:
params_short = f" nbytes={r.params['nbytes']}"
if "src_addr" in r.params:
params_short += f" src_addr={r.params['src_addr']}"
if "dst_addr" in r.params:
params_short += f" dst_addr={r.params['dst_addr']}"
print(
f" t=[{r.t_start:7.2f}..{r.t_end:7.2f}] dur={dur:6.2f}ns "
f"{comp_short:<25s} {r.op_kind:<8s} {r.op_name:<12s}{params_short}"
)
def _run_raw():
captured, restore = _install_per_pe_capture()
try:
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
src_sip, src_cube, src_pe = SRC
dst_sip, dst_cube, dst_pe = DST
assert src_sip == dst_sip
src_off = (src_cube * N_PES + src_pe) * N_ELEM * ELEM_BYTES
dst_off = (dst_cube * N_PES + dst_pe) * N_ELEM * ELEM_BYTES
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="diag_raw",
spec=spec,
) as rt:
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
rt.ahbm.set_device(src_sip)
t = rt.zeros(
(N_CUBES, N_PES * N_ELEM), dtype="f16",
dp=dp, name="raw_tensor",
)
t.copy_(rt.from_numpy(
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
))
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(
t_ptr + src_off, shape=(n_elem,), dtype="f16",
)
tl.store(t_ptr + dst_off, data)
pending = rt.launch(
"diag_raw_kernel", kernel, t, N_ELEM, _defer_wait=True,
)
for h, _sip, meta in pending:
rt.wait(h, _meta=meta)
# Compute the RAW sub-txn path: src PE_DMA -> dst HBM_CTRL
from kernbench.policy.address.phyaddr import PhysAddr
ctx = next(iter(engine._components.values())).ctx
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
# Resolve dst PA to HBM controller node
# The raw store kernel issues DmaWriteCmd on dst VA; in the engine
# this is translated via PE_MMU. For diagnostic we approximate
# the destination as the dst cube's HBM controller for slice
# belonging to dst_pe.
# Use the resolver on a constructed PA matching the same memory
# slice the kernel writes to.
# The tensor is "row_wise" sharded across cubes, so each cube
# owns row[cube_id, :], with each PE owning a column slice.
# The actual dst PA depends on the AHBM allocator; we read it
# via the tensor's shard map.
shard_map = getattr(t, "_shard_map", None) or getattr(t, "shard_map", None)
# Fallback: query the resolver directly by constructing a PA in
# the dst cube's HBM region. If shard_map is unavailable, still
# show the breakdown for src-PE-DMA -> first reachable HBM_CTRL
# in dst cube.
dst_hbm_id = f"sip{dst_sip}.cube{dst_cube}.hbm_ctrl"
if dst_hbm_id not in engine._components:
# try alternate naming
for nid in engine._components.keys():
if (
nid.startswith(f"sip{dst_sip}.cube{dst_cube}.")
and "hbm" in nid
):
dst_hbm_id = nid
break
# find_path() prepends ".pe_dma" to src_pe automatically
try:
raw_path = ctx.router.find_path(src_pe_prefix, dst_hbm_id)
except Exception as e:
raw_path = []
print(f" WARN: find_path raw failed: {e}")
if not raw_path:
# Try other HBM-related node names in dst cube
for nid in engine._components.keys():
if not nid.startswith(f"sip{dst_sip}.cube{dst_cube}."):
continue
if "hbm" not in nid:
continue
try:
p = ctx.router.find_path(src_pe_prefix, nid)
except Exception:
p = []
if p:
raw_path = p
print(f" (fallback raw dst node: {nid})")
break
return captured, ctx, raw_path, engine
finally:
restore()
# ── IPCQ path scenario ───────────────────────────────────────────────
def _run_ipcq():
captured, restore = _install_per_pe_capture()
dst_pe_ipcq_id = (
f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_ipcq"
)
arrival_log, restore_arrival = _install_meta_arrival_capture(
dst_pe_ipcq_id,
)
recv_log, restore_recv = _install_recv_capture(dst_pe_ipcq_id)
barrier_log, restore_barrier = _install_barrier_capture()
try:
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
src_sip, src_cube, src_pe = SRC
dst_sip, dst_cube, dst_pe = DST
cfg = load_ccl_config()
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), NBYTES)
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="diag_ipcq",
spec=spec,
) as rt:
configure_sfr_intercube_multisip(engine, spec, merged)
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
tl.send(dir=("E" if HOP == "h3" else "S"), src=data)
elif cube_id == dst_cube and pe_id == dst_pe:
tl.recv(
dir=("W" if HOP == "h3" else "N"),
shape=(n_elem,), dtype="f16",
)
tensors = []
for s in sorted({src_sip, dst_sip}):
rt.ahbm.set_device(s)
t = rt.zeros(
(N_CUBES, N_PES * N_ELEM), dtype="f16",
dp=dp, name=f"sip{s}",
)
t.copy_(rt.from_numpy(
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
))
tensors.append(t)
all_pending = []
for tt in tensors:
pending = rt.launch(
"diag_ipcq_kernel", kernel, tt, N_ELEM, _defer_wait=True,
)
all_pending.extend(pending)
for h, _sip, meta in all_pending:
rt.wait(h, _meta=meta)
ctx = next(iter(engine._components.values())).ctx
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
dst_pe_dma = f"sip{dst_sip}.cube{dst_cube}.pe{dst_pe}.pe_dma"
try:
ipcq_path = ctx.router.find_path(src_pe_prefix, dst_pe_dma)
except Exception as e:
ipcq_path = []
print(f" WARN: find_path ipcq failed: {e}")
# Snapshot DST PE_IPCQ qp state at end-of-run so we can see what
# peer_head_cache/my_tail looked like (and at which directions).
qp_state = _snapshot_qp_state(engine, dst_pe_ipcq_id)
return (captured, ctx, ipcq_path, engine,
arrival_log, qp_state, recv_log, barrier_log)
finally:
restore_barrier()
restore_recv()
restore_arrival()
restore()
# ── Test entry ───────────────────────────────────────────────────────
@pytest.mark.diagnostic
def test_pe_to_pe_diagnostic_h3():
print("\n" + "=" * 78)
print(f" Diagnostic: h3 inter-cube horizontal, nbytes={NBYTES}")
print(f" src={SRC} dst={DST}")
print("=" * 78)
# ── RAW scenario
print("\n[RAW] tl.load + tl.store (sender pays both legs)")
raw_per_pe, raw_ctx, raw_path, raw_engine = _run_raw()
print(f" per-PE pe_exec_ns ({len(raw_per_pe)} entries):")
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
for nid in (src_id, dst_id):
if nid in raw_per_pe:
print(f" {nid:<60s} {raw_per_pe[nid]:.2f} ns <-- key PE")
nonzero = {k: v for k, v in raw_per_pe.items() if v > 0.5}
if nonzero:
print(f" other PEs with pe_exec_ns > 0.5 ns:")
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
if nid not in (src_id, dst_id):
print(f" {nid:<60s} {v:.2f} ns")
print(f" max(pe_exec_ns) = "
f"{max(raw_per_pe.values()) if raw_per_pe else 0:.2f} ns")
if raw_path:
br = _path_breakdown(raw_ctx, raw_path, NBYTES)
_print_breakdown("RAW sub-txn path (src.pe_dma -> dst.hbm_ctrl)", br)
_dump_src_op_records(raw_engine, *SRC, "RAW")
# ── IPCQ scenario
print("\n[IPCQ] tl.send + tl.recv (recv pays inbound traversal+drain)")
(ipcq_per_pe, ipcq_ctx, ipcq_path, ipcq_engine,
arrival_log, qp_state, recv_log, barrier_log) = _run_ipcq()
print(f"\n [BARRIER LOG] {len(barrier_log)} _execute_kernel entries:")
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
n_skipped = 0
src_entry = None
dst_entry = None
for e in barrier_log:
if e["barrier_skipped"]:
n_skipped += 1
if e["node_id"] == src_id:
src_entry = e
if e["node_id"] == dst_id:
dst_entry = e
print(f" PEs entering _execute_kernel: {len(barrier_log)}")
print(f" PEs that SKIPPED barrier (env.now > target_start): {n_skipped}")
if src_entry:
print(
f" SRC pe ({src_id}): entry_now={src_entry['entry_now']:.2f} "
f"target_start={src_entry['target_start_ns']:.2f} "
f"skipped={src_entry['barrier_skipped']} "
f"late_ns={src_entry['delta_late_ns']:.2f}"
)
if dst_entry:
print(
f" DST pe ({dst_id}): entry_now={dst_entry['entry_now']:.2f} "
f"target_start={dst_entry['target_start_ns']:.2f} "
f"skipped={dst_entry['barrier_skipped']} "
f"late_ns={dst_entry['delta_late_ns']:.2f}"
)
# Top 5 latest arrivals
sorted_late = sorted(
[e for e in barrier_log if e["delta_late_ns"] is not None],
key=lambda e: -e["delta_late_ns"],
)[:5]
print(f" Top 5 latest PE arrivals (positive = barrier missed):")
for e in sorted_late:
if e["delta_late_ns"] > 0:
print(
f" {e['node_id']}: late by {e['delta_late_ns']:.2f} ns "
f"(entry={e['entry_now']:.2f}, target={e['target_start_ns']:.2f})"
)
print(f"\n [RECV LOG on dst pe_ipcq] {len(recv_log)} entries:")
for e in recv_log:
print(
f" {e['phase']:5s} t={e['t']:8.2f} ns "
f"dir={e['direction']} "
f"peer_head_cache={e['peer_head_cache']} "
f"my_tail={e['my_tail']}"
)
print(f"\n [META-ARRIVAL LOG on dst pe_ipcq] {len(arrival_log)} arrivals:")
for i, e in enumerate(arrival_log):
print(
f" #{i:2d} t={e['t']:8.2f} ns "
f"src=(sip{e['src_sip']},cube{e['src_cube']},pe{e['src_pe']}) "
f"dir={e['src_direction']} "
f"sender_seq={e['sender_seq']} "
f"matched_dir={e['matched_direction']} "
f"nbytes={e['nbytes']}"
)
for d, ph in e["peer_head_cache_before"].items():
mt = e["my_tail_before"][d]
if ph != 0 or mt != 0 or d == e["matched_direction"]:
print(
f" before: dir={d} peer_head_cache={ph} my_tail={mt}"
)
print(f"\n [QP STATE END-OF-RUN on dst pe_ipcq]:")
for d, st in qp_state.items():
print(
f" dir={d} peer_head_cache={st['peer_head_cache']} "
f"my_tail={st['my_tail']} rx_range=[{st['rx_range'][0]}..."
f"{st['rx_range'][1]}) n_slots={st['n_slots']} "
f"slot_size={st['slot_size']}"
)
print(f" per-PE pe_exec_ns ({len(ipcq_per_pe)} entries):")
for nid in (src_id, dst_id):
if nid in ipcq_per_pe:
print(f" {nid:<60s} {ipcq_per_pe[nid]:.2f} ns <-- key PE")
nonzero = {k: v for k, v in ipcq_per_pe.items() if v > 0.5}
if nonzero:
print(f" other PEs with pe_exec_ns > 0.5 ns:")
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
if nid not in (src_id, dst_id):
print(f" {nid:<60s} {v:.2f} ns")
print(f" max(pe_exec_ns) = "
f"{max(ipcq_per_pe.values()) if ipcq_per_pe else 0:.2f} ns")
if ipcq_path:
br = _path_breakdown(ipcq_ctx, ipcq_path, NBYTES)
_print_breakdown("IPCQ sub-txn path (src.pe_dma -> peer.pe_dma)", br)
_dump_src_op_records(ipcq_engine, *SRC, "IPCQ")
_dump_src_op_records(ipcq_engine, *DST, "IPCQ DST")
# ── Credit-return path analysis (where the missing IPCQ "ack" lives)
print("\n" + "-" * 78)
print("Credit-return path (current modeling)")
print("-" * 78)
src_pe_prefix = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}"
dst_pe_prefix = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}"
# PE_IPCQ._credit_latency_ns calls
# ctx.router.find_path(self._pe_prefix, peer_pe_prefix)
# where the *destination* lacks the ".pe_dma" suffix. find_path()
# only auto-appends to the source, so this raises -> the except
# clause silently returns 0.0. Effectively credit latency = 0.
try:
ipcq_ctx.router.find_path(dst_pe_prefix, src_pe_prefix)
bug_caught = False
except Exception as e:
bug_caught = True
print(f" CONFIRMED BUG in _credit_latency_ns: dest lacks '.pe_dma' "
f"-> find_path raises -> caught exception -> returns 0.0")
print(f" Error: {e}")
# The intended credit path is recv -> sender (reverse data direction)
try:
credit_path = ipcq_ctx.router.find_path(
dst_pe_prefix, f"{src_pe_prefix}.pe_dma",
)
except Exception as e:
credit_path = []
print(f" WARN: corrected find_path credit failed: {e}")
if credit_path:
credit_size = 16 # PE_IPCQ default _credit_size_bytes
# Today's modeling: drain only, 16 bytes -> ~0.125 ns
cur = ipcq_ctx.compute_drain_ns(credit_path, credit_size)
# Proposed modeling: full path latency (edges + node overhead + drain)
proposed = ipcq_ctx.compute_path_latency_ns(credit_path, credit_size)
print(f" credit path nodes = {len(credit_path)} (recv -> sender)")
for nid in credit_path[:6]:
print(f" {nid}")
if len(credit_path) > 6:
print(f" ... {len(credit_path) - 6} more nodes")
br = _path_breakdown(ipcq_ctx, credit_path, credit_size)
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
print(f" drain_ns(16 bytes) = {br['drain_ns']:.2f}")
print(f" CURRENT _credit_latency_ns (drain only) = {cur:.3f} ns")
print(f" PROPOSED (compute_path_latency_ns) = {proposed:.2f} ns")
print(f" delta = {proposed - cur:+.2f} ns")
# ── Comparison summary
print("\n" + "-" * 78)
print("Summary")
print("-" * 78)
raw_max = max(raw_per_pe.values()) if raw_per_pe else 0.0
ipcq_max = max(ipcq_per_pe.values()) if ipcq_per_pe else 0.0
print(f" RAW max(pe_exec_ns) = {raw_max:.2f} ns")
print(f" IPCQ max(pe_exec_ns) (current) = {ipcq_max:.2f} ns")
print(f" delta (RAW - IPCQ current) = {raw_max - ipcq_max:+.2f} ns")
if credit_path:
ipcq_with_credit = ipcq_max + (proposed - cur)
print(
f" IPCQ projected w/ blocking credit + full path overhead "
f"= {ipcq_with_credit:.2f} ns"
)
print(
f" delta (RAW - IPCQ projected) = "
f"{raw_max - ipcq_with_credit:+.2f} ns "
f"(<= 0 means IPCQ >= RAW)"
)
# No assertions — this is observational.
assert raw_per_pe, "no RAW pe_exec_ns recorded"
assert ipcq_per_pe, "no IPCQ pe_exec_ns recorded"
+106
View File
@@ -0,0 +1,106 @@
"""Rectangular (non-square) SIP-level 2D topology support.
Phase 1 regression target: today the 2D builtin topology functions in
``kernbench.ccl.topologies`` (``mesh_2d``, ``torus_2d``,
``mesh_2d_no_wrap``) hardcode ``side = sqrt(world_size)`` and raise
``ValueError`` for any non-square ``world_size``. This blocks running
the allreduce sweep at n_sips=6 on torus/mesh layouts.
Phase 2 will extend these functions to accept optional ``w, h`` kwargs
so a 2×3 (or 3×2, etc.) layout works. Until then, every test below is
expected to FAIL.
Layout convention used here (matches non-rectangular case):
rank = row * w + col for 0 <= row < h, 0 <= col < w
For w=2, h=3, world_size=6 the layout is:
col=0 col=1
row=0: 0 1
row=1: 2 3
row=2: 4 5
"""
from __future__ import annotations
import pytest
from kernbench.ccl.topologies import (
mesh_2d,
mesh_2d_no_wrap,
torus_2d,
)
# ── mesh_2d_no_wrap (no wrap-around) ──────────────────────────────────
def test_mesh_2d_no_wrap_2x3_top_left():
"""rank 0 (top-left, no N, no W): only S and E."""
nbrs = mesh_2d_no_wrap(rank=0, world_size=6, w=2, h=3)
assert nbrs == {"S": 2, "E": 1}, nbrs
def test_mesh_2d_no_wrap_2x3_top_right():
"""rank 1 (top-right, no N, no E): only S and W."""
nbrs = mesh_2d_no_wrap(rank=1, world_size=6, w=2, h=3)
assert nbrs == {"S": 3, "W": 0}, nbrs
def test_mesh_2d_no_wrap_2x3_middle_left():
"""rank 2 (middle-left, no W): N, S, E."""
nbrs = mesh_2d_no_wrap(rank=2, world_size=6, w=2, h=3)
assert nbrs == {"N": 0, "S": 4, "E": 3}, nbrs
def test_mesh_2d_no_wrap_2x3_bottom_right():
"""rank 5 (bottom-right, no S, no E): only N and W."""
nbrs = mesh_2d_no_wrap(rank=5, world_size=6, w=2, h=3)
assert nbrs == {"N": 3, "W": 4}, nbrs
# ── torus_2d (wrap-around on all four edges) ─────────────────────────
def test_torus_2d_2x3_top_left():
"""rank 0: N wraps to row 2 col 0 (rank 4); W wraps to col 1 (rank 1)."""
nbrs = torus_2d(rank=0, world_size=6, w=2, h=3)
assert nbrs == {"N": 4, "S": 2, "W": 1, "E": 1}, nbrs
def test_torus_2d_2x3_bottom_right():
"""rank 5: S wraps to row 0 (rank 1); E wraps to col 0 (rank 4)."""
nbrs = torus_2d(rank=5, world_size=6, w=2, h=3)
assert nbrs == {"N": 3, "S": 1, "W": 4, "E": 4}, nbrs
# ── mesh_2d alias for torus_2d ───────────────────────────────────────
def test_mesh_2d_2x3_matches_torus_2d():
"""mesh_2d is currently a torus alias; behaviour must match torus_2d."""
for rank in range(6):
assert mesh_2d(rank=rank, world_size=6, w=2, h=3) == \
torus_2d(rank=rank, world_size=6, w=2, h=3)
# ── Back-compat: square layouts still work without w/h kwargs ────────
def test_square_back_compat_mesh_2d_no_wrap():
"""Calling without w, h should still work for square world_size."""
nbrs = mesh_2d_no_wrap(rank=0, world_size=4)
assert nbrs == {"S": 2, "E": 1}, nbrs
def test_square_back_compat_torus_2d():
nbrs = torus_2d(rank=0, world_size=4)
assert nbrs == {"N": 2, "S": 2, "W": 1, "E": 1}, nbrs
# ── Validation: w*h must match world_size ────────────────────────────
def test_rectangular_dims_must_match_world_size():
"""Phase 2 contract: explicit w, h must satisfy w*h == world_size."""
with pytest.raises(ValueError):
mesh_2d_no_wrap(rank=0, world_size=6, w=3, h=3) # 9 != 6