"""Tests for IPCQ deadlock detection (ADR-0023 D14 F3).""" from __future__ import annotations from dataclasses import dataclass, field from typing import Any import pytest import simpy from kernbench.ccl import diagnostics from kernbench.common.ipcq_types import ( IpcqEndpoint, IpcqInitEntry, IpcqRecvCmd, IpcqRequest, ) from kernbench.components.builtin.pe_ipcq import PeIpcqComponent from kernbench.runtime_api.kernel import IpcqInitMsg from kernbench.topology.types import Node @dataclass class _FakeTxn: request: Any done: simpy.Event result_data: dict[str, Any] = field(default_factory=dict) def _make_isolated_pe_ipcq(env): node = Node( id="sip0.cube0.pe0.pe_ipcq", kind="pe_ipcq", impl="builtin.pe_ipcq", attrs={}, pos_mm=None, ) comp = PeIpcqComponent(node, ctx=None) comp.in_ports["host"] = simpy.Store(env) comp.out_ports["sip0.cube0.pe0.pe_dma"] = simpy.Store(env) comp.start(env) peer_credit = simpy.Store(env) ep = IpcqEndpoint( sip=0, cube=0, pe=1, buffer_kind="tcm", rx_base_pa=0x10_000, rx_base_va=0, n_slots=4, slot_size=4096, ) init_msg = IpcqInitMsg( correlation_id="t", request_id="t", target_sips=(0,), target_cubes=(0,), target_pe=0, entries=(IpcqInitEntry( direction="W", peer=ep, my_rx_base_pa=0x40_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_credit, ),), backpressure_mode="sleep", buffer_kind="tcm", credit_size_bytes=16, ) done = env.event() comp.in_ports["host"].put(_FakeTxn(request=init_msg, done=done)) env.run(until=done) return comp def test_pointer_dump_includes_blocked_state(): """A blocked recv should still be visible in the pointer dump.""" env = simpy.Environment() comp = _make_isolated_pe_ipcq(env) # Issue a recv that will block (no data has arrived) recv_cmd = IpcqRecvCmd(direction="W", shape=(8,), dtype="f16", handle_id="r1") req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(req) env.run(until=10) assert not req.done.triggered # Pointer dump should show my_tail=0 and peer_head_cache=0 # We need to use the engine API but for an isolated component, just call directly class FakeEngine: _components = {"sip0.cube0.pe0.pe_ipcq": comp} dump = diagnostics.pointer_dump(FakeEngine()) assert "my_tail=0" in dump assert "peer_head_cache=0" in dump def test_deadlock_detection_recv_without_send(): """A recv with no matching sender → SimPy schedule empties → engine raises ``IpcqDeadlock`` with a pointer dump. """ from kernbench.ccl.diagnostics import IpcqDeadlock from kernbench.policy.placement.dp import DPPolicy from kernbench.runtime_api.bench_runner import run_bench from kernbench.runtime_api.types import resolve_device from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import resolve_topology def deadlock_kernel(t_ptr, n_elem, tl): # Every PE just receives, no sends → no one delivers → deadlock tl.recv(dir="W", shape=(n_elem,), dtype="f16") topo = resolve_topology("topology.yaml") def run(torch): torch.install_ipcq( algorithm="ring_allreduce_tcm", world_size_override=8, ) a = torch.zeros( (1, 8 * 8), dtype="f16", dp=DPPolicy( cube="replicate", pe="column_wise", num_cubes=1, ), name="dl_in", ) torch.launch("dl", deadlock_kernel, a, 8) with pytest.raises(IpcqDeadlock): run_bench( topology=topo, bench_fn=run, device=resolve_device("all"), engine_factory=lambda t, d: GraphEngine( getattr(t, "topology_obj", t), enable_data=True ), )