"""Allreduce latency sweep (distributed path), xdist-friendly. Each parametrized case writes one JSON row to the shared staging dir; the conftest sessionfinish hook calls ``_aggregate_sweep_plots`` to emit the per-topology PNGs + summary.csv after all cases finish. """ from __future__ import annotations import json import pytest from tests.sccl._allreduce_helpers import ( _ELEM_BYTES_F16, _SWEEP_ROWS_DIR, _crit_ns, _run_distributed, _sweep_params, _write_temp_configs, ) @pytest.mark.parametrize( "algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(), ) def test_allreduce_latency_one( tmp_path, monkeypatch, algorithm, sip_topology, n_sips, sip_w, sip_h, n_elem, ): topo_path, _ = _write_temp_configs( tmp_path, sip_topology, n_sips, algorithm, sip_w=sip_w, sip_h=sip_h, n_elem_override=n_elem, ) engine, n_cubes = _run_distributed( tmp_path, monkeypatch, topo_path, f"sweep_{algorithm}_{sip_topology}_{n_elem}", n_elem, ) crit_ns = _crit_ns(engine) bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16 bytes_per_pe = n_elem * _ELEM_BYTES_F16 record = { "algorithm": algorithm, "sip_topology": sip_topology, "n_sips": n_sips, "n_elem": n_elem, "bytes_per_pe": bytes_per_pe, "bytes_per_sip": bytes_per_sip, "latency_ns": crit_ns, } _SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True) row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json" with open(row_path, "w", encoding="utf-8") as f: json.dump(record, f)