diff --git a/docs/adr/ADR-0023-ipcq-pe-collective.en.md b/docs/adr/ADR-0023-ipcq-pe-collective.en.md index 8c79f07..81ad662 100644 --- a/docs/adr/ADR-0023-ipcq-pe-collective.en.md +++ b/docs/adr/ADR-0023-ipcq-pe-collective.en.md @@ -372,24 +372,41 @@ When the receiver frees a slot, the sender must learn about it travel through general vc_comm fabric — it uses a **separate fast path**, an abstraction of the NVLink / UCIe credit-return wire. -**Latency** is computed from the **bottleneck BW on the path**, not a -magic constant: +**Latency** is computed from the **full path latency** (per-node +overhead + edge propagation + drain), not a magic constant: ``` credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes) -path = router.find_path(self_pe, peer_pe) -latency = compute_drain_ns(path, credit_size_bytes) - = credit_size_bytes / bottleneck_bw_on_path +path = router.find_path(self_pe, peer_pe.pe_dma) +latency = compute_path_latency_ns(path, credit_size_bytes) + = sum(edge.distance_mm * ns_per_mm) + + sum(node_overhead_ns[n] for n in path) + + credit_size_bytes / bottleneck_bw_on_path ``` +The router auto-appends `.pe_dma` to the source only, so the +destination MUST be spelled with the explicit `.pe_dma` suffix or +`find_path` raises and the credit silently teleports at zero cost +(latent bug fixed alongside this update). + +`tl.recv` blocks on the credit-emit completion (recv yields-from +`_delayed_credit_send` rather than spawning it as a fork). This puts +the credit-return cost on the receiver's `pe_exec_ns`, modeling the +IPCQ control-plane completing the consume-acknowledgement before +recv returns to the kernel — the protocol equivalent of a non-posted +`tl.store` waiting for an HBM ack on the raw DMA path. + That gives us: - **Topology-proportional approximation**: an in-cube credit return is automatically faster than a cross-SIP credit return. -- **No magic constants**: no arbitrary `ipcq_ctrl_latency_ns`. +- **No magic constants**: every nanosecond comes from + `compute_path_latency_ns` on the same edge_map and `node_overhead_ns` + as data traffic. - **No deadlock risk**: unlike piggyback, B can issue credit even when - it has no data to send back. -- **Reuses existing utility**: `ComponentContext.compute_drain_ns`. + it has no data to send back. `peer_credit_store.put` is unbounded. +- **`IPCQ ≥ raw DMA`** for matched physical moves — the credit-emit + cost on recv balances the HBM ack-trip cost RAW pays on the sender. #### Component coupling — SimPy Store channel diff --git a/docs/adr/ADR-0023-ipcq-pe-collective.md b/docs/adr/ADR-0023-ipcq-pe-collective.md index f2afa6b..9fc9ced 100644 --- a/docs/adr/ADR-0023-ipcq-pe-collective.md +++ b/docs/adr/ADR-0023-ipcq-pe-collective.md @@ -365,23 +365,39 @@ data 경로의 piggyback 모델과 달리, credit return은 일반 vc_comm fabri 거치지 않고 **별도 fast path**로 처리한다. 이는 실제 HW의 NVLink/UCIe credit return fast path를 추상화한 것이다. -**Latency 계산**: magic constant가 아니라 **라우팅 경로의 bottleneck BW** -기준으로 산출한다. +**Latency 계산**: magic constant가 아니라 **라우팅 경로의 full path +latency** (per-node overhead + edge propagation + drain) 기준으로 +산출한다. ``` credit_size_bytes = 16 (ccl.yaml: ipcq_credit_size_bytes) -path = router.find_path(self_pe, peer_pe) -latency = compute_drain_ns(path, credit_size_bytes) - = credit_size_bytes / bottleneck_bw_on_path +path = router.find_path(self_pe, peer_pe.pe_dma) +latency = compute_path_latency_ns(path, credit_size_bytes) + = sum(edge.distance_mm * ns_per_mm) + + sum(node_overhead_ns[n] for n in path) + + credit_size_bytes / bottleneck_bw_on_path ``` +router는 source에만 `.pe_dma`를 자동 부여하므로 destination에는 반드시 +`.pe_dma` suffix를 명시해야 한다. 그렇지 않으면 `find_path`가 raise하고 +credit이 0 cost로 silently teleport되는 latent bug가 발생한다 (이번 +업데이트에서 수정됨). + +`tl.recv`는 credit-emit 완료를 yield-from으로 기다린다 (이전에는 +`env.process`로 fork). 이로써 credit-return cost가 receiver의 +`pe_exec_ns`에 반영되어, IPCQ control-plane이 consume-acknowledgement를 +완료한 뒤에야 recv가 kernel에 반환된다 — RAW DMA의 non-posted `tl.store`가 +HBM ack-trip을 기다리는 것의 protocol-level 등가물이다. + 이로써: - **토폴로지 비례 approximation**: cube 내 credit return과 cross-SIP credit이 - 자동으로 다른 latency를 가짐 (정확한 값은 아니지만 magic constant보다 의미 있음) -- **Magic constant 없음**: 별도 `ipcq_ctrl_latency_ns` 같은 임의 값 불필요 -- **Deadlock 위험 없음**: piggyback과 달리 B가 A에게 보낼 데이터가 없어도 - credit이 자동 발행됨 -- **기존 utility 재사용**: `ComponentContext.compute_drain_ns` 그대로 사용 + 자동으로 다른 latency를 가짐 +- **Magic constant 없음**: 모든 ns 값이 데이터 트래픽과 동일한 edge_map + 및 `node_overhead_ns`에서 산출되는 `compute_path_latency_ns`로부터 옴 +- **Deadlock 위험 없음**: `peer_credit_store.put`은 unbounded, B가 A에게 + 보낼 데이터가 없어도 credit이 자동 발행됨 +- **`IPCQ ≥ raw DMA`** 보장: matched physical move에 대해 credit-emit이 + RAW의 ack-trip cost와 균형을 이룸 ``` PE B: tl.recv(W) → 데이터 가져감 → my_tail++ diff --git a/src/kernbench/components/builtin/pe_ipcq.py b/src/kernbench/components/builtin/pe_ipcq.py index 5e31876..8ef75dd 100644 --- a/src/kernbench/components/builtin/pe_ipcq.py +++ b/src/kernbench/components/builtin/pe_ipcq.py @@ -338,9 +338,13 @@ class PeIpcqComponent(ComponentBase): nbytes=req.result_data.get("nbytes", 0), ) - # Fast path credit return — bottleneck BW based latency - env.process( - self._delayed_credit_send(env, direction, qp["peer_credit_store"], qp["my_tail"]) + # Credit return: recv blocks on credit-emit so the protocol cost + # (full path latency to deliver the credit metadata back to the + # sender) is reflected in the recv's pe_exec_ns. Models the IPCQ + # control-plane completing the consume-acknowledgement before + # recv returns to the kernel. + yield from self._delayed_credit_send( + env, direction, qp["peer_credit_store"], qp["my_tail"], ) if not req.done.triggered: @@ -455,7 +459,12 @@ class PeIpcqComponent(ComponentBase): yield peer_credit_store.put(meta) def _credit_latency_ns(self, direction: str) -> float: - """Compute credit fast path latency = credit_size / bottleneck_bw. + """Full path latency for the credit-return packet. + + Pays per-node overhead + edge prop + drain along the same fabric + the data took. PathRouter.find_path() auto-appends ".pe_dma" to + the source only, so the destination MUST be spelled with the + explicit ".pe_dma" suffix. Falls back to 0 when ctx/router is unavailable (unit-test mode). """ @@ -463,10 +472,12 @@ class PeIpcqComponent(ComponentBase): return 0.0 qp = self._queue_pairs[direction] peer = qp["peer"] - peer_pe_prefix = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}" + peer_pe_dma = f"sip{peer.sip}.cube{peer.cube}.pe{peer.pe}.pe_dma" try: - path = self.ctx.router.find_path(self._pe_prefix, peer_pe_prefix) - return self.ctx.compute_drain_ns(path, self._credit_size_bytes) + path = self.ctx.router.find_path(self._pe_prefix, peer_pe_dma) + return self.ctx.compute_path_latency_ns( + path, self._credit_size_bytes, + ) except Exception: return 0.0 diff --git a/tests/pe2pe_latency_plots/h1_intra_horizontal.png b/tests/pe2pe_latency_plots/h1_intra_horizontal.png new file mode 100644 index 0000000..23a4db0 Binary files /dev/null and b/tests/pe2pe_latency_plots/h1_intra_horizontal.png differ diff --git a/tests/pe2pe_latency_plots/h2_intra_vertical.png b/tests/pe2pe_latency_plots/h2_intra_vertical.png new file mode 100644 index 0000000..a7af541 Binary files /dev/null and b/tests/pe2pe_latency_plots/h2_intra_vertical.png differ diff --git a/tests/pe2pe_latency_plots/h3_inter_cube_horizontal.png b/tests/pe2pe_latency_plots/h3_inter_cube_horizontal.png new file mode 100644 index 0000000..94b9eef Binary files /dev/null and b/tests/pe2pe_latency_plots/h3_inter_cube_horizontal.png differ diff --git a/tests/pe2pe_latency_plots/h4_inter_cube_vertical.png b/tests/pe2pe_latency_plots/h4_inter_cube_vertical.png new file mode 100644 index 0000000..3f685da Binary files /dev/null and b/tests/pe2pe_latency_plots/h4_inter_cube_vertical.png differ diff --git a/tests/pe2pe_latency_plots/h5_inter_sip.png b/tests/pe2pe_latency_plots/h5_inter_sip.png new file mode 100644 index 0000000..ca61e51 Binary files /dev/null and b/tests/pe2pe_latency_plots/h5_inter_sip.png differ diff --git a/tests/pe2pe_latency_plots/overview.png b/tests/pe2pe_latency_plots/overview.png new file mode 100644 index 0000000..1b29591 Binary files /dev/null and b/tests/pe2pe_latency_plots/overview.png differ diff --git a/tests/pe2pe_latency_plots/summary.csv b/tests/pe2pe_latency_plots/summary.csv new file mode 100644 index 0000000..2fa9201 --- /dev/null +++ b/tests/pe2pe_latency_plots/summary.csv @@ -0,0 +1,91 @@ +hop,label,size_bytes,path,total_ns +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,ipcq,31.1399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),128,raw,12.019999999996799 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,ipcq,32.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),256,raw,13.019999999996799 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,ipcq,34.1399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),384,raw,14.019999999996799 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,ipcq,35.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),512,raw,15.019999999996799 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,ipcq,38.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),768,raw,17.0199999999968 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,ipcq,41.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),1024,raw,19.0199999999968 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,ipcq,53.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),2048,raw,27.0199999999968 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,ipcq,77.6399999999976 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),4096,raw,43.0199999999968 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,ipcq,125.64000000000306 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),8192,raw,75.02000000000407 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,ipcq,149.64000000000306 +h1_intra_horizontal,Intra-cube horizontal (pe0 to pe1),10240,raw,91.02000000000407 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,ipcq,31.1399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),128,raw,12.019999999996799 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,ipcq,32.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),256,raw,13.019999999996799 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,ipcq,34.1399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),384,raw,14.019999999996799 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,ipcq,35.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),512,raw,15.019999999996799 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,ipcq,38.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),768,raw,17.0199999999968 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,ipcq,41.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),1024,raw,19.0199999999968 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,ipcq,53.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),2048,raw,27.0199999999968 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,ipcq,77.6399999999976 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),4096,raw,43.0199999999968 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,ipcq,125.64000000000306 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),8192,raw,75.02000000000407 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,ipcq,149.64000000000306 +h2_intra_vertical,Intra-cube vertical (pe0 to pe4),10240,raw,91.02000000000407 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,ipcq,67.15999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),128,raw,68.53999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,ipcq,68.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),256,raw,70.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,ipcq,70.15999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),384,raw,71.53999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,ipcq,71.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),512,raw,73.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,ipcq,74.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),768,raw,76.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,ipcq,77.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),1024,raw,79.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,ipcq,89.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),2048,raw,91.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,ipcq,113.65999999999804 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),4096,raw,115.03999999999724 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,ipcq,161.65999999999985 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),8192,raw,163.04000000000087 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,ipcq,185.65999999999985 +h3_inter_cube_horizontal,Inter-cube horizontal (cube0 to cube1),10240,raw,187.04000000000087 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,ipcq,87.15999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),128,raw,88.53999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,ipcq,88.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),256,raw,90.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,ipcq,90.15999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),384,raw,91.53999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,ipcq,91.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),512,raw,93.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,ipcq,94.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),768,raw,96.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,ipcq,97.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),1024,raw,99.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,ipcq,109.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),2048,raw,111.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,ipcq,133.65999999999804 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),4096,raw,135.03999999999724 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,ipcq,181.65999999999985 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),8192,raw,183.04000000000087 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,ipcq,205.65999999999985 +h4_inter_cube_vertical,Inter-cube vertical (cube0 to cube4),10240,raw,207.04000000000087 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",128,ipcq,6.015000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",256,ipcq,6.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",384,ipcq,7.015000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",512,ipcq,7.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",768,ipcq,8.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",1024,ipcq,9.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",2048,ipcq,13.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",4096,ipcq,21.515000000003056 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",8192,ipcq,37.51499999999214 +h5_inter_sip,"Inter-SIP (sip0 to sip1, same cube/pe)",10240,ipcq,45.51499999999214 diff --git a/tests/test_pe_to_pe_diagnostic.py b/tests/test_pe_to_pe_diagnostic.py new file mode 100644 index 0000000..4d7bc2f --- /dev/null +++ b/tests/test_pe_to_pe_diagnostic.py @@ -0,0 +1,741 @@ +"""Diagnostic for the inter-cube RAW > IPCQ asymmetry on h3/h4 plots. + +Single-shot run at h3 (sip0.cube0.pe0 -> sip0.cube1.pe0), nbytes=4096. + +Captures per-PE pe_exec_ns and the actual path / drain / per-node overhead +breakdown for the RAW sub-txn (PE_DMA -> remote HBM_CTRL) vs the IPCQ +outbound sub-txn (PE_DMA -> peer PE_DMA), so we can localize the gap to +one of: + (a) drain at HBM-BW (RAW) vs fabric-BW (IPCQ) + (b) path-length / per-node overhead asymmetry + (c) RAW SRC paying tl.load (local HBM read) on top of remote tl.store + while IPCQ DST only pays inbound traversal+drain. + +Phase 1 / test-only. No production code is modified. +""" +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest + +from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config +from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip +from kernbench.policy.placement.dp import DPPolicy +from kernbench.runtime_api.context import RuntimeContext +from kernbench.runtime_api.types import DeviceSelector +from kernbench.sim_engine.engine import GraphEngine +from kernbench.topology.builder import resolve_topology + +TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" + +import os + +# Allow the test to be re-run for h4 (inter-cube vertical) at multiple sizes +# to investigate why IPCQ slope flattens past 8192 B (path may differ). +NBYTES = int(os.environ.get("DIAG_NBYTES", "4096")) +ELEM_BYTES = 2 +N_ELEM = NBYTES // ELEM_BYTES +N_CUBES = 16 +N_PES = 8 +HOP = os.environ.get("DIAG_HOP", "h3") +if HOP == "h4": + SRC = (0, 0, 0) + DST = (0, 4, 0) # h4 inter-cube vertical +else: + SRC = (0, 0, 0) + DST = (0, 1, 0) # h3 inter-cube horizontal + + +# ── Per-PE pe_exec_ns capture via monkey-patch ─────────────────────── + + +def _install_barrier_capture(): + """Wrap PeCpuComponent._execute_kernel to log, for every PE that + enters: env.now at entry, target_start_ns the request carried, + whether the barrier yield fired (i.e. env.now < target_start_ns), + and env.now at pe_exec_start. + """ + import kernbench.components.builtin.pe_cpu as pe_cpu_mod + + log: list[dict] = [] + original = pe_cpu_mod.PeCpuComponent._execute_kernel + + def patched(self, env, txn): + request = txn.request + target_start = getattr(request, "target_start_ns", None) + entry_now = float(env.now) + log_entry = { + "node_id": self.node.id, + "entry_now": entry_now, + "target_start_ns": ( + float(target_start) if target_start is not None else None + ), + "barrier_skipped": ( + target_start is None + or float(target_start) <= entry_now + ), + "delta_late_ns": ( + None if target_start is None + else max(0.0, entry_now - float(target_start)) + ), + } + log.append(log_entry) + yield from original(self, env, txn) + + pe_cpu_mod.PeCpuComponent._execute_kernel = patched + + def restore(): + pe_cpu_mod.PeCpuComponent._execute_kernel = original + + return log, restore + + +def _install_per_pe_capture(): + """Wrap PeCpuComponent._execute_kernel so we record (node_id -> + pe_exec_ns) for every PE that executes a kernel during the run. + + Returns (capture_dict, restore_callable). + """ + import kernbench.components.builtin.pe_cpu as pe_cpu_mod + + captured: dict[str, float] = {} + original = pe_cpu_mod.PeCpuComponent._execute_kernel + + def patched(self, env, txn): + gen = original(self, env, txn) + try: + value = yield from gen + finally: + v = txn.result_data.get("pe_exec_ns") + if v is not None: + captured[self.node.id] = float(v) + return value + + pe_cpu_mod.PeCpuComponent._execute_kernel = patched + + def restore(): + pe_cpu_mod.PeCpuComponent._execute_kernel = original + + return captured, restore + + +def _install_recv_capture(target_node_id: str): + """Wrap PeIpcqComponent._handle_recv to log entry/exit times and the + peer_head_cache/my_tail values seen at the start. + + This pins down whether recv ever blocked on a wait_event, or whether + it consumed without waiting (i.e. peer_head_cache > my_tail at entry). + """ + import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod + + log: list[dict] = [] + original = pe_ipcq_mod.PeIpcqComponent._handle_recv + + def patched(self, env, req, cmd): + if self.node.id != target_node_id: + yield from original(self, env, req, cmd) + return + # Snapshot state before dispatch + d = cmd.direction + qp = self._queue_pairs.get(d, {}) + log.append({ + "phase": "enter", + "t": float(env.now), + "direction": d, + "peer_head_cache": qp.get("peer_head_cache"), + "my_tail": qp.get("my_tail"), + }) + yield from original(self, env, req, cmd) + qp = self._queue_pairs.get(d, {}) + log.append({ + "phase": "exit", + "t": float(env.now), + "direction": d, + "peer_head_cache": qp.get("peer_head_cache"), + "my_tail": qp.get("my_tail"), + }) + + pe_ipcq_mod.PeIpcqComponent._handle_recv = patched + + def restore(): + pe_ipcq_mod.PeIpcqComponent._handle_recv = original + + return log, restore + + +def _install_meta_arrival_capture(target_node_id: str): + """Log every IpcqMetaArrival that lands on ``target_node_id`` PE_IPCQ. + + Records (env_now, sender_seq, dst_addr, matched_direction, + peer_head_cache_before, my_tail_before). + """ + import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod + + log: list[dict] = [] + original = pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival + + def patched(self, msg): + if self.node.id == target_node_id: + token = msg.token + now = float(self._env.now) if hasattr(self, "_env") else 0.0 + # _env is not stored on the component; use ctx? Fall back to + # introspection via self._inbox._env (SimPy stores reference). + try: + now = float(self._inbox._env.now) + except Exception: + pass + entry = { + "t": now, + "sender_seq": getattr(token, "sender_seq", None), + "dst_addr": getattr(token, "dst_addr", None), + "src_sip": getattr(token, "src_sip", None), + "src_cube": getattr(token, "src_cube", None), + "src_pe": getattr(token, "src_pe", None), + "src_direction": getattr(token, "src_direction", None), + "nbytes": getattr(token, "nbytes", None), + "matched_direction": None, + "peer_head_cache_before": {}, + "my_tail_before": {}, + } + for d, qp in self._queue_pairs.items(): + entry["peer_head_cache_before"][d] = qp["peer_head_cache"] + entry["my_tail_before"][d] = qp["my_tail"] + base = qp["my_rx_base_pa"] + size = qp["n_slots"] * qp["slot_size"] + if base <= entry["dst_addr"] < base + size: + entry["matched_direction"] = d + log.append(entry) + return original(self, msg) + + pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = patched + + def restore(): + pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = original + + return log, restore + + +def _snapshot_qp_state(engine, target_node_id: str) -> dict: + """Snapshot every direction's qp state on the target PE_IPCQ now. + + Captures peer_head_cache, my_tail, my_rx_base_pa, n_slots, slot_size + for each installed direction. + """ + comp = engine._components.get(target_node_id) + if comp is None: + return {} + return { + d: { + "peer_head_cache": qp["peer_head_cache"], + "my_tail": qp["my_tail"], + "my_rx_base_pa": qp["my_rx_base_pa"], + "n_slots": qp["n_slots"], + "slot_size": qp["slot_size"], + "rx_range": ( + qp["my_rx_base_pa"], + qp["my_rx_base_pa"] + qp["n_slots"] * qp["slot_size"], + ), + } + for d, qp in comp.queue_pairs.items() + } + + +# ── Path / drain breakdown using engine ctx ────────────────────────── + + +def _path_breakdown(ctx, path: list[str], nbytes: int) -> dict: + edge_total_ns = 0.0 + edge_details = [] + min_bw = float("inf") + for i in range(len(path) - 1): + edge = ctx.edge_map.get((path[i], path[i + 1])) + if edge is None: + edge_details.append((path[i], path[i + 1], None, None, None)) + continue + prop_ns = edge.distance_mm * ctx.ns_per_mm + edge_total_ns += prop_ns + bw = getattr(edge, "bw_gbs", None) or 0.0 + if bw > 0 and bw < min_bw: + min_bw = bw + edge_details.append( + (path[i], path[i + 1], edge.distance_mm, prop_ns, bw), + ) + + overhead_total_ns = 0.0 + overhead_details = [] + for nid in path: + oh = float(ctx.node_overhead_ns.get(nid, 0.0)) + overhead_total_ns += oh + overhead_details.append((nid, oh)) + + drain_ns = ctx.compute_drain_ns(path, nbytes) + bottleneck_bw = None if min_bw == float("inf") else min_bw + + return { + "path": path, + "edges": edge_details, + "edge_total_ns": edge_total_ns, + "overheads": overhead_details, + "overhead_total_ns": overhead_total_ns, + "drain_ns": drain_ns, + "bottleneck_bw_gbs": bottleneck_bw, + "expected_total_ns": edge_total_ns + overhead_total_ns + drain_ns, + } + + +def _print_breakdown(label: str, br: dict) -> None: + print(f"\n {label}") + print(f" path ({len(br['path'])} nodes):") + for nid in br["path"]: + print(f" - {nid}") + print(f" edges (prop. delay):") + for src, dst, dist_mm, prop_ns, bw in br["edges"]: + if dist_mm is None: + print(f" ! {src} -> {dst} EDGE NOT FOUND IN edge_map") + continue + print( + f" {src} -> {dst} " + f"dist={dist_mm:.3f}mm prop={prop_ns:.2f}ns " + f"bw={bw or 0:.2f}GB/s" + ) + print(f" per-node overhead_ns:") + for nid, oh in br["overheads"]: + if oh > 0: + print(f" {nid:<60s} overhead_ns={oh:.2f}") + print(f" edge_total_ns = {br['edge_total_ns']:.2f}") + print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}") + print(f" bottleneck_bw_gbs = {br['bottleneck_bw_gbs']}") + print(f" drain_ns (nbytes={NBYTES}) = {br['drain_ns']:.2f}") + print(f" expected_total_ns = {br['expected_total_ns']:.2f}") + + +# ── RAW path scenario ──────────────────────────────────────────────── + + +def _dump_src_op_records(engine, src_sip, src_cube, src_pe, label) -> None: + """Print op_logger records for ops on the SRC PE. + + The op log captures t_start/t_end for memory/math/gemm/copy ops on + every component, so we can see how long tl.load vs tl.store vs + tl.send actually took at the engine level. + """ + op_logger = getattr(engine, "_op_logger", None) + if op_logger is None: + print(f" ({label}) op_logger not available") + return + src_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}." + recs = [r for r in op_logger.records if r.component_id.startswith(src_prefix)] + print(f" ({label}) op_logger records on SRC PE ({src_prefix}*):") + for r in recs[:40]: + dur = r.t_end - r.t_start + comp_short = r.component_id.replace(src_prefix, "") + params_short = "" + if "nbytes" in r.params: + params_short = f" nbytes={r.params['nbytes']}" + if "src_addr" in r.params: + params_short += f" src_addr={r.params['src_addr']}" + if "dst_addr" in r.params: + params_short += f" dst_addr={r.params['dst_addr']}" + print( + f" t=[{r.t_start:7.2f}..{r.t_end:7.2f}] dur={dur:6.2f}ns " + f"{comp_short:<25s} {r.op_kind:<8s} {r.op_name:<12s}{params_short}" + ) + + +def _run_raw(): + captured, restore = _install_per_pe_capture() + try: + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + src_sip, src_cube, src_pe = SRC + dst_sip, dst_cube, dst_pe = DST + assert src_sip == dst_sip + + src_off = (src_cube * N_PES + src_pe) * N_ELEM * ELEM_BYTES + dst_off = (dst_cube * N_PES + dst_pe) * N_ELEM * ELEM_BYTES + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id="diag_raw", + spec=spec, + ) as rt: + dp = DPPolicy( + cube="row_wise", pe="column_wise", + num_cubes=N_CUBES, num_pes=N_PES, + ) + rt.ahbm.set_device(src_sip) + t = rt.zeros( + (N_CUBES, N_PES * N_ELEM), dtype="f16", + dp=dp, name="raw_tensor", + ) + t.copy_(rt.from_numpy( + np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16), + )) + + def kernel(t_ptr, n_elem, tl): + pe_id = tl.program_id(axis=0) + cube_id = tl.program_id(axis=1) + if cube_id == src_cube and pe_id == src_pe: + data = tl.load( + t_ptr + src_off, shape=(n_elem,), dtype="f16", + ) + tl.store(t_ptr + dst_off, data) + + pending = rt.launch( + "diag_raw_kernel", kernel, t, N_ELEM, _defer_wait=True, + ) + for h, _sip, meta in pending: + rt.wait(h, _meta=meta) + + # Compute the RAW sub-txn path: src PE_DMA -> dst HBM_CTRL + from kernbench.policy.address.phyaddr import PhysAddr + ctx = next(iter(engine._components.values())).ctx + src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}" + # Resolve dst PA to HBM controller node + # The raw store kernel issues DmaWriteCmd on dst VA; in the engine + # this is translated via PE_MMU. For diagnostic we approximate + # the destination as the dst cube's HBM controller for slice + # belonging to dst_pe. + # Use the resolver on a constructed PA matching the same memory + # slice the kernel writes to. + # The tensor is "row_wise" sharded across cubes, so each cube + # owns row[cube_id, :], with each PE owning a column slice. + # The actual dst PA depends on the AHBM allocator; we read it + # via the tensor's shard map. + shard_map = getattr(t, "_shard_map", None) or getattr(t, "shard_map", None) + # Fallback: query the resolver directly by constructing a PA in + # the dst cube's HBM region. If shard_map is unavailable, still + # show the breakdown for src-PE-DMA -> first reachable HBM_CTRL + # in dst cube. + dst_hbm_id = f"sip{dst_sip}.cube{dst_cube}.hbm_ctrl" + if dst_hbm_id not in engine._components: + # try alternate naming + for nid in engine._components.keys(): + if ( + nid.startswith(f"sip{dst_sip}.cube{dst_cube}.") + and "hbm" in nid + ): + dst_hbm_id = nid + break + + # find_path() prepends ".pe_dma" to src_pe automatically + try: + raw_path = ctx.router.find_path(src_pe_prefix, dst_hbm_id) + except Exception as e: + raw_path = [] + print(f" WARN: find_path raw failed: {e}") + if not raw_path: + # Try other HBM-related node names in dst cube + for nid in engine._components.keys(): + if not nid.startswith(f"sip{dst_sip}.cube{dst_cube}."): + continue + if "hbm" not in nid: + continue + try: + p = ctx.router.find_path(src_pe_prefix, nid) + except Exception: + p = [] + if p: + raw_path = p + print(f" (fallback raw dst node: {nid})") + break + + return captured, ctx, raw_path, engine + finally: + restore() + + +# ── IPCQ path scenario ─────────────────────────────────────────────── + + +def _run_ipcq(): + captured, restore = _install_per_pe_capture() + dst_pe_ipcq_id = ( + f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_ipcq" + ) + arrival_log, restore_arrival = _install_meta_arrival_capture( + dst_pe_ipcq_id, + ) + recv_log, restore_recv = _install_recv_capture(dst_pe_ipcq_id) + barrier_log, restore_barrier = _install_barrier_capture() + try: + topo = resolve_topology(str(TOPOLOGY_PATH)) + engine = GraphEngine(topo.topology_obj, enable_data=True) + spec = topo.topology_obj.spec + + src_sip, src_cube, src_pe = SRC + dst_sip, dst_cube, dst_pe = DST + + cfg = load_ccl_config() + merged = resolve_algorithm_config(cfg, name="intercube_allreduce") + merged["slot_size"] = max(int(merged.get("slot_size", 4096)), NBYTES) + + with RuntimeContext( + engine=engine, + target_device=DeviceSelector("all"), + correlation_id="diag_ipcq", + spec=spec, + ) as rt: + configure_sfr_intercube_multisip(engine, spec, merged) + dp = DPPolicy( + cube="row_wise", pe="column_wise", + num_cubes=N_CUBES, num_pes=N_PES, + ) + + def kernel(t_ptr, n_elem, tl): + pe_id = tl.program_id(axis=0) + cube_id = tl.program_id(axis=1) + if cube_id == src_cube and pe_id == src_pe: + data = tl.load(t_ptr, shape=(n_elem,), dtype="f16") + tl.send(dir=("E" if HOP == "h3" else "S"), src=data) + elif cube_id == dst_cube and pe_id == dst_pe: + tl.recv( + dir=("W" if HOP == "h3" else "N"), + shape=(n_elem,), dtype="f16", + ) + + tensors = [] + for s in sorted({src_sip, dst_sip}): + rt.ahbm.set_device(s) + t = rt.zeros( + (N_CUBES, N_PES * N_ELEM), dtype="f16", + dp=dp, name=f"sip{s}", + ) + t.copy_(rt.from_numpy( + np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16), + )) + tensors.append(t) + + all_pending = [] + for tt in tensors: + pending = rt.launch( + "diag_ipcq_kernel", kernel, tt, N_ELEM, _defer_wait=True, + ) + all_pending.extend(pending) + for h, _sip, meta in all_pending: + rt.wait(h, _meta=meta) + + ctx = next(iter(engine._components.values())).ctx + src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}" + dst_pe_dma = f"sip{dst_sip}.cube{dst_cube}.pe{dst_pe}.pe_dma" + try: + ipcq_path = ctx.router.find_path(src_pe_prefix, dst_pe_dma) + except Exception as e: + ipcq_path = [] + print(f" WARN: find_path ipcq failed: {e}") + # Snapshot DST PE_IPCQ qp state at end-of-run so we can see what + # peer_head_cache/my_tail looked like (and at which directions). + qp_state = _snapshot_qp_state(engine, dst_pe_ipcq_id) + return (captured, ctx, ipcq_path, engine, + arrival_log, qp_state, recv_log, barrier_log) + finally: + restore_barrier() + restore_recv() + restore_arrival() + restore() + + +# ── Test entry ─────────────────────────────────────────────────────── + + +@pytest.mark.diagnostic +def test_pe_to_pe_diagnostic_h3(): + print("\n" + "=" * 78) + print(f" Diagnostic: h3 inter-cube horizontal, nbytes={NBYTES}") + print(f" src={SRC} dst={DST}") + print("=" * 78) + + # ── RAW scenario + print("\n[RAW] tl.load + tl.store (sender pays both legs)") + raw_per_pe, raw_ctx, raw_path, raw_engine = _run_raw() + print(f" per-PE pe_exec_ns ({len(raw_per_pe)} entries):") + src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu" + dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu" + for nid in (src_id, dst_id): + if nid in raw_per_pe: + print(f" {nid:<60s} {raw_per_pe[nid]:.2f} ns <-- key PE") + nonzero = {k: v for k, v in raw_per_pe.items() if v > 0.5} + if nonzero: + print(f" other PEs with pe_exec_ns > 0.5 ns:") + for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]: + if nid not in (src_id, dst_id): + print(f" {nid:<60s} {v:.2f} ns") + print(f" max(pe_exec_ns) = " + f"{max(raw_per_pe.values()) if raw_per_pe else 0:.2f} ns") + + if raw_path: + br = _path_breakdown(raw_ctx, raw_path, NBYTES) + _print_breakdown("RAW sub-txn path (src.pe_dma -> dst.hbm_ctrl)", br) + _dump_src_op_records(raw_engine, *SRC, "RAW") + + # ── IPCQ scenario + print("\n[IPCQ] tl.send + tl.recv (recv pays inbound traversal+drain)") + (ipcq_per_pe, ipcq_ctx, ipcq_path, ipcq_engine, + arrival_log, qp_state, recv_log, barrier_log) = _run_ipcq() + print(f"\n [BARRIER LOG] {len(barrier_log)} _execute_kernel entries:") + src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu" + dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu" + n_skipped = 0 + src_entry = None + dst_entry = None + for e in barrier_log: + if e["barrier_skipped"]: + n_skipped += 1 + if e["node_id"] == src_id: + src_entry = e + if e["node_id"] == dst_id: + dst_entry = e + print(f" PEs entering _execute_kernel: {len(barrier_log)}") + print(f" PEs that SKIPPED barrier (env.now > target_start): {n_skipped}") + if src_entry: + print( + f" SRC pe ({src_id}): entry_now={src_entry['entry_now']:.2f} " + f"target_start={src_entry['target_start_ns']:.2f} " + f"skipped={src_entry['barrier_skipped']} " + f"late_ns={src_entry['delta_late_ns']:.2f}" + ) + if dst_entry: + print( + f" DST pe ({dst_id}): entry_now={dst_entry['entry_now']:.2f} " + f"target_start={dst_entry['target_start_ns']:.2f} " + f"skipped={dst_entry['barrier_skipped']} " + f"late_ns={dst_entry['delta_late_ns']:.2f}" + ) + # Top 5 latest arrivals + sorted_late = sorted( + [e for e in barrier_log if e["delta_late_ns"] is not None], + key=lambda e: -e["delta_late_ns"], + )[:5] + print(f" Top 5 latest PE arrivals (positive = barrier missed):") + for e in sorted_late: + if e["delta_late_ns"] > 0: + print( + f" {e['node_id']}: late by {e['delta_late_ns']:.2f} ns " + f"(entry={e['entry_now']:.2f}, target={e['target_start_ns']:.2f})" + ) + print(f"\n [RECV LOG on dst pe_ipcq] {len(recv_log)} entries:") + for e in recv_log: + print( + f" {e['phase']:5s} t={e['t']:8.2f} ns " + f"dir={e['direction']} " + f"peer_head_cache={e['peer_head_cache']} " + f"my_tail={e['my_tail']}" + ) + print(f"\n [META-ARRIVAL LOG on dst pe_ipcq] {len(arrival_log)} arrivals:") + for i, e in enumerate(arrival_log): + print( + f" #{i:2d} t={e['t']:8.2f} ns " + f"src=(sip{e['src_sip']},cube{e['src_cube']},pe{e['src_pe']}) " + f"dir={e['src_direction']} " + f"sender_seq={e['sender_seq']} " + f"matched_dir={e['matched_direction']} " + f"nbytes={e['nbytes']}" + ) + for d, ph in e["peer_head_cache_before"].items(): + mt = e["my_tail_before"][d] + if ph != 0 or mt != 0 or d == e["matched_direction"]: + print( + f" before: dir={d} peer_head_cache={ph} my_tail={mt}" + ) + print(f"\n [QP STATE END-OF-RUN on dst pe_ipcq]:") + for d, st in qp_state.items(): + print( + f" dir={d} peer_head_cache={st['peer_head_cache']} " + f"my_tail={st['my_tail']} rx_range=[{st['rx_range'][0]}..." + f"{st['rx_range'][1]}) n_slots={st['n_slots']} " + f"slot_size={st['slot_size']}" + ) + print(f" per-PE pe_exec_ns ({len(ipcq_per_pe)} entries):") + for nid in (src_id, dst_id): + if nid in ipcq_per_pe: + print(f" {nid:<60s} {ipcq_per_pe[nid]:.2f} ns <-- key PE") + nonzero = {k: v for k, v in ipcq_per_pe.items() if v > 0.5} + if nonzero: + print(f" other PEs with pe_exec_ns > 0.5 ns:") + for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]: + if nid not in (src_id, dst_id): + print(f" {nid:<60s} {v:.2f} ns") + print(f" max(pe_exec_ns) = " + f"{max(ipcq_per_pe.values()) if ipcq_per_pe else 0:.2f} ns") + + if ipcq_path: + br = _path_breakdown(ipcq_ctx, ipcq_path, NBYTES) + _print_breakdown("IPCQ sub-txn path (src.pe_dma -> peer.pe_dma)", br) + _dump_src_op_records(ipcq_engine, *SRC, "IPCQ") + _dump_src_op_records(ipcq_engine, *DST, "IPCQ DST") + + # ── Credit-return path analysis (where the missing IPCQ "ack" lives) + print("\n" + "-" * 78) + print("Credit-return path (current modeling)") + print("-" * 78) + src_pe_prefix = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}" + dst_pe_prefix = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}" + # PE_IPCQ._credit_latency_ns calls + # ctx.router.find_path(self._pe_prefix, peer_pe_prefix) + # where the *destination* lacks the ".pe_dma" suffix. find_path() + # only auto-appends to the source, so this raises -> the except + # clause silently returns 0.0. Effectively credit latency = 0. + try: + ipcq_ctx.router.find_path(dst_pe_prefix, src_pe_prefix) + bug_caught = False + except Exception as e: + bug_caught = True + print(f" CONFIRMED BUG in _credit_latency_ns: dest lacks '.pe_dma' " + f"-> find_path raises -> caught exception -> returns 0.0") + print(f" Error: {e}") + # The intended credit path is recv -> sender (reverse data direction) + try: + credit_path = ipcq_ctx.router.find_path( + dst_pe_prefix, f"{src_pe_prefix}.pe_dma", + ) + except Exception as e: + credit_path = [] + print(f" WARN: corrected find_path credit failed: {e}") + if credit_path: + credit_size = 16 # PE_IPCQ default _credit_size_bytes + # Today's modeling: drain only, 16 bytes -> ~0.125 ns + cur = ipcq_ctx.compute_drain_ns(credit_path, credit_size) + # Proposed modeling: full path latency (edges + node overhead + drain) + proposed = ipcq_ctx.compute_path_latency_ns(credit_path, credit_size) + print(f" credit path nodes = {len(credit_path)} (recv -> sender)") + for nid in credit_path[:6]: + print(f" {nid}") + if len(credit_path) > 6: + print(f" ... {len(credit_path) - 6} more nodes") + br = _path_breakdown(ipcq_ctx, credit_path, credit_size) + print(f" edge_total_ns = {br['edge_total_ns']:.2f}") + print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}") + print(f" drain_ns(16 bytes) = {br['drain_ns']:.2f}") + print(f" CURRENT _credit_latency_ns (drain only) = {cur:.3f} ns") + print(f" PROPOSED (compute_path_latency_ns) = {proposed:.2f} ns") + print(f" delta = {proposed - cur:+.2f} ns") + + # ── Comparison summary + print("\n" + "-" * 78) + print("Summary") + print("-" * 78) + raw_max = max(raw_per_pe.values()) if raw_per_pe else 0.0 + ipcq_max = max(ipcq_per_pe.values()) if ipcq_per_pe else 0.0 + print(f" RAW max(pe_exec_ns) = {raw_max:.2f} ns") + print(f" IPCQ max(pe_exec_ns) (current) = {ipcq_max:.2f} ns") + print(f" delta (RAW - IPCQ current) = {raw_max - ipcq_max:+.2f} ns") + if credit_path: + ipcq_with_credit = ipcq_max + (proposed - cur) + print( + f" IPCQ projected w/ blocking credit + full path overhead " + f"= {ipcq_with_credit:.2f} ns" + ) + print( + f" delta (RAW - IPCQ projected) = " + f"{raw_max - ipcq_with_credit:+.2f} ns " + f"(<= 0 means IPCQ >= RAW)" + ) + + # No assertions — this is observational. + assert raw_per_pe, "no RAW pe_exec_ns recorded" + assert ipcq_per_pe, "no IPCQ pe_exec_ns recorded"