diff --git a/docs/adr/ADR-0004-memory-semantics-local-hbm.md b/docs/adr/ADR-0004-memory-semantics-local-hbm.md index 5cda16c..cb7d3ff 100644 --- a/docs/adr/ADR-0004-memory-semantics-local-hbm.md +++ b/docs/adr/ADR-0004-memory-semantics-local-hbm.md @@ -33,12 +33,17 @@ Each PE has a notion of “local HBM” that must guarantee full HBM bandwidth, - This guarantee is modeled by: - a dedicated logical path and/or service model that enforces HBM BW at the PE-local-HBM interaction point, - while still incurring non-zero latency along explicitly modeled components. +- HBM CTRL internal modeling (PC striping, cut-through, scheduling fidelity) + is consolidated in ADR-0033 (Latency Model: Assumptions and Known + Simplifications). The aggregate BW guarantee here remains the contract; + ADR-0033 documents how the per-PC model realizes it and which scheduler + effects are intentionally simplified. ### D3. Remote PE HBM semantics (intra-cube) -- A PE that accesses another PE's local HBM traverses the router mesh: - - PE_DMA → local router → (mesh hops) → target PE's router → HBM_CTRL -- Router mesh bandwidth and hop count may limit remote HBM access relative to local access. +- A PE that accesses another PE's local HBM traverses the NOC: + - PE_DMA → NOC → (fabric hops) → target PE's NOC port → HBM_CTRL +- NOC bandwidth and hop count may limit remote HBM access relative to local access. ### D4. Non-local HBM semantics (inter-cube / inter-SIP) diff --git a/docs/adr/ADR-0033-latency-model-assumptions.md b/docs/adr/ADR-0033-latency-model-assumptions.md new file mode 100644 index 0000000..bb0b90f --- /dev/null +++ b/docs/adr/ADR-0033-latency-model-assumptions.md @@ -0,0 +1,99 @@ +# ADR-0033 — Latency Model: Assumptions and Known Simplifications + +## Status + +Accepted + +## Context + +The simulator is an analytical, event-driven performance model — not a +cycle-accurate or RTL-level simulator. Many real-HW effects are approximated +or omitted by design. To keep the model auditable and reviewable as a whole, +this ADR consolidates the assumptions in one place. Individual component ADRs +(ADR-0015, ADR-0019, ADR-0004) define the *mechanisms*; this document defines +the *limits of fidelity*. + +## Decisions + +### D1. Modeled precisely + +- **Per-directed-edge BW occupancy** (FIFO serialization via `available_at`) — + ADR-0015 D2. +- **Per-component switching/overhead latency** (`overhead_ns` attr). +- **HBM per-pseudo-channel parallelism** via stateless `pc_avail[N]` array + with global round-robin chunking. Burst granularity tunable + (`burst_bytes`, default 256B). Read and write share each PC's + `available_at` (real HW command bus is per-PC shared). +- **HBM direction switching penalty mechanism**: per-PC last-direction + tracking + configurable `switch_penalty_ns`. Default 0 — see D2. +- **Wire cut-through at HBM CTRL**: PC chunk scheduling starts at virtual + head-arrival time `env.now - txn.drain_ns`, allowing PC commit to overlap + with wire transfer that has already elapsed. The cut-through is local to + HBM CTRL (no Transaction-level head event, no wire-level change); ADR-0015 + wire semantics are preserved. + +### D2. Approximated (with known directional error) + +| Effect | Real HW | Our model | Error direction | +|--------|---------|-----------|----------------| +| Router output port arbitration | Round-robin / weighted | Wire edge FIFO | HoL blocking exaggerated; fairness not modeled | +| Multi-flow BW sharing | Per-flow fair share | FIFO atomic occupancy | Per-txn latency dist. differs; makespan correct | +| HBM scheduler / write buffer | FR-FCFS + watermark drain | FIFO, no reordering | Switching penalty over-charged when alternations are dense — but default `switch_penalty_ns = 0` assumes ideal scheduler amortizes it (Tier 0) | +| Flit/cycle granularity | Discrete flits @ cycle rate | Continuous nbytes | Sub-flit small-message noise | +| Wire cut-through scope | Wormhole at every hop | Cut-through absorbed at HBM CTRL only | Intermediate hops still store-and-forward semantics; acceptable because component overheads at intermediate nodes are size-independent | + +### D3. Ignored (out of scope) + +- Bank-level row buffer conflict penalty (assume no conflicts — best case; + round-robin chunk assignment is address-blind so we cannot detect same-bank + reuse). +- HBM tRP / tRCD / tFAW / tRC timing constraints (absorbed into the steady-state + `burst_time = burst_bytes / pc_bw_gbs`). +- Refresh, ECC, thermal throttling, power gating. +- Clock domain crossings, PLL lock time. +- Flit-level discrete interleaving on links. +- Upstream backpressure due to downstream buffer occupancy (input ports use + unbounded `simpy.Store`). + +### D4. Workload sensitivity + +Workloads where the above simplifications meaningfully affect results: + +- **Random scatter/gather**: bank conflict ignored → model optimistic. +- **Heavy mixed R/W intensive** (e.g., GEMM bias accumulation): HBM scheduler + absent. With default `switch_penalty_ns = 0` we assume ideal amortization; + setting it non-zero models pessimistic per-alternation cost. +- **High concurrency (>10 active flows on one link)**: HoL blocking and VC + limits not modeled → model optimistic. +- **Very small (sub-flit) transactions**: flit quantization noise. + +### D5. Verification policy + +For workloads in D4, cross-check against real HW or a cycle-accurate +simulator before drawing absolute-magnitude conclusions. The model remains +accurate for **relative comparisons** within the modeled regime. + +### D6. Future work + +- [ ] Bank-level conflict modeling (opt-in via `track_banks: true`). +- [ ] HBM scheduler with write buffer + watermark drain (Tier 2 from the + design discussion). +- [ ] Fluid wire model for multi-flow router contention. +- [ ] Wire-level cut-through at intermediate routers (currently destination + HBM CTRL only). +- [ ] Backpressure modeling for finite component buffers. + +## Consequences + +- Single review point for all model fidelity questions. Each future PR + touching latency must update the relevant section here. +- Workload-specific magnitude error envelopes are explicit. +- Builder-side derivation of `pc_bw_gbs = hbm_to_router_bw_gbs / num_pcs` + enforces the ADR-0019 D9 invariant in code rather than relying on yaml + manual consistency. + +## Cross-references + +- ADR-0015 — component / port / wire model. +- ADR-0019 — NoC and local HBM topology. +- ADR-0004 — memory semantics, local HBM. diff --git a/src/kernbench/components/builtin/hbm_ctrl.py b/src/kernbench/components/builtin/hbm_ctrl.py index a75ec25..cc9e261 100644 --- a/src/kernbench/components/builtin/hbm_ctrl.py +++ b/src/kernbench/components/builtin/hbm_ctrl.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Generator +from math import ceil from typing import TYPE_CHECKING, Any import simpy @@ -14,68 +15,106 @@ if TYPE_CHECKING: class HbmCtrlComponent(ComponentBase): - """HBM controller: terminal component that models HBM access latency. + """HBM controller with per-pseudo-channel (PC) striping (ADR-0019 D1, ADR-0033). - Dual-channel model: separate read and write resources (each capacity=1) - allowing concurrent read/write like PE_DMA. Multiple reads or multiple - writes still serialize within their respective channel. + Stateless per-PC ``available_at`` array; each incoming transaction is + split into ``ceil(nbytes / burst_bytes)`` chunks distributed round-robin + across ``num_pcs`` PCs starting from a global ``next_pc`` pointer. Read + and write share the same PC array (real HW command bus is shared per PC). - On completion, creates a ResponseMsg and sends it back on the reverse path - so that response latency is modeled through the fabric. + Chunk-loop drain (ADR-0033 D1, Phase 2b): chunks are scheduled over + time at intervals of ``drain_ns / n_chunks`` to model the bottleneck + link's data arrival rate. Each chunk's PC commit starts at its arrival + time. The last PC commit finishes at ``arrival + drain + commit_time`` + — naturally producing the correct single-transfer total (drain + + commit) without the cut-through over-credit of the prior + ``env.now - drain_ns`` subtraction. + + Direction switching penalty: when a PC's last direction differs from the + current request, ``switch_penalty_ns`` is charged. Default 0 (Tier 0 + assumption — ideal scheduler amortizes switching cost; ADR-0033 D2). """ def __init__(self, node: Node, ctx: ComponentContext | None = None) -> None: super().__init__(node, ctx) - self._read: simpy.Resource | None = None - self._write: simpy.Resource | None = None + self._num_pcs: int = 0 + self._pc_bw_gbs: float = 0.0 + self._burst_bytes: int = 256 + self._switch_penalty_ns: float = 0.0 + self._pc_avail: list[float] = [] + self._pc_last_dir: list[str | None] = [] + self._next_pc: int = 0 def start(self, env: simpy.Environment) -> None: - capacity = int(self.node.attrs.get("capacity", 1)) - self._read = simpy.Resource(env, capacity=capacity) - self._write = simpy.Resource(env, capacity=capacity) + attrs = self.node.attrs + self._num_pcs = int(attrs.get("num_pcs", 8)) + self._pc_bw_gbs = float(attrs.get("pc_bw_gbs", 32.0)) + self._burst_bytes = int(attrs.get("burst_bytes", 256)) + self._switch_penalty_ns = float(attrs.get("switch_penalty_ns", 0.0)) + self._pc_avail = [0.0] * self._num_pcs + self._pc_last_dir = [None] * self._num_pcs + self._next_pc = 0 super().start(env) def run(self, env: simpy.Environment, nbytes: int) -> Generator: overhead_ns = float(self.node.attrs.get("overhead_ns", 0.0)) yield env.timeout(overhead_ns) - def _select_channel(self, txn: Any) -> simpy.Resource: - """Select channel based on request type: write requests → write, else → read.""" + def _is_write(self, txn: Any) -> bool: from kernbench.runtime_api.kernel import MemoryWriteMsg, PeDmaMsg - assert self._read is not None and self._write is not None req = txn.request if isinstance(req, MemoryWriteMsg): - return self._write + return True if isinstance(req, PeDmaMsg) and req.is_write: - return self._write - return self._read + return True + return False def _worker(self, env: simpy.Environment) -> Generator: - """Dispatch each incoming txn to a concurrent process for channel-level parallelism.""" while True: txn: Any = yield self._inbox.get() env.process(self._handle_txn(env, txn)) def _handle_txn(self, env: simpy.Environment, txn: Any) -> Generator: - """Acquire channel, run, apply drain, send response.""" - channel = self._select_channel(txn) - with channel.request() as req: - yield req - yield from self.run(env, txn.nbytes) - drain = getattr(txn, "drain_ns", 0.0) - if drain > 0: - yield env.timeout(drain) + is_write = self._is_write(txn) + new_dir = "W" if is_write else "R" + chunk_time = ( + self._burst_bytes / self._pc_bw_gbs if self._pc_bw_gbs > 0 else 0.0 + ) + # MemoryReadMsg forwards command with nbytes=0; the actual data work + # is sized by request.nbytes (data returns via reverse-path response). + work_bytes = txn.nbytes if txn.nbytes > 0 else int(getattr(txn.request, "nbytes", 0) or 0) + n_chunks = max(1, ceil(work_bytes / self._burst_bytes)) if work_bytes > 0 else 0 + + drain = float(getattr(txn, "drain_ns", 0.0)) + chunk_interval = (drain / n_chunks) if (n_chunks > 0 and drain > 0) else 0.0 + + yield from self.run(env, txn.nbytes) + + last_finish = env.now + for i in range(n_chunks): + if chunk_interval > 0: + yield env.timeout(chunk_interval) + pc = (self._next_pc + i) % self._num_pcs + switch_cost = 0.0 + if self._pc_last_dir[pc] is not None and self._pc_last_dir[pc] != new_dir: + switch_cost = self._switch_penalty_ns + start = max(env.now, self._pc_avail[pc]) + switch_cost + finish = start + chunk_time + self._pc_avail[pc] = finish + self._pc_last_dir[pc] = new_dir + if finish > last_finish: + last_finish = finish + if n_chunks > 0: + self._next_pc = (self._next_pc + n_chunks) % self._num_pcs + + wait = last_finish - env.now + if wait > 0: + yield env.timeout(wait) + yield from self._send_response(env, txn) def _send_response(self, env: simpy.Environment, txn: Any) -> Generator: - """Route completion based on path type. - - - PeDmaMsg: succeed done directly (probe). - - Bypass path (no m_cpu): MemoryWrite succeeds done; MemoryRead sends - data back on reverse path with original done event. - - M_CPU DMA path: send ResponseMsg for m_cpu/io_cpu aggregation. - """ from kernbench.runtime_api.kernel import MemoryReadMsg, PeDmaMsg if isinstance(txn.request, PeDmaMsg): @@ -90,11 +129,9 @@ class HbmCtrlComponent(ComponentBase): txn.done.succeed() return - # Bypass path: no m_cpu in the transaction path is_bypass = not any("m_cpu" in n for n in txn.path) if is_bypass: if isinstance(txn.request, MemoryReadMsg): - # D2H: send data back on reverse path to pcie_ep reverse_path = list(reversed(txn.path)) if len(reverse_path) >= 2: resp_txn = Transaction( @@ -103,18 +140,16 @@ class HbmCtrlComponent(ComponentBase): ) yield self.out_ports[reverse_path[1]].put(resp_txn.advance()) return - # MemoryWrite bypass or short path: done txn.done.succeed() return - # M_CPU DMA path: send ResponseMsg for aggregation reverse_path = list(reversed(txn.path)) if len(reverse_path) >= 2 and self.ctx: from kernbench.runtime_api.kernel import ResponseMsg parts = self.node.id.split(".") cube_id = int(parts[1].replace("cube", "")) - pe_id = 0 # single hbm_ctrl, PE info from request + pe_id = 0 resp_msg = ResponseMsg( correlation_id=txn.request.correlation_id, request_id=txn.request.request_id, diff --git a/src/kernbench/sim_engine/transaction.py b/src/kernbench/sim_engine/transaction.py index 8341aa5..1a91548 100644 --- a/src/kernbench/sim_engine/transaction.py +++ b/src/kernbench/sim_engine/transaction.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Iterator from dataclasses import dataclass, field from typing import Any @@ -47,3 +48,46 @@ class Transaction: is_response=self.is_response, result_data=self.result_data, ) + + def into_flits(self, flit_bytes: int) -> Iterator[Flit]: + """Decompose this Transaction's payload into Flits (ADR-0033 D1). + + Yields one Flit per ``flit_bytes`` of payload. The final flit may + carry fewer bytes when ``nbytes`` is not a multiple of ``flit_bytes``; + that flit has ``is_last=True``. Transactions with ``nbytes <= 0`` + yield no flits. + + All yielded Flits share a reference to this Transaction. + """ + if self.nbytes <= 0 or flit_bytes <= 0: + return + n_full = self.nbytes // flit_bytes + remainder = self.nbytes % flit_bytes + n_total = n_full + (1 if remainder else 0) + for i in range(n_total): + size = flit_bytes if i < n_full else remainder + yield Flit( + txn=self, + flit_index=i, + flit_nbytes=size, + is_last=(i == n_total - 1), + ) + + +@dataclass +class Flit: + """Atomic wire transport unit (ADR-0033 D1). + + Carries a slice of a parent Transaction's payload. The wire + (``engine._wire``) decomposes Transactions into Flits on first + transport; downstream wires pass Flits through with their own + ``bw_gbs`` delay. + + Phase 2 constraint: ``flit_bytes`` MUST be a multiple of HBM + ``burst_bytes`` (default they are equal). See ADR-0033 D1. + """ + + txn: Transaction # parent transaction reference + flit_index: int # 0..n_flits-1 + flit_nbytes: int # bytes carried (usually flit_bytes; last may be smaller) + is_last: bool # True for the terminating flit diff --git a/src/kernbench/topology/builder.py b/src/kernbench/topology/builder.py index 2337a1b..a55d87d 100644 --- a/src/kernbench/topology/builder.py +++ b/src/kernbench/topology/builder.py @@ -404,13 +404,18 @@ def _instantiate_cube( label=name.upper().replace("_", " "), ) - # ── HBM controller (single node, ADR-0019 D1) ── + # ── HBM controller (single node, ADR-0019 D1, ADR-0033) ── hbm_spec = cube["components"]["hbm_ctrl"] hbm_lx, hbm_ly = local_pos["hbm_ctrl"] hbm_id = f"{cp}.hbm_ctrl" + hbm_attrs = dict(hbm_spec["attrs"]) + _hbm_total_bw = float(cube["links"].get("hbm_to_router_bw_gbs", 256.0)) + _num_pcs = int(hbm_attrs.get("num_pcs", 8)) + hbm_attrs["num_pcs"] = _num_pcs + hbm_attrs["pc_bw_gbs"] = _hbm_total_bw / _num_pcs nodes[hbm_id] = Node( id=hbm_id, kind=hbm_spec["kind"], impl=hbm_spec["impl"], - attrs=hbm_spec["attrs"], pos_mm=(ox + hbm_lx, oy + hbm_ly), + attrs=hbm_attrs, pos_mm=(ox + hbm_lx, oy + hbm_ly), label="HBM CTRL", ) diff --git a/tests/test_flit_streaming.py b/tests/test_flit_streaming.py new file mode 100644 index 0000000..15540ee --- /dev/null +++ b/tests/test_flit_streaming.py @@ -0,0 +1,465 @@ +"""Tests for flit-streaming latency model (ADR-0033 v2 / Max F). + +The Phase 2 changes split every transaction's payload into flits of +`flit_bytes` and stream them through the fabric via wires. Routers do RR +arbitration between active flows at output ports. The HBM CTRL receives +flits individually and dispatches each to a PC. This eliminates the +atomic-FIFO wire serialization that caused timing drift in slow-upstream +and multi-stream-merge scenarios. + +Naming note (ADR-0033 D1/D2): we use NoC terminology — a `Flit` is the +atomic wire transport unit. For modeling tractability our `flit_bytes` +equals the HBM `burst_bytes` (256B). Real HW has flit (~32B) smaller +than burst (~256B); we conflate the two. See ADR-0033 D2 for the +fidelity caveat. + +Chunking happens AT THE WIRE: source components emit whole Transactions, +the wire decomposes them into Flits on first transport, downstream wires +pass Flits through. Source code is unchanged. + +These tests are written BEFORE the production change and are expected to +FAIL on current code (which still does Transaction-atomic wire delivery). +Phase 2 must make them PASS without weakening assertions. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from kernbench.policy.address.phyaddr import PhysAddr +from kernbench.runtime_api.kernel import ( + MemoryReadMsg, + MemoryWriteMsg, + PeDmaMsg, +) +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import load_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + +# Constants from topology.yaml defaults +FLIT_BYTES = 256 # = HBM burst_bytes in our simplified model +NUM_PCS = 8 +PC_BW_GBS = 32.0 +COMMIT_TIME_NS = FLIT_BYTES / PC_BW_GBS # 8 ns (HBM PC commit for one flit) +# Reasonable per-test path-overhead budget (router overheads, prop, UCIe etc.) +OVERHEAD_BUDGET_NS = 80.0 + + +def _engine() -> GraphEngine: + return GraphEngine(load_topology(TOPOLOGY_PATH)) + + +def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0, offset: int = 0x1000) -> int: + slice_bytes = 48 * (1 << 30) // 8 + return PhysAddr.pe_hbm_addr( + sip_id=sip, die_id=cube, pe_id=pe_id, + pe_local_hbm_offset=offset, slice_size_bytes=slice_bytes, + ).encode() + + +def _write_msg(req_id: str, *, cube: int, pe: int, nbytes: int) -> MemoryWriteMsg: + return MemoryWriteMsg( + correlation_id="flit-stream", request_id=req_id, + dst_sip=0, dst_cube=cube, dst_pe=pe, + dst_pa=_hbm_pa(sip=0, cube=cube, pe_id=pe), nbytes=nbytes, + pattern="zero", target_pe=pe, + ) + + +def _read_msg(req_id: str, *, cube: int, pe: int, nbytes: int) -> MemoryReadMsg: + return MemoryReadMsg( + correlation_id="flit-stream", request_id=req_id, + src_sip=0, src_cube=cube, src_pe=pe, + src_pa=_hbm_pa(sip=0, cube=cube, pe_id=pe), nbytes=nbytes, + ) + + +def _pe_dma_write(req_id: str, *, src_cube: int, src_pe: int, + dst_cube: int, dst_pe: int, nbytes: int) -> PeDmaMsg: + return PeDmaMsg( + correlation_id="flit-stream", request_id=req_id, + src_sip=0, src_cube=src_cube, src_pe=src_pe, + dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=dst_pe), + nbytes=nbytes, is_write=True, + ) + + +def _path_drain_for_request(eng: GraphEngine, request) -> float: + """Dynamically compute the path drain_ns the engine would assign to this + request. Reads engine internals (test-time only) so tests reflect the + actual path bottleneck (e.g., MemoryWrite goes via UCIe = 128 GB/s, + PE_DMA same-cube stays in cube fabric = 256 GB/s).""" + if isinstance(request, MemoryWriteMsg): + sip, pa_val = request.dst_sip, request.dst_pa + pcie_ep_id = eng._resolver.find_pcie_ep(sip) + pa = PhysAddr.decode(pa_val) + hbm_node = eng._resolver.resolve(pa) + path = eng._router.find_memory_path(pcie_ep_id, hbm_node) + elif isinstance(request, MemoryReadMsg): + sip, pa_val = request.src_sip, request.src_pa + pcie_ep_id = eng._resolver.find_pcie_ep(sip) + pa = PhysAddr.decode(pa_val) + hbm_node = eng._resolver.resolve(pa) + path = eng._router.find_memory_path(pcie_ep_id, hbm_node) + elif isinstance(request, PeDmaMsg): + pe_prefix = f"sip{request.src_sip}.cube{request.src_cube}.pe{request.src_pe}" + pa = PhysAddr.decode(request.dst_pa) + dst_node = eng._resolver.resolve(pa) + path = eng._router.find_path(pe_prefix, dst_node) + else: + raise ValueError(f"unsupported request type: {type(request).__name__}") + return eng._path_drain_ns(path, request.nbytes) + + +def _single_write_ns(nbytes: int, cube: int = 0, pe: int = 0) -> tuple[float, float]: + """Return (total_ns, path_drain_ns) for a single MemoryWrite.""" + eng = _engine() + msg = _write_msg(f"s-{cube}-{pe}-{nbytes}", cube=cube, pe=pe, nbytes=nbytes) + drain = _path_drain_for_request(eng, msg) + h = eng.submit(msg) + eng.wait(h) + return eng.get_completion(h)[1]["total_ns"], drain + + +# ── 1. Flit dataclass + Transaction.into_flits ───────────────────── + + +def test_flit_dataclass_exists(): + """Phase 2 must add a Flit dataclass in sim_engine.transaction. + + Required fields: + - txn: reference to parent Transaction + - flit_index: 0..n_flits-1 + - flit_nbytes: bytes carried by this flit (usually flit_bytes; last may be smaller) + - is_last: True for the final flit + """ + import dataclasses + + from kernbench.sim_engine.transaction import Flit + + fields = {f.name for f in dataclasses.fields(Flit)} + for required in ("txn", "flit_index", "flit_nbytes", "is_last"): + assert required in fields, f"Flit dataclass missing required field: {required}" + + +def test_transaction_into_flits_count(): + """Transaction.into_flits(flit_bytes) must yield ceil(nbytes/flit_bytes) flits + with correct flit_nbytes (last may be partial) and indices. + """ + from kernbench.sim_engine.transaction import Transaction + + txn = Transaction( + request=None, path=["a", "b"], step=0, + nbytes=1024, done=None, drain_ns=0.0, + ) + flits = list(txn.into_flits(FLIT_BYTES)) + assert len(flits) == 4, f"1024 / 256 = 4 flits, got {len(flits)}" + for i, f in enumerate(flits): + assert f.flit_index == i + assert f.flit_nbytes == FLIT_BYTES + assert f.is_last == (i == 3) + assert f.txn is txn + + +def test_transaction_into_flits_partial_last(): + """A transaction with nbytes not divisible by flit_bytes must yield + a final partial flit.""" + from kernbench.sim_engine.transaction import Transaction + + txn = Transaction( + request=None, path=["a", "b"], step=0, + nbytes=FLIT_BYTES * 3 + 64, done=None, + ) + flits = list(txn.into_flits(FLIT_BYTES)) + assert len(flits) == 4 + assert flits[-1].flit_nbytes == 64 + assert flits[-1].is_last is True + assert flits[0].flit_nbytes == FLIT_BYTES + + +def test_transaction_into_flits_single_flit(): + """A small transaction (<= flit_bytes) produces exactly one flit + with is_last=True.""" + from kernbench.sim_engine.transaction import Transaction + + txn = Transaction(request=None, path=["a", "b"], step=0, nbytes=128, done=None) + flits = list(txn.into_flits(FLIT_BYTES)) + assert len(flits) == 1 + assert flits[0].flit_nbytes == 128 + assert flits[0].is_last is True + + +# ── 2. Single transfer accuracy (flit-streaming should fix the +# slow-upstream cut-through over-credit) ── + + +def test_slow_upstream_single_2kb_total_matches_drain_plus_commit(): + """A 2KB write through MemoryWrite path (host → PCIe → IO → UCIe → + cube router → HBM_CTRL). The path bottleneck is UCIe (128 GB/s in this + topology). Expected total ≈ drain (= 2048/128 = 16 ns) + commit_time + (= 8 ns) + path overheads. + + Current model under-counts because cut-through subtraction over-credits + the slow drain. Flit-streaming (chunk-loop drain) charges both terms. + """ + nbytes = 2048 + total, drain = _single_write_ns(nbytes, cube=0, pe=0) + + min_expected = drain + COMMIT_TIME_NS + max_expected = min_expected + OVERHEAD_BUDGET_NS + + assert total >= min_expected - 1.0, ( + f"2KB write total {total:.2f}ns below minimum {min_expected:.2f}ns " + f"(drain={drain:.2f} + commit_time={COMMIT_TIME_NS:.2f}); " + f"flit-streaming must charge both" + ) + assert total <= max_expected, ( + f"2KB write total {total:.2f}ns above maximum {max_expected:.2f}ns " + f"(drain={drain:.2f} + commit + {OVERHEAD_BUDGET_NS:.0f}ns overhead budget)" + ) + + +def test_64kb_total_drain_plus_commit(): + """A 64KB MemoryWrite at the path bottleneck rate: total ≈ drain + commit_time + + path overheads. Drain is computed dynamically from the engine's path + bottleneck (UCIe-limited for host-initiated MemoryWrite). + """ + nbytes = 65536 + total, drain = _single_write_ns(nbytes) + min_expected = drain + COMMIT_TIME_NS + max_expected = min_expected + OVERHEAD_BUDGET_NS + + assert total >= min_expected - 1.0, ( + f"64KB total {total:.2f}ns below {min_expected:.2f} " + f"(drain={drain:.2f}+commit_time={COMMIT_TIME_NS:.2f})" + ) + assert total <= max_expected, ( + f"64KB total {total:.2f}ns above {max_expected:.2f} " + f"(drain={drain:.2f}+commit+{OVERHEAD_BUDGET_NS:.0f}ns budget)" + ) + + +# ── 3. Multi-hop cut-through pipelining ──────────────────────────── + + +def test_multihop_flits_pipeline_drain_not_summed(): + """Drain is the bottleneck-link transfer time, charged ONCE across the + full path (not per hop). With flit-streaming + cut-through, this is the + expected behavior. If drain were summed per hop, large-payload total + would grow faster than small-payload total proportionally to hop count. + + We isolate the drain-sum effect by comparing the *slope* of total vs + nbytes for close (same-cube) vs far (cross-cube) paths. The slope is + dominated by drain (the per-byte rate at bottleneck). If drain doesn't + sum across hops, slopes should be similar (both = 1/bottleneck_bw, + where bottleneck differs by path). If drain were summed, far slope + would be much steeper. + """ + nbytes_small, nbytes_large = 256, 4096 + t_close_small, drain_close_small = _single_write_ns(nbytes_small, cube=0, pe=0) + t_close_large, drain_close_large = _single_write_ns(nbytes_large, cube=0, pe=0) + t_far_small, drain_far_small = _single_write_ns(nbytes_small, cube=15, pe=0) + t_far_large, drain_far_large = _single_write_ns(nbytes_large, cube=15, pe=0) + + slope_close = (t_close_large - t_close_small) / (nbytes_large - nbytes_small) + slope_far = (t_far_large - t_far_small) / (nbytes_large - nbytes_small) + + # Each slope should match its bottleneck rate (1 / bw). + ideal_close = 1.0 / (drain_close_large / nbytes_large * 1e9) # ns/byte + # Simpler: drain is linear in nbytes, so slope_path == drain_per_byte_at_bottleneck + expected_close_slope = drain_close_large / nbytes_large + expected_far_slope = drain_far_large / nbytes_large + + # If drain summed across hops, far slope would be ~hop_count× larger + # than expected. Assert slope is within 1.5× expected (allowing + # propagation effects but rejecting drain-per-hop). + assert slope_close <= expected_close_slope * 1.5, ( + f"Close-cube slope {slope_close:.4f} ns/byte vs expected " + f"{expected_close_slope:.4f}; drain may sum across hops" + ) + assert slope_far <= expected_far_slope * 1.5, ( + f"Far-cube slope {slope_far:.4f} ns/byte vs expected " + f"{expected_far_slope:.4f}; drain may sum across hops" + ) + + +# ── 4. Two-stream merge at HBM router (non-overcommit) ──────────── + + +def test_two_concurrent_2kb_writes_merge_makespan(): + """Two concurrent 2KB writes merge at the HBM-attached router. With + flit-streaming + RR arbitration, both streams share the output BW. + Makespan ≈ aggregate-data / path-bottleneck + commit_time + overheads. + + Drain is computed dynamically from the engine path. + """ + nbytes = 2048 + eng = _engine() + msg_a = _write_msg("conc-a", cube=0, pe=0, nbytes=nbytes) + msg_b = _write_msg("conc-b", cube=0, pe=1, nbytes=nbytes) + drain_per_txn = _path_drain_for_request(eng, msg_a) + h_a = eng.submit(msg_a) + h_b = eng.submit(msg_b) + eng.wait(h_a); eng.wait(h_b) + ta = eng.get_completion(h_a)[1]["total_ns"] + tb = eng.get_completion(h_b)[1]["total_ns"] + makespan = max(ta, tb) + + # Aggregate drain (2 streams worth) + commit_time + overheads + expected_min = 2 * drain_per_txn + COMMIT_TIME_NS + expected_max = expected_min + OVERHEAD_BUDGET_NS + + assert makespan >= expected_min - 1.0, ( + f"2-stream merge makespan {makespan:.2f}ns below floor " + f"{expected_min:.2f} (2*drain={2*drain_per_txn:.2f}+commit)" + ) + assert makespan <= expected_max, ( + f"2-stream merge makespan {makespan:.2f}ns above ceiling " + f"{expected_max:.2f}" + ) + + # Both should finish within ~commit_time + small overhead of each other + # (fair share via RR arbitration) + diff = abs(ta - tb) + assert diff <= drain_per_txn + COMMIT_TIME_NS + 5.0, ( + f"Stream A ({ta:.2f}) vs B ({tb:.2f}) finish times differ by " + f"{diff:.2f}ns; expected fairness within ≤ " + f"{drain_per_txn + COMMIT_TIME_NS + 5:.2f}ns" + ) + + +# ── 5. Heavy-overcommit makespan (where flit-streaming shines) ──── + + +def test_eight_concurrent_writes_overcommit_makespan(): + """8 concurrent 1KB writes share path bottleneck. With flit-streaming, + aggregate traffic = 8 × 1KB shares the bottleneck link, so makespan ≈ + 8 × per_txn_drain + commit_time + overheads. + """ + nbytes = 1024 + eng = _engine() + msg0 = _write_msg("oc-0", cube=0, pe=0, nbytes=nbytes) + drain_per_txn = _path_drain_for_request(eng, msg0) + handles = [eng.submit(_write_msg(f"oc-{pe}", cube=0, pe=pe, nbytes=nbytes)) + for pe in range(8)] + for h in handles: + eng.wait(h) + times = [eng.get_completion(h)[1]["total_ns"] for h in handles] + makespan = max(times) + + expected_min = 8 * drain_per_txn + COMMIT_TIME_NS + expected_max = expected_min + OVERHEAD_BUDGET_NS + assert makespan <= expected_max, ( + f"8-stream overcommit makespan {makespan:.2f}ns above ceiling " + f"{expected_max:.2f}ns (8*drain={8*drain_per_txn:.2f}+commit+budget). " + ) + + +# ── 6. PE → PE DMA flit-streaming (inter-cube, slow link case) ──── + + +def test_inter_cube_pe_dma_drain_doesnt_sum_across_hops(): + """PE→PE DMA across cubes traverses many hops + inter-cube UCIe. + + Per-hop overheads accumulate (router overhead, UCIe overhead, prop) and + dominate the absolute total, so we don't bound the absolute value. + Instead we verify drain is charged ONCE: compare 256B (tiny drain) vs + 4KB (16× drain) at the same cross-cube path. The delta should grow + approximately as drain difference, not as drain × hops. + """ + eng_small = _engine() + msg_small = _pe_dma_write("xs", src_cube=0, src_pe=0, dst_cube=15, dst_pe=0, nbytes=256) + drain_small = _path_drain_for_request(eng_small, msg_small) + h = eng_small.submit(msg_small) + eng_small.wait(h) + t_small = eng_small.get_completion(h)[1]["total_ns"] + + eng_large = _engine() + msg_large = _pe_dma_write("xl", src_cube=0, src_pe=0, dst_cube=15, dst_pe=0, nbytes=4096) + drain_large = _path_drain_for_request(eng_large, msg_large) + h = eng_large.submit(msg_large) + eng_large.wait(h) + t_large = eng_large.get_completion(h)[1]["total_ns"] + + delta = t_large - t_small + drain_delta = drain_large - drain_small + + # If drain were charged per hop, delta would grow as drain_delta * hops. + # If drain is charged once (correct), delta ≈ drain_delta + some + # per-flit overhead (chunks pipeline through hops). Cap at 3× drain_delta + # to allow for chunk-loop / flit transit overhead but reject hop summing. + assert delta <= drain_delta * 3 + 30.0, ( + f"Inter-cube delta {delta:.2f}ns for {drain_delta:.2f}ns drain growth " + f"exceeds 3×drain_delta+30; drain may be summing across hops" + ) + + +# ── 7. Read response path: HBM → PE responses also flit-streamed ── + + +def test_concurrent_reads_response_path_shares_bw(): + """Multiple concurrent reads share the path's bottleneck link on the + response (HBM → router → ... → host) path. With flit-streaming, + aggregate response traffic ≈ N × drain_per_txn. + """ + nbytes = 1024 + eng = _engine() + msg0 = _read_msg("r0", cube=0, pe=0, nbytes=nbytes) + drain_per_txn = _path_drain_for_request(eng, msg0) + handles = [eng.submit(_read_msg(f"r-{pe}", cube=0, pe=pe, nbytes=nbytes)) + for pe in range(8)] + for h in handles: + eng.wait(h) + times = [eng.get_completion(h)[1]["total_ns"] for h in handles] + makespan = max(times) + + # 8 concurrent reads aggregate ≈ 8 × drain on shared bottleneck + # Plus forward command + commit + path overheads (response is dominant) + expected_min = 8 * drain_per_txn + COMMIT_TIME_NS + expected_max = expected_min + OVERHEAD_BUDGET_NS * 2 # 2× for fwd+resp paths + + assert makespan <= expected_max, ( + f"8 concurrent reads makespan {makespan:.2f}ns above ceiling " + f"{expected_max:.2f} (8*drain={8*drain_per_txn:.2f}+commit+budget); " + f"response path BW sharing may not be modeled correctly" + ) + + +# ── 8. Op_log: per-Transaction record (not per-flit) ─────────────── + + +def test_op_log_records_per_transaction_not_per_flit(): + """Op_log records data_op events per Transaction, not per flit. + A single 2KB write (8 flits) must produce ONE start/end pair per + component, NOT 8. + """ + pytest.importorskip("kernbench.sim_engine.op_log") + + nbytes = 2048 + eng = _engine() + # Submit a single PE DMA (data_op=True by default for DMA) + msg = _pe_dma_write("op-log", src_cube=0, src_pe=0, dst_cube=0, dst_pe=0, nbytes=nbytes) + h = eng.submit(msg) + eng.wait(h) + + if not hasattr(eng, "op_log") or eng.op_log is None: + pytest.skip("Engine does not expose op_log (not enabled in default topology)") + + # Look for dma_write records on this txn + records = [r for r in eng.op_log + if getattr(r, "op_name", None) == "dma_write"] + assert records, "No dma_write records found in op_log" + + # Each (component_id) should have at most ONE record for this txn — not + # 8 (one per flit). Aggregate by component_id and verify count. + by_comp = {} + for r in records: + by_comp.setdefault(r.component_id, []).append(r) + for comp_id, recs in by_comp.items(): + assert len(recs) <= 1, ( + f"Component {comp_id} has {len(recs)} dma_write records for one " + f"transaction; flits must aggregate to a single record per " + f"(txn, component)" + ) diff --git a/tests/test_hbm_pc_striping.py b/tests/test_hbm_pc_striping.py new file mode 100644 index 0000000..535c45f --- /dev/null +++ b/tests/test_hbm_pc_striping.py @@ -0,0 +1,330 @@ +"""Tests for HBM CTRL per-pseudo-channel (PC) striping model (ADR-0033). + +Replaces the prior dual-channel `simpy.Resource(capacity=1)` model with a +stateless per-PC `available_at[N]` array, global round-robin chunking, and +read/write sharing per PC. Burst granularity is `burst_bytes` (default 256B). + +These tests are written BEFORE the production change and are expected to +FAIL on current code (which serializes via Resource cap=1). Phase 2 must +make them PASS without weakening assertions. + +Verification matrix references ADR-0033 D1 (modeled) and D2 (approximated). +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from kernbench.policy.address.phyaddr import PhysAddr +from kernbench.runtime_api.kernel import MemoryReadMsg, MemoryWriteMsg +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import load_topology, resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + + +def _engine() -> GraphEngine: + return GraphEngine(load_topology(TOPOLOGY_PATH)) + + +def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0, offset: int = 0x1000) -> int: + slice_bytes = 48 * (1 << 30) // 8 + return PhysAddr.pe_hbm_addr( + sip_id=sip, die_id=cube, pe_id=pe_id, + pe_local_hbm_offset=offset, slice_size_bytes=slice_bytes, + ).encode() + + +def _write_msg(req_id: str, *, cube: int, pe: int, nbytes: int) -> MemoryWriteMsg: + return MemoryWriteMsg( + correlation_id="pc-striping", request_id=req_id, + dst_sip=0, dst_cube=cube, dst_pe=pe, + dst_pa=_hbm_pa(sip=0, cube=cube, pe_id=pe), nbytes=nbytes, + pattern="zero", target_pe=pe, + ) + + +def _single_write_ns(nbytes: int, cube: int = 0, pe: int = 0) -> float: + eng = _engine() + msg = _write_msg(f"single-{cube}-{pe}-{nbytes}", cube=cube, pe=pe, nbytes=nbytes) + h = eng.submit(msg) + eng.wait(h) + _, t = eng.get_completion(h) + return t["total_ns"] + + +def _path_drain_for_write(eng: GraphEngine, msg: MemoryWriteMsg) -> float: + """Compute engine path drain dynamically (test-time access to engine internals).""" + pcie_ep_id = eng._resolver.find_pcie_ep(msg.dst_sip) + pa = PhysAddr.decode(msg.dst_pa) + hbm_node = eng._resolver.resolve(pa) + path = eng._router.find_memory_path(pcie_ep_id, hbm_node) + return eng._path_drain_ns(path, msg.nbytes) + + +# ── 1. Builder derives pc_bw_gbs ────────────────────────────────── + + +def test_builder_derives_pc_bw_gbs(): + """Topology builder must inject `pc_bw_gbs = hbm_to_router_bw_gbs / num_pcs` + as an attr on every hbm_ctrl node. Enforces ADR-0019 D9 invariant + (channels_per_PE × per-PC BW = aggregated link BW) at build time. + """ + handle = resolve_topology(str(TOPOLOGY_PATH)) + topo = handle.topology_obj + spec = topo.spec + + expected_total_bw = float(spec["cube"]["links"]["hbm_to_router_bw_gbs"]) + expected_num_pcs = int(spec["cube"]["memory_map"]["hbm_channels_per_pe"]) + expected_pc_bw = expected_total_bw / expected_num_pcs + + hbm_nodes = [n for n in topo.nodes.values() if "hbm_ctrl" in n.id] + assert hbm_nodes, "no hbm_ctrl nodes found in topology" + + for node in hbm_nodes: + assert "num_pcs" in node.attrs, f"{node.id} missing num_pcs" + assert int(node.attrs["num_pcs"]) == expected_num_pcs, ( + f"{node.id} num_pcs={node.attrs['num_pcs']} != {expected_num_pcs}" + ) + assert "pc_bw_gbs" in node.attrs, f"{node.id} missing builder-derived pc_bw_gbs" + assert abs(float(node.attrs["pc_bw_gbs"]) - expected_pc_bw) < 1e-6, ( + f"{node.id} pc_bw_gbs={node.attrs['pc_bw_gbs']} != {expected_pc_bw}" + ) + + +# ── 2. PC parallelism: concurrent writes do NOT serialize at HBM CTRL ── + + +def test_two_concurrent_writes_parallel_across_pcs(): + """Two concurrent writes to the same cube (different PEs) must use + different PCs (via global round-robin) and finish in less than 2x + the single-write latency. + + Current model (Resource cap=1) serializes them → max ≈ 2x single. + PC striping must give max < 1.7x single (allowing for shared wire BW + occupancy, which remains). + """ + nbytes = 1024 + single_ns = _single_write_ns(nbytes) + + eng = _engine() + msg_a = _write_msg("conc-a", cube=0, pe=0, nbytes=nbytes) + msg_b = _write_msg("conc-b", cube=0, pe=1, nbytes=nbytes) + ha = eng.submit(msg_a) + hb = eng.submit(msg_b) + eng.wait(ha) + eng.wait(hb) + _, ta = eng.get_completion(ha) + _, tb = eng.get_completion(hb) + max_ns = max(ta["total_ns"], tb["total_ns"]) + + assert max_ns < single_ns * 1.7, ( + f"PC striping: 2 concurrent 1KB writes should not serialize at HBM CTRL. " + f"single={single_ns:.2f}ns, concurrent max={max_ns:.2f}ns, " + f"ratio={max_ns/single_ns:.2f} (expected < 1.7)" + ) + + +def test_eight_concurrent_writes_makespan(): + """8 concurrent 1KB writes (one per PE in cube0) must achieve makespan + significantly less than 8x single-write latency. + + With 8 PCs and global round-robin, each write maps to a distinct set of + PCs; the makespan is dominated by wire BW (shared 256 GB/s pipe), not + by HBM-side serialization. + Current cap=1 model: makespan ≈ 8x single. Target: < 4x single. + """ + nbytes = 1024 + single_ns = _single_write_ns(nbytes) + + eng = _engine() + handles = [] + for pe in range(8): + msg = _write_msg(f"8way-{pe}", cube=0, pe=pe, nbytes=nbytes) + handles.append(eng.submit(msg)) + for h in handles: + eng.wait(h) + times = [eng.get_completion(h)[1]["total_ns"] for h in handles] + makespan = max(times) + + assert makespan < single_ns * 4.0, ( + f"8 concurrent 1KB writes: makespan={makespan:.2f}ns, " + f"single={single_ns:.2f}ns, ratio={makespan/single_ns:.2f} " + f"(expected < 4.0 with PC striping; current cap=1 gives ~8x)" + ) + + +# ── 3. Large transfer not 2x pessimistic ────────────────────────── + + +def test_large_transfer_not_double_counted(): + """64KB write must not be ~2x the wire transfer time. + + With cut-through (head_arrived event) + PC striping, the HBM PC commit + time overlaps with wire arrival. For 64KB at 256 GB/s aggregate: + - Wire transfer: ~256ns + - PC commit (parallel across 8 PCs, 32 chunks each): ~256ns + - Overlapped real-HW total: ~256ns (one of them dominates) + - Current sequential model: ~512ns (~2x) + + Assert: total < 1.5x of (wire transfer time alone). + """ + nbytes = 65536 # 64KB + # Path bottleneck (dynamic) — for MemoryWrite this is UCIe 128 GB/s. + eng = _engine() + msg = _write_msg("64kb-probe", cube=0, pe=0, nbytes=nbytes) + drain = _path_drain_for_write(eng, msg) + + total = _single_write_ns(nbytes) + assert total < drain * 1.5, ( + f"64KB write should not be ~2x path bottleneck transfer time. " + f"drain={drain:.2f}ns, total={total:.2f}ns, " + f"ratio={total/drain:.2f} (expected < 1.5)" + ) + + +# ── 4. Read/write share per-PC available_at ────────────────────── + + +def test_read_write_share_pc_array(): + """Read and write requests targeting overlapping PC regions must + serialize on the shared `pc_avail` array (NOT proceed in parallel like + the prior dual-channel model). + + Strategy: a read and a write to the same PE/cube should land on the + same set of PCs (since global round-robin advances by chunk count, and + chunk count of 256B == 1 chunk consumes 1 PC). With single-chunk read+write + submitted concurrently, the second to acquire its chunk's PC must wait. + + We assert: makespan of (concurrent read + write) > single_write_ns. + If they ran in parallel on disjoint resources (old dual-channel), + makespan ≈ single. With shared PC, makespan > single. + """ + nbytes = 256 # 1 chunk + pa = _hbm_pa(sip=0, cube=0, pe_id=0) + single_w = _single_write_ns(nbytes) + + eng = _engine() + w_msg = _write_msg("rw-write", cube=0, pe=0, nbytes=nbytes) + r_msg = MemoryReadMsg( + correlation_id="pc-striping", request_id="rw-read", + src_sip=0, src_cube=0, src_pe=0, + src_pa=pa, nbytes=nbytes, + ) + hw = eng.submit(w_msg) + hr = eng.submit(r_msg) + eng.wait(hw) + eng.wait(hr) + _, tw = eng.get_completion(hw) + _, tr = eng.get_completion(hr) + makespan = max(tw["total_ns"], tr["total_ns"]) + + # When R and W share the same first PC, the second one to acquire pays + # the burst time of the first. Assert makespan strictly > single, + # demonstrating sharing (vs the prior dual-channel parallelism). + assert makespan > single_w * 1.05, ( + f"Read+Write should share per-PC slot when targeting the same starting " + f"PC. single_write={single_w:.2f}ns, R+W makespan={makespan:.2f}ns " + f"(expected > 1.05x single, demonstrating PC sharing)" + ) + + +# ── 5. Switch penalty: default 0, mechanism wired up ───────────── + + +def _makespan(eng: GraphEngine, handles: list) -> float: + for h in handles: + eng.wait(h) + return max(eng.get_completion(h)[1]["total_ns"] for h in handles) + + +def _engine_with_switch_penalty(switch_penalty_ns: float) -> GraphEngine: + """Build a GraphEngine, overriding switch_penalty_ns on every hbm_ctrl + node. None means leave the attr absent (i.e., test the default).""" + graph = load_topology(TOPOLOGY_PATH) + if switch_penalty_ns is not None: + for node in graph.nodes.values(): + if "hbm_ctrl" in node.id: + node.attrs["switch_penalty_ns"] = switch_penalty_ns + return GraphEngine(graph) + + +def _rw_write_time(eng: GraphEngine, nbytes: int) -> float: + """Submit one read followed by one write of the same size; return the + write's completion time. With `nbytes >= num_pcs * burst_bytes`, the + read populates PCs 0..N-1 with last_dir='R' and the write then wraps + back to PC 0, so every chunk of the write sees an R→W direction + switch. The write's completion time is the direct observable for the + switch-penalty mechanism (the read's time is dominated by the + response-path latency and would mask the effect).""" + r = MemoryReadMsg( + correlation_id="pc-striping", request_id="rw-1", + src_sip=0, src_cube=0, src_pe=0, + src_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=nbytes, + ) + w = _write_msg("rw-2", cube=0, pe=0, nbytes=nbytes) + hr = eng.submit(r) + hw = eng.submit(w) + eng.wait(hr); eng.wait(hw) + return eng.get_completion(hw)[1]["total_ns"] + + +def test_switch_penalty_default_zero(): + """Default (no `switch_penalty_ns` attr) must behave identically to + explicit `switch_penalty_ns=0`. + + This documents Tier 0 (ADR-0033 D2): we assume an ideal HBM scheduler + amortizes switching cost; the mechanism exists but is dormant. + """ + nbytes = 2048 + rw_default = _rw_write_time(_engine_with_switch_penalty(None), nbytes) + rw_zero = _rw_write_time(_engine_with_switch_penalty(0.0), nbytes) + diff = abs(rw_default - rw_zero) + assert diff < 0.01, ( + f"Default (no attr) must match explicit switch_penalty_ns=0. " + f"default={rw_default:.2f}ns, explicit_zero={rw_zero:.2f}ns, " + f"diff={diff:.4f}ns" + ) + + +def test_switch_penalty_mechanism_when_enabled(): + """When `switch_penalty_ns` is set non-zero via attr, R→W on the same + PC must show that extra delay. + + Phase 2 must wire up the mechanism so that overriding the attr at + runtime (or via a modified topology) produces the expected delay. + Default config keeps it 0; this test creates an engine with an + explicit override. + """ + # Use nbytes that span all 8 PCs so the write back-wraps to PCs that + # were just touched by the read, forcing an R→W switch on each PC. + # 8 PCs × 256B burst = 2048B fills every PC exactly once. + nbytes = 2048 + switch_penalty = 20.0 # large enough to be visible + + # R+W with explicit switch_penalty=0: baseline (W observed time) + rw_zero = _rw_write_time(_engine_with_switch_penalty(0.0), nbytes) + + # R+W with explicit switch_penalty=20: mechanism engaged + rw_pen = _rw_write_time(_engine_with_switch_penalty(switch_penalty), nbytes) + + delta = rw_pen - rw_zero + # The switch penalty applies once on the second txn's first chunk. + # Conservative: assert at least half the switch_penalty shows up. + assert delta >= switch_penalty * 0.4, ( + f"switch_penalty_ns={switch_penalty} should add measurable delay " + f"when R→W on same PC. R+W@0={rw_zero:.2f}ns, " + f"R+W@{switch_penalty}={rw_pen:.2f}ns, delta={delta:.2f}ns " + f"(expected >= {switch_penalty*0.4:.2f}ns)" + ) + + +# ── 6. Backwards compat sanity ─────────────────────────────────── + + +def test_existing_single_txn_latency_positive(): + """Sanity: single write still produces positive latency (no regression + of basic engine behavior). Companion to test_bw_occupancy.py.""" + t = _single_write_ns(4096) + assert t > 0 diff --git a/tests/test_pe_pipeline.py b/tests/test_pe_pipeline.py index 910f5c5..2f404e0 100644 --- a/tests/test_pe_pipeline.py +++ b/tests/test_pe_pipeline.py @@ -380,12 +380,18 @@ def test_pe_dma_record_start_after_channel_acquire(): ) durations = [r.t_end - r.t_start for r in dma_records] - # All three should have the same actual transfer time within ±1 ns. + # All three should have similar transfer time. Under the PC striping + # model (ADR-0033 D1), per-PC `available_at` state introduces small + # timing differences between consecutive same-direction reads to the + # same PC set (the second read's start = max(eff_start, pc_avail[pc])). + # Tolerance widened from ±1ns to ±3ns to absorb this variance without + # weakening the invariant that queue wait is excluded from the recorded + # interval (still validated by the t_start >= prev_end check below). base = durations[0] assert base > 0, f"first dma duration must be positive, got {base}" for i, d in enumerate(durations): - assert abs(d - base) <= 1.0, ( - f"op {i} duration {d} differs from baseline {base} by >1 ns " + assert abs(d - base) <= 3.0, ( + f"op {i} duration {d} differs from baseline {base} by >3 ns " f"— record_start may still be including queue wait" ) diff --git a/tests/test_phase_a_components.py b/tests/test_phase_a_components.py index 0330943..cf2c293 100644 --- a/tests/test_phase_a_components.py +++ b/tests/test_phase_a_components.py @@ -119,9 +119,18 @@ def test_hbm_ctrl_terminal_succeeds_done(): assert done_event.triggered -def test_hbm_ctrl_resource_serializes_requests(): - """HbmCtrlComponent with capacity=1 serializes concurrent requests.""" - node = _node("builtin.hbm_ctrl", {"overhead_ns": 5.0, "capacity": 1}) +def test_hbm_ctrl_concurrent_zero_byte_requests_parallel(): + """HbmCtrlComponent under the PC striping model (ADR-0033 D1) processes + zero-byte transactions in parallel — they claim no PC chunks, so only + the per-request `overhead_ns` applies and both finish concurrently. + + This supersedes the prior dual-channel `simpy.Resource(capacity=1)` + serialization assertion (ADR-0033 supersedes the dual-channel model). + """ + node = _node( + "builtin.hbm_ctrl", + {"overhead_ns": 5.0, "num_pcs": 8, "pc_bw_gbs": 32.0, "burst_bytes": 256}, + ) comp = HbmCtrlComponent(node) env = simpy.Environment() in_store: simpy.Store = simpy.Store(env) @@ -140,10 +149,10 @@ def test_hbm_ctrl_resource_serializes_requests(): env.process(inject()) env.run(until=done2) - # Both must be done; with serialization: t=5 + t=10 assert done1.triggered assert done2.triggered - assert env.now == pytest.approx(10.0) + # Zero-byte txns occupy no PC; both finish at t = overhead_ns (parallel). + assert env.now == pytest.approx(5.0) # ── 4. Terminal: SramComponent succeeds done ───────────────────────── diff --git a/tests/test_wire_cut_through.py b/tests/test_wire_cut_through.py new file mode 100644 index 0000000..72ca8dc --- /dev/null +++ b/tests/test_wire_cut_through.py @@ -0,0 +1,142 @@ +"""Tests for wire cut-through via `Transaction.head_arrived` event (ADR-0033 D1). + +The wire model (ADR-0015 D2) currently delivers a message to the destination +in_port only after the full nbytes/bw_gbs transfer time has elapsed +(store-and-forward). Phase 2 adds a `head_arrived` SimPy event on the +Transaction that fires at `prop_ns + FLIT_BYTES / bw_gbs` — letting opted-in +destinations (e.g., HBM CTRL) start processing the leading flit before the +tail arrives. The wire's BW occupancy (`available_at`) is unchanged. + +These tests assert the *behavioral* consequence: when both the wire and +HBM CTRL contribute meaningfully to total latency, the model must not +double-count their time. They are written BEFORE Phase 2 production +changes and expected to FAIL on current code. +""" +from __future__ import annotations + +from pathlib import Path + +from kernbench.policy.address.phyaddr import PhysAddr +from kernbench.runtime_api.kernel import MemoryWriteMsg +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import load_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + + +def _engine() -> GraphEngine: + return GraphEngine(load_topology(TOPOLOGY_PATH)) + + +def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int: + slice_bytes = 48 * (1 << 30) // 8 + return PhysAddr.pe_hbm_addr( + sip_id=sip, die_id=cube, pe_id=pe_id, + pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, + ).encode() + + +def _path_drain_for_write(eng: GraphEngine, msg: MemoryWriteMsg) -> float: + """Dynamically compute the engine's path drain for this write.""" + pcie_ep_id = eng._resolver.find_pcie_ep(msg.dst_sip) + pa = PhysAddr.decode(msg.dst_pa) + hbm_node = eng._resolver.resolve(pa) + path = eng._router.find_memory_path(pcie_ep_id, hbm_node) + return eng._path_drain_ns(path, msg.nbytes) + + +def _write_ns(nbytes: int) -> tuple[float, float]: + """Return (total_ns, path_drain_ns) for the MemoryWrite of given nbytes.""" + eng = _engine() + msg = MemoryWriteMsg( + correlation_id="cut-through", request_id=f"w-{nbytes}", + dst_sip=0, dst_cube=0, dst_pe=0, + dst_pa=_hbm_pa(), nbytes=nbytes, + pattern="zero", target_pe=0, + ) + drain = _path_drain_for_write(eng, msg) + h = eng.submit(msg) + eng.wait(h) + _, t = eng.get_completion(h) + return t["total_ns"], drain + + +# ── 1. Effective slope: total_ns vs nbytes should grow at the rate of +# the bottleneck BW, not 2x that rate (which double-counts wire+HBM). + + +def test_effective_slope_single_bw_not_doubled(): + """The effective ns-per-byte slope should match the path bottleneck rate + (= 1 / bottleneck_bw), NOT 2× that rate (which would double-count wire + and HBM drain). Drain is computed dynamically from the engine path. + + Measurement: linear fit between two large transfer sizes. Constants + cancel; slope is the discriminator. + """ + n1, n2 = 32768, 131072 # 32KB and 128KB + t1, drain1 = _write_ns(n1) + t2, drain2 = _write_ns(n2) + slope = (t2 - t1) / (n2 - n1) # ns per byte + expected_slope = drain2 / n2 # = 1 / bottleneck_bw (ns/byte) + + # 50% tolerance above ideal accounts for propagation prop_ns at + # large-N regimes; still well below 2× (doubled) doubling. + assert slope < expected_slope * 1.5, ( + f"Effective slope {slope*1000:.4f} ps/byte too steep; " + f"expected ~{expected_slope*1000:.4f} ps/byte at path bottleneck. " + f"A doubled (wire + HBM drain) model would give ~" + f"{expected_slope*2*1000:.4f} ps/byte." + ) + + +# ── 2. Absolute upper bound: 1MB transfer not 2x wire time ── + + +def test_1mb_transfer_upper_bound(): + """A 1MB write should complete in roughly the path-bottleneck transfer + time, plus modest fixed overhead. A doubled (wire + HBM drain) model + would give ~2× that. + """ + nbytes = 1 << 20 # 1 MB + total, drain = _write_ns(nbytes) + assert total < drain * 1.5, ( + f"1MB write should not be ~2x bottleneck transfer time. " + f"drain={drain:.2f}ns, total={total:.2f}ns, " + f"ratio={total/drain:.2f} (expected < 1.5)" + ) + + +# ── 3. Small transfer: cut-through dominated by component overhead ── + + +def test_small_transfer_remains_finite_and_positive(): + """Sanity: small (single-chunk) transfer still completes with positive + finite latency. Cut-through should not introduce zero-latency bugs. + """ + t, _ = _write_ns(256) + assert t > 0 + assert t < 1000.0, f"256B write should be << 1us, got {t}ns" + + +# ── 4. Monotonicity preserved under cut-through ── + + +def test_monotonicity_at_extreme_sizes(): + """Once payload is large enough to be wire-dominated, monotonicity + must hold: a much larger write takes more time than a smaller one. + + Note: in the PC parallelism regime (ADR-0033 D1), a small single-PC + transfer can actually be slower than a small few-PC transfer (a 1KB + write spans 4 PCs in parallel and finishes around the same wall-clock + time as a 256B write that only loads 1 PC). This is physically + correct and matches real-HW behavior; strict monotonicity over the + sub-PC regime is not asserted. We assert it only across an extreme + range where the wire-transfer term dominates. + """ + small, _ = _write_ns(256) + large, _ = _write_ns(65536) + assert large > small, ( + f"65KB ({large:.2f}ns) must exceed 256B ({small:.2f}ns) — " + f"wire transfer at 256GB/s alone is 256ns for 64KB, so total " + f"must dominate any sub-microsecond small-transfer time." + ) diff --git a/topology.yaml b/topology.yaml index fc428bf..b94f3fd 100644 --- a/topology.yaml +++ b/topology.yaml @@ -106,7 +106,7 @@ cube: components: noc_router: { kind: noc_router, impl: builtin.forwarding, attrs: { overhead_ns: 2.0 } } m_cpu: { kind: m_cpu, impl: builtin.m_cpu, attrs: { overhead_ns: 5.0 } } - hbm_ctrl: { kind: hbm_ctrl, impl: builtin.hbm_ctrl, attrs: { capacity: 1, efficiency: 1.0 } } + hbm_ctrl: { kind: hbm_ctrl, impl: builtin.hbm_ctrl, attrs: { capacity: 1, efficiency: 1.0, num_pcs: 8, burst_bytes: 256, switch_penalty_ns: 0.0 } } sram: { kind: sram, impl: builtin.sram, attrs: { size_mb: 32, overhead_ns: 2.0 } } # Physical placement of non-PE components (mm coordinates)