"""Tests for flit-streaming latency model (ADR-0033 v2 / Max F). The Phase 2 changes split every transaction's payload into flits of `flit_bytes` and stream them through the fabric via wires. Routers do RR arbitration between active flows at output ports. The HBM CTRL receives flits individually and dispatches each to a PC. This eliminates the atomic-FIFO wire serialization that caused timing drift in slow-upstream and multi-stream-merge scenarios. Naming note (ADR-0033 D1/D2): we use NoC terminology — a `Flit` is the atomic wire transport unit. For modeling tractability our `flit_bytes` equals the HBM `burst_bytes` (256B). Real HW has flit (~32B) smaller than burst (~256B); we conflate the two. See ADR-0033 D2 for the fidelity caveat. Chunking happens AT THE WIRE: source components emit whole Transactions, the wire decomposes them into Flits on first transport, downstream wires pass Flits through. Source code is unchanged. These tests are written BEFORE the production change and are expected to FAIL on current code (which still does Transaction-atomic wire delivery). Phase 2 must make them PASS without weakening assertions. """ from __future__ import annotations from pathlib import Path import pytest from kernbench.policy.address.phyaddr import PhysAddr from kernbench.runtime_api.kernel import ( MemoryReadMsg, MemoryWriteMsg, PeDmaMsg, ) from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import load_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" # Constants from topology.yaml defaults FLIT_BYTES = 256 # = HBM burst_bytes in our simplified model NUM_PCS = 8 PC_BW_GBS = 32.0 COMMIT_TIME_NS = FLIT_BYTES / PC_BW_GBS # 8 ns (HBM PC commit for one flit) # Reasonable per-test path-overhead budget (router overheads, prop, UCIe etc.) OVERHEAD_BUDGET_NS = 80.0 def _engine() -> GraphEngine: return GraphEngine(load_topology(TOPOLOGY_PATH)) def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0, offset: int = 0x1000) -> int: slice_bytes = 48 * (1 << 30) // 8 return PhysAddr.pe_hbm_addr( sip_id=sip, die_id=cube, pe_id=pe_id, pe_local_hbm_offset=offset, slice_size_bytes=slice_bytes, ).encode() def _write_msg(req_id: str, *, cube: int, pe: int, nbytes: int) -> MemoryWriteMsg: return MemoryWriteMsg( correlation_id="flit-stream", request_id=req_id, dst_sip=0, dst_cube=cube, dst_pe=pe, dst_pa=_hbm_pa(sip=0, cube=cube, pe_id=pe), nbytes=nbytes, pattern="zero", target_pe=pe, ) def _read_msg(req_id: str, *, cube: int, pe: int, nbytes: int) -> MemoryReadMsg: return MemoryReadMsg( correlation_id="flit-stream", request_id=req_id, src_sip=0, src_cube=cube, src_pe=pe, src_pa=_hbm_pa(sip=0, cube=cube, pe_id=pe), nbytes=nbytes, ) def _pe_dma_write(req_id: str, *, src_cube: int, src_pe: int, dst_cube: int, dst_pe: int, nbytes: int) -> PeDmaMsg: return PeDmaMsg( correlation_id="flit-stream", request_id=req_id, src_sip=0, src_cube=src_cube, src_pe=src_pe, dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=dst_pe), nbytes=nbytes, is_write=True, ) def _path_drain_for_request(eng: GraphEngine, request) -> float: """Dynamically compute the path drain_ns the engine would assign to this request. Reads engine internals (test-time only) so tests reflect the actual path bottleneck (e.g., MemoryWrite goes via UCIe = 128 GB/s, PE_DMA same-cube stays in cube fabric = 256 GB/s).""" if isinstance(request, MemoryWriteMsg): sip, pa_val = request.dst_sip, request.dst_pa pcie_ep_id = eng._resolver.find_pcie_ep(sip) pa = PhysAddr.decode(pa_val) hbm_node = eng._resolver.resolve(pa) path = eng._router.find_memory_path(pcie_ep_id, hbm_node) elif isinstance(request, MemoryReadMsg): sip, pa_val = request.src_sip, request.src_pa pcie_ep_id = eng._resolver.find_pcie_ep(sip) pa = PhysAddr.decode(pa_val) hbm_node = eng._resolver.resolve(pa) path = eng._router.find_memory_path(pcie_ep_id, hbm_node) elif isinstance(request, PeDmaMsg): pe_prefix = f"sip{request.src_sip}.cube{request.src_cube}.pe{request.src_pe}" pa = PhysAddr.decode(request.dst_pa) dst_node = eng._resolver.resolve(pa) path = eng._router.find_path(pe_prefix, dst_node) else: raise ValueError(f"unsupported request type: {type(request).__name__}") return eng._path_drain_ns(path, request.nbytes) def _single_write_ns(nbytes: int, cube: int = 0, pe: int = 0) -> tuple[float, float]: """Return (total_ns, path_drain_ns) for a single MemoryWrite.""" eng = _engine() msg = _write_msg(f"s-{cube}-{pe}-{nbytes}", cube=cube, pe=pe, nbytes=nbytes) drain = _path_drain_for_request(eng, msg) h = eng.submit(msg) eng.wait(h) return eng.get_completion(h)[1]["total_ns"], drain # ── 1. Flit dataclass + Transaction.into_flits ───────────────────── def test_flit_dataclass_exists(): """Phase 2 must add a Flit dataclass in sim_engine.transaction. Required fields: - txn: reference to parent Transaction - flit_index: 0..n_flits-1 - flit_nbytes: bytes carried by this flit (usually flit_bytes; last may be smaller) - is_last: True for the final flit """ import dataclasses from kernbench.sim_engine.transaction import Flit fields = {f.name for f in dataclasses.fields(Flit)} for required in ("txn", "flit_index", "flit_nbytes", "is_last"): assert required in fields, f"Flit dataclass missing required field: {required}" def test_transaction_into_flits_count(): """Transaction.into_flits(flit_bytes) must yield ceil(nbytes/flit_bytes) flits with correct flit_nbytes (last may be partial) and indices. """ from kernbench.sim_engine.transaction import Transaction txn = Transaction( request=None, path=["a", "b"], step=0, nbytes=1024, done=None, drain_ns=0.0, ) flits = list(txn.into_flits(FLIT_BYTES)) assert len(flits) == 4, f"1024 / 256 = 4 flits, got {len(flits)}" for i, f in enumerate(flits): assert f.flit_index == i assert f.flit_nbytes == FLIT_BYTES assert f.is_last == (i == 3) assert f.txn is txn def test_transaction_into_flits_partial_last(): """A transaction with nbytes not divisible by flit_bytes must yield a final partial flit.""" from kernbench.sim_engine.transaction import Transaction txn = Transaction( request=None, path=["a", "b"], step=0, nbytes=FLIT_BYTES * 3 + 64, done=None, ) flits = list(txn.into_flits(FLIT_BYTES)) assert len(flits) == 4 assert flits[-1].flit_nbytes == 64 assert flits[-1].is_last is True assert flits[0].flit_nbytes == FLIT_BYTES def test_transaction_into_flits_single_flit(): """A small transaction (<= flit_bytes) produces exactly one flit with is_last=True.""" from kernbench.sim_engine.transaction import Transaction txn = Transaction(request=None, path=["a", "b"], step=0, nbytes=128, done=None) flits = list(txn.into_flits(FLIT_BYTES)) assert len(flits) == 1 assert flits[0].flit_nbytes == 128 assert flits[0].is_last is True # ── 2. Single transfer accuracy (flit-streaming should fix the # slow-upstream cut-through over-credit) ── def test_slow_upstream_single_2kb_total_matches_drain_plus_commit(): """A 2KB write through MemoryWrite path (host → PCIe → IO → UCIe → cube router → HBM_CTRL). The path bottleneck is UCIe (128 GB/s in this topology). Expected total ≈ drain (= 2048/128 = 16 ns) + commit_time (= 8 ns) + path overheads. Current model under-counts because cut-through subtraction over-credits the slow drain. Flit-streaming (chunk-loop drain) charges both terms. """ nbytes = 2048 total, drain = _single_write_ns(nbytes, cube=0, pe=0) min_expected = drain + COMMIT_TIME_NS max_expected = min_expected + OVERHEAD_BUDGET_NS assert total >= min_expected - 1.0, ( f"2KB write total {total:.2f}ns below minimum {min_expected:.2f}ns " f"(drain={drain:.2f} + commit_time={COMMIT_TIME_NS:.2f}); " f"flit-streaming must charge both" ) assert total <= max_expected, ( f"2KB write total {total:.2f}ns above maximum {max_expected:.2f}ns " f"(drain={drain:.2f} + commit + {OVERHEAD_BUDGET_NS:.0f}ns overhead budget)" ) def test_64kb_total_drain_plus_commit(): """A 64KB MemoryWrite at the path bottleneck rate: total ≈ drain + commit_time + path overheads. Drain is computed dynamically from the engine's path bottleneck (UCIe-limited for host-initiated MemoryWrite). """ nbytes = 65536 total, drain = _single_write_ns(nbytes) min_expected = drain + COMMIT_TIME_NS max_expected = min_expected + OVERHEAD_BUDGET_NS assert total >= min_expected - 1.0, ( f"64KB total {total:.2f}ns below {min_expected:.2f} " f"(drain={drain:.2f}+commit_time={COMMIT_TIME_NS:.2f})" ) assert total <= max_expected, ( f"64KB total {total:.2f}ns above {max_expected:.2f} " f"(drain={drain:.2f}+commit+{OVERHEAD_BUDGET_NS:.0f}ns budget)" ) # ── 3. Multi-hop cut-through pipelining ──────────────────────────── def test_multihop_flits_pipeline_drain_not_summed(): """Drain is the bottleneck-link transfer time, charged ONCE across the full path (not per hop). With flit-streaming + cut-through, this is the expected behavior. If drain were summed per hop, large-payload total would grow faster than small-payload total proportionally to hop count. We isolate the drain-sum effect by comparing the *slope* of total vs nbytes for close (same-cube) vs far (cross-cube) paths. The slope is dominated by drain (the per-byte rate at bottleneck). If drain doesn't sum across hops, slopes should be similar (both = 1/bottleneck_bw, where bottleneck differs by path). If drain were summed, far slope would be much steeper. """ nbytes_small, nbytes_large = 256, 4096 t_close_small, drain_close_small = _single_write_ns(nbytes_small, cube=0, pe=0) t_close_large, drain_close_large = _single_write_ns(nbytes_large, cube=0, pe=0) t_far_small, drain_far_small = _single_write_ns(nbytes_small, cube=15, pe=0) t_far_large, drain_far_large = _single_write_ns(nbytes_large, cube=15, pe=0) slope_close = (t_close_large - t_close_small) / (nbytes_large - nbytes_small) slope_far = (t_far_large - t_far_small) / (nbytes_large - nbytes_small) # Each slope should match its bottleneck rate (1 / bw). ideal_close = 1.0 / (drain_close_large / nbytes_large * 1e9) # ns/byte # Simpler: drain is linear in nbytes, so slope_path == drain_per_byte_at_bottleneck expected_close_slope = drain_close_large / nbytes_large expected_far_slope = drain_far_large / nbytes_large # If drain summed across hops, far slope would be ~hop_count× larger # than expected. Assert slope is within 1.5× expected (allowing # propagation effects but rejecting drain-per-hop). assert slope_close <= expected_close_slope * 1.5, ( f"Close-cube slope {slope_close:.4f} ns/byte vs expected " f"{expected_close_slope:.4f}; drain may sum across hops" ) assert slope_far <= expected_far_slope * 1.5, ( f"Far-cube slope {slope_far:.4f} ns/byte vs expected " f"{expected_far_slope:.4f}; drain may sum across hops" ) # ── 4. Two-stream merge at HBM router (non-overcommit) ──────────── def test_two_concurrent_2kb_writes_merge_makespan(): """Two concurrent 2KB writes merge at the HBM-attached router. With flit-streaming + RR arbitration, both streams share the output BW. Makespan ≈ aggregate-data / path-bottleneck + commit_time + overheads. Drain is computed dynamically from the engine path. """ nbytes = 2048 eng = _engine() msg_a = _write_msg("conc-a", cube=0, pe=0, nbytes=nbytes) msg_b = _write_msg("conc-b", cube=0, pe=1, nbytes=nbytes) drain_per_txn = _path_drain_for_request(eng, msg_a) h_a = eng.submit(msg_a) h_b = eng.submit(msg_b) eng.wait(h_a); eng.wait(h_b) ta = eng.get_completion(h_a)[1]["total_ns"] tb = eng.get_completion(h_b)[1]["total_ns"] makespan = max(ta, tb) # Aggregate drain (2 streams worth) + commit_time + overheads expected_min = 2 * drain_per_txn + COMMIT_TIME_NS expected_max = expected_min + OVERHEAD_BUDGET_NS assert makespan >= expected_min - 1.0, ( f"2-stream merge makespan {makespan:.2f}ns below floor " f"{expected_min:.2f} (2*drain={2*drain_per_txn:.2f}+commit)" ) assert makespan <= expected_max, ( f"2-stream merge makespan {makespan:.2f}ns above ceiling " f"{expected_max:.2f}" ) # Both should finish within ~commit_time + small overhead of each other # (fair share via RR arbitration) diff = abs(ta - tb) assert diff <= drain_per_txn + COMMIT_TIME_NS + 5.0, ( f"Stream A ({ta:.2f}) vs B ({tb:.2f}) finish times differ by " f"{diff:.2f}ns; expected fairness within ≤ " f"{drain_per_txn + COMMIT_TIME_NS + 5:.2f}ns" ) # ── 5. Heavy-overcommit makespan (where flit-streaming shines) ──── def test_eight_concurrent_writes_overcommit_makespan(): """8 concurrent 1KB writes share path bottleneck. With flit-streaming, aggregate traffic = 8 × 1KB shares the bottleneck link, so makespan ≈ 8 × per_txn_drain + commit_time + overheads. """ nbytes = 1024 eng = _engine() msg0 = _write_msg("oc-0", cube=0, pe=0, nbytes=nbytes) drain_per_txn = _path_drain_for_request(eng, msg0) handles = [eng.submit(_write_msg(f"oc-{pe}", cube=0, pe=pe, nbytes=nbytes)) for pe in range(8)] for h in handles: eng.wait(h) times = [eng.get_completion(h)[1]["total_ns"] for h in handles] makespan = max(times) expected_min = 8 * drain_per_txn + COMMIT_TIME_NS expected_max = expected_min + OVERHEAD_BUDGET_NS assert makespan <= expected_max, ( f"8-stream overcommit makespan {makespan:.2f}ns above ceiling " f"{expected_max:.2f}ns (8*drain={8*drain_per_txn:.2f}+commit+budget). " ) # ── 6. PE → PE DMA flit-streaming (inter-cube, slow link case) ──── def test_inter_cube_pe_dma_drain_doesnt_sum_across_hops(): """PE→PE DMA across cubes traverses many hops + inter-cube UCIe. Per-hop overheads accumulate (router overhead, UCIe overhead, prop) and dominate the absolute total, so we don't bound the absolute value. Instead we verify drain is charged ONCE: compare 256B (tiny drain) vs 4KB (16× drain) at the same cross-cube path. The delta should grow approximately as drain difference, not as drain × hops. """ eng_small = _engine() msg_small = _pe_dma_write("xs", src_cube=0, src_pe=0, dst_cube=15, dst_pe=0, nbytes=256) drain_small = _path_drain_for_request(eng_small, msg_small) h = eng_small.submit(msg_small) eng_small.wait(h) t_small = eng_small.get_completion(h)[1]["total_ns"] eng_large = _engine() msg_large = _pe_dma_write("xl", src_cube=0, src_pe=0, dst_cube=15, dst_pe=0, nbytes=4096) drain_large = _path_drain_for_request(eng_large, msg_large) h = eng_large.submit(msg_large) eng_large.wait(h) t_large = eng_large.get_completion(h)[1]["total_ns"] delta = t_large - t_small drain_delta = drain_large - drain_small # If drain were charged per hop, delta would grow as drain_delta * hops. # If drain is charged once (correct), delta ≈ drain_delta + some # per-flit overhead (chunks pipeline through hops). Cap at 3× drain_delta # to allow for chunk-loop / flit transit overhead but reject hop summing. assert delta <= drain_delta * 3 + 30.0, ( f"Inter-cube delta {delta:.2f}ns for {drain_delta:.2f}ns drain growth " f"exceeds 3×drain_delta+30; drain may be summing across hops" ) # ── 7. Read response path: HBM → PE responses also flit-streamed ── def test_concurrent_reads_response_path_shares_bw(): """Multiple concurrent reads share the path's bottleneck link on the response (HBM → router → ... → host) path. With flit-streaming, aggregate response traffic ≈ N × drain_per_txn. """ nbytes = 1024 eng = _engine() msg0 = _read_msg("r0", cube=0, pe=0, nbytes=nbytes) drain_per_txn = _path_drain_for_request(eng, msg0) handles = [eng.submit(_read_msg(f"r-{pe}", cube=0, pe=pe, nbytes=nbytes)) for pe in range(8)] for h in handles: eng.wait(h) times = [eng.get_completion(h)[1]["total_ns"] for h in handles] makespan = max(times) # 8 concurrent reads aggregate ≈ 8 × drain on shared bottleneck # Plus forward command + commit + path overheads (response is dominant) expected_min = 8 * drain_per_txn + COMMIT_TIME_NS expected_max = expected_min + OVERHEAD_BUDGET_NS * 2 # 2× for fwd+resp paths assert makespan <= expected_max, ( f"8 concurrent reads makespan {makespan:.2f}ns above ceiling " f"{expected_max:.2f} (8*drain={8*drain_per_txn:.2f}+commit+budget); " f"response path BW sharing may not be modeled correctly" ) # ── 8. Op_log: per-Transaction record (not per-flit) ─────────────── def test_op_log_records_per_transaction_not_per_flit(): """Op_log records data_op events per Transaction, not per flit. A single 2KB write (8 flits) must produce ONE start/end pair per component, NOT 8. """ pytest.importorskip("kernbench.sim_engine.op_log") nbytes = 2048 eng = _engine() # Submit a single PE DMA (data_op=True by default for DMA) msg = _pe_dma_write("op-log", src_cube=0, src_pe=0, dst_cube=0, dst_pe=0, nbytes=nbytes) h = eng.submit(msg) eng.wait(h) if not hasattr(eng, "op_log") or eng.op_log is None: pytest.skip("Engine does not expose op_log (not enabled in default topology)") # Look for dma_write records on this txn records = [r for r in eng.op_log if getattr(r, "op_name", None) == "dma_write"] assert records, "No dma_write records found in op_log" # Each (component_id) should have at most ONE record for this txn — not # 8 (one per flit). Aggregate by component_id and verify count. by_comp = {} for r in records: by_comp.setdefault(r.component_id, []).append(r) for comp_id, recs in by_comp.items(): assert len(recs) <= 1, ( f"Component {comp_id} has {len(recs)} dma_write records for one " f"transaction; flits must aggregate to a single record per " f"(txn, component)" )