"""Buffer-kind sweep (TCM / SRAM / HBM) on torus_2d 6 SIPs (3×2), distributed. Each parametrized case writes one JSON row; the conftest sessionfinish hook calls ``aggregate_buffer_kind_plot`` to emit the comparison PNG + csv. Pre slot-latency modeling the three lines overlap exactly (slot access is latency-free today). """ from __future__ import annotations import json import pytest import yaml from tests.sccl._allreduce_helpers import ( _BK_ROWS_DIR, _ELEM_BYTES_F16, _bk_params, _crit_ns, _run_distributed, _write_temp_configs, ) @pytest.mark.parametrize("buffer_kind,n_elem", _bk_params()) def test_buffer_kind_allreduce_one(tmp_path, monkeypatch, buffer_kind, n_elem): sub = tmp_path / f"{buffer_kind}_{n_elem}" sub.mkdir() topo_path, ccl_path = _write_temp_configs( sub, sip_topology="torus_2d", n_sips=6, algorithm="lrab_hierarchical_allreduce", sip_w=3, sip_h=2, n_elem_override=n_elem, ) # Override buffer_kind in the temp ccl.yaml (read by the ahbm backend # at init_process_group time via load_ccl_config()). with open(ccl_path) as f: ccl_cfg = yaml.safe_load(f) ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind ccl_cfg.setdefault("algorithms", {}).setdefault( "lrab_hierarchical_allreduce", {}, )["buffer_kind"] = buffer_kind with open(ccl_path, "w") as f: yaml.dump(ccl_cfg, f, default_flow_style=False) engine, _ = _run_distributed( sub, monkeypatch, topo_path, f"bk_sweep_{buffer_kind}_{n_elem}", n_elem, ) crit_ns = _crit_ns(engine) bytes_per_pe = n_elem * _ELEM_BYTES_F16 record = { "buffer_kind": buffer_kind, "sip_topology": "torus_2d", "n_sips": 6, "n_elem": n_elem, "bytes_per_pe": bytes_per_pe, "latency_ns": crit_ns, } _BK_ROWS_DIR.mkdir(parents=True, exist_ok=True) row_path = _BK_ROWS_DIR / f"{buffer_kind}_{n_elem}.json" with open(row_path, "w", encoding="utf-8") as f: json.dump(record, f)