Files
kernbench2/tests/test_pe_to_pe_diagnostic.py
T
mukesh 90874abbfe ADR-0023 D9: blocking credit-emit with full-path latency
PE_IPCQ._handle_recv now yields-from _delayed_credit_send instead of
spawning it as a fork, so the receiver's pe_exec_ns includes the
credit-return cost. _credit_latency_ns switches from
compute_drain_ns(path, 16) to compute_path_latency_ns(path, 16) and
fixes a latent find_path bug where the destination lacked the
".pe_dma" suffix (silently returned 0 ns under the bare except).

Net effect on h3/h4 inter-cube pe-to-pe latency: IPCQ >= raw DMA at
every size, matching real-HW posted-write semantics. tl.send remains
fire-and-forget. ADR-0023 D9 amended; new diagnostic test
tests/test_pe_to_pe_diagnostic.py captures per-PE pe_exec_ns, paths,
drain, and meta-arrival timing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:12:38 -07:00

742 lines
28 KiB
Python

"""Diagnostic for the inter-cube RAW > IPCQ asymmetry on h3/h4 plots.
Single-shot run at h3 (sip0.cube0.pe0 -> sip0.cube1.pe0), nbytes=4096.
Captures per-PE pe_exec_ns and the actual path / drain / per-node overhead
breakdown for the RAW sub-txn (PE_DMA -> remote HBM_CTRL) vs the IPCQ
outbound sub-txn (PE_DMA -> peer PE_DMA), so we can localize the gap to
one of:
(a) drain at HBM-BW (RAW) vs fabric-BW (IPCQ)
(b) path-length / per-node overhead asymmetry
(c) RAW SRC paying tl.load (local HBM read) on top of remote tl.store
while IPCQ DST only pays inbound traversal+drain.
Phase 1 / test-only. No production code is modified.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pytest
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
import os
# Allow the test to be re-run for h4 (inter-cube vertical) at multiple sizes
# to investigate why IPCQ slope flattens past 8192 B (path may differ).
NBYTES = int(os.environ.get("DIAG_NBYTES", "4096"))
ELEM_BYTES = 2
N_ELEM = NBYTES // ELEM_BYTES
N_CUBES = 16
N_PES = 8
HOP = os.environ.get("DIAG_HOP", "h3")
if HOP == "h4":
SRC = (0, 0, 0)
DST = (0, 4, 0) # h4 inter-cube vertical
else:
SRC = (0, 0, 0)
DST = (0, 1, 0) # h3 inter-cube horizontal
# ── Per-PE pe_exec_ns capture via monkey-patch ───────────────────────
def _install_barrier_capture():
"""Wrap PeCpuComponent._execute_kernel to log, for every PE that
enters: env.now at entry, target_start_ns the request carried,
whether the barrier yield fired (i.e. env.now < target_start_ns),
and env.now at pe_exec_start.
"""
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
log: list[dict] = []
original = pe_cpu_mod.PeCpuComponent._execute_kernel
def patched(self, env, txn):
request = txn.request
target_start = getattr(request, "target_start_ns", None)
entry_now = float(env.now)
log_entry = {
"node_id": self.node.id,
"entry_now": entry_now,
"target_start_ns": (
float(target_start) if target_start is not None else None
),
"barrier_skipped": (
target_start is None
or float(target_start) <= entry_now
),
"delta_late_ns": (
None if target_start is None
else max(0.0, entry_now - float(target_start))
),
}
log.append(log_entry)
yield from original(self, env, txn)
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
def restore():
pe_cpu_mod.PeCpuComponent._execute_kernel = original
return log, restore
def _install_per_pe_capture():
"""Wrap PeCpuComponent._execute_kernel so we record (node_id ->
pe_exec_ns) for every PE that executes a kernel during the run.
Returns (capture_dict, restore_callable).
"""
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
captured: dict[str, float] = {}
original = pe_cpu_mod.PeCpuComponent._execute_kernel
def patched(self, env, txn):
gen = original(self, env, txn)
try:
value = yield from gen
finally:
v = txn.result_data.get("pe_exec_ns")
if v is not None:
captured[self.node.id] = float(v)
return value
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
def restore():
pe_cpu_mod.PeCpuComponent._execute_kernel = original
return captured, restore
def _install_recv_capture(target_node_id: str):
"""Wrap PeIpcqComponent._handle_recv to log entry/exit times and the
peer_head_cache/my_tail values seen at the start.
This pins down whether recv ever blocked on a wait_event, or whether
it consumed without waiting (i.e. peer_head_cache > my_tail at entry).
"""
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
log: list[dict] = []
original = pe_ipcq_mod.PeIpcqComponent._handle_recv
def patched(self, env, req, cmd):
if self.node.id != target_node_id:
yield from original(self, env, req, cmd)
return
# Snapshot state before dispatch
d = cmd.direction
qp = self._queue_pairs.get(d, {})
log.append({
"phase": "enter",
"t": float(env.now),
"direction": d,
"peer_head_cache": qp.get("peer_head_cache"),
"my_tail": qp.get("my_tail"),
})
yield from original(self, env, req, cmd)
qp = self._queue_pairs.get(d, {})
log.append({
"phase": "exit",
"t": float(env.now),
"direction": d,
"peer_head_cache": qp.get("peer_head_cache"),
"my_tail": qp.get("my_tail"),
})
pe_ipcq_mod.PeIpcqComponent._handle_recv = patched
def restore():
pe_ipcq_mod.PeIpcqComponent._handle_recv = original
return log, restore
def _install_meta_arrival_capture(target_node_id: str):
"""Log every IpcqMetaArrival that lands on ``target_node_id`` PE_IPCQ.
Records (env_now, sender_seq, dst_addr, matched_direction,
peer_head_cache_before, my_tail_before).
"""
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
log: list[dict] = []
original = pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival
def patched(self, msg):
if self.node.id == target_node_id:
token = msg.token
now = float(self._env.now) if hasattr(self, "_env") else 0.0
# _env is not stored on the component; use ctx? Fall back to
# introspection via self._inbox._env (SimPy stores reference).
try:
now = float(self._inbox._env.now)
except Exception:
pass
entry = {
"t": now,
"sender_seq": getattr(token, "sender_seq", None),
"dst_addr": getattr(token, "dst_addr", None),
"src_sip": getattr(token, "src_sip", None),
"src_cube": getattr(token, "src_cube", None),
"src_pe": getattr(token, "src_pe", None),
"src_direction": getattr(token, "src_direction", None),
"nbytes": getattr(token, "nbytes", None),
"matched_direction": None,
"peer_head_cache_before": {},
"my_tail_before": {},
}
for d, qp in self._queue_pairs.items():
entry["peer_head_cache_before"][d] = qp["peer_head_cache"]
entry["my_tail_before"][d] = qp["my_tail"]
base = qp["my_rx_base_pa"]
size = qp["n_slots"] * qp["slot_size"]
if base <= entry["dst_addr"] < base + size:
entry["matched_direction"] = d
log.append(entry)
return original(self, msg)
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = patched
def restore():
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = original
return log, restore
def _snapshot_qp_state(engine, target_node_id: str) -> dict:
"""Snapshot every direction's qp state on the target PE_IPCQ now.
Captures peer_head_cache, my_tail, my_rx_base_pa, n_slots, slot_size
for each installed direction.
"""
comp = engine._components.get(target_node_id)
if comp is None:
return {}
return {
d: {
"peer_head_cache": qp["peer_head_cache"],
"my_tail": qp["my_tail"],
"my_rx_base_pa": qp["my_rx_base_pa"],
"n_slots": qp["n_slots"],
"slot_size": qp["slot_size"],
"rx_range": (
qp["my_rx_base_pa"],
qp["my_rx_base_pa"] + qp["n_slots"] * qp["slot_size"],
),
}
for d, qp in comp.queue_pairs.items()
}
# ── Path / drain breakdown using engine ctx ──────────────────────────
def _path_breakdown(ctx, path: list[str], nbytes: int) -> dict:
edge_total_ns = 0.0
edge_details = []
min_bw = float("inf")
for i in range(len(path) - 1):
edge = ctx.edge_map.get((path[i], path[i + 1]))
if edge is None:
edge_details.append((path[i], path[i + 1], None, None, None))
continue
prop_ns = edge.distance_mm * ctx.ns_per_mm
edge_total_ns += prop_ns
bw = getattr(edge, "bw_gbs", None) or 0.0
if bw > 0 and bw < min_bw:
min_bw = bw
edge_details.append(
(path[i], path[i + 1], edge.distance_mm, prop_ns, bw),
)
overhead_total_ns = 0.0
overhead_details = []
for nid in path:
oh = float(ctx.node_overhead_ns.get(nid, 0.0))
overhead_total_ns += oh
overhead_details.append((nid, oh))
drain_ns = ctx.compute_drain_ns(path, nbytes)
bottleneck_bw = None if min_bw == float("inf") else min_bw
return {
"path": path,
"edges": edge_details,
"edge_total_ns": edge_total_ns,
"overheads": overhead_details,
"overhead_total_ns": overhead_total_ns,
"drain_ns": drain_ns,
"bottleneck_bw_gbs": bottleneck_bw,
"expected_total_ns": edge_total_ns + overhead_total_ns + drain_ns,
}
def _print_breakdown(label: str, br: dict) -> None:
print(f"\n {label}")
print(f" path ({len(br['path'])} nodes):")
for nid in br["path"]:
print(f" - {nid}")
print(f" edges (prop. delay):")
for src, dst, dist_mm, prop_ns, bw in br["edges"]:
if dist_mm is None:
print(f" ! {src} -> {dst} EDGE NOT FOUND IN edge_map")
continue
print(
f" {src} -> {dst} "
f"dist={dist_mm:.3f}mm prop={prop_ns:.2f}ns "
f"bw={bw or 0:.2f}GB/s"
)
print(f" per-node overhead_ns:")
for nid, oh in br["overheads"]:
if oh > 0:
print(f" {nid:<60s} overhead_ns={oh:.2f}")
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
print(f" bottleneck_bw_gbs = {br['bottleneck_bw_gbs']}")
print(f" drain_ns (nbytes={NBYTES}) = {br['drain_ns']:.2f}")
print(f" expected_total_ns = {br['expected_total_ns']:.2f}")
# ── RAW path scenario ────────────────────────────────────────────────
def _dump_src_op_records(engine, src_sip, src_cube, src_pe, label) -> None:
"""Print op_logger records for ops on the SRC PE.
The op log captures t_start/t_end for memory/math/gemm/copy ops on
every component, so we can see how long tl.load vs tl.store vs
tl.send actually took at the engine level.
"""
op_logger = getattr(engine, "_op_logger", None)
if op_logger is None:
print(f" ({label}) op_logger not available")
return
src_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}."
recs = [r for r in op_logger.records if r.component_id.startswith(src_prefix)]
print(f" ({label}) op_logger records on SRC PE ({src_prefix}*):")
for r in recs[:40]:
dur = r.t_end - r.t_start
comp_short = r.component_id.replace(src_prefix, "")
params_short = ""
if "nbytes" in r.params:
params_short = f" nbytes={r.params['nbytes']}"
if "src_addr" in r.params:
params_short += f" src_addr={r.params['src_addr']}"
if "dst_addr" in r.params:
params_short += f" dst_addr={r.params['dst_addr']}"
print(
f" t=[{r.t_start:7.2f}..{r.t_end:7.2f}] dur={dur:6.2f}ns "
f"{comp_short:<25s} {r.op_kind:<8s} {r.op_name:<12s}{params_short}"
)
def _run_raw():
captured, restore = _install_per_pe_capture()
try:
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
src_sip, src_cube, src_pe = SRC
dst_sip, dst_cube, dst_pe = DST
assert src_sip == dst_sip
src_off = (src_cube * N_PES + src_pe) * N_ELEM * ELEM_BYTES
dst_off = (dst_cube * N_PES + dst_pe) * N_ELEM * ELEM_BYTES
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="diag_raw",
spec=spec,
) as rt:
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
rt.ahbm.set_device(src_sip)
t = rt.zeros(
(N_CUBES, N_PES * N_ELEM), dtype="f16",
dp=dp, name="raw_tensor",
)
t.copy_(rt.from_numpy(
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
))
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(
t_ptr + src_off, shape=(n_elem,), dtype="f16",
)
tl.store(t_ptr + dst_off, data)
pending = rt.launch(
"diag_raw_kernel", kernel, t, N_ELEM, _defer_wait=True,
)
for h, _sip, meta in pending:
rt.wait(h, _meta=meta)
# Compute the RAW sub-txn path: src PE_DMA -> dst HBM_CTRL
from kernbench.policy.address.phyaddr import PhysAddr
ctx = next(iter(engine._components.values())).ctx
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
# Resolve dst PA to HBM controller node
# The raw store kernel issues DmaWriteCmd on dst VA; in the engine
# this is translated via PE_MMU. For diagnostic we approximate
# the destination as the dst cube's HBM controller for slice
# belonging to dst_pe.
# Use the resolver on a constructed PA matching the same memory
# slice the kernel writes to.
# The tensor is "row_wise" sharded across cubes, so each cube
# owns row[cube_id, :], with each PE owning a column slice.
# The actual dst PA depends on the AHBM allocator; we read it
# via the tensor's shard map.
shard_map = getattr(t, "_shard_map", None) or getattr(t, "shard_map", None)
# Fallback: query the resolver directly by constructing a PA in
# the dst cube's HBM region. If shard_map is unavailable, still
# show the breakdown for src-PE-DMA -> first reachable HBM_CTRL
# in dst cube.
dst_hbm_id = f"sip{dst_sip}.cube{dst_cube}.hbm_ctrl"
if dst_hbm_id not in engine._components:
# try alternate naming
for nid in engine._components.keys():
if (
nid.startswith(f"sip{dst_sip}.cube{dst_cube}.")
and "hbm" in nid
):
dst_hbm_id = nid
break
# find_path() prepends ".pe_dma" to src_pe automatically
try:
raw_path = ctx.router.find_path(src_pe_prefix, dst_hbm_id)
except Exception as e:
raw_path = []
print(f" WARN: find_path raw failed: {e}")
if not raw_path:
# Try other HBM-related node names in dst cube
for nid in engine._components.keys():
if not nid.startswith(f"sip{dst_sip}.cube{dst_cube}."):
continue
if "hbm" not in nid:
continue
try:
p = ctx.router.find_path(src_pe_prefix, nid)
except Exception:
p = []
if p:
raw_path = p
print(f" (fallback raw dst node: {nid})")
break
return captured, ctx, raw_path, engine
finally:
restore()
# ── IPCQ path scenario ───────────────────────────────────────────────
def _run_ipcq():
captured, restore = _install_per_pe_capture()
dst_pe_ipcq_id = (
f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_ipcq"
)
arrival_log, restore_arrival = _install_meta_arrival_capture(
dst_pe_ipcq_id,
)
recv_log, restore_recv = _install_recv_capture(dst_pe_ipcq_id)
barrier_log, restore_barrier = _install_barrier_capture()
try:
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
src_sip, src_cube, src_pe = SRC
dst_sip, dst_cube, dst_pe = DST
cfg = load_ccl_config()
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), NBYTES)
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="diag_ipcq",
spec=spec,
) as rt:
configure_sfr_intercube_multisip(engine, spec, merged)
dp = DPPolicy(
cube="row_wise", pe="column_wise",
num_cubes=N_CUBES, num_pes=N_PES,
)
def kernel(t_ptr, n_elem, tl):
pe_id = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
if cube_id == src_cube and pe_id == src_pe:
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
tl.send(dir=("E" if HOP == "h3" else "S"), src=data)
elif cube_id == dst_cube and pe_id == dst_pe:
tl.recv(
dir=("W" if HOP == "h3" else "N"),
shape=(n_elem,), dtype="f16",
)
tensors = []
for s in sorted({src_sip, dst_sip}):
rt.ahbm.set_device(s)
t = rt.zeros(
(N_CUBES, N_PES * N_ELEM), dtype="f16",
dp=dp, name=f"sip{s}",
)
t.copy_(rt.from_numpy(
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
))
tensors.append(t)
all_pending = []
for tt in tensors:
pending = rt.launch(
"diag_ipcq_kernel", kernel, tt, N_ELEM, _defer_wait=True,
)
all_pending.extend(pending)
for h, _sip, meta in all_pending:
rt.wait(h, _meta=meta)
ctx = next(iter(engine._components.values())).ctx
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
dst_pe_dma = f"sip{dst_sip}.cube{dst_cube}.pe{dst_pe}.pe_dma"
try:
ipcq_path = ctx.router.find_path(src_pe_prefix, dst_pe_dma)
except Exception as e:
ipcq_path = []
print(f" WARN: find_path ipcq failed: {e}")
# Snapshot DST PE_IPCQ qp state at end-of-run so we can see what
# peer_head_cache/my_tail looked like (and at which directions).
qp_state = _snapshot_qp_state(engine, dst_pe_ipcq_id)
return (captured, ctx, ipcq_path, engine,
arrival_log, qp_state, recv_log, barrier_log)
finally:
restore_barrier()
restore_recv()
restore_arrival()
restore()
# ── Test entry ───────────────────────────────────────────────────────
@pytest.mark.diagnostic
def test_pe_to_pe_diagnostic_h3():
print("\n" + "=" * 78)
print(f" Diagnostic: h3 inter-cube horizontal, nbytes={NBYTES}")
print(f" src={SRC} dst={DST}")
print("=" * 78)
# ── RAW scenario
print("\n[RAW] tl.load + tl.store (sender pays both legs)")
raw_per_pe, raw_ctx, raw_path, raw_engine = _run_raw()
print(f" per-PE pe_exec_ns ({len(raw_per_pe)} entries):")
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
for nid in (src_id, dst_id):
if nid in raw_per_pe:
print(f" {nid:<60s} {raw_per_pe[nid]:.2f} ns <-- key PE")
nonzero = {k: v for k, v in raw_per_pe.items() if v > 0.5}
if nonzero:
print(f" other PEs with pe_exec_ns > 0.5 ns:")
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
if nid not in (src_id, dst_id):
print(f" {nid:<60s} {v:.2f} ns")
print(f" max(pe_exec_ns) = "
f"{max(raw_per_pe.values()) if raw_per_pe else 0:.2f} ns")
if raw_path:
br = _path_breakdown(raw_ctx, raw_path, NBYTES)
_print_breakdown("RAW sub-txn path (src.pe_dma -> dst.hbm_ctrl)", br)
_dump_src_op_records(raw_engine, *SRC, "RAW")
# ── IPCQ scenario
print("\n[IPCQ] tl.send + tl.recv (recv pays inbound traversal+drain)")
(ipcq_per_pe, ipcq_ctx, ipcq_path, ipcq_engine,
arrival_log, qp_state, recv_log, barrier_log) = _run_ipcq()
print(f"\n [BARRIER LOG] {len(barrier_log)} _execute_kernel entries:")
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
n_skipped = 0
src_entry = None
dst_entry = None
for e in barrier_log:
if e["barrier_skipped"]:
n_skipped += 1
if e["node_id"] == src_id:
src_entry = e
if e["node_id"] == dst_id:
dst_entry = e
print(f" PEs entering _execute_kernel: {len(barrier_log)}")
print(f" PEs that SKIPPED barrier (env.now > target_start): {n_skipped}")
if src_entry:
print(
f" SRC pe ({src_id}): entry_now={src_entry['entry_now']:.2f} "
f"target_start={src_entry['target_start_ns']:.2f} "
f"skipped={src_entry['barrier_skipped']} "
f"late_ns={src_entry['delta_late_ns']:.2f}"
)
if dst_entry:
print(
f" DST pe ({dst_id}): entry_now={dst_entry['entry_now']:.2f} "
f"target_start={dst_entry['target_start_ns']:.2f} "
f"skipped={dst_entry['barrier_skipped']} "
f"late_ns={dst_entry['delta_late_ns']:.2f}"
)
# Top 5 latest arrivals
sorted_late = sorted(
[e for e in barrier_log if e["delta_late_ns"] is not None],
key=lambda e: -e["delta_late_ns"],
)[:5]
print(f" Top 5 latest PE arrivals (positive = barrier missed):")
for e in sorted_late:
if e["delta_late_ns"] > 0:
print(
f" {e['node_id']}: late by {e['delta_late_ns']:.2f} ns "
f"(entry={e['entry_now']:.2f}, target={e['target_start_ns']:.2f})"
)
print(f"\n [RECV LOG on dst pe_ipcq] {len(recv_log)} entries:")
for e in recv_log:
print(
f" {e['phase']:5s} t={e['t']:8.2f} ns "
f"dir={e['direction']} "
f"peer_head_cache={e['peer_head_cache']} "
f"my_tail={e['my_tail']}"
)
print(f"\n [META-ARRIVAL LOG on dst pe_ipcq] {len(arrival_log)} arrivals:")
for i, e in enumerate(arrival_log):
print(
f" #{i:2d} t={e['t']:8.2f} ns "
f"src=(sip{e['src_sip']},cube{e['src_cube']},pe{e['src_pe']}) "
f"dir={e['src_direction']} "
f"sender_seq={e['sender_seq']} "
f"matched_dir={e['matched_direction']} "
f"nbytes={e['nbytes']}"
)
for d, ph in e["peer_head_cache_before"].items():
mt = e["my_tail_before"][d]
if ph != 0 or mt != 0 or d == e["matched_direction"]:
print(
f" before: dir={d} peer_head_cache={ph} my_tail={mt}"
)
print(f"\n [QP STATE END-OF-RUN on dst pe_ipcq]:")
for d, st in qp_state.items():
print(
f" dir={d} peer_head_cache={st['peer_head_cache']} "
f"my_tail={st['my_tail']} rx_range=[{st['rx_range'][0]}..."
f"{st['rx_range'][1]}) n_slots={st['n_slots']} "
f"slot_size={st['slot_size']}"
)
print(f" per-PE pe_exec_ns ({len(ipcq_per_pe)} entries):")
for nid in (src_id, dst_id):
if nid in ipcq_per_pe:
print(f" {nid:<60s} {ipcq_per_pe[nid]:.2f} ns <-- key PE")
nonzero = {k: v for k, v in ipcq_per_pe.items() if v > 0.5}
if nonzero:
print(f" other PEs with pe_exec_ns > 0.5 ns:")
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
if nid not in (src_id, dst_id):
print(f" {nid:<60s} {v:.2f} ns")
print(f" max(pe_exec_ns) = "
f"{max(ipcq_per_pe.values()) if ipcq_per_pe else 0:.2f} ns")
if ipcq_path:
br = _path_breakdown(ipcq_ctx, ipcq_path, NBYTES)
_print_breakdown("IPCQ sub-txn path (src.pe_dma -> peer.pe_dma)", br)
_dump_src_op_records(ipcq_engine, *SRC, "IPCQ")
_dump_src_op_records(ipcq_engine, *DST, "IPCQ DST")
# ── Credit-return path analysis (where the missing IPCQ "ack" lives)
print("\n" + "-" * 78)
print("Credit-return path (current modeling)")
print("-" * 78)
src_pe_prefix = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}"
dst_pe_prefix = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}"
# PE_IPCQ._credit_latency_ns calls
# ctx.router.find_path(self._pe_prefix, peer_pe_prefix)
# where the *destination* lacks the ".pe_dma" suffix. find_path()
# only auto-appends to the source, so this raises -> the except
# clause silently returns 0.0. Effectively credit latency = 0.
try:
ipcq_ctx.router.find_path(dst_pe_prefix, src_pe_prefix)
bug_caught = False
except Exception as e:
bug_caught = True
print(f" CONFIRMED BUG in _credit_latency_ns: dest lacks '.pe_dma' "
f"-> find_path raises -> caught exception -> returns 0.0")
print(f" Error: {e}")
# The intended credit path is recv -> sender (reverse data direction)
try:
credit_path = ipcq_ctx.router.find_path(
dst_pe_prefix, f"{src_pe_prefix}.pe_dma",
)
except Exception as e:
credit_path = []
print(f" WARN: corrected find_path credit failed: {e}")
if credit_path:
credit_size = 16 # PE_IPCQ default _credit_size_bytes
# Today's modeling: drain only, 16 bytes -> ~0.125 ns
cur = ipcq_ctx.compute_drain_ns(credit_path, credit_size)
# Proposed modeling: full path latency (edges + node overhead + drain)
proposed = ipcq_ctx.compute_path_latency_ns(credit_path, credit_size)
print(f" credit path nodes = {len(credit_path)} (recv -> sender)")
for nid in credit_path[:6]:
print(f" {nid}")
if len(credit_path) > 6:
print(f" ... {len(credit_path) - 6} more nodes")
br = _path_breakdown(ipcq_ctx, credit_path, credit_size)
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
print(f" drain_ns(16 bytes) = {br['drain_ns']:.2f}")
print(f" CURRENT _credit_latency_ns (drain only) = {cur:.3f} ns")
print(f" PROPOSED (compute_path_latency_ns) = {proposed:.2f} ns")
print(f" delta = {proposed - cur:+.2f} ns")
# ── Comparison summary
print("\n" + "-" * 78)
print("Summary")
print("-" * 78)
raw_max = max(raw_per_pe.values()) if raw_per_pe else 0.0
ipcq_max = max(ipcq_per_pe.values()) if ipcq_per_pe else 0.0
print(f" RAW max(pe_exec_ns) = {raw_max:.2f} ns")
print(f" IPCQ max(pe_exec_ns) (current) = {ipcq_max:.2f} ns")
print(f" delta (RAW - IPCQ current) = {raw_max - ipcq_max:+.2f} ns")
if credit_path:
ipcq_with_credit = ipcq_max + (proposed - cur)
print(
f" IPCQ projected w/ blocking credit + full path overhead "
f"= {ipcq_with_credit:.2f} ns"
)
print(
f" delta (RAW - IPCQ projected) = "
f"{raw_max - ipcq_with_credit:+.2f} ns "
f"(<= 0 means IPCQ >= RAW)"
)
# No assertions — this is observational.
assert raw_per_pe, "no RAW pe_exec_ns recorded"
assert ipcq_per_pe, "no IPCQ pe_exec_ns recorded"