Files
kernbench2/tests/conftest.py
mukesh b610cb0d9a sccl: drive allreduce tests via torch.distributed; reorganize into tests/sccl/
Convert the multidevice allreduce correctness + latency/buffer-kind sweeps
to run through the real PyTorch-distributed path
(init_process_group(backend="ahbm") -> mp.spawn -> dist.all_reduce) instead
of direct ctx.launch, and reorganize the CCL/allreduce tests into a
tests/sccl/ package split one test per file.

Production change (required for the distributed path on non-square SIP grids):
- AhbmCCLBackend now reads explicit system.sips.w/h from the spec, with a
  square-only sqrt fallback that raises on ambiguity, instead of silently
  guessing round(sqrt(count)). This fixes the 2x3 / 3x2 torus + mesh cases,
  which previously resolved to a wrong 2x2 grid. Mirrors the test helper's
  _sip_topo_dims precedence (explicit w/h > square fallback > raise).

Test reorganization (tests/sccl/):
- _allreduce_helpers.py: shared plumbing (distributed driver, config writers,
  direct-launch run_allreduce parity reference, sweep/buffer-kind constants,
  plot aggregators, topology-diagram + FSIM-comparison emitters).
- test_allreduce_ring_torus_mesh.py: correctness across ring/torus/mesh.
- test_distributed_default_topology.py: full distributed path on topology.yaml.
- test_plot_latency_sweep.py / test_plot_buffer_kind_sweep.py: sweep rows.
- test_plot_topology_diagram.py / test_plot_comparison_fsim.py: plot emitters.
- test_intercube_root_center.py: moved in (ADR-0032 center-root latency guard).

Also:
- Move the FSIM comparison plot generator out of scripts/ into the sccl suite.
- Delete superseded test files (test_allreduce_multidevice,
  test_distributed_lrab_hierarchical_allreduce, test_allreduce_buffer_kind_sweep)
  and repoint conftest aggregators + the ipcq buffer-kind importers.
- Regenerate the allreduce_latency_plots derived artifacts from the full sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 22:24:43 -07:00

75 lines
2.3 KiB
Python

"""Shared pytest fixtures for the kernbench test suite.
Session-scoped topology caching: ``resolve_topology("topology.yaml")`` is
pure (no side effects), so we cache the result across all tests in a
worker process. Each test still builds its own ``GraphEngine`` (which is
stateful/SimPy-event-consuming and MUST NOT be shared).
"""
from __future__ import annotations
import os
import pytest
from kernbench.topology.builder import resolve_topology
def pytest_sessionfinish(session, exitstatus):
"""Aggregate parametrized sweep rows into combined CSV + PNG plots.
Runs on the controller node only (xdist worker processes set
``PYTEST_XDIST_WORKER``; we skip those). Idempotent — does nothing
if no sweep rows are present (e.g., when the sweep was filtered out).
"""
if os.environ.get("PYTEST_XDIST_WORKER"):
return
import importlib.util
import sys
from pathlib import Path
def _exec(name: str, attr: str) -> None:
mod_path = Path(__file__).parent / name
if not mod_path.exists():
return
s = importlib.util.spec_from_file_location(
f"_{name.removesuffix('.py')}_for_aggregate", mod_path,
)
if s is None or s.loader is None:
return
mod = importlib.util.module_from_spec(s)
sys.modules[s.name] = mod
try:
s.loader.exec_module(mod)
fn = getattr(mod, attr, None)
if fn is not None:
fn()
except Exception as e:
print(f"[conftest] aggregator {attr}() in {name} failed: {e}")
_exec("sccl/_allreduce_helpers.py", "_aggregate_sweep_plots")
_exec("sccl/_allreduce_helpers.py", "aggregate_buffer_kind_plot")
@pytest.fixture(scope="session")
def topology():
"""Session-scoped parsed topology (immutable graph + spec).
Usage in tests::
def test_foo(topology):
engine = GraphEngine(topology.topology_obj, enable_data=True)
"""
return resolve_topology("topology.yaml")
@pytest.fixture(scope="session")
def topology_obj(topology):
"""The TopologyGraph inside the handle (convenience shortcut)."""
return topology.topology_obj
@pytest.fixture(scope="session")
def spec(topology):
"""Topology spec dict (convenience shortcut)."""
return topology.topology_obj.spec