Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 46291bf91b | |||
| 04c912f53e | |||
| 1c33afec55 |
@@ -2,7 +2,14 @@
|
||||
|
||||
## Status
|
||||
|
||||
Proposed (Revision 8 — Hierarchical content split out to ADR-0029)
|
||||
Accepted. rank = SIP process-group model stands. The allreduce algorithm
|
||||
path (mapper / validator / per-PE install machinery originally targeted at
|
||||
ADR-0029) has been replaced by ADR-0032: `AhbmCCLBackend` now calls
|
||||
`configure_sfr_intercube_multisip` at `init_process_group` time and the
|
||||
intercube kernel receives `(sip_rank, sip_topo_kind, sip_topo_w,
|
||||
sip_topo_h)` appended after the module's `kernel_args()`. The
|
||||
`leader_only` / `all_pes` mapper concepts in this document are no longer
|
||||
used by the default allreduce path.
|
||||
|
||||
## Context
|
||||
|
||||
|
||||
@@ -89,7 +89,14 @@ direction_idx × bytes_per_direction). 따라서:
|
||||
`src/kernbench/ccl/install.py`:
|
||||
|
||||
```python
|
||||
_OPPOSITE_DIR = {"E": "W", "W": "E", "N": "S", "S": "N"}
|
||||
# Extended in ADR-0032 with global_* pairs for inter-SIP directions,
|
||||
# which were introduced by configure_sfr_intercube_multisip to keep
|
||||
# intercube (N/S/E/W) and inter-SIP (global_N/S/E/W) namespaces disjoint.
|
||||
_OPPOSITE_DIR = {
|
||||
"E": "W", "W": "E", "N": "S", "S": "N",
|
||||
"global_E": "global_W", "global_W": "global_E",
|
||||
"global_N": "global_S", "global_S": "global_N",
|
||||
}
|
||||
|
||||
def reverse_direction(my_rank: int, peer_rank: int, my_dir: str) -> str | None:
|
||||
"""Find peer's direction that reciprocates my_dir→peer_rank.
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
|
||||
## Status
|
||||
|
||||
Proposed
|
||||
Superseded by ADR-0032 (Intercube all-reduce). The 3-level kernel and
|
||||
`hierarchical_allreduce.py` module have been removed. The cube-mesh
|
||||
intercube + inter-SIP path is now the single all-reduce algorithm.
|
||||
|
||||
## Context
|
||||
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
# ADR-0032: Intercube All-Reduce — pe0 cube-mesh reduce + multi-SIP exchange
|
||||
|
||||
## Status
|
||||
|
||||
Accepted (supersedes ADR-0029).
|
||||
|
||||
## Context
|
||||
|
||||
### Goal
|
||||
|
||||
Define a single all-reduce algorithm that exploits the topology hierarchy:
|
||||
cube mesh within each SIP (intercube) + inter-SIP exchange. One kernel,
|
||||
one SFR configuration path, driven by `topology.yaml` and `ccl.yaml`.
|
||||
|
||||
### Why replace ADR-0029 (hierarchical 3-level)
|
||||
|
||||
ADR-0029 proposed a 3-level (intra-cube → inter-cube → inter-SIP) algorithm
|
||||
where every PE in the system participates. In practice this adds the
|
||||
intra-cube PE-to-PE stage complexity (bidirectional reduce + chain broadcast)
|
||||
without matching the common workload pattern where the tensor is sharded
|
||||
**per cube** (not per PE within a cube).
|
||||
|
||||
Moreover, the hierarchical design required:
|
||||
- per-PE neighbor graph installation (`_build_pe_installs` multi-level)
|
||||
- multi-level topology schema (`hierarchical_3level`)
|
||||
- `all_pes` mapper + `multi_pe_sip_local` validator infrastructure
|
||||
|
||||
The intercube algorithm below removes all of that: **pe0-only same-lane
|
||||
intercube reduce on the 4×4 cube mesh**, then inter-SIP exchange on the
|
||||
root cube, then broadcast back. Simpler kernel, simpler wiring, same
|
||||
bandwidth characteristics for the common per-cube DP workload.
|
||||
|
||||
### Current state
|
||||
|
||||
- `src/kernbench/ccl/algorithms/intercube_allreduce.py` — kernel
|
||||
- `src/kernbench/ccl/sfr_config.py` — `configure_sfr_intercube_multisip`
|
||||
- `src/kernbench/runtime_api/distributed.py` — `AhbmCCLBackend` wires this
|
||||
automatically at `init_process_group` time.
|
||||
- Old `ring_allreduce`, `mesh_allreduce`, `tree_allreduce`,
|
||||
`hierarchical_allreduce` modules and their tests are **removed**.
|
||||
|
||||
---
|
||||
|
||||
## Decision
|
||||
|
||||
### D1. Algorithm structure — 5 phases
|
||||
|
||||
For each SIP (launched concurrently by `mp.spawn`):
|
||||
|
||||
```
|
||||
Phase 1 — Row reduce W → E (cube mesh, pe0 only):
|
||||
col=0 sends E → col=1 accumulates, sends E → ... → col=3 holds row sum.
|
||||
|
||||
Phase 2 — Col reduce N → S on rightmost column (pe0, col = mesh_w-1):
|
||||
row=0 sends S → row=1 accumulates, sends S → ... → root cube (15)
|
||||
holds the full SIP sum.
|
||||
|
||||
Phase 3 — Inter-SIP exchange on root cube (pe0 of root cube only):
|
||||
Ring / torus-2d row+col ring / mesh-2d chain reduce+broadcast —
|
||||
selected by sip_topo_kind (from topology.yaml sips.topology).
|
||||
|
||||
Phase 4 — Col broadcast S → N on rightmost column.
|
||||
|
||||
Phase 5 — Row broadcast E → W across the cube mesh.
|
||||
```
|
||||
|
||||
After all phases every cube's pe0 holds the global sum.
|
||||
|
||||
The kernel is a single function parameterised by `sip_topo_kind ∈ {0, 1, 2}`
|
||||
(ring_1d, torus_2d, mesh_2d_no_wrap). Phases 1-2 and 4-5 are identical
|
||||
across topologies; only phase 3 branches. Helper functions
|
||||
`_inter_sip_ring`, `_inter_sip_torus_2d`, `_inter_sip_mesh_2d` encode the
|
||||
three exchange patterns.
|
||||
|
||||
### D2. Tensor layout (rank = SIP, per-worker)
|
||||
|
||||
Per ADR-0024 rank = SIP at the process-group level. Each worker allocates
|
||||
its own cube-mesh-spanning tensor:
|
||||
|
||||
```python
|
||||
dp = DPPolicy(cube="row_wise", pe="replicate", num_cubes=16, num_pes=1)
|
||||
tensor = torch.zeros((n_cubes, n_elem), dtype="f16", dp=dp)
|
||||
```
|
||||
|
||||
Shard layout: 16 shards per SIP, one per cube on pe0. The kernel addresses
|
||||
each cube's shard as `pe_addr = t_ptr + cube_id * n_elem * 2`.
|
||||
|
||||
### D3. SFR / IPCQ wiring — `configure_sfr_intercube_multisip`
|
||||
|
||||
Replaces the rank-to-2-PE install from ADR-0024. Wires PE_IPCQ neighbor
|
||||
tables for **every cube's pe0 across every SIP** — regardless of which
|
||||
cube is the root or which SIP topology is selected. This lets the kernel
|
||||
elect the root cube at runtime and supports topology switches without
|
||||
re-wiring.
|
||||
|
||||
| Level | Direction labels | Scope |
|
||||
|---|---|---|
|
||||
| Intercube within SIP | N / S / E / W | pe0 of every cube → pe0 of mesh neighbors (no wrap) |
|
||||
| Inter-SIP (all cubes) | global_E / global_W / global_N / global_S | pe0 of cube c on sip A → pe0 of cube c on peer SIP per `sips.topology` |
|
||||
|
||||
Inter-SIP directions use the `global_*` prefix to keep the namespace
|
||||
disjoint from intercube directions. ADR-0025's `_OPPOSITE_DIR` is extended
|
||||
with `global_E ↔ global_W` and `global_N ↔ global_S` so the reverse-
|
||||
direction resolver handles 2-SIP bidirectional rings correctly.
|
||||
|
||||
Internally the function calls `install_ipcq` with:
|
||||
- `world_size = n_sips × n_cubes`
|
||||
- `rank_to_pe = [(sip, cube, 0) for sip in range(n_sips) for cube in range(n_cubes)]`
|
||||
- A closure-captured `neighbors()` function that builds the map above.
|
||||
|
||||
This `world_size` is internal to IPCQ wiring and does not leak to the
|
||||
process-group rank.
|
||||
|
||||
### D4. SIP topology — from `topology.yaml`
|
||||
|
||||
```yaml
|
||||
system:
|
||||
sips:
|
||||
count: 2
|
||||
topology: ring_1d # or torus_2d, mesh_2d_no_wrap
|
||||
```
|
||||
|
||||
- `ring_1d`: n_sips-1 rounds of `send global_E / recv global_W`.
|
||||
- `torus_2d`: sqrt(n_sips)×sqrt(n_sips) wrapping mesh. Row ring on
|
||||
`global_E/W` then col ring on `global_S/N`.
|
||||
- `mesh_2d_no_wrap`: square mesh without wrap-around. Chain reduce +
|
||||
broadcast per dimension.
|
||||
|
||||
2D variants require `n_sips` to be a perfect square.
|
||||
|
||||
### D5. Process-group integration — `AhbmCCLBackend`
|
||||
|
||||
At `init_process_group` time the backend:
|
||||
|
||||
1. Loads `ccl.yaml` + `topology.yaml`.
|
||||
2. Derives `sip_topo_kind, sip_topo_w, sip_topo_h` from
|
||||
`system.sips.topology` using the algorithm module's `TOPO_NAME_TO_KIND`.
|
||||
3. Calls `configure_sfr_intercube_multisip(engine, spec, cfg)` — one-time
|
||||
SFR wiring, mirrors NCCL communicator creation.
|
||||
|
||||
At each `dist.all_reduce(tensor)` call:
|
||||
|
||||
1. Resolves `kernel_fn` from `cfg["module"]`.
|
||||
2. Builds args: `(n_elem, cube_w, cube_h, n_sips)` from
|
||||
`kernel_args(world_size, n_elem)`.
|
||||
3. Appends `(sip_rank, sip_topo_kind, sip_topo_w, sip_topo_h)` where
|
||||
`sip_rank` is the current greenlet's bound rank.
|
||||
4. Launches with `_defer_wait=True`; the main scheduler drains pending
|
||||
handles after all workers submit (per ADR-0024 D7 / ADR-0027 D0.4).
|
||||
|
||||
### D6. Config schema
|
||||
|
||||
`ccl.yaml`:
|
||||
|
||||
```yaml
|
||||
defaults:
|
||||
algorithm: intercube_allreduce
|
||||
buffer_kind: tcm
|
||||
...
|
||||
|
||||
algorithms:
|
||||
intercube_allreduce:
|
||||
module: kernbench.ccl.algorithms.intercube_allreduce
|
||||
topology: none
|
||||
buffer_kind: tcm
|
||||
n_elem: 8
|
||||
root_cube: 15
|
||||
```
|
||||
|
||||
`topology.yaml`:
|
||||
|
||||
```yaml
|
||||
system:
|
||||
sips:
|
||||
count: 2
|
||||
topology: ring_1d
|
||||
sip:
|
||||
cube_mesh: { w: 4, h: 4 }
|
||||
```
|
||||
|
||||
### D7. Algorithm module contract
|
||||
|
||||
Modules loaded via `cfg["module"]` must export:
|
||||
|
||||
| Name | Purpose |
|
||||
|---|---|
|
||||
| `kernel` | callable, signature `(t_ptr, n_elem, cube_w, cube_h, n_sips, sip_rank, sip_topo_kind, sip_topo_w, sip_topo_h, tl)` |
|
||||
| `kernel_args(world_size, n_elem) -> tuple` | returns the first 4 scalar args (per-tensor) |
|
||||
| `TOPO_NAME_TO_KIND: dict[str, int]` | maps `system.sips.topology` name to kernel branch code |
|
||||
| `SIP_TOPO_RING`, `SIP_TOPO_TORUS`, `SIP_TOPO_MESH` | integer constants (0, 1, 2) |
|
||||
|
||||
---
|
||||
|
||||
## Dependencies
|
||||
|
||||
- **ADR-0023**: IPCQ protocol (neighbor table, send/recv, credit return).
|
||||
- **ADR-0024**: rank = SIP launcher, `mp.spawn`, greenlet-local rank.
|
||||
- **ADR-0025**: Address-based IPCQ direction matching; extended
|
||||
`_OPPOSITE_DIR` with `global_*` pairs.
|
||||
- **ADR-0027**: Worker-wait / collective-pending drain in main scheduler.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- **Per-PE allreduce** (intra-cube PE-to-PE reduce). Out of scope — the
|
||||
workload for this algorithm is per-cube DP.
|
||||
- **Asymmetric SIP topologies** (non-square mesh/torus). `torus_2d` and
|
||||
`mesh_2d_no_wrap` require `n_sips = k²`.
|
||||
- **Pipelined chunks**: single-tile per cube, no pipelining yet.
|
||||
- **Root cube runtime election**: the kernel currently uses
|
||||
`root_cube = (mesh_h - 1) * mesh_w + (mesh_w - 1)` hardcoded to the SE
|
||||
corner. SFR wiring covers all cubes, so runtime election is a pure kernel
|
||||
change when needed.
|
||||
|
||||
---
|
||||
|
||||
## Consequences
|
||||
|
||||
### Positive
|
||||
|
||||
- **Single kernel, single install path** for all-reduce — replaces four
|
||||
removed modules (`ring`, `mesh`, `tree`, `hierarchical`).
|
||||
- **Topology-agnostic kernel**: ring / torus / mesh selected via one
|
||||
integer param, no kernel duplication.
|
||||
- **Automatic via `dist.all_reduce`**: no bench-level or user-level
|
||||
algorithm selection needed; config-driven end-to-end.
|
||||
- **Full SFR wiring**: every cube on every SIP has inter-SIP links
|
||||
available — supports future dynamic root-cube election.
|
||||
|
||||
### Negative
|
||||
|
||||
- **Not suitable for per-PE sharded tensors**: TP-layer-style tensors that
|
||||
shard within one cube across 8 PEs are not addressable by this kernel.
|
||||
Such workloads would need a separate intra-cube all-reduce path (not
|
||||
yet implemented).
|
||||
- **`configure_sfr_intercube_multisip` always wires all pe0s**: even if a
|
||||
given run only needs a subset (e.g. 1 SIP, ring only). Install cost is
|
||||
small but not zero.
|
||||
|
||||
---
|
||||
|
||||
## Affected files
|
||||
|
||||
| File | Change |
|
||||
|---|---|
|
||||
| `src/kernbench/ccl/algorithms/intercube_allreduce.py` (new) | Kernel + `_inter_sip_*` helpers + `TOPO_NAME_TO_KIND` |
|
||||
| `src/kernbench/ccl/sfr_config.py` (new) | `configure_sfr_intercube_multisip` |
|
||||
| `src/kernbench/ccl/topologies.py` | Added `torus_2d`, `mesh_2d_no_wrap` |
|
||||
| `src/kernbench/ccl/install.py` | Extended `_OPPOSITE_DIR` with `global_*` pairs |
|
||||
| `src/kernbench/runtime_api/distributed.py` | `AhbmCCLBackend` uses `configure_sfr_intercube_multisip` + appends sip_rank/topo args |
|
||||
| `ccl.yaml` | Single `intercube_allreduce` entry |
|
||||
| `topology.yaml` | Added `system.sips.topology` |
|
||||
| `benches/ccl_allreduce.py` | Row-wise cube-mesh tensor layout |
|
||||
| `tests/test_allreduce_multidevice.py` (new) | Config-driven ring/torus/mesh |
|
||||
| `tests/test_distributed_intercube_allreduce.py` (new) | Full `dist.all_reduce` path |
|
||||
| `tests/test_intercube_sfr_config.py` (new) | SFR wiring verification |
|
||||
| Removed | `ring_allreduce.py`, `mesh_allreduce.py`, `tree_allreduce.py`, `hierarchical_allreduce.py`, `hello_send.py`, `testing.py` and their tests |
|
||||
@@ -221,6 +221,8 @@ def install_ipcq(
|
||||
|
||||
_OPPOSITE_DIR = {
|
||||
"E": "W", "W": "E", "N": "S", "S": "N",
|
||||
"intra_E": "intra_W", "intra_W": "intra_E",
|
||||
"intra_N": "intra_S", "intra_S": "intra_N",
|
||||
"global_E": "global_W", "global_W": "global_E",
|
||||
"global_N": "global_S", "global_S": "global_N",
|
||||
}
|
||||
|
||||
|
Before Width: | Height: | Size: 39 KiB After Width: | Height: | Size: 43 KiB |
|
Before Width: | Height: | Size: 71 KiB After Width: | Height: | Size: 87 KiB |
|
Before Width: | Height: | Size: 38 KiB After Width: | Height: | Size: 41 KiB |
@@ -1,26 +1,4 @@
|
||||
algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns
|
||||
intercube_allreduce,ring_1d,6,8,16,256,3073.1299999999937
|
||||
intercube_allreduce,ring_1d,6,32,64,1024,3079.8799999999947
|
||||
intercube_allreduce,ring_1d,6,64,128,2048,3088.879999999992
|
||||
intercube_allreduce,ring_1d,6,128,256,4096,3106.8799999999865
|
||||
intercube_allreduce,ring_1d,6,512,1024,16384,3225.8799999999865
|
||||
intercube_allreduce,ring_1d,6,1024,2048,32768,3391.8799999999865
|
||||
intercube_allreduce,ring_1d,6,2048,4096,65536,3723.8799999999865
|
||||
intercube_allreduce,ring_1d,6,4096,8192,131072,4387.879999999965
|
||||
intercube_allreduce,ring_1d,6,8192,16384,262144,5715.879999999957
|
||||
intercube_allreduce,ring_1d,6,16384,32768,524288,8371.879999999932
|
||||
intercube_allreduce,ring_1d,6,32768,65536,1048576,13683.879999999903
|
||||
intercube_allreduce,torus_2d,6,8,16,256,2190.4799999999923
|
||||
intercube_allreduce,torus_2d,6,32,64,1024,2196.479999999993
|
||||
intercube_allreduce,torus_2d,6,64,128,2048,2204.4799999999905
|
||||
intercube_allreduce,torus_2d,6,128,256,4096,2220.479999999985
|
||||
intercube_allreduce,torus_2d,6,512,1024,16384,2325.479999999985
|
||||
intercube_allreduce,torus_2d,6,1024,2048,32768,2471.479999999985
|
||||
intercube_allreduce,torus_2d,6,2048,4096,65536,2763.479999999985
|
||||
intercube_allreduce,torus_2d,6,4096,8192,131072,3347.4799999999777
|
||||
intercube_allreduce,torus_2d,6,8192,16384,262144,4515.4799999999705
|
||||
intercube_allreduce,torus_2d,6,16384,32768,524288,6851.479999999952
|
||||
intercube_allreduce,torus_2d,6,32768,65536,1048576,11523.479999999923
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,8,16,256,3508.4249999999993
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,32,64,1024,3515.55
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,64,128,2048,3525.0499999999975
|
||||
@@ -32,3 +10,28 @@ intercube_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,4857.049999999959
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,6217.049999999945
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,8937.049999999937
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,14377.049999999872
|
||||
intercube_allreduce,mesh_2d_no_wrap,6,49152,98304,1572864,19817.049999999872
|
||||
intercube_allreduce,ring_1d,6,8,16,256,3073.1299999999937
|
||||
intercube_allreduce,ring_1d,6,32,64,1024,3079.8799999999947
|
||||
intercube_allreduce,ring_1d,6,64,128,2048,3088.879999999992
|
||||
intercube_allreduce,ring_1d,6,128,256,4096,3106.8799999999865
|
||||
intercube_allreduce,ring_1d,6,512,1024,16384,3225.8799999999865
|
||||
intercube_allreduce,ring_1d,6,1024,2048,32768,3391.8799999999865
|
||||
intercube_allreduce,ring_1d,6,2048,4096,65536,3723.8799999999865
|
||||
intercube_allreduce,ring_1d,6,4096,8192,131072,4387.879999999965
|
||||
intercube_allreduce,ring_1d,6,8192,16384,262144,5715.879999999957
|
||||
intercube_allreduce,ring_1d,6,16384,32768,524288,8371.879999999932
|
||||
intercube_allreduce,ring_1d,6,32768,65536,1048576,13683.879999999903
|
||||
intercube_allreduce,ring_1d,6,49152,98304,1572864,18995.879999999917
|
||||
intercube_allreduce,torus_2d,6,8,16,256,2190.4799999999923
|
||||
intercube_allreduce,torus_2d,6,32,64,1024,2196.479999999993
|
||||
intercube_allreduce,torus_2d,6,64,128,2048,2204.4799999999905
|
||||
intercube_allreduce,torus_2d,6,128,256,4096,2220.479999999985
|
||||
intercube_allreduce,torus_2d,6,512,1024,16384,2325.479999999985
|
||||
intercube_allreduce,torus_2d,6,1024,2048,32768,2471.479999999985
|
||||
intercube_allreduce,torus_2d,6,2048,4096,65536,2763.479999999985
|
||||
intercube_allreduce,torus_2d,6,4096,8192,131072,3347.4799999999777
|
||||
intercube_allreduce,torus_2d,6,8192,16384,262144,4515.4799999999705
|
||||
intercube_allreduce,torus_2d,6,16384,32768,524288,6851.479999999952
|
||||
intercube_allreduce,torus_2d,6,32768,65536,1048576,11523.479999999923
|
||||
intercube_allreduce,torus_2d,6,49152,98304,1572864,16195.479999999952
|
||||
|
||||
|
|
After Width: | Height: | Size: 194 KiB |
|
Before Width: | Height: | Size: 37 KiB After Width: | Height: | Size: 41 KiB |
@@ -7,11 +7,45 @@ stateful/SimPy-event-consuming and MUST NOT be shared).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from kernbench.topology.builder import resolve_topology
|
||||
|
||||
|
||||
def pytest_sessionfinish(session, exitstatus):
|
||||
"""Aggregate parametrized sweep rows into combined CSV + PNG plots.
|
||||
|
||||
Runs on the controller node only (xdist worker processes set
|
||||
``PYTEST_XDIST_WORKER``; we skip those). Idempotent — does nothing
|
||||
if no sweep rows are present (e.g., when the sweep was filtered out).
|
||||
"""
|
||||
if os.environ.get("PYTEST_XDIST_WORKER"):
|
||||
return
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
mod_path = Path(__file__).parent / "test_allreduce_multidevice.py"
|
||||
if not mod_path.exists():
|
||||
return
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_test_allreduce_multidevice_for_aggregate", mod_path,
|
||||
)
|
||||
if spec is None or spec.loader is None:
|
||||
return
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[spec.name] = mod
|
||||
try:
|
||||
spec.loader.exec_module(mod)
|
||||
agg = getattr(mod, "_aggregate_sweep_plots", None)
|
||||
if agg is not None:
|
||||
agg()
|
||||
except Exception as e:
|
||||
print(f"[conftest] sweep aggregation failed: {e}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def topology():
|
||||
"""Session-scoped parsed topology (immutable graph + spec).
|
||||
|
||||
|
Before Width: | Height: | Size: 44 KiB |
|
Before Width: | Height: | Size: 129 KiB After Width: | Height: | Size: 101 KiB |
@@ -79,13 +79,3 @@ h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,ipcq,181.659999
|
||||
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,raw,183.04000000000087
|
||||
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,ipcq,205.65999999999985
|
||||
h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,raw,207.04000000000087
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",128,ipcq,6.015000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",256,ipcq,6.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",384,ipcq,7.015000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",512,ipcq,7.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",768,ipcq,8.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",1024,ipcq,9.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",2048,ipcq,13.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",4096,ipcq,21.515000000003056
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",8192,ipcq,37.51499999999214
|
||||
h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",10240,ipcq,45.51499999999214
|
||||
|
||||
|
@@ -269,29 +269,143 @@ def test_allreduce(
|
||||
assert result["ok_cubes"] > 0
|
||||
|
||||
|
||||
# ── Latency sweep ─────────────────────────────────────────────────────
|
||||
# ── Latency sweep (parametrized + xdist-friendly) ─────────────────────
|
||||
|
||||
# avoid 16 (== n_cubes, dim_map collision). Goes up to 1 MB per SIP:
|
||||
# bytes_per_sip = n_cubes * n_elem * 2 = 32 * n_elem.
|
||||
# avoid 16 (== n_cubes, dim_map collision). Goes up to 96 KB per PE:
|
||||
# bytes_per_pe = n_elem * 2 (f16). 49152 elem * 2 = 96 KB / PE.
|
||||
_SWEEP_N_ELEM = [
|
||||
8, 32, 64, 128, 512, 1024, 2048,
|
||||
4096, 8192, 16384, 32768,
|
||||
4096, 8192, 16384, 32768, 49152,
|
||||
]
|
||||
_ELEM_BYTES_F16 = 2
|
||||
|
||||
_SWEEP_TOPOLOGIES = [
|
||||
("intercube_allreduce", "ring_1d", 6, None, None),
|
||||
("intercube_allreduce", "torus_2d", 6, 2, 3),
|
||||
("intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3),
|
||||
]
|
||||
|
||||
def test_allreduce_latency_sweep(tmp_path):
|
||||
"""Sweep n_elem across each SIP topology; record max(pe_exec_ns)
|
||||
as the critical-path kernel latency. Emits CSV + PNG plots to
|
||||
tests/allreduce_latency_plots/.
|
||||
# Shared on-disk staging dir for parametrized sweep rows. Each
|
||||
# parametrized invocation writes one JSON file here; the aggregator
|
||||
# (run from conftest.pytest_sessionfinish) reads them and emits the
|
||||
# combined CSV + PNG plots.
|
||||
_SWEEP_OUT_DIR = Path(__file__).parent / "allreduce_latency_plots"
|
||||
_SWEEP_ROWS_DIR = _SWEEP_OUT_DIR / "_rows"
|
||||
|
||||
|
||||
def _sweep_params():
|
||||
out = []
|
||||
for algorithm, sip_topology, n_sips, sip_w, sip_h in _SWEEP_TOPOLOGIES:
|
||||
for n_elem in _SWEEP_N_ELEM:
|
||||
out.append(pytest.param(
|
||||
algorithm, sip_topology, n_sips, sip_w, sip_h, n_elem,
|
||||
id=f"{sip_topology}-n_elem{n_elem}",
|
||||
))
|
||||
return out
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(),
|
||||
)
|
||||
def test_allreduce_latency_one(
|
||||
tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h, n_elem,
|
||||
):
|
||||
"""One config of the latency sweep. xdist parallelizes across params.
|
||||
|
||||
Writes a single JSON row to ``_SWEEP_ROWS_DIR``. The conftest
|
||||
sessionfinish hook aggregates rows into CSV + plots after all
|
||||
parametrized cases finish.
|
||||
"""
|
||||
import json
|
||||
|
||||
topo_path, ccl_path = _write_temp_configs(
|
||||
tmp_path, sip_topology, n_sips, algorithm,
|
||||
sip_w=sip_w, sip_h=sip_h,
|
||||
n_elem_override=n_elem,
|
||||
)
|
||||
topo = resolve_topology(topo_path)
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
result = run_allreduce(
|
||||
ctx, engine, spec,
|
||||
algorithm=algorithm, ccl_yaml=ccl_path,
|
||||
)
|
||||
assert result["ok_cubes"] > 0
|
||||
|
||||
pe_exec_vals = [
|
||||
float(tr.get("pe_exec_ns", 0.0) or 0.0)
|
||||
for _, (_, tr) in engine._results.items()
|
||||
if isinstance(tr, dict)
|
||||
]
|
||||
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||
|
||||
cm = spec["sip"]["cube_mesh"]
|
||||
n_cubes = int(cm["w"]) * int(cm["h"])
|
||||
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
|
||||
bytes_per_pe = n_elem * _ELEM_BYTES_F16
|
||||
|
||||
record = {
|
||||
"algorithm": algorithm,
|
||||
"sip_topology": sip_topology,
|
||||
"n_sips": n_sips,
|
||||
"n_elem": n_elem,
|
||||
"bytes_per_pe": bytes_per_pe,
|
||||
"bytes_per_sip": bytes_per_sip,
|
||||
"latency_ns": crit_ns,
|
||||
}
|
||||
|
||||
_SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json"
|
||||
with open(row_path, "w", encoding="utf-8") as f:
|
||||
json.dump(record, f)
|
||||
|
||||
|
||||
def _aggregate_sweep_plots() -> bool:
|
||||
"""Read all per-config rows and emit CSV + PNG plots.
|
||||
|
||||
Called by ``conftest.pytest_sessionfinish`` (controller node only).
|
||||
Returns True if any rows were aggregated, False otherwise.
|
||||
"""
|
||||
import csv
|
||||
import json
|
||||
|
||||
row_files = sorted(_SWEEP_ROWS_DIR.glob("*.json")) \
|
||||
if _SWEEP_ROWS_DIR.exists() else []
|
||||
records: list[dict] = []
|
||||
if row_files:
|
||||
for p in row_files:
|
||||
with open(p, encoding="utf-8") as f:
|
||||
records.append(json.load(f))
|
||||
else:
|
||||
# Fallback: replot from existing summary.csv (skip sweep re-run).
|
||||
summary_path = _SWEEP_OUT_DIR / "summary.csv"
|
||||
if not summary_path.exists():
|
||||
return False
|
||||
with open(summary_path, encoding="utf-8") as f:
|
||||
for row in csv.DictReader(f):
|
||||
records.append({
|
||||
"algorithm": row["algorithm"],
|
||||
"sip_topology": row["sip_topology"],
|
||||
"n_sips": int(row["n_sips"]),
|
||||
"n_elem": int(row["n_elem"]),
|
||||
"bytes_per_pe": int(row["bytes_per_pe"]),
|
||||
"bytes_per_sip": int(row["bytes_per_sip"]),
|
||||
"latency_ns": float(row["latency_ns"]),
|
||||
})
|
||||
if not records:
|
||||
return False
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.ticker import FuncFormatter
|
||||
|
||||
def _fmt_bytes(x, _pos):
|
||||
"""Format tick as B / KB / MB."""
|
||||
if x <= 0:
|
||||
return "0"
|
||||
if x >= 1024 * 1024:
|
||||
@@ -302,86 +416,27 @@ def test_allreduce_latency_sweep(tmp_path):
|
||||
|
||||
_bytes_fmt = FuncFormatter(_fmt_bytes)
|
||||
|
||||
out_dir = Path(__file__).parent / "allreduce_latency_plots"
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
records: list[dict] = []
|
||||
|
||||
# Apples-to-apples: same n_sips across all three topologies.
|
||||
for algorithm, sip_topology, n_sips, sip_w, sip_h in [
|
||||
("intercube_allreduce", "ring_1d", 6, None, None),
|
||||
("intercube_allreduce", "torus_2d", 6, 2, 3),
|
||||
("intercube_allreduce", "mesh_2d_no_wrap", 6, 2, 3),
|
||||
]:
|
||||
for n_elem in _SWEEP_N_ELEM:
|
||||
sub = tmp_path / f"{sip_topology}_{n_elem}"
|
||||
sub.mkdir()
|
||||
topo_path, ccl_path = _write_temp_configs(
|
||||
sub, sip_topology, n_sips, algorithm,
|
||||
sip_w=sip_w, sip_h=sip_h,
|
||||
n_elem_override=n_elem,
|
||||
)
|
||||
topo = resolve_topology(topo_path)
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
result = run_allreduce(
|
||||
ctx, engine, spec,
|
||||
algorithm=algorithm, ccl_yaml=ccl_path,
|
||||
)
|
||||
assert result["ok_cubes"] > 0
|
||||
|
||||
pe_exec_vals = [
|
||||
float(tr.get("pe_exec_ns", 0.0) or 0.0)
|
||||
for _, (_, tr) in engine._results.items()
|
||||
if isinstance(tr, dict)
|
||||
]
|
||||
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||
|
||||
cm = spec["sip"]["cube_mesh"]
|
||||
n_cubes = int(cm["w"]) * int(cm["h"])
|
||||
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
|
||||
# pe="replicate" + num_pes=1 → one active PE per cube owns
|
||||
# the whole cube row. Per-PE bytes == per-cube-tile bytes ==
|
||||
# per-message bytes over the IPCQ fabric.
|
||||
bytes_per_pe = n_elem * _ELEM_BYTES_F16
|
||||
|
||||
records.append({
|
||||
"algorithm": algorithm,
|
||||
"sip_topology": sip_topology,
|
||||
"n_sips": n_sips,
|
||||
"n_elem": n_elem,
|
||||
"bytes_per_pe": bytes_per_pe,
|
||||
"bytes_per_sip": bytes_per_sip,
|
||||
"latency_ns": crit_ns,
|
||||
})
|
||||
print(
|
||||
f"[{sip_topology:<16} n_sips={n_sips} n_elem={n_elem:>5} "
|
||||
f"bytes/pe={bytes_per_pe:>7} bytes/sip={bytes_per_sip:>9}] "
|
||||
f"pe_exec_max = {crit_ns:8.1f} ns"
|
||||
)
|
||||
|
||||
with open(out_dir / "summary.csv", "w", newline="", encoding="utf-8") as f:
|
||||
_SWEEP_OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(_SWEEP_OUT_DIR / "summary.csv", "w",
|
||||
newline="", encoding="utf-8") as f:
|
||||
w = csv.DictWriter(f, fieldnames=[
|
||||
"algorithm", "sip_topology", "n_sips", "n_elem",
|
||||
"bytes_per_pe", "bytes_per_sip", "latency_ns",
|
||||
])
|
||||
w.writeheader()
|
||||
for r in records:
|
||||
for r in sorted(records, key=lambda r: (
|
||||
r["sip_topology"], r["bytes_per_pe"],
|
||||
)):
|
||||
w.writerow(r)
|
||||
|
||||
topologies = sorted({r["sip_topology"] for r in records})
|
||||
# Per-topology plots, log-scale x-axis = bytes per PE.
|
||||
for topo_name in topologies:
|
||||
rs = sorted(
|
||||
[r for r in records if r["sip_topology"] == topo_name],
|
||||
key=lambda r: r["bytes_per_pe"],
|
||||
)
|
||||
if not rs:
|
||||
continue
|
||||
xs = [r["bytes_per_pe"] for r in rs]
|
||||
ys = [r["latency_ns"] for r in rs]
|
||||
title = (
|
||||
@@ -397,17 +452,20 @@ def test_allreduce_latency_sweep(tmp_path):
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||
fig.tight_layout()
|
||||
fig.savefig(out_dir / f"{topo_name}.png", dpi=120)
|
||||
fig.savefig(_SWEEP_OUT_DIR / f"{topo_name}.png", dpi=120)
|
||||
plt.close(fig)
|
||||
|
||||
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
|
||||
"mesh_2d_no_wrap": "tab:green"}
|
||||
THEORETICAL_TORUS_2D_6SIP_NS = 10600.0
|
||||
fig, ax = plt.subplots(figsize=(9, 6))
|
||||
for topo_name in topologies:
|
||||
rs = sorted(
|
||||
[r for r in records if r["sip_topology"] == topo_name],
|
||||
key=lambda r: r["bytes_per_pe"],
|
||||
)
|
||||
if not rs:
|
||||
continue
|
||||
ax.plot(
|
||||
[r["bytes_per_pe"] for r in rs],
|
||||
[r["latency_ns"] for r in rs],
|
||||
@@ -415,15 +473,378 @@ def test_allreduce_latency_sweep(tmp_path):
|
||||
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
|
||||
color=colors.get(topo_name),
|
||||
)
|
||||
ax.axhline(
|
||||
y=THEORETICAL_TORUS_2D_6SIP_NS,
|
||||
color="tab:red", linestyle="--", linewidth=1.5,
|
||||
label=f"theoretical torus_2d (6 SIPs) = "
|
||||
f"{THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns",
|
||||
)
|
||||
BYTES_96KB = 96 * 1024
|
||||
ax.axvline(
|
||||
x=BYTES_96KB, ymin=0, ymax=1,
|
||||
color="tab:red", linestyle=":", linewidth=1.2,
|
||||
)
|
||||
ax.plot(
|
||||
[BYTES_96KB], [THEORETICAL_TORUS_2D_6SIP_NS],
|
||||
marker="x", color="tab:red", markersize=10, markeredgewidth=2,
|
||||
)
|
||||
# Find simulated torus_2d latency at 96 KB (if present) for direct
|
||||
# comparison with the theoretical value.
|
||||
sim_torus_at_96kb = next(
|
||||
(r["latency_ns"] for r in records
|
||||
if r["sip_topology"] == "torus_2d" and r["bytes_per_pe"] == BYTES_96KB),
|
||||
None,
|
||||
)
|
||||
if sim_torus_at_96kb is not None:
|
||||
ax.plot(
|
||||
[BYTES_96KB], [sim_torus_at_96kb],
|
||||
marker="o", color="tab:orange",
|
||||
markersize=10, markeredgecolor="black", markeredgewidth=1.2,
|
||||
)
|
||||
ax.annotate(
|
||||
f"96 KB\n"
|
||||
f"theoretical = {THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns\n"
|
||||
f"simulated = {sim_torus_at_96kb:.0f} ns",
|
||||
xy=(BYTES_96KB, sim_torus_at_96kb),
|
||||
xytext=(10, -20), textcoords="offset points",
|
||||
color="tab:red", fontsize=9,
|
||||
)
|
||||
else:
|
||||
ax.annotate(
|
||||
f"96 KB\n→ theoretical {THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns",
|
||||
xy=(BYTES_96KB, THEORETICAL_TORUS_2D_6SIP_NS),
|
||||
xytext=(8, -20), textcoords="offset points",
|
||||
color="tab:red", fontsize=9,
|
||||
)
|
||||
ax.set_xscale("log", base=2)
|
||||
ax.set_xlabel("Bytes per PE (log scale)")
|
||||
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||
ax.set_title("Multi-device allreduce latency by topology")
|
||||
ax.grid(True, alpha=0.3)
|
||||
|
||||
# Drop 128 KB tick (overlaps visually with the explicit 96 KB marker)
|
||||
# and add 96 KB.
|
||||
BYTES_128KB = 128 * 1024
|
||||
existing_ticks = [t for t in ax.get_xticks() if int(t) != BYTES_128KB]
|
||||
if BYTES_96KB not in existing_ticks:
|
||||
existing_ticks.append(BYTES_96KB)
|
||||
ax.set_xticks(sorted(existing_ticks))
|
||||
ax.set_xlim(left=min(r["bytes_per_pe"] for r in records) / 2,
|
||||
right=BYTES_96KB * 1.5)
|
||||
ax.legend()
|
||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||
fig.tight_layout()
|
||||
fig.savefig(out_dir / "overview.png", dpi=120)
|
||||
fig.savefig(_SWEEP_OUT_DIR / "overview.png", dpi=120)
|
||||
plt.close(fig)
|
||||
|
||||
print(f"\nWrote {out_dir / 'overview.png'}")
|
||||
# Cleanup row staging dir so a partial future run doesn't pick up
|
||||
# stale rows.
|
||||
for p in row_files:
|
||||
try:
|
||||
p.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
_SWEEP_ROWS_DIR.rmdir()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
print(f"\nWrote {_SWEEP_OUT_DIR / 'overview.png'} "
|
||||
f"from {len(records)} rows")
|
||||
return True
|
||||
|
||||
|
||||
# ── Topology diagram (device-level + cube-level reduction) ────────────
|
||||
|
||||
# Convention: "rows × cols" everywhere, row-major rank assignment
|
||||
# (rank = row * n_cols + col). For the 2×3 inter-SIP grid, this means
|
||||
# 2 rows × 3 columns: SIP 0 1 2 / SIP 3 4 5.
|
||||
|
||||
_PALETTE_BG = "#fafbfd"
|
||||
_PALETTE_FRAME = "#3a3f4a"
|
||||
_PALETTE_BLUE = "#2c6fb6"
|
||||
_PALETTE_GREEN = "#2e8a4e"
|
||||
_PALETTE_TEXT = "#1f2530"
|
||||
_PALETTE_BOX_FILL = "#eaf2fb"
|
||||
_PALETTE_BOX_EDGE = "#2c4a78"
|
||||
_PALETTE_ROOT_FILL = "#ffd9b8"
|
||||
_PALETTE_ROOT_EDGE = "#bd5a14"
|
||||
|
||||
|
||||
def _arrow(ax, xy_from, xy_to, color="black", lw=1.4, alpha=1.0,
|
||||
style="-|>", curve=0.0):
|
||||
from matplotlib.patches import FancyArrowPatch
|
||||
arrow = FancyArrowPatch(
|
||||
xy_from, xy_to,
|
||||
arrowstyle=style, mutation_scale=12,
|
||||
color=color, lw=lw, alpha=alpha,
|
||||
connectionstyle=f"arc3,rad={curve}",
|
||||
)
|
||||
ax.add_patch(arrow)
|
||||
|
||||
|
||||
def _draw_sip_box(ax, cx, cy, w, h, label, *, fill=_PALETTE_BOX_FILL,
|
||||
edge=_PALETTE_BOX_EDGE, text_color=_PALETTE_TEXT,
|
||||
font=10):
|
||||
from matplotlib.patches import FancyBboxPatch
|
||||
box = FancyBboxPatch(
|
||||
(cx - w / 2, cy - h / 2), w, h,
|
||||
boxstyle="round,pad=0.02,rounding_size=0.10",
|
||||
linewidth=1.4, edgecolor=edge, facecolor=fill,
|
||||
)
|
||||
ax.add_patch(box)
|
||||
ax.text(cx, cy, label, ha="center", va="center",
|
||||
color=text_color, fontsize=font, fontweight="bold")
|
||||
|
||||
|
||||
def _frame_panel(ax, title, lim_x=10.0, lim_y=6.0):
|
||||
"""Set up a square-ish panel with a visible outer border."""
|
||||
from matplotlib.patches import FancyBboxPatch
|
||||
ax.set_xlim(0, lim_x)
|
||||
ax.set_ylim(0, lim_y)
|
||||
ax.set_aspect("equal")
|
||||
ax.axis("off")
|
||||
ax.set_facecolor(_PALETTE_BG)
|
||||
border = FancyBboxPatch(
|
||||
(0.05, 0.05), lim_x - 0.10, lim_y - 0.10,
|
||||
boxstyle="round,pad=0.01,rounding_size=0.12",
|
||||
linewidth=1.4, edgecolor=_PALETTE_FRAME, facecolor=_PALETTE_BG,
|
||||
zorder=0,
|
||||
)
|
||||
ax.add_patch(border)
|
||||
ax.set_title(title, fontsize=12, fontweight="bold",
|
||||
color=_PALETTE_TEXT, pad=8)
|
||||
|
||||
|
||||
def _draw_ring_topology(ax):
|
||||
_frame_panel(ax, "ring_1d (6 SIPs)", lim_x=10.0, lim_y=6.0)
|
||||
|
||||
xs = [1.2, 2.7, 4.2, 5.7, 7.2, 8.7]
|
||||
y = 3.1
|
||||
box_w, box_h = 1.05, 0.9
|
||||
for i, x in enumerate(xs):
|
||||
_draw_sip_box(ax, x, y, box_w, box_h, f"SIP {i}")
|
||||
# Forward ring (global_E) — adjacent neighbours, anchored to box edges.
|
||||
for i in range(5):
|
||||
_arrow(ax, (xs[i] + box_w / 2, y),
|
||||
(xs[i + 1] - box_w / 2, y),
|
||||
color=_PALETTE_BLUE, lw=1.6)
|
||||
# Wrap (SIP 5 → SIP 0). Anchor at right-CENTER of SIP 5 and
|
||||
# left-CENTER of SIP 0; arc OUTSIDE (above) the row so it does not
|
||||
# overlap any of the SIP boxes in between.
|
||||
_arrow(
|
||||
ax,
|
||||
(xs[5] + box_w / 2, y),
|
||||
(xs[0] - box_w / 2, y),
|
||||
color=_PALETTE_BLUE, lw=1.6, curve=-0.40,
|
||||
)
|
||||
ax.text(5.0, y + 2.0, "global_E (ring)", ha="center",
|
||||
color=_PALETTE_BLUE, fontsize=10, style="italic")
|
||||
ax.text(5.0, y - 1.5,
|
||||
"(global_W = reverse direction, used by the algorithm)",
|
||||
ha="center", color="gray", fontsize=8, style="italic")
|
||||
|
||||
|
||||
def _draw_grid_topology(ax, kind, *, n_rows=2, n_cols=3):
|
||||
"""kind ∈ {'torus', 'mesh'}. Lays out as n_rows × n_cols (row-major).
|
||||
|
||||
For the sweep we use 2 rows × 3 cols → SIP layout::
|
||||
|
||||
row 0: SIP 0 SIP 1 SIP 2
|
||||
row 1: SIP 3 SIP 4 SIP 5
|
||||
"""
|
||||
title = f"torus_2d ({n_rows}×{n_cols}, 6 SIPs)" if kind == "torus" \
|
||||
else f"mesh_2d_no_wrap ({n_rows}×{n_cols}, 6 SIPs)"
|
||||
_frame_panel(ax, title, lim_x=10.0, lim_y=6.0)
|
||||
|
||||
col_xs = [2.0, 5.0, 8.0] # 3 cols
|
||||
row_ys = [4.3, 1.8] # 2 rows
|
||||
box_w, box_h = 1.3, 0.95
|
||||
pos: dict[tuple[int, int], tuple[float, float]] = {}
|
||||
for r in range(n_rows):
|
||||
for c in range(n_cols):
|
||||
rank = r * n_cols + c
|
||||
x, y = col_xs[c], row_ys[r]
|
||||
pos[(r, c)] = (x, y)
|
||||
_draw_sip_box(ax, x, y, box_w, box_h, f"SIP {rank}")
|
||||
|
||||
# Row edges (E↔W) — between adjacent columns within each row.
|
||||
for r in range(n_rows):
|
||||
for c in range(n_cols - 1):
|
||||
x0, y0 = pos[(r, c)]
|
||||
x1, y1 = pos[(r, c + 1)]
|
||||
_arrow(ax, (x0 + box_w / 2, y0 + 0.10),
|
||||
(x1 - box_w / 2, y1 + 0.10),
|
||||
color=_PALETTE_BLUE, lw=1.5)
|
||||
_arrow(ax, (x1 - box_w / 2, y1 - 0.10),
|
||||
(x0 + box_w / 2, y0 - 0.10),
|
||||
color=_PALETTE_BLUE, lw=1.5)
|
||||
# Col edges (N↔S) — between adjacent rows within each column.
|
||||
for c in range(n_cols):
|
||||
for r in range(n_rows - 1):
|
||||
x0, y0 = pos[(r, c)]
|
||||
x1, y1 = pos[(r + 1, c)]
|
||||
_arrow(ax, (x0 - 0.12, y0 - box_h / 2),
|
||||
(x1 - 0.12, y1 + box_h / 2),
|
||||
color=_PALETTE_GREEN, lw=1.5)
|
||||
_arrow(ax, (x1 + 0.12, y1 + box_h / 2),
|
||||
(x0 + 0.12, y0 - box_h / 2),
|
||||
color=_PALETTE_GREEN, lw=1.5)
|
||||
# Wrap arrows for torus only — anchor to the centre of the OUTER
|
||||
# edge of the end SIPs and arc OUTSIDE the row/column so they do
|
||||
# not overlap the SIPs in between.
|
||||
if kind == "torus":
|
||||
# Row wrap: last col → first col. Top row arcs UP, bottom row
|
||||
# arcs DOWN, so each wrap sits clearly outside its own row.
|
||||
for r in range(n_rows):
|
||||
x0, y0 = pos[(r, 0)]
|
||||
x1, y1 = pos[(r, n_cols - 1)]
|
||||
curve = -0.45 if r == 0 else 0.45
|
||||
_arrow(
|
||||
ax,
|
||||
(x1 + box_w / 2, y1),
|
||||
(x0 - box_w / 2, y0),
|
||||
color=_PALETTE_BLUE, lw=1.5,
|
||||
curve=curve, alpha=0.9,
|
||||
)
|
||||
# Col wrap: last row → first row. Leftmost col arcs LEFT,
|
||||
# rightmost col arcs RIGHT. Middle col(s) get a small inline
|
||||
# marker + legend note (drawing them through the panel would
|
||||
# collide with the row arrows).
|
||||
for c in range(n_cols):
|
||||
x0, y0 = pos[(0, c)]
|
||||
x1, y1 = pos[(n_rows - 1, c)]
|
||||
if c == 0:
|
||||
curve = 0.55
|
||||
elif c == n_cols - 1:
|
||||
curve = -0.55
|
||||
else:
|
||||
continue # skip middle col — see legend note
|
||||
_arrow(
|
||||
ax,
|
||||
(x1, y1 - box_h / 2),
|
||||
(x0, y0 + box_h / 2),
|
||||
color=_PALETTE_GREEN, lw=1.5,
|
||||
curve=curve, alpha=0.9,
|
||||
)
|
||||
|
||||
ax.text(0.7, 5.6, "global_E/W (row)", color=_PALETTE_BLUE,
|
||||
fontsize=9, style="italic", fontweight="bold")
|
||||
ax.text(0.7, 5.25, "global_N/S (col)", color=_PALETTE_GREEN,
|
||||
fontsize=9, style="italic", fontweight="bold")
|
||||
ax.text(0.7, 4.92,
|
||||
"wrap = torus" if kind == "torus" else "no wrap = mesh",
|
||||
color="gray", fontsize=8, style="italic")
|
||||
if kind == "torus" and n_cols > 2:
|
||||
ax.text(0.7, 0.3,
|
||||
"(middle-col wrap omitted for clarity — every row "
|
||||
"and every column wraps)",
|
||||
color="gray", fontsize=7.5, style="italic")
|
||||
|
||||
|
||||
def _draw_cube_reduction(ax):
|
||||
"""4×4 cube grid inside SIP 0 — compact layout with phase legend."""
|
||||
from matplotlib.patches import Rectangle
|
||||
_frame_panel(ax, "Cube-level reduction inside SIP 0 (4×4 cubes)",
|
||||
lim_x=10.0, lim_y=6.0)
|
||||
|
||||
cube_w = 0.65
|
||||
cube_gap = 0.18
|
||||
# Center the 4×4 grid in the left half of the panel.
|
||||
grid_total = 4 * cube_w + 3 * cube_gap
|
||||
grid_x0 = 0.7
|
||||
grid_y0 = 0.7
|
||||
centers: dict[tuple[int, int], tuple[float, float]] = {}
|
||||
for r in range(4):
|
||||
for c in range(4):
|
||||
cx = grid_x0 + c * (cube_w + cube_gap) + cube_w / 2
|
||||
cy = grid_y0 + (3 - r) * (cube_w + cube_gap) + cube_w / 2
|
||||
centers[(r, c)] = (cx, cy)
|
||||
cube_id = r * 4 + c
|
||||
is_root = (r == 3 and c == 3)
|
||||
face = _PALETTE_ROOT_FILL if is_root else _PALETTE_BOX_FILL
|
||||
edge = _PALETTE_ROOT_EDGE if is_root else _PALETTE_BOX_EDGE
|
||||
rect = Rectangle(
|
||||
(cx - cube_w / 2, cy - cube_w / 2), cube_w, cube_w,
|
||||
linewidth=1.2, edgecolor=edge, facecolor=face,
|
||||
)
|
||||
ax.add_patch(rect)
|
||||
label = f"c{cube_id}"
|
||||
ax.text(cx, cy, label, ha="center", va="center",
|
||||
fontsize=7.5, fontweight="bold",
|
||||
color=_PALETTE_ROOT_EDGE if is_root
|
||||
else _PALETTE_TEXT)
|
||||
|
||||
# Phase 1: row reduce W→E.
|
||||
for r in range(4):
|
||||
for c in range(3):
|
||||
x0, y0 = centers[(r, c)]
|
||||
x1, y1 = centers[(r, c + 1)]
|
||||
_arrow(ax, (x0 + cube_w / 2, y0), (x1 - cube_w / 2, y1),
|
||||
color=_PALETTE_BLUE, lw=1.5)
|
||||
# Phase 2: col reduce N→S along rightmost column.
|
||||
for r in range(3):
|
||||
x0, y0 = centers[(r, 3)]
|
||||
x1, y1 = centers[(r + 1, 3)]
|
||||
_arrow(ax, (x0, y0 - cube_w / 2), (x1, y1 + cube_w / 2),
|
||||
color=_PALETTE_GREEN, lw=1.7)
|
||||
|
||||
# Phase legend on the right side.
|
||||
legend_x = grid_x0 + grid_total + 0.55
|
||||
ax.text(legend_x, 5.0, "Phase 1: row reduce (W → E)",
|
||||
color=_PALETTE_BLUE, fontsize=10, fontweight="bold")
|
||||
ax.text(legend_x, 4.55, "Phase 2: col reduce (N → S, rightmost col)",
|
||||
color=_PALETTE_GREEN, fontsize=10, fontweight="bold")
|
||||
ax.text(legend_x, 4.10, "Phase 3: inter-SIP exchange at root cube",
|
||||
color=_PALETTE_ROOT_EDGE, fontsize=10, fontweight="bold")
|
||||
ax.text(legend_x, 3.65, "Phase 4: col broadcast (S → N)",
|
||||
color=_PALETTE_GREEN, fontsize=10, style="italic")
|
||||
ax.text(legend_x, 3.20, "Phase 5: row broadcast (E → W)",
|
||||
color=_PALETTE_BLUE, fontsize=10, style="italic")
|
||||
ax.text(legend_x, 2.55,
|
||||
"(broadcast phases reverse phases 2 & 1)",
|
||||
color="gray", fontsize=8.5, style="italic")
|
||||
ax.text(legend_x, 1.7,
|
||||
"Root cube (c15, bottom-right) is the only\n"
|
||||
"cube that performs the inter-SIP exchange.",
|
||||
color=_PALETTE_ROOT_EDGE, fontsize=9, style="italic")
|
||||
|
||||
|
||||
def emit_topology_diagram() -> str:
|
||||
"""Emit a 2×2-panel topology diagram into allreduce_latency_plots/.
|
||||
|
||||
Top row: ring_1d | torus_2d (2×3)
|
||||
Bot row: mesh_2d_no_wrap (2×3) | cube-level reduction in SIP 0
|
||||
"""
|
||||
import matplotlib.gridspec as gridspec
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
_SWEEP_OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
fig = plt.figure(figsize=(16, 10), facecolor="white")
|
||||
gs = gridspec.GridSpec(2, 2, figure=fig, hspace=0.30, wspace=0.10)
|
||||
ax_ring = fig.add_subplot(gs[0, 0])
|
||||
ax_torus = fig.add_subplot(gs[0, 1])
|
||||
ax_mesh = fig.add_subplot(gs[1, 0])
|
||||
ax_cube = fig.add_subplot(gs[1, 1])
|
||||
|
||||
_draw_ring_topology(ax_ring)
|
||||
_draw_grid_topology(ax_torus, "torus", n_rows=2, n_cols=3)
|
||||
_draw_grid_topology(ax_mesh, "mesh", n_rows=2, n_cols=3)
|
||||
_draw_cube_reduction(ax_cube)
|
||||
|
||||
fig.suptitle(
|
||||
"Allreduce topology — device-level (top: ring, torus, mesh) "
|
||||
"and cube-level reduction in SIP 0",
|
||||
fontsize=14, fontweight="bold", color=_PALETTE_TEXT, y=0.98,
|
||||
)
|
||||
out_path = _SWEEP_OUT_DIR / "topology.png"
|
||||
fig.savefig(out_path, dpi=130, bbox_inches="tight",
|
||||
facecolor=fig.get_facecolor())
|
||||
plt.close(fig)
|
||||
return str(out_path)
|
||||
|
||||
|
||||
def test_emit_topology_diagram():
|
||||
"""Emit topology.png alongside the sweep plots. Pure plotting; no sim."""
|
||||
out = emit_topology_diagram()
|
||||
assert Path(out).exists()
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
"""Test that tl.recv() (no direction) works under the mock runtime
|
||||
and the SimPy PE_IPCQ component (ADR-0023 D4 weak fairness)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import numpy as np
|
||||
|
||||
from kernbench.ccl.testing import run_kernel_in_mock
|
||||
|
||||
|
||||
def kernel_round_robin(t_ptr, n_elem, tl):
|
||||
"""Each PE sends one tile E then receives N-1 tiles via round-robin.
|
||||
Uses TensorHandle math (PE_MATH) so Phase 2 produces correct HBM
|
||||
contents under SimPy + op_log replay."""
|
||||
rank = tl.program_id(axis=0)
|
||||
world_size = tl.num_programs(axis=0)
|
||||
nbytes = n_elem * 2
|
||||
|
||||
pe_addr = t_ptr + rank * nbytes
|
||||
acc = tl.load(pe_addr, shape=(n_elem,), dtype="f16")
|
||||
current = acc
|
||||
|
||||
for _step in range(world_size - 1):
|
||||
tl.send(dir="E", src=current)
|
||||
# No direction → round-robin
|
||||
recv = tl.recv(shape=(n_elem,), dtype="f16")
|
||||
acc = acc + recv
|
||||
current = recv # forward W's tile to E next round
|
||||
|
||||
tl.store(pe_addr, acc)
|
||||
|
||||
|
||||
def test_round_robin_recv_mock_runtime():
|
||||
n_elem = 8
|
||||
inputs = [
|
||||
np.full((n_elem,), float(r + 1), dtype=np.float16)
|
||||
for r in range(4)
|
||||
]
|
||||
expected = sum(inputs) # [10,...]
|
||||
|
||||
outputs = run_kernel_in_mock(
|
||||
kernel_fn=kernel_round_robin,
|
||||
world_size=4,
|
||||
topology="ring_1d",
|
||||
inputs=inputs,
|
||||
kernel_args=(n_elem,),
|
||||
)
|
||||
for r in range(4):
|
||||
assert np.allclose(outputs[r], expected)
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Tests for configure_sfr_intercube_multisip neighbor table wiring.
|
||||
|
||||
Verifies that IPCQ neighbor tables are correctly installed for
|
||||
intercube (pe0, 4×4 mesh N/S/E/W) + inter-SIP (pe0, all cubes,
|
||||
global_E/global_W) communication.
|
||||
Verifies full IPCQ hardware wiring (independent of DPPolicy):
|
||||
- intra-cube (2×4 PE grid) → intra_N/S/E/W
|
||||
- intercube same-lane → N/S/E/W
|
||||
- inter-SIP same-(cube, pe) → global_N/S/E/W
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -16,6 +17,7 @@ from kernbench.topology.builder import resolve_topology
|
||||
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||
|
||||
N_CUBES = 16
|
||||
PES_PER_CUBE = 8
|
||||
|
||||
|
||||
def _engine_and_spec():
|
||||
@@ -36,78 +38,102 @@ class TestConfigureSfrNeighborTables:
|
||||
plan = configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
n_sips = int(spec["system"]["sips"]["count"])
|
||||
assert plan["world_size"] == n_sips * N_CUBES
|
||||
assert len(plan["rank_to_pe"]) == n_sips * N_CUBES
|
||||
for pe_idx, (sip, cube, pe) in enumerate(plan["rank_to_pe"]):
|
||||
assert pe == 0, f"pe_idx {pe_idx}: pe must be 0, got {pe}"
|
||||
expected = n_sips * N_CUBES * PES_PER_CUBE
|
||||
assert plan["world_size"] == expected
|
||||
assert len(plan["rank_to_pe"]) == expected
|
||||
|
||||
def test_corner_cube0_has_E_and_S_only(self):
|
||||
"""Cube 0 (row=0, col=0) is NW corner: only E and S neighbors."""
|
||||
# ── Intra-cube (intra_N/S/E/W) ────────────────────────────────
|
||||
|
||||
def test_pe0_intra_cube_has_intra_E_and_intra_S(self):
|
||||
"""pe0 is NW of the 2×4 PE grid: intra_E=pe1, intra_S=pe4."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
ipcq = engine._components["sip0.cube0.pe0.pe_ipcq"]
|
||||
qp = ipcq.queue_pairs
|
||||
assert "E" in qp, "cube 0 must have E neighbor"
|
||||
assert "S" in qp, "cube 0 must have S neighbor"
|
||||
assert "W" not in qp, "cube 0 (col=0) must NOT have W neighbor"
|
||||
assert "N" not in qp, "cube 0 (row=0) must NOT have N neighbor"
|
||||
qp = engine._components["sip0.cube0.pe0.pe_ipcq"].queue_pairs
|
||||
assert "intra_E" in qp
|
||||
assert qp["intra_E"]["peer"].pe == 1
|
||||
assert "intra_S" in qp
|
||||
assert qp["intra_S"]["peer"].pe == 4
|
||||
assert "intra_W" not in qp
|
||||
assert "intra_N" not in qp
|
||||
|
||||
def test_pe5_intra_cube_has_all_four(self):
|
||||
"""pe5 (row=1, col=1 in 2×4 grid) has all 4 intra directions.
|
||||
|
||||
Intra neighbors: intra_N=pe1, intra_E=pe6, intra_W=pe4,
|
||||
intra_S not present (row=1 is bottom row).
|
||||
"""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
qp = engine._components["sip0.cube0.pe5.pe_ipcq"].queue_pairs
|
||||
assert qp["intra_N"]["peer"].pe == 1
|
||||
assert qp["intra_E"]["peer"].pe == 6
|
||||
assert qp["intra_W"]["peer"].pe == 4
|
||||
assert "intra_S" not in qp # bottom row
|
||||
|
||||
# ── Intercube same-lane (N/S/E/W) ─────────────────────────────
|
||||
|
||||
def test_corner_cube0_pe0_has_intercube_E_and_S(self):
|
||||
"""Cube 0 (NW mesh corner): intercube E→cube1, S→cube4."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
qp = engine._components["sip0.cube0.pe0.pe_ipcq"].queue_pairs
|
||||
assert qp["E"]["peer"].cube == 1
|
||||
assert qp["E"]["peer"].pe == 0 # same-lane
|
||||
assert qp["S"]["peer"].cube == 4
|
||||
assert qp["S"]["peer"].pe == 0
|
||||
assert "W" not in qp, "cube 0 has no west neighbor"
|
||||
assert "N" not in qp, "cube 0 has no north neighbor"
|
||||
|
||||
def test_interior_cube5_has_all_four(self):
|
||||
"""Cube 5 (row=1, col=1) is interior: N/S/E/W all present."""
|
||||
def test_interior_cube5_pe3_has_all_four_intercube_same_lane(self):
|
||||
"""Cube 5 interior, pe3: intercube N/S/E/W all present, same-lane."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
ipcq = engine._components["sip0.cube5.pe0.pe_ipcq"]
|
||||
qp = ipcq.queue_pairs
|
||||
assert qp["N"]["peer"].cube == 1
|
||||
assert qp["S"]["peer"].cube == 9
|
||||
assert qp["E"]["peer"].cube == 6
|
||||
assert qp["W"]["peer"].cube == 4
|
||||
qp = engine._components["sip0.cube5.pe3.pe_ipcq"].queue_pairs
|
||||
for d, expected_cube in [("N", 1), ("S", 9), ("E", 6), ("W", 4)]:
|
||||
assert qp[d]["peer"].cube == expected_cube
|
||||
assert qp[d]["peer"].pe == 3 # same-lane
|
||||
|
||||
def test_root_cube15_has_inter_sip(self):
|
||||
"""Cube 15 (root, SE corner) has N, W + global_E/global_W."""
|
||||
def test_all_pes_have_intercube_wiring(self):
|
||||
"""Every PE on every interior cube has intercube same-lane wiring."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
ipcq0 = engine._components["sip0.cube15.pe0.pe_ipcq"]
|
||||
qp0 = ipcq0.queue_pairs
|
||||
assert "N" in qp0
|
||||
assert "W" in qp0
|
||||
assert "E" not in qp0, "cube 15 (col=3) must NOT have E"
|
||||
assert "S" not in qp0, "cube 15 (row=3) must NOT have S"
|
||||
assert "global_E" in qp0, "root cube must have global_E"
|
||||
assert "global_W" in qp0, "root cube must have global_W"
|
||||
assert qp0["global_E"]["peer"].sip == 1
|
||||
assert qp0["global_E"]["peer"].cube == 15
|
||||
|
||||
ipcq1 = engine._components["sip1.cube15.pe0.pe_ipcq"]
|
||||
qp1 = ipcq1.queue_pairs
|
||||
assert qp1["global_E"]["peer"].sip == 0
|
||||
assert qp1["global_E"]["peer"].cube == 15
|
||||
|
||||
def test_all_cubes_have_inter_sip(self):
|
||||
"""ALL cubes (not just root) are wired for inter-SIP."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
root_cube = int(cfg.get("root_cube", N_CUBES - 1))
|
||||
for cube_id in range(N_CUBES):
|
||||
ipcq = engine._components[f"sip0.cube{cube_id}.pe0.pe_ipcq"]
|
||||
qp = ipcq.queue_pairs
|
||||
assert "global_E" in qp, (
|
||||
f"sip0.cube{cube_id}.pe0 missing global_E"
|
||||
)
|
||||
assert "global_W" in qp, (
|
||||
f"sip0.cube{cube_id}.pe0 missing global_W"
|
||||
)
|
||||
if cube_id == root_cube:
|
||||
assert qp["global_E"]["peer"].sip != 0, (
|
||||
f"root cube {root_cube} global_E must point to another SIP"
|
||||
# Interior cube 5: every PE should have N/S/E/W same-lane.
|
||||
for pe in range(PES_PER_CUBE):
|
||||
qp = engine._components[f"sip0.cube5.pe{pe}.pe_ipcq"].queue_pairs
|
||||
for d in ("N", "S", "E", "W"):
|
||||
assert d in qp, f"sip0.cube5.pe{pe} missing intercube {d}"
|
||||
assert qp[d]["peer"].pe == pe, (
|
||||
f"sip0.cube5.pe{pe} {d} not same-lane"
|
||||
)
|
||||
|
||||
# ── Inter-SIP (global_*) ──────────────────────────────────────
|
||||
|
||||
def test_every_pe_on_every_cube_has_inter_sip(self):
|
||||
"""All PEs on all cubes wired for inter-SIP via global_*."""
|
||||
engine, spec = _engine_and_spec()
|
||||
cfg = _merged_cfg()
|
||||
configure_sfr_intercube_multisip(engine, spec, cfg)
|
||||
|
||||
for cube_id in range(N_CUBES):
|
||||
for pe in range(PES_PER_CUBE):
|
||||
qp = engine._components[
|
||||
f"sip0.cube{cube_id}.pe{pe}.pe_ipcq"
|
||||
].queue_pairs
|
||||
assert "global_E" in qp, (
|
||||
f"sip0.cube{cube_id}.pe{pe} missing global_E"
|
||||
)
|
||||
assert "global_W" in qp
|
||||
# Peer must be same (cube, pe) on another SIP.
|
||||
assert qp["global_E"]["peer"].sip == 1
|
||||
assert qp["global_E"]["peer"].cube == cube_id
|
||||
assert qp["global_E"]["peer"].pe == pe
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
"""PE-to-PE latency sweep across hop types and data sizes.
|
||||
|
||||
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for five
|
||||
Compares IPCQ send/recv vs raw-DMA (tl.load + tl.store) latency for four
|
||||
hop types:
|
||||
|
||||
H1 Intra-cube horizontal pe0 → pe1
|
||||
H2 Intra-cube vertical pe0 → pe4
|
||||
H3 Inter-cube horizontal sip0.cube0.pe0 → sip0.cube1.pe0
|
||||
H4 Inter-cube vertical sip0.cube0.pe0 → sip0.cube4.pe0
|
||||
H5 Inter-SIP sip0.cube0.pe0 → sip1.cube0.pe0 (IPCQ only —
|
||||
raw needs
|
||||
cross-SIP MMU)
|
||||
|
||||
Sizes: 128..10240 bytes. Emits PNGs with both lines plus a CSV.
|
||||
"""
|
||||
@@ -48,7 +45,7 @@ class Hop:
|
||||
dst: tuple[int, int, int]
|
||||
send_dir: str
|
||||
recv_dir: str
|
||||
supports_raw: bool # False for cross-SIP (DPPolicy intra-device only)
|
||||
supports_raw: bool
|
||||
|
||||
|
||||
HOPS = [
|
||||
@@ -60,8 +57,6 @@ HOPS = [
|
||||
(0, 0, 0), (0, 1, 0), "E", "W", True),
|
||||
Hop("h4_inter_cube_vertical", "Inter-cube vertical (cube0 to cube4)",
|
||||
(0, 0, 0), (0, 4, 0), "S", "N", True),
|
||||
Hop("h5_inter_sip", "Inter-SIP (sip0 to sip1, same cube/pe)",
|
||||
(0, 0, 0), (1, 0, 0), "global_E", "global_W", False),
|
||||
]
|
||||
|
||||
|
||||
@@ -251,12 +246,6 @@ def _plot_per_hop(records, hop: Hop, path: Path) -> None:
|
||||
[r["total_ns"] for r in raw],
|
||||
marker="s", label="Raw DMA (load+store)", color="tab:orange",
|
||||
)
|
||||
else:
|
||||
ax.text(
|
||||
0.98, 0.02, "(Raw DMA unavailable for cross-SIP)",
|
||||
transform=ax.transAxes, ha="right", va="bottom",
|
||||
fontsize=9, color="gray",
|
||||
)
|
||||
ax.set_xlabel("Data size (bytes)")
|
||||
ax.set_ylabel("Latency (ns)")
|
||||
ax.set_title(hop.label)
|
||||
@@ -270,7 +259,7 @@ def _plot_per_hop(records, hop: Hop, path: Path) -> None:
|
||||
def _plot_overview(records, path: Path) -> None:
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
fig, axes = plt.subplots(2, 3, figsize=(16, 9))
|
||||
fig, axes = plt.subplots(2, 2, figsize=(13, 9))
|
||||
axes = axes.flatten()
|
||||
for i, hop in enumerate(HOPS):
|
||||
ax = axes[i]
|
||||
|
||||