"""Tests for H2D writes and PE DMA probe latency invariants. H2D tests use MemoryWriteMsg (pcie_ep → io_cpu → m_cpu → hbm_ctrl → response). PE DMA tests use PeDmaMsg (direct pe_dma → xbar → hbm_ctrl injection). """ from pathlib import Path from kernbench.policy.address.phyaddr import PhysAddr from kernbench.policy.routing.router import AddressResolver, PathRouter from kernbench.runtime_api.kernel import MemoryReadMsg, MemoryWriteMsg, PeDmaMsg from kernbench.sim_engine.engine import GraphEngine from kernbench.topology.builder import load_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" def _engine(): return GraphEngine(load_topology(TOPOLOGY_PATH)) def _hbm_pa(sip: int = 0, cube: int = 0, pe_id: int = 0) -> int: slice_bytes = 48 * (1 << 30) // 8 pa = PhysAddr.pe_hbm_addr( rack_id=0, sip_id=sip, cube_id=cube, pe_id=pe_id, pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, ) return pa.encode() def _h2d_latency(dst_cube: int, dst_pe: int = 0) -> float: engine = _engine() msg = MemoryWriteMsg( correlation_id="probe", request_id=f"h2d-c{dst_cube}-p{dst_pe}", dst_sip=0, dst_cube=dst_cube, dst_pe=dst_pe, dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=dst_pe), nbytes=4096, pattern="zero", target_pe=dst_pe, ) h = engine.submit(msg) engine.wait(h) _, trace = engine.get_completion(h) return trace["total_ns"] # ── 1. Single-PE write completes ────────────────────────────────── def test_single_pe_write_completes(): """MemoryWriteMsg(target_pe=0) must complete with ok=True, latency > 0.""" engine = _engine() msg = MemoryWriteMsg( correlation_id="probe", request_id="pe-local", dst_sip=0, dst_cube=0, dst_pe=0, dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096, pattern="zero", target_pe=0, ) h = engine.submit(msg) engine.wait(h) comp, trace = engine.get_completion(h) assert comp.ok is True assert trace["total_ns"] > 0 # ── 2. Cross-cube write positive latency ───────────────────────── def test_cross_cube_write_positive(): """Cross-cube MemoryWriteMsg(target_pe=0) must complete with latency > 0.""" lat = _h2d_latency(dst_cube=1, dst_pe=0) assert lat > 0 # ── 3. H2D latency monotonicity ────────────────────────────────── def test_h2d_latency_monotonic(): """1hop < 2hop < 3hop < 4hop.""" cubes = [0, 4, 8, 12] latencies: list[tuple[int, float]] = [] for cube in cubes: lat = _h2d_latency(dst_cube=cube, dst_pe=0) latencies.append((cube, lat)) for i in range(len(latencies) - 1): assert latencies[i][1] < latencies[i + 1][1], ( f"cube{latencies[i][0]}({latencies[i][1]:.2f}) " f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})" ) # ── 4. Single-PE write deterministic ───────────────────────────── def test_single_pe_write_deterministic(): """Same MemoryWriteMsg on two engines must produce identical latency.""" msg = MemoryWriteMsg( correlation_id="probe", request_id="det", dst_sip=0, dst_cube=0, dst_pe=0, dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096, pattern="zero", target_pe=0, ) e1 = _engine() h1 = e1.submit(msg) e1.wait(h1) _, t1 = e1.get_completion(h1) e2 = _engine() h2 = e2.submit(msg) e2.wait(h2) _, t2 = e2.get_completion(h2) assert t1["total_ns"] == t2["total_ns"] # ── 5. Cut-through (wormhole) wire model invariants ────────────── def test_h2d_local_cube_cut_through(): """H2D to local cube with cut-through should be < 50ns for 4096B. Full command path: pcie_ep → io_cpu → ucie → noc → m_cpu DMA: m_cpu → noc → xbar → hbm_ctrl (drain once at terminal) Plus response path back. With store-and-forward each hop would serialize; cut-through keeps it low. """ lat = _h2d_latency(dst_cube=0, dst_pe=0) assert lat < 65.0, f"Local H2D {lat:.2f}ns; cut-through expects < 65ns" def test_h2d_remote_cube_cut_through(): """H2D to 1-hop remote cube: cut-through drain dominates, not per-hop serialization. With store-and-forward, each hop would serialize 4096B, total >> 100ns. With cut-through, drain happens once at bottleneck. """ lat = _h2d_latency(dst_cube=4, dst_pe=0) assert lat < 80.0, f"Remote H2D {lat:.2f}ns; cut-through expects < 80ns" # ── 6. PE DMA: direct injection tests ───────────────────────── def _graph(): return load_topology(TOPOLOGY_PATH) def _hbm_effective_bw() -> float: """Compute HBM effective BW from topology spec: xbar_to_hbm_bw_gbs * efficiency.""" g = _graph() raw_bw = g.spec["cube"]["links"]["xbar_to_hbm_bw_gbs"] eff = g.spec["cube"]["components"]["hbm_ctrl"].get("attrs", {}).get("efficiency", 1.0) return raw_bw * eff def _pe_dma_latency(src_cube: int, src_pe: int, dst_pe: int) -> float: engine = _engine() msg = PeDmaMsg( correlation_id="probe", request_id=f"dma-c{src_cube}-p{src_pe}-s{dst_pe}", src_sip=0, src_cube=src_cube, src_pe=src_pe, dst_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe), nbytes=4096, ) h = engine.submit(msg) engine.wait(h) _, trace = engine.get_completion(h) return trace["total_ns"] def _pe_dma_bottleneck(src_cube: int, src_pe: int, dst_pe: int) -> float | None: graph = _graph() edge_map = {(e.src, e.dst): e for e in graph.edges} resolver = AddressResolver(graph) router = PathRouter(graph) pa = _hbm_pa(sip=0, cube=src_cube, pe_id=dst_pe) pa_obj = PhysAddr.decode(pa) dst_node = resolver.resolve(pa_obj) pe_ref = f"sip0.cube{src_cube}.pe{src_pe}" path = router.find_path(pe_ref, dst_node) bws: list[float] = [] for i in range(len(path) - 1): e = edge_map.get((path[i], path[i + 1])) if e and e.bw_gbs: bws.append(e.bw_gbs) return min(bws) if bws else None def test_pe_dma_local_completes(): """PeDmaMsg to local slice0 must complete with ok=True, latency > 0.""" engine = _engine() msg = PeDmaMsg( correlation_id="probe", request_id="dma-local", src_sip=0, src_cube=0, src_pe=0, dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096, ) h = engine.submit(msg) engine.wait(h) comp, trace = engine.get_completion(h) assert comp.ok is True assert trace["total_ns"] > 0 def test_pe_dma_local_bottleneck_hbm(): """PE DMA pe0→slice0 (local): bottleneck = HBM effective BW.""" bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=0) expected = _hbm_effective_bw() assert bn == expected, f"Local PE DMA bottleneck {bn}, expected {expected}" def test_pe_dma_same_half_bottleneck_hbm(): """PE DMA pe0→slice1 (same half via xbar_top): bottleneck = HBM effective BW.""" bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=1) expected = _hbm_effective_bw() assert bn == expected, f"Same-half PE DMA bottleneck {bn}, expected {expected}" def test_pe_dma_deterministic(): """Same PeDmaMsg on two engines must produce identical latency.""" msg = PeDmaMsg( correlation_id="probe", request_id="det", src_sip=0, src_cube=0, src_pe=0, dst_pa=_hbm_pa(sip=0, cube=0, pe_id=0), nbytes=4096, ) e1 = _engine() h1 = e1.submit(msg) e1.wait(h1) _, t1 = e1.get_completion(h1) e2 = _engine() h2 = e2.submit(msg) e2.wait(h2) _, t2 = e2.get_completion(h2) assert t1["total_ns"] == t2["total_ns"] # ── 7. PE DMA cross-cube best vs worst ────────────────────────── def _pe_dma_cross_cube_latency(dst_cube: int) -> float: engine = _engine() msg = PeDmaMsg( correlation_id="probe", request_id=f"dma-cross-c{dst_cube}", src_sip=0, src_cube=0, src_pe=0, dst_pa=_hbm_pa(sip=0, cube=dst_cube, pe_id=0), nbytes=4096, ) h = engine.submit(msg) engine.wait(h) _, trace = engine.get_completion(h) return trace["total_ns"] def test_pe_cross_cube_best_worst(): """Cross-cube best (adjacent cube1) must have lower latency than worst (far cube15).""" best = _pe_dma_cross_cube_latency(dst_cube=1) worst = _pe_dma_cross_cube_latency(dst_cube=15) assert best < worst, ( f"Best (cube1) {best:.2f}ns must < worst (cube15) {worst:.2f}ns" ) # ── 8. Probe timestamp trace ────────────────────────────────── def test_probe_timestamp_trace(): """_hop_timestamps must return monotonically increasing cumulative timestamps.""" from kernbench.cli.probe import _hop_timestamps, _build_edge_map graph = _graph() edge_map = _build_edge_map(graph) resolver = AddressResolver(graph) router = PathRouter(graph) pa = _hbm_pa(sip=0, cube=0, pe_id=0) pa_obj = PhysAddr.decode(pa) dst_node = resolver.resolve(pa_obj) pe_ref = "sip0.cube0.pe0" path = router.find_path(pe_ref, dst_node) timestamps = _hop_timestamps(path, 4096, edge_map, graph) assert len(timestamps) == len(path) for i in range(len(timestamps) - 1): assert timestamps[i][1] <= timestamps[i + 1][1], ( f"Timestamps not monotonic at hop {i}: " f"{timestamps[i][1]:.4f} > {timestamps[i + 1][1]:.4f}" ) # ── 9. D2H Read latency monotonicity ──────────────────────────── def _d2h_latency(src_cube: int) -> float: engine = _engine() msg = MemoryReadMsg( correlation_id="probe", request_id=f"d2h-c{src_cube}", src_sip=0, src_cube=src_cube, src_pe=0, src_pa=_hbm_pa(sip=0, cube=src_cube, pe_id=0), nbytes=4096, ) h = engine.submit(msg) engine.wait(h) _, trace = engine.get_completion(h) return trace["total_ns"] def test_d2h_latency_monotonic(): """D2H read: 1hop < 2hop < 3hop < 4hop.""" cubes = [0, 4, 8, 12] latencies = [(c, _d2h_latency(c)) for c in cubes] for i in range(len(latencies) - 1): assert latencies[i][1] < latencies[i + 1][1], ( f"cube{latencies[i][0]}({latencies[i][1]:.2f}) " f"must < cube{latencies[i + 1][0]}({latencies[i + 1][1]:.2f})" ) def test_d2h_latency_gte_h2d(): """D2H read latency >= H2D write latency for same cube (reverse data path).""" for cube in [0, 4, 8]: h2d = _h2d_latency(dst_cube=cube, dst_pe=0) d2h = _d2h_latency(src_cube=cube) assert d2h >= h2d * 0.8, ( f"cube{cube}: D2H ({d2h:.2f}ns) should be >= 80% of H2D ({h2d:.2f}ns)" ) # ── 10. HBM efficiency applied ────────────────────────────────── def test_hbm_efficiency_applied(): """HBM edge BW should reflect efficiency factor from topology spec.""" graph = _graph() edge_map = {(e.src, e.dst): e for e in graph.edges} e = edge_map.get(("sip0.cube0.xbar_top", "sip0.cube0.hbm_ctrl.slice0")) assert e is not None, "xbar_top -> hbm_ctrl.slice0 edge missing" expected = _hbm_effective_bw() assert e.bw_gbs == expected, f"HBM edge BW {e.bw_gbs}, expected {expected}" # ── 11. Sweep saturation ────────────────────────────────────── def test_probe_sweep_saturation(): """Utilization at 1MB must exceed utilization at 4KB for pe-local-hbm.""" from kernbench.cli.probe import _sweep_util # pe-local-hbm: ovhd=2ns (xbar), wire~0.03ns, bn=204.8 GB/s u = _sweep_util(2.0, 0.03, 204.8) assert u[-1] > u[0], ( f"1MB util ({u[-1]:.1f}%) must exceed 4KB util ({u[0]:.1f}%)" ) assert u[-1] > 99.0, f"1MB util ({u[-1]:.1f}%) should be >99%"