diff --git a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2DMesh_6SiP_2x3.png b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2DMesh_6SiP_2x3.png index d1aca35..45d0b2c 100644 Binary files a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2DMesh_6SiP_2x3.png and b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2DMesh_6SiP_2x3.png differ diff --git a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3.png b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3.png index 627c8af..eac4f1f 100644 Binary files a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3.png and b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3.png differ diff --git a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.csv b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.csv index adc0350..5117b1a 100644 --- a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.csv +++ b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.csv @@ -1,13 +1,13 @@ buffer_kind,sip_topology,n_sips,n_elem,bytes_per_pe,latency_ns -hbm,torus_2d,6,128,256,2120.0399999999754 -hbm,torus_2d,6,1024,2048,2716.74499999995 -hbm,torus_2d,6,8192,16384,7315.185000000081 -hbm,torus_2d,6,32768,65536,23081.265000008738 -sram,torus_2d,6,128,256,2060.0399999999754 -sram,torus_2d,6,1024,2048,2908.74499999995 -sram,torus_2d,6,8192,16384,9523.185000000081 -sram,torus_2d,6,32768,65536,32201.265000008752 -tcm,torus_2d,6,128,256,1964.0399999999754 -tcm,torus_2d,6,1024,2048,2476.74499999995 -tcm,torus_2d,6,8192,16384,6403.185000000081 -tcm,torus_2d,6,32768,65536,19865.265000008738 +hbm,torus_2d,6,128,256,2120.040000000012 +hbm,torus_2d,6,1024,2048,2717.2783333333473 +hbm,torus_2d,6,8192,16384,7315.184999999989 +hbm,torus_2d,6,32768,65536,23081.26500000037 +sram,torus_2d,6,128,256,2060.040000000012 +sram,torus_2d,6,1024,2048,2909.2783333333473 +sram,torus_2d,6,8192,16384,9523.184999999869 +sram,torus_2d,6,32768,65536,32201.265000000385 +tcm,torus_2d,6,128,256,1964.040000000012 +tcm,torus_2d,6,1024,2048,2477.2783333333473 +tcm,torus_2d,6,8192,16384,6403.185000000109 +tcm,torus_2d,6,32768,65536,19865.265000000378 diff --git a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png index 4b3f932..19d4cc2 100644 Binary files a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png and b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png differ diff --git a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_Ring1D_6SiP_1x6.png b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_Ring1D_6SiP_1x6.png index 11f07df..1bb1acc 100644 Binary files a/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_Ring1D_6SiP_1x6.png and b/docs/diagrams/allreduce_latency_plots/AllReduce_LRAB_Ring1D_6SiP_1x6.png differ diff --git a/docs/diagrams/allreduce_latency_plots/comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png b/docs/diagrams/allreduce_latency_plots/comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png index 849d97c..229dbd3 100644 Binary files a/docs/diagrams/allreduce_latency_plots/comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png and b/docs/diagrams/allreduce_latency_plots/comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png differ diff --git a/docs/diagrams/allreduce_latency_plots/summary.csv b/docs/diagrams/allreduce_latency_plots/summary.csv index 7ea2508..983d3be 100644 --- a/docs/diagrams/allreduce_latency_plots/summary.csv +++ b/docs/diagrams/allreduce_latency_plots/summary.csv @@ -1,37 +1,37 @@ algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns -intercube_allreduce,mesh_2d_no_wrap,6,8,16,256,2666.5524999999725 -intercube_allreduce,mesh_2d_no_wrap,6,32,64,1024,2747.7399999999725 -intercube_allreduce,mesh_2d_no_wrap,6,64,128,2048,2855.98999999998 -intercube_allreduce,mesh_2d_no_wrap,6,128,256,4096,3072.4899999999725 -intercube_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3336.579999999951 -intercube_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3707.49999999992 -intercube_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4449.339999999875 -intercube_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,5933.020000000055 -intercube_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,8900.380000000157 -intercube_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,14835.099999997583 -intercube_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,26704.540000017492 -intercube_allreduce,mesh_2d_no_wrap,6,49152,98304,1572864,38573.980000026335 -intercube_allreduce,ring_1d,6,8,16,256,2365.2558333333036 -intercube_allreduce,ring_1d,6,32,64,1024,2436.9433333333036 -intercube_allreduce,ring_1d,6,64,128,2048,2532.526666666643 -intercube_allreduce,ring_1d,6,128,256,4096,2723.6933333333036 -intercube_allreduce,ring_1d,6,512,1024,16384,3042.0349999999544 -intercube_allreduce,ring_1d,6,1024,2048,32768,3390.201666666597 -intercube_allreduce,ring_1d,6,2048,4096,65536,4079.7349999998714 -intercube_allreduce,ring_1d,6,4096,8192,131072,5458.801666666721 -intercube_allreduce,ring_1d,6,8192,16384,262144,8216.93500000014 -intercube_allreduce,ring_1d,6,16384,32768,524288,13733.201666664638 -intercube_allreduce,ring_1d,6,32768,65536,1048576,24765.735000014545 -intercube_allreduce,ring_1d,6,49152,98304,1572864,35798.268333355256 -intercube_allreduce,torus_2d,6,8,16,256,1700.6024999999754 -intercube_allreduce,torus_2d,6,32,64,1024,1753.2899999999754 -intercube_allreduce,torus_2d,6,64,128,2048,1823.539999999979 -intercube_allreduce,torus_2d,6,128,256,4096,1964.0399999999754 -intercube_allreduce,torus_2d,6,512,1024,16384,2196.2849999999653 -intercube_allreduce,torus_2d,6,1024,2048,32768,2476.74499999995 -intercube_allreduce,torus_2d,6,2048,4096,65536,3037.664999999919 -intercube_allreduce,torus_2d,6,4096,8192,131072,4159.50500000003 -intercube_allreduce,torus_2d,6,8192,16384,262144,6403.185000000081 -intercube_allreduce,torus_2d,6,16384,32768,524288,10890.544999998769 -intercube_allreduce,torus_2d,6,32768,65536,1048576,19865.265000008738 -intercube_allreduce,torus_2d,6,49152,98304,1572864,28839.985000013185 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,8,16,256,2666.552500000015 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,32,64,1024,2747.7400000000152 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,64,128,2048,2855.990000000018 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,128,256,4096,3072.490000000019 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3337.1133333333582 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3708.0333333333692 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4449.873333333393 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,5933.020000000124 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,8900.379999999863 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,14835.099999999224 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,26704.540000000765 +lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,49152,98304,1572864,38573.97999999701 +lrab_hierarchical_allreduce,ring_1d,6,8,16,256,2365.255833333347 +lrab_hierarchical_allreduce,ring_1d,6,32,64,1024,2436.9433333333473 +lrab_hierarchical_allreduce,ring_1d,6,64,128,2048,2532.526666666683 +lrab_hierarchical_allreduce,ring_1d,6,128,256,4096,2723.693333333349 +lrab_hierarchical_allreduce,ring_1d,6,512,1024,16384,3048.635000000021 +lrab_hierarchical_allreduce,ring_1d,6,1024,2048,32768,3393.4016666666957 +lrab_hierarchical_allreduce,ring_1d,6,2048,4096,65536,4082.401666666714 +lrab_hierarchical_allreduce,ring_1d,6,4096,8192,131072,5458.80166666677 +lrab_hierarchical_allreduce,ring_1d,6,8192,16384,262144,8216.934999999943 +lrab_hierarchical_allreduce,ring_1d,6,16384,32768,524288,13733.201666665835 +lrab_hierarchical_allreduce,ring_1d,6,32768,65536,1048576,24765.73500000064 +lrab_hierarchical_allreduce,ring_1d,6,49152,98304,1572864,35798.268333331536 +lrab_hierarchical_allreduce,torus_2d,6,8,16,256,1700.6025000000095 +lrab_hierarchical_allreduce,torus_2d,6,32,64,1024,1753.2900000000102 +lrab_hierarchical_allreduce,torus_2d,6,64,128,2048,1823.540000000012 +lrab_hierarchical_allreduce,torus_2d,6,128,256,4096,1964.040000000012 +lrab_hierarchical_allreduce,torus_2d,6,512,1024,16384,2196.8183333333463 +lrab_hierarchical_allreduce,torus_2d,6,1024,2048,32768,2477.2783333333473 +lrab_hierarchical_allreduce,torus_2d,6,2048,4096,65536,3038.1983333333583 +lrab_hierarchical_allreduce,torus_2d,6,4096,8192,131072,4159.5050000000665 +lrab_hierarchical_allreduce,torus_2d,6,8192,16384,262144,6403.185000000109 +lrab_hierarchical_allreduce,torus_2d,6,16384,32768,524288,10890.5449999995 +lrab_hierarchical_allreduce,torus_2d,6,32768,65536,1048576,19865.265000000378 +lrab_hierarchical_allreduce,torus_2d,6,49152,98304,1572864,28839.98500000059 diff --git a/scripts/emit_overview_with_external_ref.py b/scripts/emit_overview_with_external_ref.py deleted file mode 100644 index 8877bbc..0000000 --- a/scripts/emit_overview_with_external_ref.py +++ /dev/null @@ -1,176 +0,0 @@ -"""One-shot: render the broken-y-axis allreduce comparison with the FSIM -single-device reference. Reads docs/diagrams/allreduce_latency_plots/summary.csv -and writes comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png -alongside it. - -This is a derived-artifact generator (per CLAUDE.md): plotting only, no production -or test logic touched. -""" -from __future__ import annotations - -import csv -from pathlib import Path - -import matplotlib.pyplot as plt -import matplotlib.ticker as mticker - -ROOT = Path(__file__).resolve().parent.parent -PLOT_DIR = ROOT / "docs" / "diagrams" / "allreduce_latency_plots" -CSV_PATH = PLOT_DIR / "summary.csv" - -EXT_LABEL = "FSIM (single device): 366 µs" -EXT_LATENCY_NS = 366_000.0 - -COLORS = { - "ring_1d": "tab:blue", - "torus_2d": "tab:orange", - "mesh_2d_no_wrap": "tab:green", -} - -# Display labels (data keys above stay as the summary.csv sip_topology -# values; these are only the human-readable legend strings). All non-FSIM -# runs use 6 devices; the grid differs per topology. -DISPLAY = { - "ring_1d": "Ring 1x6 (6 devices)", - "torus_2d": "2D Torus 2x3 (6 devices)", - "mesh_2d_no_wrap": "2D Mesh 2x3 (6 devices)", -} - -# Hand-derived theoretical model for torus_2d (6 SIPs). Mirrors -# _aggregate_sweep_plots in tests/test_allreduce_multidevice.py. -NOC_PACKET_BYTES = 128 -PES_PER_CUBE = 8 -T_STARTUP_NS = 1346.0 -TAU_NS = (8741.0 - 1346.0) / (6144 - 1) - - -def _theoretical_torus_2d_ns(bytes_per_pe: int) -> float: - bytes_per_cube = int(bytes_per_pe) * PES_PER_CUBE - n_packets = max(1, -(-bytes_per_cube // NOC_PACKET_BYTES)) - return T_STARTUP_NS + (n_packets - 1) * TAU_NS - - -def _plot_theoretical(ax, records): - torus_rs = sorted( - [r for r in records if r["sip_topology"] == "torus_2d"], - key=lambda r: r["bytes_per_pe"], - ) - if not torus_rs: - return - ax.plot( - [r["bytes_per_pe"] for r in torus_rs], - [_theoretical_torus_2d_ns(r["bytes_per_pe"]) for r in torus_rs], - color="tab:red", linestyle="--", linewidth=1.6, marker="x", - label="Theoretical 2D Torus 2x3", - ) - - -def _bytes_fmt(x, _pos): - if x >= 1024 * 1024: - return f"{x / (1024 * 1024):.0f}M" - if x >= 1024: - return f"{x / 1024:.0f}K" - return f"{int(x)}" - - -def _load_records(): - rows = [] - with open(CSV_PATH, newline="") as f: - r = csv.DictReader(f) - for row in r: - rows.append({ - "sip_topology": row["sip_topology"], - "bytes_per_pe": int(row["bytes_per_pe"]), - "latency_ns": float(row["latency_ns"]), - }) - return rows - - -def _ext_x(records): - """Anchor the external reference at the largest payload (96 KB / PE).""" - return max(r["bytes_per_pe"] for r in records) - - -def _plot_curves(ax, records, topologies): - for topo in topologies: - rs = sorted([r for r in records if r["sip_topology"] == topo], - key=lambda r: r["bytes_per_pe"]) - if not rs: - continue - ax.plot( - [r["bytes_per_pe"] for r in rs], - [r["latency_ns"] for r in rs], - marker="o", - label=DISPLAY.get(topo, topo), - color=COLORS.get(topo), - ) - - -def emit_broken(records): - topologies = sorted({r["sip_topology"] for r in records}) - max_local = max(r["latency_ns"] for r in records) - - fig, (ax_top, ax_bot) = plt.subplots( - 2, 1, sharex=True, - gridspec_kw={"height_ratios": [1, 4], "hspace": 0.05}, - figsize=(9, 6.5), - ) - - # Bottom panel: today's three curves + theoretical, linear y. - _plot_curves(ax_bot, records, topologies) - _plot_theoretical(ax_bot, records) - ax_bot.set_ylim(0, max_local * 1.10) - - # Top panel: only the external reference marker, linear y around 366 µs. - ax_top.scatter( - [_ext_x(records)], [EXT_LATENCY_NS], - marker="*", s=240, color="tab:red", zorder=5, - label=EXT_LABEL, - ) - ax_top.set_ylim(EXT_LATENCY_NS * 0.93, EXT_LATENCY_NS * 1.05) - - # Hide the spine between the two panels and draw diagonal "break" ticks. - ax_top.spines["bottom"].set_visible(False) - ax_bot.spines["top"].set_visible(False) - ax_top.tick_params(labeltop=False, bottom=False) - ax_bot.xaxis.tick_bottom() - - d = 0.012 # diagonal-tick size, in axis-fraction - kw = dict(transform=ax_top.transAxes, color="k", clip_on=False, lw=1) - ax_top.plot((-d, +d), (-d, +d), **kw) - ax_top.plot((1 - d, 1 + d), (-d, +d), **kw) - kw.update(transform=ax_bot.transAxes) - ax_bot.plot((-d, +d), (1 - d * 4, 1 + d * 4), **kw) - ax_bot.plot((1 - d, 1 + d), (1 - d * 4, 1 + d * 4), **kw) - - ax_bot.set_xscale("log", base=2) - ax_bot.set_xlabel("Bytes per PE (log scale)") - ax_bot.set_ylabel("Time (ns)") - ax_top.set_ylabel("Time (ns)") - ax_bot.grid(True, alpha=0.3) - ax_top.grid(True, alpha=0.3) - ax_bot.xaxis.set_major_formatter(mticker.FuncFormatter(_bytes_fmt)) - - # One legend covering both axes. - handles_bot, labels_bot = ax_bot.get_legend_handles_labels() - handles_top, labels_top = ax_top.get_legend_handles_labels() - ax_bot.legend(handles_bot + handles_top, labels_bot + labels_top, - loc="upper left") - - fig.suptitle("Multidevice allreduce (ring, Mesh, 2DTorus) vs FSIM latency") - fig.tight_layout() - out = PLOT_DIR / "comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png" - fig.savefig(out, dpi=120) - plt.close(fig) - print(f"wrote {out}") - - -def main(): - records = _load_records() - if not records: - raise SystemExit(f"no rows in {CSV_PATH}") - emit_broken(records) - - -if __name__ == "__main__": - main() diff --git a/src/kernbench/runtime_api/distributed.py b/src/kernbench/runtime_api/distributed.py index 3d5f4b6..150efd2 100644 --- a/src/kernbench/runtime_api/distributed.py +++ b/src/kernbench/runtime_api/distributed.py @@ -59,10 +59,23 @@ class AhbmCCLBackend: self._sip_topo_kind = topo_map.get(self._sip_topo, 0) else: self._sip_topo_kind = 0 + sips = spec.get("system", {}).get("sips", {}) if self._sip_topo == "ring_1d": self._sip_topo_w, self._sip_topo_h = 0, 0 + elif sips.get("w") is not None and sips.get("h") is not None: + w, h = int(sips["w"]), int(sips["h"]) + if w * h != self._n_sips: + raise ValueError( + f"sip layout {w}x{h} != sips.count ({self._n_sips})" + ) + self._sip_topo_w, self._sip_topo_h = w, h else: side = int(round(math.sqrt(self._n_sips))) + if side * side != self._n_sips: + raise ValueError( + f"SIP topology '{self._sip_topo}' requires square " + f"sips.count or explicit sips.w/h, got {self._n_sips}" + ) self._sip_topo_w, self._sip_topo_h = side, side # IPCQ install: wire all pe0s across all cubes and SIPs diff --git a/tests/conftest.py b/tests/conftest.py index 3d9725b..f608d55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,8 +46,8 @@ def pytest_sessionfinish(session, exitstatus): except Exception as e: print(f"[conftest] aggregator {attr}() in {name} failed: {e}") - _exec("test_allreduce_multidevice.py", "_aggregate_sweep_plots") - _exec("test_allreduce_buffer_kind_sweep.py", "aggregate_buffer_kind_plot") + _exec("sccl/_allreduce_helpers.py", "_aggregate_sweep_plots") + _exec("sccl/_allreduce_helpers.py", "aggregate_buffer_kind_plot") @pytest.fixture(scope="session") diff --git a/tests/test_allreduce_multidevice.py b/tests/sccl/_allreduce_helpers.py similarity index 64% rename from tests/test_allreduce_multidevice.py rename to tests/sccl/_allreduce_helpers.py index a270b3f..ea336eb 100644 --- a/tests/test_allreduce_multidevice.py +++ b/tests/sccl/_allreduce_helpers.py @@ -1,25 +1,193 @@ -"""Config-driven multi-device allreduce test application. +"""Shared plumbing for the sccl allreduce tests. -Reads ``ccl.yaml`` + ``topology.yaml``, dynamically loads the kernel -module from ``ccl.yaml → module``, and picks the inter-SIP exchange -pattern from ``topology.yaml → system.sips.topology``. - -Run directly:: - - python -m pytest tests/allreduce_app.py -v -s +Not a test module (no ``test_`` prefix → pytest does not collect it). +Holds the distributed driver, the direct-launch parity reference, the +config writers, the sweep/buffer-kind constants, the plot aggregators +(called from ``conftest.pytest_sessionfinish``), and the topology-diagram +emitter. The per-test files under ``tests/sccl/`` import from here, as do +the external buffer-kind / root-center tests under ``tests/``. """ from __future__ import annotations import importlib import math +import textwrap from pathlib import Path from typing import Any import numpy as np +import pytest +import yaml from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip from kernbench.policy.placement.dp import DPPolicy +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent.parent / "topology.yaml" + +DEFAULT_N_ELEM = 8 + + +# ── config writers ──────────────────────────────────────────────────── + + +def _write_ccl_yaml(tmp_path) -> str: + body = textwrap.dedent("""\ + defaults: + algorithm: lrab_hierarchical_allreduce + buffer_kind: tcm + backpressure: sleep + n_slots: 4 + slot_size: 4096 + vc_chunk_size: 256 + ipcq_credit_size_bytes: 16 + + algorithms: + lrab_hierarchical_allreduce: + module: kernbench.ccl.algorithms.lrab_hierarchical_allreduce + topology: none + buffer_kind: tcm + n_elem: 8 + root_cube: 15 + """) + (tmp_path / "ccl.yaml").write_text(body) + return str(tmp_path) + + +def _write_temp_configs( + tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None, + sip_w=None, sip_h=None, +): + """Write temp topology.yaml and ccl.yaml with the given overrides.""" + with open(TOPOLOGY_PATH) as f: + topo_cfg = yaml.safe_load(f) + topo_cfg["system"]["sips"]["count"] = n_sips + topo_cfg["system"]["sips"]["topology"] = sip_topology + if sip_w is not None and sip_h is not None: + topo_cfg["system"]["sips"]["w"] = int(sip_w) + topo_cfg["system"]["sips"]["h"] = int(sip_h) + else: + topo_cfg["system"]["sips"].pop("w", None) + topo_cfg["system"]["sips"].pop("h", None) + topo_path = tmp_path / "topology.yaml" + with open(topo_path, "w") as f: + yaml.dump(topo_cfg, f, default_flow_style=False) + + ccl_path = Path(__file__).parent.parent.parent / "ccl.yaml" + with open(ccl_path) as f: + ccl_cfg = yaml.safe_load(f) + ccl_cfg["defaults"]["algorithm"] = algorithm + if n_elem_override is not None: + ccl_cfg.setdefault("algorithms", {}).setdefault( + algorithm, {}, + )["n_elem"] = int(n_elem_override) + # Ensure IPCQ slot is big enough for the per-message payload. + per_msg_bytes = int(n_elem_override) * 2 # f16 + default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096)) + if per_msg_bytes > default_slot: + ccl_cfg["defaults"]["slot_size"] = per_msg_bytes + tmp_ccl = tmp_path / "ccl.yaml" + with open(tmp_ccl, "w") as f: + yaml.dump(ccl_cfg, f, default_flow_style=False) + + return str(topo_path), str(tmp_ccl) + + +# ── distributed driver (init_process_group → mp.spawn → all_reduce) ──── + + +def _worker(rank: int, n_cubes: int, n_elem: int, n_sips: int, torch) -> None: + """Per-SIP worker: allocate, fill, all_reduce, verify.""" + torch.ahbm.set_device(rank) + + dp = DPPolicy( + cube="row_wise", pe="replicate", + num_pes=1, num_cubes=n_cubes, + ) + tensor = torch.zeros( + (n_cubes, n_elem), dtype="f16", dp=dp, + name=f"sip{rank}", + ) + tensor.copy_(torch.from_numpy( + np.full((n_cubes, n_elem), float(rank + 1), dtype=np.float16) + )) + + torch.distributed.all_reduce(tensor, op="sum") + + arr = tensor.numpy() + expected = float(n_cubes * sum(range(1, n_sips + 1))) + for cube_id in range(n_cubes): + assert np.allclose(arr[cube_id], expected, rtol=1e-1, atol=1e-1), ( + f"SIP{rank} cube {cube_id}: " + f"got {arr[cube_id][:4]}, expected {expected}" + ) + + if rank == 0: + print(f"\n lrab_hierarchical_allreduce (ws={n_sips}): " + f"{n_sips * n_cubes} OK") + + +def _crit_ns(engine) -> float: + """Critical-path latency = max per-result pe_exec_ns over engine results.""" + vals = [ + float(tr.get("pe_exec_ns", 0.0) or 0.0) + for _, (_, tr) in engine._results.items() + if isinstance(tr, dict) + ] + return max(vals) if vals else 0.0 + + +def _run_distributed(tmp_path, monkeypatch, topo_path, correlation_id, n_elem): + """Build engine + run the collective via the full distributed path. + + Returns ``(engine, n_cubes)``. ``monkeypatch.chdir`` points the backend's + ``load_ccl_config()`` (cwd lookup) at the temp ``ccl.yaml``. + """ + monkeypatch.chdir(tmp_path) + topo = resolve_topology(topo_path) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + n_sips = int(spec["system"]["sips"]["count"]) + cm = spec["sip"]["cube_mesh"] + n_cubes = int(cm["w"]) * int(cm["h"]) + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id=correlation_id, + spec=spec, + ) as ctx: + ctx.distributed.init_process_group(backend="ahbm") + assert ctx.distributed.get_world_size() == n_sips + ctx.multiprocessing.spawn( + _worker, args=(n_cubes, n_elem, n_sips, ctx), nprocs=n_sips, + ) + return engine, n_cubes + + +# ── correctness config matrix (used by test_allreduce) ───────────────── + +CONFIGS = [ + pytest.param( + "lrab_hierarchical_allreduce", "ring_1d", 6, None, None, + id="ring_6sip", + ), + pytest.param( + "lrab_hierarchical_allreduce", "torus_2d", 6, 2, 3, + id="torus_6sip_2x3", + ), + pytest.param( + "lrab_hierarchical_allreduce", "mesh_2d_no_wrap", 6, 2, 3, + id="mesh_6sip_2x3", + ), +] + + +# ── direct-launch helper (parity reference only) ─────────────────────── def _sip_topo_dims( @@ -51,14 +219,14 @@ def run_allreduce( algorithm: str | None = None, ccl_yaml: str | None = None, ) -> dict: - """Config-driven allreduce: read yaml, load kernel, run. + """Config-driven allreduce via direct ctx.launch (no distributed wrapper). - Everything is resolved from config — no hardcoded kernel imports. + Retained as the parity reference for the distributed path and reused by + the external buffer-kind / root-center micro-tests. """ cfg_all = load_ccl_config(ccl_yaml) cfg = resolve_algorithm_config(cfg_all, algorithm) - # Dynamic import from ccl.yaml → module algo_module = importlib.import_module(cfg["module"]) kernel_fn = algo_module.kernel topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND @@ -83,15 +251,6 @@ def run_allreduce( ) algo_name = cfg.get("algorithm", "allreduce") - print(f"\n{'=' * 60}") - print(f"algorithm: {algo_name}") - print(f"module: {cfg['module']}") - print(f"sip_topology: {sip_topo}") - print(f"kernel: {kernel_fn.__name__}") - print(f"n_sips: {n_sips}") - print(f"n_cubes: {n_cubes}") - print(f"n_elem: {n_elem}") - print(f"{'=' * 60}") configure_sfr_intercube_multisip(engine, spec, cfg) @@ -112,11 +271,6 @@ def run_allreduce( )) tensors.append(t) - for sip in range(n_sips): - arr = tensors[sip].numpy() - print(f"[SIP {sip}] input cube0[:4] = {arr[0][:4].tolist()} " - f"cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}") - t_start = engine._env.now all_pending = [] @@ -129,31 +283,14 @@ def run_allreduce( ) all_pending.extend(pending) - for h, sip_id, meta in all_pending: + for h, _sip_id, meta in all_pending: ctx.wait(h, _meta=meta) t_end = engine._env.now latency_ns = t_end - t_start - print(f"\n[{algo_name} ws={n_sips}] sim latency = " - f"{latency_ns:.1f} ns ({latency_ns / 1000:.3f} us)") - - for key, (_, trace) in engine._results.items(): - if not isinstance(trace, dict): - continue - total = trace.get("total_ns", 0.0) - pe_exec = trace.get("pe_exec_ns", 0.0) or 0.0 - network = total - pe_exec - print(f" [{key}] total={total:.1f} ns " - f"pe_exec={pe_exec:.1f} ns network={network:.1f} ns") expected = float(n_cubes * sum(range(1, n_sips + 1))) - print() - for sip in range(n_sips): - arr = tensors[sip].numpy() - print(f"[SIP {sip}] output cube0[:4] = {arr[0][:4].tolist()}") - print(f"[SIP {sip}] output cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}") - ok_cubes = 0 for sip in range(n_sips): arr = tensors[sip].numpy() @@ -166,8 +303,6 @@ def run_allreduce( ) ok_cubes += 1 - print(f"\n {algo_name} (ws={n_sips}): {ok_cubes} OK") - return { "expected": expected, "latency_ns": latency_ns, @@ -175,101 +310,7 @@ def run_allreduce( } -# ── pytest entry point ─────────────────────────────────────────────── - -import pytest -import yaml - -from kernbench.runtime_api.context import RuntimeContext -from kernbench.runtime_api.types import DeviceSelector -from kernbench.sim_engine.engine import GraphEngine -from kernbench.topology.builder import resolve_topology - -TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" - -CONFIGS = [ - pytest.param( - "lrab_hierarchical_allreduce", "ring_1d", 6, None, None, - id="ring_6sip", - ), - pytest.param( - "lrab_hierarchical_allreduce", "torus_2d", 6, 2, 3, - id="torus_6sip_2x3", - ), - pytest.param( - "lrab_hierarchical_allreduce", "mesh_2d_no_wrap", 6, 2, 3, - id="mesh_6sip_2x3", - ), -] - - -def _write_temp_configs( - tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None, - sip_w=None, sip_h=None, -): - """Write temp topology.yaml and ccl.yaml with the given overrides.""" - with open(TOPOLOGY_PATH) as f: - topo_cfg = yaml.safe_load(f) - topo_cfg["system"]["sips"]["count"] = n_sips - topo_cfg["system"]["sips"]["topology"] = sip_topology - if sip_w is not None and sip_h is not None: - topo_cfg["system"]["sips"]["w"] = int(sip_w) - topo_cfg["system"]["sips"]["h"] = int(sip_h) - else: - topo_cfg["system"]["sips"].pop("w", None) - topo_cfg["system"]["sips"].pop("h", None) - topo_path = tmp_path / "topology.yaml" - with open(topo_path, "w") as f: - yaml.dump(topo_cfg, f, default_flow_style=False) - - ccl_path = Path(__file__).parent.parent / "ccl.yaml" - with open(ccl_path) as f: - ccl_cfg = yaml.safe_load(f) - ccl_cfg["defaults"]["algorithm"] = algorithm - if n_elem_override is not None: - ccl_cfg.setdefault("algorithms", {}).setdefault( - algorithm, {}, - )["n_elem"] = int(n_elem_override) - # Ensure IPCQ slot is big enough for the per-message payload. - per_msg_bytes = int(n_elem_override) * 2 # f16 - default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096)) - if per_msg_bytes > default_slot: - ccl_cfg["defaults"]["slot_size"] = per_msg_bytes - tmp_ccl = tmp_path / "ccl.yaml" - with open(tmp_ccl, "w") as f: - yaml.dump(ccl_cfg, f, default_flow_style=False) - - return str(topo_path), str(tmp_ccl) - - -@pytest.mark.parametrize( - "algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS, -) -def test_allreduce( - tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h, -): - topo_path, ccl_path = _write_temp_configs( - tmp_path, sip_topology, n_sips, algorithm, - sip_w=sip_w, sip_h=sip_h, - ) - topo = resolve_topology(topo_path) - engine = GraphEngine(topo.topology_obj, enable_data=True) - spec = topo.topology_obj.spec - - with RuntimeContext( - engine=engine, - target_device=DeviceSelector("all"), - correlation_id=f"test_{algorithm}_{sip_topology}", - spec=spec, - ) as ctx: - result = run_allreduce( - ctx, engine, spec, - algorithm=algorithm, ccl_yaml=ccl_path, - ) - assert result["ok_cubes"] > 0 - - -# ── Latency sweep (parametrized + xdist-friendly) ───────────────────── +# ── Latency sweep constants + aggregator ────────────────────────────── # avoid 16 (== n_cubes, dim_map collision). Goes up to 96 KB per PE: # bytes_per_pe = n_elem * 2 (f16). 49152 elem * 2 = 96 KB / PE. @@ -289,7 +330,7 @@ _SWEEP_TOPOLOGIES = [ # parametrized invocation writes one JSON file here; the aggregator # (run from conftest.pytest_sessionfinish) reads them and emits the # combined CSV + PNG plots. -_SWEEP_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams" +_SWEEP_OUT_DIR = (Path(__file__).parent.parent.parent / "docs" / "diagrams" / "allreduce_latency_plots") _SWEEP_ROWS_DIR = _SWEEP_OUT_DIR / "_rows" @@ -305,69 +346,6 @@ def _sweep_params(): return out -@pytest.mark.parametrize( - "algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(), -) -def test_allreduce_latency_one( - tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h, n_elem, -): - """One config of the latency sweep. xdist parallelizes across params. - - Writes a single JSON row to ``_SWEEP_ROWS_DIR``. The conftest - sessionfinish hook aggregates rows into CSV + plots after all - parametrized cases finish. - """ - import json - - topo_path, ccl_path = _write_temp_configs( - tmp_path, sip_topology, n_sips, algorithm, - sip_w=sip_w, sip_h=sip_h, - n_elem_override=n_elem, - ) - topo = resolve_topology(topo_path) - engine = GraphEngine(topo.topology_obj, enable_data=True) - spec = topo.topology_obj.spec - - with RuntimeContext( - engine=engine, - target_device=DeviceSelector("all"), - correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}", - spec=spec, - ) as ctx: - result = run_allreduce( - ctx, engine, spec, - algorithm=algorithm, ccl_yaml=ccl_path, - ) - assert result["ok_cubes"] > 0 - - pe_exec_vals = [ - float(tr.get("pe_exec_ns", 0.0) or 0.0) - for _, (_, tr) in engine._results.items() - if isinstance(tr, dict) - ] - crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0 - - cm = spec["sip"]["cube_mesh"] - n_cubes = int(cm["w"]) * int(cm["h"]) - bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16 - bytes_per_pe = n_elem * _ELEM_BYTES_F16 - - record = { - "algorithm": algorithm, - "sip_topology": sip_topology, - "n_sips": n_sips, - "n_elem": n_elem, - "bytes_per_pe": bytes_per_pe, - "bytes_per_sip": bytes_per_sip, - "latency_ns": crit_ns, - } - - _SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True) - row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json" - with open(row_path, "w", encoding="utf-8") as f: - json.dump(record, f) - - def _aggregate_sweep_plots() -> bool: """Read all per-config rows and emit CSV + PNG plots. @@ -469,7 +447,7 @@ def _aggregate_sweep_plots() -> bool: plt.close(fig) # Combined overview.png is no longer emitted — the broken-y-axis - # comparison (scripts/emit_overview_with_external_ref.py → + # comparison (emit_comparison_fsim_plot() below → # comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png) # supersedes it. Per-topology plots above and summary.csv are still # produced. @@ -491,6 +469,118 @@ def _aggregate_sweep_plots() -> bool: return True +# ── Buffer-kind sweep constants + aggregator ────────────────────────── +# +# Parametrized over (buffer_kind, n_elem) on torus_2d 6 SIPs (3×2). Pre +# slot-latency modeling the three lines overlap exactly (slot access is +# latency-free today); they spread out once tcm/sram/hbm carry distinct +# access costs. + +_BUFFER_KINDS = ["tcm", "sram", "hbm"] +_BK_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot +_BK_ROWS_DIR = _SWEEP_OUT_DIR / "_buffer_kind_rows" +# Descriptive output stem (shared by the .png and .csv). +_BK_OUT_STEM = "AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM" + + +def _bk_params(): + out = [] + for bk in _BUFFER_KINDS: + for n_elem in _BK_N_ELEM_GRID: + out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}")) + return out + + +def aggregate_buffer_kind_plot() -> bool: + """Read per-config rows and emit the descriptive .png + .csv (_BK_OUT_STEM). + + Called from conftest.pytest_sessionfinish (controller-only). + Returns True if rows were aggregated. + """ + import csv + import json + + if not _BK_ROWS_DIR.exists(): + return False + row_files = sorted(_BK_ROWS_DIR.glob("*.json")) + if not row_files: + return False + + records = [] + for p in row_files: + with open(p, encoding="utf-8") as f: + records.append(json.load(f)) + + import matplotlib.pyplot as plt + from matplotlib.ticker import FuncFormatter + + def _fmt_bytes(x, _pos): + if x <= 0: + return "0" + if x >= 1024 * 1024: + return f"{x / (1024 * 1024):.0f} MB" + if x >= 1024: + return f"{x / 1024:.0f} KB" + return f"{x:.0f} B" + + _bytes_fmt = FuncFormatter(_fmt_bytes) + + _SWEEP_OUT_DIR.mkdir(parents=True, exist_ok=True) + with open(_SWEEP_OUT_DIR / f"{_BK_OUT_STEM}.csv", "w", + newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=[ + "buffer_kind", "sip_topology", "n_sips", "n_elem", + "bytes_per_pe", "latency_ns", + ]) + w.writeheader() + for r in sorted(records, key=lambda r: ( + r["buffer_kind"], r["bytes_per_pe"], + )): + w.writerow(r) + + colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"} + fig, ax = plt.subplots(figsize=(10, 6)) + for bk in ["tcm", "sram", "hbm"]: + rs = sorted( + [r for r in records if r["buffer_kind"] == bk], + key=lambda r: r["bytes_per_pe"], + ) + if not rs: + continue + ax.plot( + [r["bytes_per_pe"] for r in rs], + [r["latency_ns"] for r in rs], + marker="o", lw=2.0, + color=colors[bk], label=f"buffer_kind = {bk}", + ) + ax.set_xscale("log", base=2) + ax.set_xlabel("Bytes per PE (log scale)") + ax.set_ylabel("Time (ns)") + ax.set_title( + "AllReduce_LRAB_2Dtorus_6SiP(2x3) — IPCQ memory (SRAM, TCM, HBM)" + ) + ax.grid(True, alpha=0.3) + ax.legend() + ax.xaxis.set_major_formatter(_bytes_fmt) + fig.tight_layout() + fig.savefig(_SWEEP_OUT_DIR / f"{_BK_OUT_STEM}.png", dpi=130) + plt.close(fig) + + for p in row_files: + try: + p.unlink() + except OSError: + pass + try: + _BK_ROWS_DIR.rmdir() + except OSError: + pass + + print(f"\nWrote {_SWEEP_OUT_DIR / f'{_BK_OUT_STEM}.png'} " + f"from {len(records)} rows") + return True + + # ── Topology diagram (device-level + cube-level reduction) ──────────── # Convention: "rows × cols" everywhere, row-major rank assignment @@ -781,7 +871,143 @@ def emit_topology_diagram() -> str: return str(out_path) -def test_emit_topology_diagram(): - """Emit topology.png alongside the sweep plots. Pure plotting; no sim.""" - out = emit_topology_diagram() - assert Path(out).exists() +# ── Comparison vs FSIM (broken-y-axis) ──────────────────────────────── +# +# Post-processes summary.csv: today's three model curves + a hand-derived +# theoretical torus_2d line in the bottom panel, and a single external FSIM +# single-device reference marker in the top panel (hardcoded 366 µs; no +# external data file). Reads summary.csv written by _aggregate_sweep_plots. + +_FSIM_EXT_LABEL = "FSIM (single device): 366 µs" +_FSIM_EXT_LATENCY_NS = 366_000.0 +_CMP_COLORS = { + "ring_1d": "tab:blue", + "torus_2d": "tab:orange", + "mesh_2d_no_wrap": "tab:green", +} +_CMP_DISPLAY = { + "ring_1d": "Ring 1x6 (6 devices)", + "torus_2d": "2D Torus 2x3 (6 devices)", + "mesh_2d_no_wrap": "2D Mesh 2x3 (6 devices)", +} +# Hand-derived theoretical model for torus_2d (6 SIPs): per-PE NOC-packet +# count fit to the simulated startup + per-packet tau. +_CMP_NOC_PACKET_BYTES = 128 +_CMP_PES_PER_CUBE = 8 +_CMP_T_STARTUP_NS = 1346.0 +_CMP_TAU_NS = (8741.0 - 1346.0) / (6144 - 1) + + +def emit_comparison_fsim_plot() -> str | None: + """Render comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png. + + Reads ``summary.csv`` (written by ``_aggregate_sweep_plots``). Returns the + output path, or ``None`` if summary.csv is absent / empty. + """ + import csv + + csv_path = _SWEEP_OUT_DIR / "summary.csv" + if not csv_path.exists(): + return None + records = [] + with open(csv_path, newline="", encoding="utf-8") as f: + for row in csv.DictReader(f): + records.append({ + "sip_topology": row["sip_topology"], + "bytes_per_pe": int(row["bytes_per_pe"]), + "latency_ns": float(row["latency_ns"]), + }) + if not records: + return None + + import matplotlib.pyplot as plt + import matplotlib.ticker as mticker + + def _theoretical_torus_2d_ns(bytes_per_pe: int) -> float: + bytes_per_cube = int(bytes_per_pe) * _CMP_PES_PER_CUBE + n_packets = max(1, -(-bytes_per_cube // _CMP_NOC_PACKET_BYTES)) + return _CMP_T_STARTUP_NS + (n_packets - 1) * _CMP_TAU_NS + + def _bytes_fmt(x, _pos): + if x >= 1024 * 1024: + return f"{x / (1024 * 1024):.0f}M" + if x >= 1024: + return f"{x / 1024:.0f}K" + return f"{int(x)}" + + topologies = sorted({r["sip_topology"] for r in records}) + max_local = max(r["latency_ns"] for r in records) + ext_x = max(r["bytes_per_pe"] for r in records) + + fig, (ax_top, ax_bot) = plt.subplots( + 2, 1, sharex=True, + gridspec_kw={"height_ratios": [1, 4], "hspace": 0.05}, + figsize=(9, 6.5), + ) + + # Bottom panel: model curves + theoretical torus, linear y. + for topo in topologies: + rs = sorted([r for r in records if r["sip_topology"] == topo], + key=lambda r: r["bytes_per_pe"]) + if not rs: + continue + ax_bot.plot( + [r["bytes_per_pe"] for r in rs], + [r["latency_ns"] for r in rs], + marker="o", label=_CMP_DISPLAY.get(topo, topo), + color=_CMP_COLORS.get(topo), + ) + torus_rs = sorted( + [r for r in records if r["sip_topology"] == "torus_2d"], + key=lambda r: r["bytes_per_pe"], + ) + if torus_rs: + ax_bot.plot( + [r["bytes_per_pe"] for r in torus_rs], + [_theoretical_torus_2d_ns(r["bytes_per_pe"]) for r in torus_rs], + color="tab:red", linestyle="--", linewidth=1.6, marker="x", + label="Theoretical 2D Torus 2x3", + ) + ax_bot.set_ylim(0, max_local * 1.10) + + # Top panel: external FSIM single-device reference marker. + ax_top.scatter( + [ext_x], [_FSIM_EXT_LATENCY_NS], + marker="*", s=240, color="tab:red", zorder=5, + label=_FSIM_EXT_LABEL, + ) + ax_top.set_ylim(_FSIM_EXT_LATENCY_NS * 0.93, _FSIM_EXT_LATENCY_NS * 1.05) + + # Hide spine between panels; draw diagonal break ticks. + ax_top.spines["bottom"].set_visible(False) + ax_bot.spines["top"].set_visible(False) + ax_top.tick_params(labeltop=False, bottom=False) + ax_bot.xaxis.tick_bottom() + d = 0.012 + kw = dict(transform=ax_top.transAxes, color="k", clip_on=False, lw=1) + ax_top.plot((-d, +d), (-d, +d), **kw) + ax_top.plot((1 - d, 1 + d), (-d, +d), **kw) + kw.update(transform=ax_bot.transAxes) + ax_bot.plot((-d, +d), (1 - d * 4, 1 + d * 4), **kw) + ax_bot.plot((1 - d, 1 + d), (1 - d * 4, 1 + d * 4), **kw) + + ax_bot.set_xscale("log", base=2) + ax_bot.set_xlabel("Bytes per PE (log scale)") + ax_bot.set_ylabel("Time (ns)") + ax_top.set_ylabel("Time (ns)") + ax_bot.grid(True, alpha=0.3) + ax_top.grid(True, alpha=0.3) + ax_bot.xaxis.set_major_formatter(mticker.FuncFormatter(_bytes_fmt)) + + handles_bot, labels_bot = ax_bot.get_legend_handles_labels() + handles_top, labels_top = ax_top.get_legend_handles_labels() + ax_bot.legend(handles_bot + handles_top, labels_bot + labels_top, + loc="upper left") + + fig.suptitle("Multidevice allreduce (ring, Mesh, 2DTorus) vs FSIM latency") + fig.tight_layout() + out = (_SWEEP_OUT_DIR + / "comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png") + fig.savefig(out, dpi=120) + plt.close(fig) + return str(out) diff --git a/tests/sccl/test_allreduce_ring_torus_mesh.py b/tests/sccl/test_allreduce_ring_torus_mesh.py new file mode 100644 index 0000000..61bdf3b --- /dev/null +++ b/tests/sccl/test_allreduce_ring_torus_mesh.py @@ -0,0 +1,35 @@ +"""Correctness of intercube allreduce across SIP topologies (distributed path). + +Routes through init_process_group → mp.spawn → dist.all_reduce for ring_1d, +torus_2d (2×3), and mesh_2d_no_wrap (2×3). Per-rank correctness is asserted +inside the worker; spawn raises on failure. +""" +from __future__ import annotations + +import pytest + +from tests.sccl._allreduce_helpers import ( + CONFIGS, + DEFAULT_N_ELEM, + _crit_ns, + _run_distributed, + _write_temp_configs, +) + + +@pytest.mark.parametrize( + "algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS, +) +def test_allreduce( + tmp_path, monkeypatch, algorithm, sip_topology, n_sips, sip_w, sip_h, +): + topo_path, _ = _write_temp_configs( + tmp_path, sip_topology, n_sips, algorithm, + sip_w=sip_w, sip_h=sip_h, + ) + engine, _n_cubes = _run_distributed( + tmp_path, monkeypatch, topo_path, + f"test_{algorithm}_{sip_topology}", DEFAULT_N_ELEM, + ) + # A positive critical path confirms the kernel actually ran. + assert _crit_ns(engine) > 0.0 diff --git a/tests/sccl/test_distributed_default_topology.py b/tests/sccl/test_distributed_default_topology.py new file mode 100644 index 0000000..cd18255 --- /dev/null +++ b/tests/sccl/test_distributed_default_topology.py @@ -0,0 +1,47 @@ +"""Full distributed path against topology.yaml as-is (no overrides). + +The same flow a real DDP training script would use: +init_process_group(backend="ahbm") → mp.spawn → dist.all_reduce. +""" +from __future__ import annotations + +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +from tests.sccl._allreduce_helpers import ( + DEFAULT_N_ELEM, + TOPOLOGY_PATH, + _worker, + _write_ccl_yaml, +) + + +def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch): + monkeypatch.chdir(_write_ccl_yaml(tmp_path)) + + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + n_sips = int(spec["system"]["sips"]["count"]) + cm = spec["sip"]["cube_mesh"] + n_cubes = int(cm["w"]) * int(cm["h"]) + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id="dist_intercube_ar", + spec=spec, + ) as ctx: + ctx.distributed.init_process_group(backend="ahbm") + assert ctx.distributed.get_world_size() == n_sips + + t_start = engine._env.now + ctx.multiprocessing.spawn( + _worker, args=(n_cubes, DEFAULT_N_ELEM, n_sips, ctx), + nprocs=n_sips, + ) + t_end = engine._env.now + print(f"\n[distributed] sim latency = " + f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)") diff --git a/tests/test_intercube_root_center.py b/tests/sccl/test_intercube_root_center.py similarity index 99% rename from tests/test_intercube_root_center.py rename to tests/sccl/test_intercube_root_center.py index 7267b37..18d9172 100644 --- a/tests/test_intercube_root_center.py +++ b/tests/sccl/test_intercube_root_center.py @@ -40,7 +40,7 @@ from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology -from tests.test_allreduce_multidevice import ( +from tests.sccl._allreduce_helpers import ( _write_temp_configs, run_allreduce, ) diff --git a/tests/sccl/test_plot_buffer_kind_sweep.py b/tests/sccl/test_plot_buffer_kind_sweep.py new file mode 100644 index 0000000..e4f3ae3 --- /dev/null +++ b/tests/sccl/test_plot_buffer_kind_sweep.py @@ -0,0 +1,66 @@ +"""Buffer-kind sweep (TCM / SRAM / HBM) on torus_2d 6 SIPs (3×2), distributed. + +Each parametrized case writes one JSON row; the conftest sessionfinish hook +calls ``aggregate_buffer_kind_plot`` to emit the comparison PNG + csv. Pre +slot-latency modeling the three lines overlap exactly (slot access is +latency-free today). +""" +from __future__ import annotations + +import json + +import pytest +import yaml + +from tests.sccl._allreduce_helpers import ( + _BK_ROWS_DIR, + _ELEM_BYTES_F16, + _bk_params, + _crit_ns, + _run_distributed, + _write_temp_configs, +) + + +@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params()) +def test_buffer_kind_allreduce_one(tmp_path, monkeypatch, buffer_kind, n_elem): + sub = tmp_path / f"{buffer_kind}_{n_elem}" + sub.mkdir() + topo_path, ccl_path = _write_temp_configs( + sub, + sip_topology="torus_2d", + n_sips=6, + algorithm="lrab_hierarchical_allreduce", + sip_w=3, sip_h=2, + n_elem_override=n_elem, + ) + # Override buffer_kind in the temp ccl.yaml (read by the ahbm backend + # at init_process_group time via load_ccl_config()). + with open(ccl_path) as f: + ccl_cfg = yaml.safe_load(f) + ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind + ccl_cfg.setdefault("algorithms", {}).setdefault( + "lrab_hierarchical_allreduce", {}, + )["buffer_kind"] = buffer_kind + with open(ccl_path, "w") as f: + yaml.dump(ccl_cfg, f, default_flow_style=False) + + engine, _ = _run_distributed( + sub, monkeypatch, topo_path, + f"bk_sweep_{buffer_kind}_{n_elem}", n_elem, + ) + crit_ns = _crit_ns(engine) + + bytes_per_pe = n_elem * _ELEM_BYTES_F16 + record = { + "buffer_kind": buffer_kind, + "sip_topology": "torus_2d", + "n_sips": 6, + "n_elem": n_elem, + "bytes_per_pe": bytes_per_pe, + "latency_ns": crit_ns, + } + _BK_ROWS_DIR.mkdir(parents=True, exist_ok=True) + row_path = _BK_ROWS_DIR / f"{buffer_kind}_{n_elem}.json" + with open(row_path, "w", encoding="utf-8") as f: + json.dump(record, f) diff --git a/tests/sccl/test_plot_comparison_fsim.py b/tests/sccl/test_plot_comparison_fsim.py new file mode 100644 index 0000000..4c21f66 --- /dev/null +++ b/tests/sccl/test_plot_comparison_fsim.py @@ -0,0 +1,23 @@ +"""Emit the broken-y-axis allreduce-vs-FSIM comparison plot. + +Post-processes summary.csv (written by the latency sweep) into +comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png. Pure +plotting; reads the on-disk summary.csv (skips if the sweep has never run). +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.sccl._allreduce_helpers import ( + _SWEEP_OUT_DIR, + emit_comparison_fsim_plot, +) + + +def test_emit_comparison_fsim_plot(): + if not (_SWEEP_OUT_DIR / "summary.csv").exists(): + pytest.skip("summary.csv absent; run the latency sweep first") + out = emit_comparison_fsim_plot() + assert out is not None and Path(out).exists() diff --git a/tests/sccl/test_plot_latency_sweep.py b/tests/sccl/test_plot_latency_sweep.py new file mode 100644 index 0000000..77207fb --- /dev/null +++ b/tests/sccl/test_plot_latency_sweep.py @@ -0,0 +1,58 @@ +"""Allreduce latency sweep (distributed path), xdist-friendly. + +Each parametrized case writes one JSON row to the shared staging dir; the +conftest sessionfinish hook calls ``_aggregate_sweep_plots`` to emit the +per-topology PNGs + summary.csv after all cases finish. +""" +from __future__ import annotations + +import json + +import pytest + +from tests.sccl._allreduce_helpers import ( + _ELEM_BYTES_F16, + _SWEEP_ROWS_DIR, + _crit_ns, + _run_distributed, + _sweep_params, + _write_temp_configs, +) + + +@pytest.mark.parametrize( + "algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(), +) +def test_allreduce_latency_one( + tmp_path, monkeypatch, algorithm, sip_topology, n_sips, sip_w, sip_h, + n_elem, +): + topo_path, _ = _write_temp_configs( + tmp_path, sip_topology, n_sips, algorithm, + sip_w=sip_w, sip_h=sip_h, + n_elem_override=n_elem, + ) + engine, n_cubes = _run_distributed( + tmp_path, monkeypatch, topo_path, + f"sweep_{algorithm}_{sip_topology}_{n_elem}", n_elem, + ) + + crit_ns = _crit_ns(engine) + + bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16 + bytes_per_pe = n_elem * _ELEM_BYTES_F16 + + record = { + "algorithm": algorithm, + "sip_topology": sip_topology, + "n_sips": n_sips, + "n_elem": n_elem, + "bytes_per_pe": bytes_per_pe, + "bytes_per_sip": bytes_per_sip, + "latency_ns": crit_ns, + } + + _SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True) + row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json" + with open(row_path, "w", encoding="utf-8") as f: + json.dump(record, f) diff --git a/tests/sccl/test_plot_topology_diagram.py b/tests/sccl/test_plot_topology_diagram.py new file mode 100644 index 0000000..960c6ef --- /dev/null +++ b/tests/sccl/test_plot_topology_diagram.py @@ -0,0 +1,11 @@ +"""Emit topology.png (device-level + cube-level reduction). Pure plotting; no sim.""" +from __future__ import annotations + +from pathlib import Path + +from tests.sccl._allreduce_helpers import emit_topology_diagram + + +def test_emit_topology_diagram(): + out = emit_topology_diagram() + assert Path(out).exists() diff --git a/tests/test_allreduce_buffer_kind_sweep.py b/tests/test_allreduce_buffer_kind_sweep.py deleted file mode 100644 index 48d53aa..0000000 --- a/tests/test_allreduce_buffer_kind_sweep.py +++ /dev/null @@ -1,199 +0,0 @@ -"""Phase 1 buffer-kind allreduce sweep — torus_2d 6 SIPs. - -Parametrized over (buffer_kind, n_elem). Each case runs the standard -config-driven allreduce app and writes a JSON row to a shared staging -dir; the conftest sessionfinish hook (added in Phase 1) aggregates -rows into ``docs/diagrams/allreduce_latency_plots/ -AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png``. - -Pre-Phase-2: the three buffer-kind lines overlap exactly because slot -access is latency-free today. Post-Phase-2 they spread out (tcm -fastest, hbm slowest). -""" -from __future__ import annotations - -import json -from pathlib import Path - -import pytest -import yaml - -from kernbench.runtime_api.context import RuntimeContext -from kernbench.runtime_api.types import DeviceSelector -from kernbench.sim_engine.engine import GraphEngine -from kernbench.topology.builder import resolve_topology - -# Reuse the allreduce app helpers. -from tests.test_allreduce_multidevice import ( - _write_temp_configs, - run_allreduce, -) - - -_BUFFER_KINDS = ["tcm", "sram", "hbm"] -_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot -_ELEM_BYTES_F16 = 2 - -_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams" - / "allreduce_latency_plots") -_ROWS_DIR = _OUT_DIR / "_buffer_kind_rows" -# Descriptive output stem (shared by the .png and .csv). -_OUT_STEM = "AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM" - - -def _bk_params(): - out = [] - for bk in _BUFFER_KINDS: - for n_elem in _N_ELEM_GRID: - out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}")) - return out - - -@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params()) -def test_buffer_kind_allreduce_one(tmp_path, buffer_kind, n_elem): - """One config of the buffer-kind sweep. xdist parallelizes.""" - sub = tmp_path / f"{buffer_kind}_{n_elem}" - sub.mkdir() - topo_path, ccl_path = _write_temp_configs( - sub, - sip_topology="torus_2d", - n_sips=6, - algorithm="lrab_hierarchical_allreduce", - sip_w=3, sip_h=2, - n_elem_override=n_elem, - ) - # Override buffer_kind in the temp ccl.yaml. - with open(ccl_path) as f: - ccl_cfg = yaml.safe_load(f) - ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind - ccl_cfg.setdefault("algorithms", {}).setdefault( - "lrab_hierarchical_allreduce", {}, - )["buffer_kind"] = buffer_kind - with open(ccl_path, "w") as f: - yaml.dump(ccl_cfg, f, default_flow_style=False) - - topo = resolve_topology(topo_path) - engine = GraphEngine(topo.topology_obj, enable_data=True) - spec = topo.topology_obj.spec - - with RuntimeContext( - engine=engine, - target_device=DeviceSelector("all"), - correlation_id=f"bk_sweep_{buffer_kind}_{n_elem}", - spec=spec, - ) as ctx: - result = run_allreduce( - ctx, engine, spec, - algorithm="lrab_hierarchical_allreduce", ccl_yaml=ccl_path, - ) - assert result["ok_cubes"] > 0 - - pe_exec_vals = [ - float(tr.get("pe_exec_ns", 0.0) or 0.0) - for _, (_, tr) in engine._results.items() - if isinstance(tr, dict) - ] - crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0 - - bytes_per_pe = n_elem * _ELEM_BYTES_F16 - record = { - "buffer_kind": buffer_kind, - "sip_topology": "torus_2d", - "n_sips": 6, - "n_elem": n_elem, - "bytes_per_pe": bytes_per_pe, - "latency_ns": crit_ns, - } - _ROWS_DIR.mkdir(parents=True, exist_ok=True) - row_path = _ROWS_DIR / f"{buffer_kind}_{n_elem}.json" - with open(row_path, "w", encoding="utf-8") as f: - json.dump(record, f) - - -def aggregate_buffer_kind_plot() -> bool: - """Read per-config rows and emit the descriptive .png + .csv (_OUT_STEM). - - Called from conftest.pytest_sessionfinish (controller-only). - Returns True if rows were aggregated. - """ - import csv - - if not _ROWS_DIR.exists(): - return False - row_files = sorted(_ROWS_DIR.glob("*.json")) - if not row_files: - return False - - records = [] - for p in row_files: - with open(p, encoding="utf-8") as f: - records.append(json.load(f)) - - import matplotlib.pyplot as plt - from matplotlib.ticker import FuncFormatter - - def _fmt_bytes(x, _pos): - if x <= 0: - return "0" - if x >= 1024 * 1024: - return f"{x / (1024 * 1024):.0f} MB" - if x >= 1024: - return f"{x / 1024:.0f} KB" - return f"{x:.0f} B" - - _bytes_fmt = FuncFormatter(_fmt_bytes) - - _OUT_DIR.mkdir(parents=True, exist_ok=True) - with open(_OUT_DIR / f"{_OUT_STEM}.csv", "w", - newline="", encoding="utf-8") as f: - w = csv.DictWriter(f, fieldnames=[ - "buffer_kind", "sip_topology", "n_sips", "n_elem", - "bytes_per_pe", "latency_ns", - ]) - w.writeheader() - for r in sorted(records, key=lambda r: ( - r["buffer_kind"], r["bytes_per_pe"], - )): - w.writerow(r) - - colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"} - fig, ax = plt.subplots(figsize=(10, 6)) - for bk in ["tcm", "sram", "hbm"]: - rs = sorted( - [r for r in records if r["buffer_kind"] == bk], - key=lambda r: r["bytes_per_pe"], - ) - if not rs: - continue - ax.plot( - [r["bytes_per_pe"] for r in rs], - [r["latency_ns"] for r in rs], - marker="o", lw=2.0, - color=colors[bk], label=f"buffer_kind = {bk}", - ) - ax.set_xscale("log", base=2) - ax.set_xlabel("Bytes per PE (log scale)") - ax.set_ylabel("Time (ns)") - ax.set_title( - "AllReduce_LRAB_2Dtorus_6SiP(2x3) — IPCQ memory (SRAM, TCM, HBM)" - ) - ax.grid(True, alpha=0.3) - ax.legend() - ax.xaxis.set_major_formatter(_bytes_fmt) - fig.tight_layout() - fig.savefig(_OUT_DIR / f"{_OUT_STEM}.png", dpi=130) - plt.close(fig) - - for p in row_files: - try: - p.unlink() - except OSError: - pass - try: - _ROWS_DIR.rmdir() - except OSError: - pass - - print(f"\nWrote {_OUT_DIR / f'{_OUT_STEM}.png'} " - f"from {len(records)} rows") - return True diff --git a/tests/test_distributed_lrab_hierarchical_allreduce.py b/tests/test_distributed_lrab_hierarchical_allreduce.py deleted file mode 100644 index 1f9d78a..0000000 --- a/tests/test_distributed_lrab_hierarchical_allreduce.py +++ /dev/null @@ -1,119 +0,0 @@ -"""End-to-end distributed test for intercube allreduce. - -Exercises the full process-group path: - dist.init_process_group(backend="ahbm") - → mp.spawn(nprocs=n_sips) - → each worker: set_device → allocate → fill → dist.all_reduce → verify - -This is the same flow a real DDP training script would use. -""" -from __future__ import annotations - -import os -import textwrap -from pathlib import Path - -import numpy as np -import pytest - -TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" - -N_CUBES = 16 -N_ELEM = 8 - - -def _write_ccl_yaml(tmp_path) -> str: - body = textwrap.dedent("""\ - defaults: - algorithm: lrab_hierarchical_allreduce - buffer_kind: tcm - backpressure: sleep - n_slots: 4 - slot_size: 4096 - vc_chunk_size: 256 - ipcq_credit_size_bytes: 16 - - algorithms: - lrab_hierarchical_allreduce: - module: kernbench.ccl.algorithms.lrab_hierarchical_allreduce - topology: none - buffer_kind: tcm - n_elem: 8 - root_cube: 15 - """) - (tmp_path / "ccl.yaml").write_text(body) - return str(tmp_path) - - -def _worker(rank: int, n_sips: int, torch) -> None: - """Per-SIP worker: allocate, fill, all_reduce, verify.""" - from kernbench.policy.placement.dp import DPPolicy - - torch.ahbm.set_device(rank) - - dp = DPPolicy( - cube="row_wise", pe="replicate", - num_pes=1, num_cubes=N_CUBES, - ) - tensor = torch.zeros( - (N_CUBES, N_ELEM), dtype="f16", dp=dp, - name=f"sip{rank}", - ) - - init_arr = np.full((N_CUBES, N_ELEM), float(rank + 1), dtype=np.float16) - tensor.copy_(torch.from_numpy(init_arr)) - - print(f"[SIP {rank}] input cube0[:4] = {tensor.numpy()[0][:4].tolist()}") - - torch.distributed.all_reduce(tensor, op="sum") - - arr = tensor.numpy() - expected = float(N_CUBES * sum(range(1, n_sips + 1))) - - print(f"[SIP {rank}] output cube0[:4] = {arr[0][:4].tolist()}") - print(f"[SIP {rank}] output cube15[:4] = {arr[15][:4].tolist()}") - - for cube_id in range(N_CUBES): - assert np.allclose(arr[cube_id], expected, rtol=1e-1, atol=1e-1), ( - f"SIP{rank} cube {cube_id}: " - f"got {arr[cube_id][:4]}, expected {expected}" - ) - - if rank == 0: - print(f"\n lrab_hierarchical_allreduce (ws={n_sips}): " - f"{n_sips * N_CUBES} OK") - - -def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch): - """Full distributed path: init_process_group → mp.spawn → all_reduce.""" - from kernbench.runtime_api.context import RuntimeContext - from kernbench.runtime_api.types import DeviceSelector - from kernbench.sim_engine.engine import GraphEngine - from kernbench.topology.builder import resolve_topology - - monkeypatch.chdir(_write_ccl_yaml(tmp_path)) - - topo = resolve_topology(str(TOPOLOGY_PATH)) - engine = GraphEngine(topo.topology_obj, enable_data=True) - spec = topo.topology_obj.spec - n_sips = int(spec["system"]["sips"]["count"]) - - with RuntimeContext( - engine=engine, - target_device=DeviceSelector("all"), - correlation_id="dist_intercube_ar", - spec=spec, - ) as ctx: - ctx.distributed.init_process_group(backend="ahbm") - - assert ctx.distributed.get_world_size() == n_sips - - t_start = engine._env.now - - ctx.multiprocessing.spawn( - _worker, args=(n_sips, ctx), nprocs=n_sips, - ) - - t_end = engine._env.now - print(f"\n[distributed] sim latency = " - f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)") diff --git a/tests/test_ipcq_buffer_kind_latency.py b/tests/test_ipcq_buffer_kind_latency.py index e356e64..ec7926e 100644 --- a/tests/test_ipcq_buffer_kind_latency.py +++ b/tests/test_ipcq_buffer_kind_latency.py @@ -20,7 +20,7 @@ Reference (Phase 2 will edit these): - ccl.yaml — algorithm.buffer_kind The tests reuse the existing config-driven allreduce app -(``run_allreduce`` in tests/test_allreduce_multidevice.py) with a 2-SIP +(``run_allreduce`` in tests/sccl/_allreduce_helpers.py) with a 2-SIP ring topology and a SMALL n_elem so they finish fast (~3-5 s each). """ from __future__ import annotations @@ -37,7 +37,7 @@ from kernbench.topology.builder import resolve_topology # Reuse the test app's helpers so this micro-test file does not # duplicate the run-allreduce + write-temp-configs plumbing. -from tests.test_allreduce_multidevice import ( +from tests.sccl._allreduce_helpers import ( _write_temp_configs, run_allreduce, ) diff --git a/tests/test_ipcq_buffer_kind_locations.py b/tests/test_ipcq_buffer_kind_locations.py index 0bb278f..501fb6c 100644 --- a/tests/test_ipcq_buffer_kind_locations.py +++ b/tests/test_ipcq_buffer_kind_locations.py @@ -47,7 +47,7 @@ from kernbench.runtime_api.types import DeviceSelector from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology -from tests.test_allreduce_multidevice import ( +from tests.sccl._allreduce_helpers import ( _write_temp_configs, run_allreduce, ) @@ -59,8 +59,9 @@ def _run_allreduce_with_buffer_kind( """Run one torus_2d 6-SIP allreduce with the given buffer_kind and return critical-path pe_exec_ns (max across all PEs). - Mirrors the sweep harness in test_allreduce_buffer_kind_sweep.py - so the assertions below compare apples-to-apples against that PNG. + Mirrors the buffer-kind sweep harness in + tests/sccl/test_plot_buffer_kind_sweep.py so the assertions + below compare apples-to-apples against that PNG. """ sub = tmp_path / f"{buffer_kind}_{n_elem}" sub.mkdir()