"""Shared pytest fixtures for the kernbench test suite. Session-scoped topology caching: ``resolve_topology("topology.yaml")`` is pure (no side effects), so we cache the result across all tests in a worker process. Each test still builds its own ``GraphEngine`` (which is stateful/SimPy-event-consuming and MUST NOT be shared). """ from __future__ import annotations import os import pytest from kernbench.topology.builder import resolve_topology def pytest_sessionfinish(session, exitstatus): """Aggregate parametrized sweep rows into combined CSV + PNG plots. Runs on the controller node only (xdist worker processes set ``PYTEST_XDIST_WORKER``; we skip those). Idempotent — does nothing if no sweep rows are present (e.g., when the sweep was filtered out). """ if os.environ.get("PYTEST_XDIST_WORKER"): return import importlib.util import sys from pathlib import Path mod_path = Path(__file__).parent / "test_allreduce_multidevice.py" if not mod_path.exists(): return spec = importlib.util.spec_from_file_location( "_test_allreduce_multidevice_for_aggregate", mod_path, ) if spec is None or spec.loader is None: return mod = importlib.util.module_from_spec(spec) sys.modules[spec.name] = mod try: spec.loader.exec_module(mod) agg = getattr(mod, "_aggregate_sweep_plots", None) if agg is not None: agg() except Exception as e: print(f"[conftest] sweep aggregation failed: {e}") @pytest.fixture(scope="session") def topology(): """Session-scoped parsed topology (immutable graph + spec). Usage in tests:: def test_foo(topology): engine = GraphEngine(topology.topology_obj, enable_data=True) """ return resolve_topology("topology.yaml") @pytest.fixture(scope="session") def topology_obj(topology): """The TopologyGraph inside the handle (convenience shortcut).""" return topology.topology_obj @pytest.fixture(scope="session") def spec(topology): """Topology spec dict (convenience shortcut).""" return topology.topology_obj.spec