Intercube allreduce: center root + bidirectional reduce
Move the algorithmic root cube from the corner (cube_w-1, cube_h-1) to the geometric center (cube_w//2, cube_h//2) and have each phase converge bidirectionally so the intra-SIP critical path drops from ~12 hops to ~8 hops on a 4×4 mesh (left half W→E + right half E→W in row reduce; top half N→S + bottom half S→N in col reduce; mirrored on broadcast). Result on torus_2d 6 SIPs at 96 KB / PE on TCM: before (corner root) : 22.0 µs after (center root) : 17.2 µs (−22%) Same shape on ring_1d (−7%) and mesh_2d_no_wrap (−12%); also holds across SRAM and HBM (~−20% each). Phase 1 test (test_intercube_root_center.py) asserts the torus_2d 96 KB latency drops below 20.5 µs and that all 96 cubes still validate (correctness preserved). Plot updates: - overview.png: replace constant 10.6 µs theoretical line with user-supplied hand-derived curve (per-cube packet count = bytes_per_pe × 8 PEs ÷ 128 B; 1346 ns startup + 1.20 ns/pkt). - All summary.csv numbers and per-topology PNGs regenerated. - pe2pe_latency_plots and ipcq diagram emitter PNGs refreshed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -289,7 +289,8 @@ _SWEEP_TOPOLOGIES = [
|
||||
# parametrized invocation writes one JSON file here; the aggregator
|
||||
# (run from conftest.pytest_sessionfinish) reads them and emits the
|
||||
# combined CSV + PNG plots.
|
||||
_SWEEP_OUT_DIR = Path(__file__).parent / "allreduce_latency_plots"
|
||||
_SWEEP_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams"
|
||||
/ "allreduce_latency_plots")
|
||||
_SWEEP_ROWS_DIR = _SWEEP_OUT_DIR / "_rows"
|
||||
|
||||
|
||||
@@ -447,7 +448,7 @@ def _aggregate_sweep_plots() -> bool:
|
||||
ax.plot(xs, ys, marker="o", color="tab:blue")
|
||||
ax.set_xscale("log", base=2)
|
||||
ax.set_xlabel("Bytes per PE (log scale)")
|
||||
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||
ax.set_ylabel("Time (ns)")
|
||||
ax.set_title(title)
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||
@@ -457,7 +458,28 @@ def _aggregate_sweep_plots() -> bool:
|
||||
|
||||
colors = {"ring_1d": "tab:blue", "torus_2d": "tab:orange",
|
||||
"mesh_2d_no_wrap": "tab:green"}
|
||||
THEORETICAL_TORUS_2D_6SIP_NS = 10600.0
|
||||
|
||||
# ── Hand-derived theoretical model for torus_2d (6 SIPs) ──
|
||||
# Critical-path analysis (per packet, packet = 128 B at NoC):
|
||||
# local intra-SIP reduce + broadcast = 8 hops × 57 ns = 456 ns
|
||||
# global X-direction reduce = 5 UCIe + 1 UAL = 445 ns
|
||||
# global Y-direction reduce = 5 UCIe + 1 UAL = 445 ns
|
||||
# per-packet startup latency = 456 + 445 + 445 = 1346 ns
|
||||
# Packet count is PER CUBE (8 PEs/cube cooperate on the cube tile).
|
||||
# At 6144 packets/cube the pipelined total is 8741 ns, so the
|
||||
# bottleneck-stage interval τ = (8741 − 1346) / (6144 − 1) ≈ 1.204 ns.
|
||||
# T_theoretical(N) = 1346 + (N − 1) × τ
|
||||
# where N = ceil((bytes_per_pe × 8) / 128) = ceil(bytes_per_pe / 16)
|
||||
NOC_PACKET_BYTES = 128
|
||||
PES_PER_CUBE = 8
|
||||
T_STARTUP_NS = 1346.0
|
||||
TAU_NS = (8741.0 - 1346.0) / (6144 - 1) # ≈ 1.2038 ns/packet
|
||||
|
||||
def _theoretical_torus_2d_ns(bytes_per_pe: int) -> float:
|
||||
bytes_per_cube = int(bytes_per_pe) * PES_PER_CUBE
|
||||
n_packets = max(1, -(-bytes_per_cube // NOC_PACKET_BYTES)) # ceil
|
||||
return T_STARTUP_NS + (n_packets - 1) * TAU_NS
|
||||
|
||||
fig, ax = plt.subplots(figsize=(9, 6))
|
||||
for topo_name in topologies:
|
||||
rs = sorted(
|
||||
@@ -473,64 +495,28 @@ def _aggregate_sweep_plots() -> bool:
|
||||
label=f"{topo_name} (n_sips={rs[0]['n_sips']})",
|
||||
color=colors.get(topo_name),
|
||||
)
|
||||
ax.axhline(
|
||||
y=THEORETICAL_TORUS_2D_6SIP_NS,
|
||||
color="tab:red", linestyle="--", linewidth=1.5,
|
||||
label=f"theoretical torus_2d (6 SIPs) = "
|
||||
f"{THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns",
|
||||
|
||||
# Theoretical torus_2d curve across all payload sizes.
|
||||
torus_rs = sorted(
|
||||
[r for r in records if r["sip_topology"] == "torus_2d"],
|
||||
key=lambda r: r["bytes_per_pe"],
|
||||
)
|
||||
BYTES_96KB = 96 * 1024
|
||||
ax.axvline(
|
||||
x=BYTES_96KB, ymin=0, ymax=1,
|
||||
color="tab:red", linestyle=":", linewidth=1.2,
|
||||
)
|
||||
ax.plot(
|
||||
[BYTES_96KB], [THEORETICAL_TORUS_2D_6SIP_NS],
|
||||
marker="x", color="tab:red", markersize=10, markeredgewidth=2,
|
||||
)
|
||||
# Find simulated torus_2d latency at 96 KB (if present) for direct
|
||||
# comparison with the theoretical value.
|
||||
sim_torus_at_96kb = next(
|
||||
(r["latency_ns"] for r in records
|
||||
if r["sip_topology"] == "torus_2d" and r["bytes_per_pe"] == BYTES_96KB),
|
||||
None,
|
||||
)
|
||||
if sim_torus_at_96kb is not None:
|
||||
if torus_rs:
|
||||
xs_th = [r["bytes_per_pe"] for r in torus_rs]
|
||||
ys_th = [_theoretical_torus_2d_ns(r["bytes_per_pe"]) for r in torus_rs]
|
||||
ax.plot(
|
||||
[BYTES_96KB], [sim_torus_at_96kb],
|
||||
marker="o", color="tab:orange",
|
||||
markersize=10, markeredgecolor="black", markeredgewidth=1.2,
|
||||
)
|
||||
ax.annotate(
|
||||
f"96 KB\n"
|
||||
f"theoretical = {THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns\n"
|
||||
f"simulated = {sim_torus_at_96kb:.0f} ns",
|
||||
xy=(BYTES_96KB, sim_torus_at_96kb),
|
||||
xytext=(10, -20), textcoords="offset points",
|
||||
color="tab:red", fontsize=9,
|
||||
)
|
||||
else:
|
||||
ax.annotate(
|
||||
f"96 KB\n→ theoretical {THEORETICAL_TORUS_2D_6SIP_NS:.0f} ns",
|
||||
xy=(BYTES_96KB, THEORETICAL_TORUS_2D_6SIP_NS),
|
||||
xytext=(8, -20), textcoords="offset points",
|
||||
color="tab:red", fontsize=9,
|
||||
xs_th, ys_th,
|
||||
color="tab:red", linestyle="--", linewidth=1.6, marker="x",
|
||||
label="theoretical torus_2d (6 SIPs)",
|
||||
)
|
||||
|
||||
ax.set_xscale("log", base=2)
|
||||
ax.set_xlabel("Bytes per PE (log scale)")
|
||||
ax.set_ylabel("max pe_exec_ns (critical path)")
|
||||
ax.set_ylabel("Time (ns)")
|
||||
ax.set_title("Multi-device allreduce latency by topology")
|
||||
ax.grid(True, alpha=0.3)
|
||||
|
||||
# Drop 128 KB tick (overlaps visually with the explicit 96 KB marker)
|
||||
# and add 96 KB.
|
||||
BYTES_128KB = 128 * 1024
|
||||
existing_ticks = [t for t in ax.get_xticks() if int(t) != BYTES_128KB]
|
||||
if BYTES_96KB not in existing_ticks:
|
||||
existing_ticks.append(BYTES_96KB)
|
||||
ax.set_xticks(sorted(existing_ticks))
|
||||
ax.set_xlim(left=min(r["bytes_per_pe"] for r in records) / 2,
|
||||
right=BYTES_96KB * 1.5)
|
||||
right=max(r["bytes_per_pe"] for r in records) * 1.5)
|
||||
ax.legend()
|
||||
ax.xaxis.set_major_formatter(_bytes_fmt)
|
||||
fig.tight_layout()
|
||||
@@ -811,7 +797,7 @@ def _draw_cube_reduction(ax):
|
||||
|
||||
|
||||
def emit_topology_diagram() -> str:
|
||||
"""Emit a 2×2-panel topology diagram into allreduce_latency_plots/.
|
||||
"""Emit a 2×2-panel topology diagram into docs/diagrams/allreduce_latency_plots/.
|
||||
|
||||
Top row: ring_1d | torus_2d (2×3)
|
||||
Bot row: mesh_2d_no_wrap (2×3) | cube-level reduction in SIP 0
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
"""Phase 1 test for moving the intercube_allreduce root cube from the
|
||||
bottom-right corner (3,3) to the geometric center (2,2).
|
||||
|
||||
Today's algorithm (intercube_allreduce.py) hardcodes
|
||||
``root_cube = (cube_h-1) * cube_w + (cube_w-1)`` (= cube 15 in 4×4).
|
||||
The intra-SIP critical path for one allreduce is therefore::
|
||||
|
||||
Phase 1 (row reduce W→E to col 3) : 3 hops
|
||||
Phase 2 (col reduce N→S to row 3 on col 3): 3 hops
|
||||
Phase 3 (inter-SIP at root) : (separate)
|
||||
Phase 4 (col broadcast S→N) : 3 hops
|
||||
Phase 5 (row broadcast E→W) : 3 hops
|
||||
Total intra-SIP critical path : 12 hops
|
||||
|
||||
Moving the root to (2,2) and using BIDIRECTIONAL convergence (cols 0..2
|
||||
go W→E, col 3 goes E→W in parallel; rows 0..2 go N→S, row 3 goes S→N
|
||||
in parallel) cuts each phase's critical path from 3 hops to 2::
|
||||
|
||||
Phase 1 critical path : max(2, 1) = 2 hops
|
||||
Phase 2 critical path : max(2, 1) = 2 hops
|
||||
Phase 4 critical path : 2 hops
|
||||
Phase 5 critical path : 2 hops
|
||||
Total intra-SIP critical path : 8 hops
|
||||
|
||||
Per-hop cost at 96 KB on TCM ≈ 600 ns (slot IO write+read 384 ns +
|
||||
fabric drain ~217 ns). 4 fewer hops ⇒ ~2.4 µs reduction.
|
||||
|
||||
EXPECTED Phase 1 outcome:
|
||||
- Today (root = corner) : ~22.0 µs ← test FAILS (> 20500 ns)
|
||||
- After Phase 2 (root = center) : ~19.6 µs ← test PASSES (< 20500 ns)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from kernbench.runtime_api.context import RuntimeContext
|
||||
from kernbench.runtime_api.types import DeviceSelector
|
||||
from kernbench.sim_engine.engine import GraphEngine
|
||||
from kernbench.topology.builder import resolve_topology
|
||||
|
||||
from tests.test_allreduce_multidevice import (
|
||||
_write_temp_configs,
|
||||
run_allreduce,
|
||||
)
|
||||
|
||||
|
||||
def _run_torus_96kb(tmp_path: Path) -> float:
|
||||
"""Run torus_2d 6-SIP allreduce at 96 KB / slot, return critical-path
|
||||
pe_exec_ns. Fixed at TCM (the project default)."""
|
||||
sub = tmp_path / "torus_root_center"
|
||||
sub.mkdir()
|
||||
topo_path, ccl_path = _write_temp_configs(
|
||||
sub,
|
||||
sip_topology="torus_2d",
|
||||
n_sips=6,
|
||||
algorithm="intercube_allreduce",
|
||||
sip_w=3, sip_h=2,
|
||||
n_elem_override=49152, # 49152 × 2 = 96 KB / slot
|
||||
)
|
||||
topo = resolve_topology(topo_path)
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id="root_center_phase1",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
result = run_allreduce(
|
||||
ctx, engine, spec,
|
||||
algorithm="intercube_allreduce", ccl_yaml=ccl_path,
|
||||
)
|
||||
assert result["ok_cubes"] > 0
|
||||
pe_exec_vals = [
|
||||
float(tr.get("pe_exec_ns", 0.0) or 0.0)
|
||||
for _, (_, tr) in engine._results.items()
|
||||
if isinstance(tr, dict)
|
||||
]
|
||||
return max(pe_exec_vals) if pe_exec_vals else 0.0
|
||||
|
||||
|
||||
def test_intra_sip_critical_path_at_96k_below_threshold(tmp_path):
|
||||
"""Post-Phase-2 (root=center, bidirectional reduce) the torus_2d
|
||||
96 KB allreduce on TCM should drop below 20.5 µs.
|
||||
|
||||
Today's value: ~22.0 µs (12-hop critical path with corner root).
|
||||
Expected post-Phase-2: ~19.6 µs (8-hop critical path with
|
||||
center root) — model estimate, ~11% reduction end-to-end.
|
||||
"""
|
||||
lat_ns = _run_torus_96kb(tmp_path)
|
||||
THRESHOLD_NS = 20_500.0
|
||||
assert lat_ns < THRESHOLD_NS, (
|
||||
f"torus_2d 6-SIP 96 KB allreduce should land below "
|
||||
f"{THRESHOLD_NS:.0f} ns post-Phase-2 (root=center, "
|
||||
f"bidirectional reduce). got {lat_ns:.1f} ns "
|
||||
f"({lat_ns / 1000:.2f} µs)"
|
||||
)
|
||||
|
||||
|
||||
def test_correctness_preserved(tmp_path):
|
||||
"""Smoke check: at small n_elem the new algorithm must still produce
|
||||
the correct sum across all 96 cubes. ``run_allreduce`` validates
|
||||
every cube against the expected reduce result (``ok_cubes`` must be
|
||||
96 = 6 SIPs × 16 cubes).
|
||||
|
||||
This guards against the obvious Phase 2 risk: bidirectional reduce
|
||||
sums each contribution exactly once. If implemented wrong (double-
|
||||
counting or skipping the right edge column / bottom row), the
|
||||
asserts inside run_allreduce fail.
|
||||
"""
|
||||
sub = tmp_path / "correctness"
|
||||
sub.mkdir()
|
||||
topo_path, ccl_path = _write_temp_configs(
|
||||
sub,
|
||||
sip_topology="torus_2d",
|
||||
n_sips=6,
|
||||
algorithm="intercube_allreduce",
|
||||
sip_w=3, sip_h=2,
|
||||
n_elem_override=128, # tiny payload to keep this fast
|
||||
)
|
||||
topo = resolve_topology(topo_path)
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id="root_center_correctness",
|
||||
spec=spec,
|
||||
) as ctx:
|
||||
result = run_allreduce(
|
||||
ctx, engine, spec,
|
||||
algorithm="intercube_allreduce", ccl_yaml=ccl_path,
|
||||
)
|
||||
n_cubes = 6 * 16 # 6 SIPs × 16 cubes/SIP
|
||||
assert result["ok_cubes"] == n_cubes, (
|
||||
f"all 96 cubes must validate; got {result['ok_cubes']} OK"
|
||||
)
|
||||
Reference in New Issue
Block a user