"""Tests for PE_IPCQ component (ADR-0023 D1, D2, D9, D14). These tests use a mock setup: PeIpcqComponent is instantiated directly, its in_ports/out_ports are wired to plain SimPy Stores, and IpcqInitMsg is delivered via a simple dummy transaction wrapper. PE_DMA is mocked as a Store that we drain manually. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any import pytest import simpy from kernbench.common.ipcq_types import ( IpcqCreditMetadata, IpcqDmaToken, IpcqEndpoint, IpcqInitEntry, IpcqInvalidDirection, IpcqMetaArrival, IpcqRecvCmd, IpcqRequest, IpcqSendCmd, ) from kernbench.components.builtin.pe_ipcq import PeIpcqComponent from kernbench.runtime_api.kernel import IpcqInitMsg from kernbench.topology.types import Node # ── Fakes / fixtures ───────────────────────────────────────────────── @dataclass class _FakeTxn: request: Any done: simpy.Event result_data: dict[str, Any] = field(default_factory=dict) def _make_pe_ipcq(env: simpy.Environment, pe_prefix: str = "sip0.cube0.pe0") -> PeIpcqComponent: """Create a PeIpcqComponent with mocked ports. Returns the component with: - in_ports["host"] for posting IpcqInitMsg / IpcqRequest - out_ports["__pe_dma__"] for outgoing IpcqDmaToken (drain manually) - The component is started. """ node = Node( id=f"{pe_prefix}.pe_ipcq", kind="pe_ipcq", impl="builtin.pe_ipcq", attrs={}, pos_mm=None, ) comp = PeIpcqComponent(node, ctx=None) comp.in_ports["host"] = simpy.Store(env) comp.out_ports[f"{pe_prefix}.pe_dma"] = simpy.Store(env) comp.start(env) return comp def _install_two_neighbors(env: simpy.Environment, comp: PeIpcqComponent) -> tuple[simpy.Store, simpy.Store]: """Install E and W neighbor entries with peer_credit_stores. Returns (peer_e_credit_store, peer_w_credit_store) — i.e. the stores that the component will put credits into when it receives data. """ peer_e_credit = simpy.Store(env) peer_w_credit = simpy.Store(env) ep_e = IpcqEndpoint( sip=0, cube=0, pe=1, buffer_kind="tcm", rx_base_pa=0x10_000, rx_base_va=0, n_slots=4, slot_size=4096, ) ep_w = IpcqEndpoint( sip=0, cube=0, pe=2, buffer_kind="tcm", rx_base_pa=0x20_000, rx_base_va=0, n_slots=4, slot_size=4096, ) init_msg = IpcqInitMsg( correlation_id="t", request_id="t", target_sips=(0,), target_cubes=(0,), target_pe=0, entries=( IpcqInitEntry( direction="E", peer=ep_e, my_rx_base_pa=0x30_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_e_credit, ), IpcqInitEntry( direction="W", peer=ep_w, my_rx_base_pa=0x40_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_w_credit, ), ), backpressure_mode="sleep", buffer_kind="tcm", credit_size_bytes=16, ) done = env.event() comp.in_ports["host"].put(_FakeTxn(request=init_msg, done=done)) env.run(until=done) return peer_e_credit, peer_w_credit # ── send: forward token to PE_DMA ──────────────────────────────────── def test_send_forwards_token_to_pe_dma(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) pe_dma = comp.out_ports["sip0.cube0.pe0.pe_dma"] cmd = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=128, shape=(8, 8), dtype="f16", handle_id="s1", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) # Token should be in PE_DMA's mock store assert len(pe_dma.items) == 1 token = pe_dma.items[0] assert isinstance(token, IpcqDmaToken) assert token.dst_addr == 0x10_000 # peer.rx_base_pa + 0 assert token.nbytes == 128 assert token.sender_seq == 0 assert token.src_direction == "E" def test_send_advances_my_head_and_slot_addresses(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) pe_dma = comp.out_ports["sip0.cube0.pe0.pe_dma"] for i in range(3): cmd = IpcqSendCmd( direction="E", src_addr=0x500 + i, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id=f"s{i}", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) tokens = pe_dma.items assert [t.sender_seq for t in tokens] == [0, 1, 2] # slot addresses: peer.rx_base_pa (0x10_000) + i * slot_size (4096) assert [t.dst_addr for t in tokens] == [0x10_000, 0x11_000, 0x12_000] def test_send_invalid_direction_raises(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) cmd = IpcqSendCmd( direction="N", src_addr=0x100, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id="s_bad", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) with pytest.raises(IpcqInvalidDirection): env.run(until=done) # ── recv: wait for data and return slot address ───────────────────── def test_recv_waits_until_metadata_arrives(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) recv_cmd = IpcqRecvCmd( direction="W", shape=(8,), dtype="f16", handle_id="r1", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) # Run a bit — recv should not complete yet (no data) env.run(until=10) assert not recv_req.done.triggered # Simulate metadata arrival from peer (W direction = sender pe=2) fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=recv_req.done) assert recv_req.result_data["src_addr"] == 0x40_000 # my_rx_base_pa for W assert recv_req.result_data["direction"] == "W" def test_recv_returns_immediately_if_data_already_present(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # Pre-arrive metadata fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=5) recv_cmd = IpcqRecvCmd( direction="W", shape=(8,), dtype="f16", handle_id="r1", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) env.run(until=recv_req.done) assert recv_req.result_data["src_addr"] == 0x40_000 def test_recv_round_robin_picks_arrived_direction(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # Pre-arrive metadata only on W direction fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=5) # recv() with no direction → round-robin recv_cmd = IpcqRecvCmd( direction=None, shape=(8,), dtype="f16", handle_id="r_rr", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) env.run(until=recv_req.done) assert recv_req.result_data["direction"] == "W" # ── backpressure: send blocks when full ────────────────────────────── def test_send_blocks_when_peer_slot_full(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # n_slots = 4, so 4 sends should succeed; 5th blocks for i in range(4): cmd = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id=f"s{i}", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) # 5th send: should not complete cmd5 = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id="s5", ) req5 = IpcqRequest(command=cmd5, done=env.event()) comp.in_ports["host"].put(req5) env.run(until=20) assert not req5.done.triggered # Send a credit return: peer (E direction, pe=1) consumed slot 0 credit = IpcqCreditMetadata( consumer_seq=1, # peer consumed up to my_tail=1 src_sip=0, src_cube=0, src_pe=1, src_direction="W", # peer's view ) comp.credit_inbox.put(credit) env.run(until=req5.done) assert req5.done.triggered # ── Init test ──────────────────────────────────────────────────────── def test_init_installs_neighbors(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) assert "E" in comp._queue_pairs assert "W" in comp._queue_pairs assert comp._queue_pairs["E"]["peer"].pe == 1 assert comp._queue_pairs["W"]["peer"].pe == 2 assert comp._queue_pairs["E"]["my_head"] == 0 assert comp._queue_pairs["E"]["peer_tail_cache"] == 0