"""Tests for PE_IPCQ component (ADR-0023 D1, D2, D9, D14). These tests use a mock setup: PeIpcqComponent is instantiated directly, its in_ports/out_ports are wired to plain SimPy Stores, and IpcqInitMsg is delivered via a simple dummy transaction wrapper. PE_DMA is mocked as a Store that we drain manually. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any import pytest import simpy from kernbench.common.ipcq_types import ( IpcqCreditMetadata, IpcqDmaToken, IpcqEndpoint, IpcqInitEntry, IpcqInvalidDirection, IpcqMetaArrival, IpcqRecvCmd, IpcqRequest, IpcqSendCmd, ) from kernbench.components.builtin.pe_ipcq import PeIpcqComponent from kernbench.runtime_api.kernel import IpcqInitMsg from kernbench.topology.types import Node # ── Fakes / fixtures ───────────────────────────────────────────────── @dataclass class _FakeTxn: request: Any done: simpy.Event result_data: dict[str, Any] = field(default_factory=dict) def _make_pe_ipcq(env: simpy.Environment, pe_prefix: str = "sip0.cube0.pe0") -> PeIpcqComponent: """Create a PeIpcqComponent with mocked ports. Returns the component with: - in_ports["host"] for posting IpcqInitMsg / IpcqRequest - out_ports["__pe_dma__"] for outgoing IpcqDmaToken (drain manually) - The component is started. """ node = Node( id=f"{pe_prefix}.pe_ipcq", kind="pe_ipcq", impl="builtin.pe_ipcq", attrs={}, pos_mm=None, ) comp = PeIpcqComponent(node, ctx=None) comp.in_ports["host"] = simpy.Store(env) comp.out_ports[f"{pe_prefix}.pe_dma"] = simpy.Store(env) comp.start(env) return comp def _install_two_neighbors(env: simpy.Environment, comp: PeIpcqComponent) -> tuple[simpy.Store, simpy.Store]: """Install E and W neighbor entries with peer_credit_stores. Returns (peer_e_credit_store, peer_w_credit_store) — i.e. the stores that the component will put credits into when it receives data. """ peer_e_credit = simpy.Store(env) peer_w_credit = simpy.Store(env) ep_e = IpcqEndpoint( sip=0, cube=0, pe=1, buffer_kind="tcm", rx_base_pa=0x10_000, rx_base_va=0, n_slots=4, slot_size=4096, ) ep_w = IpcqEndpoint( sip=0, cube=0, pe=2, buffer_kind="tcm", rx_base_pa=0x20_000, rx_base_va=0, n_slots=4, slot_size=4096, ) init_msg = IpcqInitMsg( correlation_id="t", request_id="t", target_sips=(0,), target_cubes=(0,), target_pe=0, entries=( IpcqInitEntry( direction="E", peer=ep_e, my_rx_base_pa=0x30_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_e_credit, ), IpcqInitEntry( direction="W", peer=ep_w, my_rx_base_pa=0x40_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_w_credit, ), ), backpressure_mode="sleep", buffer_kind="tcm", credit_size_bytes=16, ) done = env.event() comp.in_ports["host"].put(_FakeTxn(request=init_msg, done=done)) env.run(until=done) return peer_e_credit, peer_w_credit # ── send: forward token to PE_DMA ──────────────────────────────────── def test_send_forwards_token_to_pe_dma(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) pe_dma = comp.out_ports["sip0.cube0.pe0.pe_dma"] cmd = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=128, shape=(8, 8), dtype="f16", handle_id="s1", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) # Token should be in PE_DMA's mock store assert len(pe_dma.items) == 1 token = pe_dma.items[0] assert isinstance(token, IpcqDmaToken) assert token.dst_addr == 0x10_000 # peer.rx_base_pa + 0 assert token.nbytes == 128 assert token.sender_seq == 0 assert token.src_direction == "E" def test_send_advances_my_head_and_slot_addresses(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) pe_dma = comp.out_ports["sip0.cube0.pe0.pe_dma"] for i in range(3): cmd = IpcqSendCmd( direction="E", src_addr=0x500 + i, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id=f"s{i}", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) tokens = pe_dma.items assert [t.sender_seq for t in tokens] == [0, 1, 2] # slot addresses: peer.rx_base_pa (0x10_000) + i * slot_size (4096) assert [t.dst_addr for t in tokens] == [0x10_000, 0x11_000, 0x12_000] def test_send_invalid_direction_raises(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) cmd = IpcqSendCmd( direction="N", src_addr=0x100, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id="s_bad", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) with pytest.raises(IpcqInvalidDirection): env.run(until=done) # ── recv: wait for data and return slot address ───────────────────── def test_recv_waits_until_metadata_arrives(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) recv_cmd = IpcqRecvCmd( direction="W", shape=(8,), dtype="f16", handle_id="r1", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) # Run a bit — recv should not complete yet (no data) env.run(until=10) assert not recv_req.done.triggered # Simulate metadata arrival from peer (W direction = sender pe=2) fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=recv_req.done) assert recv_req.result_data["src_addr"] == 0x40_000 # my_rx_base_pa for W assert recv_req.result_data["direction"] == "W" def test_recv_returns_immediately_if_data_already_present(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # Pre-arrive metadata fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=5) recv_cmd = IpcqRecvCmd( direction="W", shape=(8,), dtype="f16", handle_id="r1", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) env.run(until=recv_req.done) assert recv_req.result_data["src_addr"] == 0x40_000 def test_recv_round_robin_picks_arrived_direction(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # Pre-arrive metadata only on W direction fake_token = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="x", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=2, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=fake_token)) env.run(until=5) # recv() with no direction → round-robin recv_cmd = IpcqRecvCmd( direction=None, shape=(8,), dtype="f16", handle_id="r_rr", ) recv_req = IpcqRequest(command=recv_cmd, done=env.event()) comp.in_ports["host"].put(recv_req) env.run(until=recv_req.done) assert recv_req.result_data["direction"] == "W" # ── backpressure: send blocks when full ────────────────────────────── def test_send_blocks_when_peer_slot_full(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) # n_slots = 4, so 4 sends should succeed; 5th blocks for i in range(4): cmd = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id=f"s{i}", ) done = env.event() comp.in_ports["host"].put(IpcqRequest(command=cmd, done=done)) env.run(until=done) # 5th send: should not complete cmd5 = IpcqSendCmd( direction="E", src_addr=0x500, src_space="tcm", nbytes=64, shape=(8,), dtype="f16", handle_id="s5", ) req5 = IpcqRequest(command=cmd5, done=env.event()) comp.in_ports["host"].put(req5) env.run(until=20) assert not req5.done.triggered # Send a credit return: peer (E direction, pe=1) consumed slot 0. # dst_rx_base_pa is the peer-side rx buffer — which equals my qp_E's # peer.rx_base_pa (0x10_000 from _install_two_neighbors). credit = IpcqCreditMetadata( consumer_seq=1, # peer consumed up to my_tail=1 dst_rx_base_pa=0x10_000, # E's peer.rx_base_pa (ADR-0025 D3) src_sip=0, src_cube=0, src_pe=1, src_direction="W", # peer's view ) comp.credit_inbox.put(credit) env.run(until=req5.done) assert req5.done.triggered # ── Init test ──────────────────────────────────────────────────────── def test_init_installs_neighbors(): env = simpy.Environment() comp = _make_pe_ipcq(env) _install_two_neighbors(env, comp) assert "E" in comp._queue_pairs assert "W" in comp._queue_pairs assert comp._queue_pairs["E"]["peer"].pe == 1 assert comp._queue_pairs["W"]["peer"].pe == 2 assert comp._queue_pairs["E"]["my_head"] == 0 assert comp._queue_pairs["E"]["peer_tail_cache"] == 0 # ── ADR-0025: address-based matching in meta arrival / credit ──────── def _install_same_peer_neighbors( env: simpy.Environment, comp: PeIpcqComponent, ) -> tuple[simpy.Store, simpy.Store]: """Install E and W neighbors BOTH pointing to the same peer (pe=1). This mirrors the 2-rank bidirectional ring topology (ADR-0025 motivation): rank 0's E and W neighbors are the same peer rank, but target different rx slots on that peer (E→peer's W-rx, W→peer's E-rx). - E's peer.rx_base_pa = 0x10_000 (peer's W-rx buffer) - W's peer.rx_base_pa = 0x20_000 (peer's E-rx buffer) - my_rx_base_pa: E=0x30_000, W=0x40_000 (local rx for each dir) """ peer_e_credit = simpy.Store(env) peer_w_credit = simpy.Store(env) ep_e = IpcqEndpoint( sip=0, cube=0, pe=1, buffer_kind="tcm", rx_base_pa=0x10_000, rx_base_va=0, n_slots=4, slot_size=4096, ) ep_w = IpcqEndpoint( sip=0, cube=0, pe=1, # SAME peer as ep_e buffer_kind="tcm", rx_base_pa=0x20_000, rx_base_va=0, # different target slot n_slots=4, slot_size=4096, ) init_msg = IpcqInitMsg( correlation_id="t", request_id="t", target_sips=(0,), target_cubes=(0,), target_pe=0, entries=( IpcqInitEntry( direction="E", peer=ep_e, my_rx_base_pa=0x30_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_e_credit, ), IpcqInitEntry( direction="W", peer=ep_w, my_rx_base_pa=0x40_000, my_rx_base_va=0, n_slots=4, slot_size=4096, peer_credit_store=peer_w_credit, ), ), backpressure_mode="sleep", buffer_kind="tcm", credit_size_bytes=16, ) done = env.event() comp.in_ports["host"].put(_FakeTxn(request=init_msg, done=done)) env.run(until=done) return peer_e_credit, peer_w_credit def test_meta_arrival_matches_by_dst_addr_same_peer(): """ADR-0025 D2: when E and W point to the same peer (2-rank ring), dst_addr range must determine which qp's peer_head_cache updates. Under the old sender-key matching, the first matching direction (E) would win for any arrival, regardless of which rx slot was written. Under D2 address-based matching, dst_addr within W's rx range (my_rx_base_pa_W .. +n_slots*slot_size) must update W, and dst_addr within E's rx range must update E. """ env = simpy.Environment() comp = _make_pe_ipcq(env) _install_same_peer_neighbors(env, comp) # Arrival into W's rx buffer (my_rx_base_pa=0x40_000) token_into_w = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x40_000, dst_endpoint=comp._queue_pairs["W"]["peer"], nbytes=64, handle_id="w1", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=1, src_direction="E", ) comp.in_ports["host"].put(IpcqMetaArrival(token=token_into_w)) env.run(until=5) # W's peer_head_cache should increment; E's stays 0. assert comp._queue_pairs["W"]["peer_head_cache"] == 1, ( "W qp should have been updated because dst_addr is in W's rx range" ) assert comp._queue_pairs["E"]["peer_head_cache"] == 0, ( "E qp should NOT be updated; current sender-key matching wrongly " "picks the first direction with a matching peer" ) # Second arrival into E's rx buffer (my_rx_base_pa=0x30_000) token_into_e = IpcqDmaToken( src_addr=0, src_space="tcm", dst_addr=0x30_000, dst_endpoint=comp._queue_pairs["E"]["peer"], nbytes=64, handle_id="e1", shape=(8,), dtype="f16", sender_seq=0, src_sip=0, src_cube=0, src_pe=1, src_direction="W", ) comp.in_ports["host"].put(IpcqMetaArrival(token=token_into_e)) env.run(until=10) assert comp._queue_pairs["E"]["peer_head_cache"] == 1 assert comp._queue_pairs["W"]["peer_head_cache"] == 1 def test_credit_matches_by_dst_rx_base_pa_same_peer(): """ADR-0025 D3: credit must carry dst_rx_base_pa (the receiver-side rx buffer base) so the original sender can match it against qp.peer.rx_base_pa and find the correct direction. Under old sender-key matching, first-match-wins would always pick E when E and W share the same peer. """ env = simpy.Environment() comp = _make_pe_ipcq(env) _install_same_peer_neighbors(env, comp) # Credit corresponding to a send through W direction: # - My W sent to peer's rx at 0x20_000 (qp_w["peer"].rx_base_pa) # - Peer consumed it; sends credit back with dst_rx_base_pa=0x20_000 # - Receiver (me, the original sender) should update W's peer_tail_cache credit_for_w = IpcqCreditMetadata( consumer_seq=1, dst_rx_base_pa=0x20_000, # matches W's peer.rx_base_pa src_sip=0, src_cube=0, src_pe=1, src_direction="E", ) comp.credit_inbox.put(credit_for_w) env.run(until=5) assert comp._queue_pairs["W"]["peer_tail_cache"] == 1, ( "W's peer_tail_cache should update — credit.dst_rx_base_pa matches " "W qp's peer.rx_base_pa" ) assert comp._queue_pairs["E"]["peer_tail_cache"] == 0, ( "E's peer_tail_cache should NOT update" ) # Second credit: for E direction credit_for_e = IpcqCreditMetadata( consumer_seq=2, dst_rx_base_pa=0x10_000, # matches E's peer.rx_base_pa src_sip=0, src_cube=0, src_pe=1, src_direction="W", ) comp.credit_inbox.put(credit_for_e) env.run(until=10) assert comp._queue_pairs["E"]["peer_tail_cache"] == 2 assert comp._queue_pairs["W"]["peer_tail_cache"] == 1