ADR-0023 D9: blocking credit-emit with full-path latency
PE_IPCQ._handle_recv now yields-from _delayed_credit_send instead of spawning it as a fork, so the receiver's pe_exec_ns includes the credit-return cost. _credit_latency_ns switches from compute_drain_ns(path, 16) to compute_path_latency_ns(path, 16) and fixes a latent find_path bug where the destination lacked the ".pe_dma" suffix (silently returned 0 ns under the bare except). Net effect on h3/h4 inter-cube pe-to-pe latency: IPCQ >= raw DMA at every size, matching real-HW posted-write semantics. tl.send remains fire-and-forget. ADR-0023 D9 amended; new diagnostic test tests/test_pe_to_pe_diagnostic.py captures per-PE pe_exec_ns, paths, drain, and meta-arrival timing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,741 @@
|
||||
"""Diagnostic for the inter-cube RAW > IPCQ asymmetry on h3/h4 plots.
|
||||
|
||||
Single-shot run at h3 (sip0.cube0.pe0 -> sip0.cube1.pe0), nbytes=4096.
|
||||
|
||||
Captures per-PE pe_exec_ns and the actual path / drain / per-node overhead
|
||||
breakdown for the RAW sub-txn (PE_DMA -> remote HBM_CTRL) vs the IPCQ
|
||||
outbound sub-txn (PE_DMA -> peer PE_DMA), so we can localize the gap to
|
||||
one of:
|
||||
(a) drain at HBM-BW (RAW) vs fabric-BW (IPCQ)
|
||||
(b) path-length / per-node overhead asymmetry
|
||||
(c) RAW SRC paying tl.load (local HBM read) on top of remote tl.store
|
||||
while IPCQ DST only pays inbound traversal+drain.
|
||||
|
||||
Phase 1 / test-only. No production code is modified.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
|
||||
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
|
||||
from kernbench.policy.placement.dp import DPPolicy
|
||||
from kernbench.runtime_api.context import RuntimeContext
|
||||
from kernbench.runtime_api.types import DeviceSelector
|
||||
from kernbench.sim_engine.engine import GraphEngine
|
||||
from kernbench.topology.builder import resolve_topology
|
||||
|
||||
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
|
||||
|
||||
import os
|
||||
|
||||
# Allow the test to be re-run for h4 (inter-cube vertical) at multiple sizes
|
||||
# to investigate why IPCQ slope flattens past 8192 B (path may differ).
|
||||
NBYTES = int(os.environ.get("DIAG_NBYTES", "4096"))
|
||||
ELEM_BYTES = 2
|
||||
N_ELEM = NBYTES // ELEM_BYTES
|
||||
N_CUBES = 16
|
||||
N_PES = 8
|
||||
HOP = os.environ.get("DIAG_HOP", "h3")
|
||||
if HOP == "h4":
|
||||
SRC = (0, 0, 0)
|
||||
DST = (0, 4, 0) # h4 inter-cube vertical
|
||||
else:
|
||||
SRC = (0, 0, 0)
|
||||
DST = (0, 1, 0) # h3 inter-cube horizontal
|
||||
|
||||
|
||||
# ── Per-PE pe_exec_ns capture via monkey-patch ───────────────────────
|
||||
|
||||
|
||||
def _install_barrier_capture():
|
||||
"""Wrap PeCpuComponent._execute_kernel to log, for every PE that
|
||||
enters: env.now at entry, target_start_ns the request carried,
|
||||
whether the barrier yield fired (i.e. env.now < target_start_ns),
|
||||
and env.now at pe_exec_start.
|
||||
"""
|
||||
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
|
||||
|
||||
log: list[dict] = []
|
||||
original = pe_cpu_mod.PeCpuComponent._execute_kernel
|
||||
|
||||
def patched(self, env, txn):
|
||||
request = txn.request
|
||||
target_start = getattr(request, "target_start_ns", None)
|
||||
entry_now = float(env.now)
|
||||
log_entry = {
|
||||
"node_id": self.node.id,
|
||||
"entry_now": entry_now,
|
||||
"target_start_ns": (
|
||||
float(target_start) if target_start is not None else None
|
||||
),
|
||||
"barrier_skipped": (
|
||||
target_start is None
|
||||
or float(target_start) <= entry_now
|
||||
),
|
||||
"delta_late_ns": (
|
||||
None if target_start is None
|
||||
else max(0.0, entry_now - float(target_start))
|
||||
),
|
||||
}
|
||||
log.append(log_entry)
|
||||
yield from original(self, env, txn)
|
||||
|
||||
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
|
||||
|
||||
def restore():
|
||||
pe_cpu_mod.PeCpuComponent._execute_kernel = original
|
||||
|
||||
return log, restore
|
||||
|
||||
|
||||
def _install_per_pe_capture():
|
||||
"""Wrap PeCpuComponent._execute_kernel so we record (node_id ->
|
||||
pe_exec_ns) for every PE that executes a kernel during the run.
|
||||
|
||||
Returns (capture_dict, restore_callable).
|
||||
"""
|
||||
import kernbench.components.builtin.pe_cpu as pe_cpu_mod
|
||||
|
||||
captured: dict[str, float] = {}
|
||||
original = pe_cpu_mod.PeCpuComponent._execute_kernel
|
||||
|
||||
def patched(self, env, txn):
|
||||
gen = original(self, env, txn)
|
||||
try:
|
||||
value = yield from gen
|
||||
finally:
|
||||
v = txn.result_data.get("pe_exec_ns")
|
||||
if v is not None:
|
||||
captured[self.node.id] = float(v)
|
||||
return value
|
||||
|
||||
pe_cpu_mod.PeCpuComponent._execute_kernel = patched
|
||||
|
||||
def restore():
|
||||
pe_cpu_mod.PeCpuComponent._execute_kernel = original
|
||||
|
||||
return captured, restore
|
||||
|
||||
|
||||
def _install_recv_capture(target_node_id: str):
|
||||
"""Wrap PeIpcqComponent._handle_recv to log entry/exit times and the
|
||||
peer_head_cache/my_tail values seen at the start.
|
||||
|
||||
This pins down whether recv ever blocked on a wait_event, or whether
|
||||
it consumed without waiting (i.e. peer_head_cache > my_tail at entry).
|
||||
"""
|
||||
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
|
||||
|
||||
log: list[dict] = []
|
||||
original = pe_ipcq_mod.PeIpcqComponent._handle_recv
|
||||
|
||||
def patched(self, env, req, cmd):
|
||||
if self.node.id != target_node_id:
|
||||
yield from original(self, env, req, cmd)
|
||||
return
|
||||
# Snapshot state before dispatch
|
||||
d = cmd.direction
|
||||
qp = self._queue_pairs.get(d, {})
|
||||
log.append({
|
||||
"phase": "enter",
|
||||
"t": float(env.now),
|
||||
"direction": d,
|
||||
"peer_head_cache": qp.get("peer_head_cache"),
|
||||
"my_tail": qp.get("my_tail"),
|
||||
})
|
||||
yield from original(self, env, req, cmd)
|
||||
qp = self._queue_pairs.get(d, {})
|
||||
log.append({
|
||||
"phase": "exit",
|
||||
"t": float(env.now),
|
||||
"direction": d,
|
||||
"peer_head_cache": qp.get("peer_head_cache"),
|
||||
"my_tail": qp.get("my_tail"),
|
||||
})
|
||||
|
||||
pe_ipcq_mod.PeIpcqComponent._handle_recv = patched
|
||||
|
||||
def restore():
|
||||
pe_ipcq_mod.PeIpcqComponent._handle_recv = original
|
||||
|
||||
return log, restore
|
||||
|
||||
|
||||
def _install_meta_arrival_capture(target_node_id: str):
|
||||
"""Log every IpcqMetaArrival that lands on ``target_node_id`` PE_IPCQ.
|
||||
|
||||
Records (env_now, sender_seq, dst_addr, matched_direction,
|
||||
peer_head_cache_before, my_tail_before).
|
||||
"""
|
||||
import kernbench.components.builtin.pe_ipcq as pe_ipcq_mod
|
||||
|
||||
log: list[dict] = []
|
||||
original = pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival
|
||||
|
||||
def patched(self, msg):
|
||||
if self.node.id == target_node_id:
|
||||
token = msg.token
|
||||
now = float(self._env.now) if hasattr(self, "_env") else 0.0
|
||||
# _env is not stored on the component; use ctx? Fall back to
|
||||
# introspection via self._inbox._env (SimPy stores reference).
|
||||
try:
|
||||
now = float(self._inbox._env.now)
|
||||
except Exception:
|
||||
pass
|
||||
entry = {
|
||||
"t": now,
|
||||
"sender_seq": getattr(token, "sender_seq", None),
|
||||
"dst_addr": getattr(token, "dst_addr", None),
|
||||
"src_sip": getattr(token, "src_sip", None),
|
||||
"src_cube": getattr(token, "src_cube", None),
|
||||
"src_pe": getattr(token, "src_pe", None),
|
||||
"src_direction": getattr(token, "src_direction", None),
|
||||
"nbytes": getattr(token, "nbytes", None),
|
||||
"matched_direction": None,
|
||||
"peer_head_cache_before": {},
|
||||
"my_tail_before": {},
|
||||
}
|
||||
for d, qp in self._queue_pairs.items():
|
||||
entry["peer_head_cache_before"][d] = qp["peer_head_cache"]
|
||||
entry["my_tail_before"][d] = qp["my_tail"]
|
||||
base = qp["my_rx_base_pa"]
|
||||
size = qp["n_slots"] * qp["slot_size"]
|
||||
if base <= entry["dst_addr"] < base + size:
|
||||
entry["matched_direction"] = d
|
||||
log.append(entry)
|
||||
return original(self, msg)
|
||||
|
||||
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = patched
|
||||
|
||||
def restore():
|
||||
pe_ipcq_mod.PeIpcqComponent._handle_meta_arrival = original
|
||||
|
||||
return log, restore
|
||||
|
||||
|
||||
def _snapshot_qp_state(engine, target_node_id: str) -> dict:
|
||||
"""Snapshot every direction's qp state on the target PE_IPCQ now.
|
||||
|
||||
Captures peer_head_cache, my_tail, my_rx_base_pa, n_slots, slot_size
|
||||
for each installed direction.
|
||||
"""
|
||||
comp = engine._components.get(target_node_id)
|
||||
if comp is None:
|
||||
return {}
|
||||
return {
|
||||
d: {
|
||||
"peer_head_cache": qp["peer_head_cache"],
|
||||
"my_tail": qp["my_tail"],
|
||||
"my_rx_base_pa": qp["my_rx_base_pa"],
|
||||
"n_slots": qp["n_slots"],
|
||||
"slot_size": qp["slot_size"],
|
||||
"rx_range": (
|
||||
qp["my_rx_base_pa"],
|
||||
qp["my_rx_base_pa"] + qp["n_slots"] * qp["slot_size"],
|
||||
),
|
||||
}
|
||||
for d, qp in comp.queue_pairs.items()
|
||||
}
|
||||
|
||||
|
||||
# ── Path / drain breakdown using engine ctx ──────────────────────────
|
||||
|
||||
|
||||
def _path_breakdown(ctx, path: list[str], nbytes: int) -> dict:
|
||||
edge_total_ns = 0.0
|
||||
edge_details = []
|
||||
min_bw = float("inf")
|
||||
for i in range(len(path) - 1):
|
||||
edge = ctx.edge_map.get((path[i], path[i + 1]))
|
||||
if edge is None:
|
||||
edge_details.append((path[i], path[i + 1], None, None, None))
|
||||
continue
|
||||
prop_ns = edge.distance_mm * ctx.ns_per_mm
|
||||
edge_total_ns += prop_ns
|
||||
bw = getattr(edge, "bw_gbs", None) or 0.0
|
||||
if bw > 0 and bw < min_bw:
|
||||
min_bw = bw
|
||||
edge_details.append(
|
||||
(path[i], path[i + 1], edge.distance_mm, prop_ns, bw),
|
||||
)
|
||||
|
||||
overhead_total_ns = 0.0
|
||||
overhead_details = []
|
||||
for nid in path:
|
||||
oh = float(ctx.node_overhead_ns.get(nid, 0.0))
|
||||
overhead_total_ns += oh
|
||||
overhead_details.append((nid, oh))
|
||||
|
||||
drain_ns = ctx.compute_drain_ns(path, nbytes)
|
||||
bottleneck_bw = None if min_bw == float("inf") else min_bw
|
||||
|
||||
return {
|
||||
"path": path,
|
||||
"edges": edge_details,
|
||||
"edge_total_ns": edge_total_ns,
|
||||
"overheads": overhead_details,
|
||||
"overhead_total_ns": overhead_total_ns,
|
||||
"drain_ns": drain_ns,
|
||||
"bottleneck_bw_gbs": bottleneck_bw,
|
||||
"expected_total_ns": edge_total_ns + overhead_total_ns + drain_ns,
|
||||
}
|
||||
|
||||
|
||||
def _print_breakdown(label: str, br: dict) -> None:
|
||||
print(f"\n {label}")
|
||||
print(f" path ({len(br['path'])} nodes):")
|
||||
for nid in br["path"]:
|
||||
print(f" - {nid}")
|
||||
print(f" edges (prop. delay):")
|
||||
for src, dst, dist_mm, prop_ns, bw in br["edges"]:
|
||||
if dist_mm is None:
|
||||
print(f" ! {src} -> {dst} EDGE NOT FOUND IN edge_map")
|
||||
continue
|
||||
print(
|
||||
f" {src} -> {dst} "
|
||||
f"dist={dist_mm:.3f}mm prop={prop_ns:.2f}ns "
|
||||
f"bw={bw or 0:.2f}GB/s"
|
||||
)
|
||||
print(f" per-node overhead_ns:")
|
||||
for nid, oh in br["overheads"]:
|
||||
if oh > 0:
|
||||
print(f" {nid:<60s} overhead_ns={oh:.2f}")
|
||||
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
|
||||
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
|
||||
print(f" bottleneck_bw_gbs = {br['bottleneck_bw_gbs']}")
|
||||
print(f" drain_ns (nbytes={NBYTES}) = {br['drain_ns']:.2f}")
|
||||
print(f" expected_total_ns = {br['expected_total_ns']:.2f}")
|
||||
|
||||
|
||||
# ── RAW path scenario ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _dump_src_op_records(engine, src_sip, src_cube, src_pe, label) -> None:
|
||||
"""Print op_logger records for ops on the SRC PE.
|
||||
|
||||
The op log captures t_start/t_end for memory/math/gemm/copy ops on
|
||||
every component, so we can see how long tl.load vs tl.store vs
|
||||
tl.send actually took at the engine level.
|
||||
"""
|
||||
op_logger = getattr(engine, "_op_logger", None)
|
||||
if op_logger is None:
|
||||
print(f" ({label}) op_logger not available")
|
||||
return
|
||||
src_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}."
|
||||
recs = [r for r in op_logger.records if r.component_id.startswith(src_prefix)]
|
||||
print(f" ({label}) op_logger records on SRC PE ({src_prefix}*):")
|
||||
for r in recs[:40]:
|
||||
dur = r.t_end - r.t_start
|
||||
comp_short = r.component_id.replace(src_prefix, "")
|
||||
params_short = ""
|
||||
if "nbytes" in r.params:
|
||||
params_short = f" nbytes={r.params['nbytes']}"
|
||||
if "src_addr" in r.params:
|
||||
params_short += f" src_addr={r.params['src_addr']}"
|
||||
if "dst_addr" in r.params:
|
||||
params_short += f" dst_addr={r.params['dst_addr']}"
|
||||
print(
|
||||
f" t=[{r.t_start:7.2f}..{r.t_end:7.2f}] dur={dur:6.2f}ns "
|
||||
f"{comp_short:<25s} {r.op_kind:<8s} {r.op_name:<12s}{params_short}"
|
||||
)
|
||||
|
||||
|
||||
def _run_raw():
|
||||
captured, restore = _install_per_pe_capture()
|
||||
try:
|
||||
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
|
||||
src_sip, src_cube, src_pe = SRC
|
||||
dst_sip, dst_cube, dst_pe = DST
|
||||
assert src_sip == dst_sip
|
||||
|
||||
src_off = (src_cube * N_PES + src_pe) * N_ELEM * ELEM_BYTES
|
||||
dst_off = (dst_cube * N_PES + dst_pe) * N_ELEM * ELEM_BYTES
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id="diag_raw",
|
||||
spec=spec,
|
||||
) as rt:
|
||||
dp = DPPolicy(
|
||||
cube="row_wise", pe="column_wise",
|
||||
num_cubes=N_CUBES, num_pes=N_PES,
|
||||
)
|
||||
rt.ahbm.set_device(src_sip)
|
||||
t = rt.zeros(
|
||||
(N_CUBES, N_PES * N_ELEM), dtype="f16",
|
||||
dp=dp, name="raw_tensor",
|
||||
)
|
||||
t.copy_(rt.from_numpy(
|
||||
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
|
||||
))
|
||||
|
||||
def kernel(t_ptr, n_elem, tl):
|
||||
pe_id = tl.program_id(axis=0)
|
||||
cube_id = tl.program_id(axis=1)
|
||||
if cube_id == src_cube and pe_id == src_pe:
|
||||
data = tl.load(
|
||||
t_ptr + src_off, shape=(n_elem,), dtype="f16",
|
||||
)
|
||||
tl.store(t_ptr + dst_off, data)
|
||||
|
||||
pending = rt.launch(
|
||||
"diag_raw_kernel", kernel, t, N_ELEM, _defer_wait=True,
|
||||
)
|
||||
for h, _sip, meta in pending:
|
||||
rt.wait(h, _meta=meta)
|
||||
|
||||
# Compute the RAW sub-txn path: src PE_DMA -> dst HBM_CTRL
|
||||
from kernbench.policy.address.phyaddr import PhysAddr
|
||||
ctx = next(iter(engine._components.values())).ctx
|
||||
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
|
||||
# Resolve dst PA to HBM controller node
|
||||
# The raw store kernel issues DmaWriteCmd on dst VA; in the engine
|
||||
# this is translated via PE_MMU. For diagnostic we approximate
|
||||
# the destination as the dst cube's HBM controller for slice
|
||||
# belonging to dst_pe.
|
||||
# Use the resolver on a constructed PA matching the same memory
|
||||
# slice the kernel writes to.
|
||||
# The tensor is "row_wise" sharded across cubes, so each cube
|
||||
# owns row[cube_id, :], with each PE owning a column slice.
|
||||
# The actual dst PA depends on the AHBM allocator; we read it
|
||||
# via the tensor's shard map.
|
||||
shard_map = getattr(t, "_shard_map", None) or getattr(t, "shard_map", None)
|
||||
# Fallback: query the resolver directly by constructing a PA in
|
||||
# the dst cube's HBM region. If shard_map is unavailable, still
|
||||
# show the breakdown for src-PE-DMA -> first reachable HBM_CTRL
|
||||
# in dst cube.
|
||||
dst_hbm_id = f"sip{dst_sip}.cube{dst_cube}.hbm_ctrl"
|
||||
if dst_hbm_id not in engine._components:
|
||||
# try alternate naming
|
||||
for nid in engine._components.keys():
|
||||
if (
|
||||
nid.startswith(f"sip{dst_sip}.cube{dst_cube}.")
|
||||
and "hbm" in nid
|
||||
):
|
||||
dst_hbm_id = nid
|
||||
break
|
||||
|
||||
# find_path() prepends ".pe_dma" to src_pe automatically
|
||||
try:
|
||||
raw_path = ctx.router.find_path(src_pe_prefix, dst_hbm_id)
|
||||
except Exception as e:
|
||||
raw_path = []
|
||||
print(f" WARN: find_path raw failed: {e}")
|
||||
if not raw_path:
|
||||
# Try other HBM-related node names in dst cube
|
||||
for nid in engine._components.keys():
|
||||
if not nid.startswith(f"sip{dst_sip}.cube{dst_cube}."):
|
||||
continue
|
||||
if "hbm" not in nid:
|
||||
continue
|
||||
try:
|
||||
p = ctx.router.find_path(src_pe_prefix, nid)
|
||||
except Exception:
|
||||
p = []
|
||||
if p:
|
||||
raw_path = p
|
||||
print(f" (fallback raw dst node: {nid})")
|
||||
break
|
||||
|
||||
return captured, ctx, raw_path, engine
|
||||
finally:
|
||||
restore()
|
||||
|
||||
|
||||
# ── IPCQ path scenario ───────────────────────────────────────────────
|
||||
|
||||
|
||||
def _run_ipcq():
|
||||
captured, restore = _install_per_pe_capture()
|
||||
dst_pe_ipcq_id = (
|
||||
f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_ipcq"
|
||||
)
|
||||
arrival_log, restore_arrival = _install_meta_arrival_capture(
|
||||
dst_pe_ipcq_id,
|
||||
)
|
||||
recv_log, restore_recv = _install_recv_capture(dst_pe_ipcq_id)
|
||||
barrier_log, restore_barrier = _install_barrier_capture()
|
||||
try:
|
||||
topo = resolve_topology(str(TOPOLOGY_PATH))
|
||||
engine = GraphEngine(topo.topology_obj, enable_data=True)
|
||||
spec = topo.topology_obj.spec
|
||||
|
||||
src_sip, src_cube, src_pe = SRC
|
||||
dst_sip, dst_cube, dst_pe = DST
|
||||
|
||||
cfg = load_ccl_config()
|
||||
merged = resolve_algorithm_config(cfg, name="intercube_allreduce")
|
||||
merged["slot_size"] = max(int(merged.get("slot_size", 4096)), NBYTES)
|
||||
|
||||
with RuntimeContext(
|
||||
engine=engine,
|
||||
target_device=DeviceSelector("all"),
|
||||
correlation_id="diag_ipcq",
|
||||
spec=spec,
|
||||
) as rt:
|
||||
configure_sfr_intercube_multisip(engine, spec, merged)
|
||||
dp = DPPolicy(
|
||||
cube="row_wise", pe="column_wise",
|
||||
num_cubes=N_CUBES, num_pes=N_PES,
|
||||
)
|
||||
|
||||
def kernel(t_ptr, n_elem, tl):
|
||||
pe_id = tl.program_id(axis=0)
|
||||
cube_id = tl.program_id(axis=1)
|
||||
if cube_id == src_cube and pe_id == src_pe:
|
||||
data = tl.load(t_ptr, shape=(n_elem,), dtype="f16")
|
||||
tl.send(dir=("E" if HOP == "h3" else "S"), src=data)
|
||||
elif cube_id == dst_cube and pe_id == dst_pe:
|
||||
tl.recv(
|
||||
dir=("W" if HOP == "h3" else "N"),
|
||||
shape=(n_elem,), dtype="f16",
|
||||
)
|
||||
|
||||
tensors = []
|
||||
for s in sorted({src_sip, dst_sip}):
|
||||
rt.ahbm.set_device(s)
|
||||
t = rt.zeros(
|
||||
(N_CUBES, N_PES * N_ELEM), dtype="f16",
|
||||
dp=dp, name=f"sip{s}",
|
||||
)
|
||||
t.copy_(rt.from_numpy(
|
||||
np.full((N_CUBES, N_PES * N_ELEM), 1.0, dtype=np.float16),
|
||||
))
|
||||
tensors.append(t)
|
||||
|
||||
all_pending = []
|
||||
for tt in tensors:
|
||||
pending = rt.launch(
|
||||
"diag_ipcq_kernel", kernel, tt, N_ELEM, _defer_wait=True,
|
||||
)
|
||||
all_pending.extend(pending)
|
||||
for h, _sip, meta in all_pending:
|
||||
rt.wait(h, _meta=meta)
|
||||
|
||||
ctx = next(iter(engine._components.values())).ctx
|
||||
src_pe_prefix = f"sip{src_sip}.cube{src_cube}.pe{src_pe}"
|
||||
dst_pe_dma = f"sip{dst_sip}.cube{dst_cube}.pe{dst_pe}.pe_dma"
|
||||
try:
|
||||
ipcq_path = ctx.router.find_path(src_pe_prefix, dst_pe_dma)
|
||||
except Exception as e:
|
||||
ipcq_path = []
|
||||
print(f" WARN: find_path ipcq failed: {e}")
|
||||
# Snapshot DST PE_IPCQ qp state at end-of-run so we can see what
|
||||
# peer_head_cache/my_tail looked like (and at which directions).
|
||||
qp_state = _snapshot_qp_state(engine, dst_pe_ipcq_id)
|
||||
return (captured, ctx, ipcq_path, engine,
|
||||
arrival_log, qp_state, recv_log, barrier_log)
|
||||
finally:
|
||||
restore_barrier()
|
||||
restore_recv()
|
||||
restore_arrival()
|
||||
restore()
|
||||
|
||||
|
||||
# ── Test entry ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.diagnostic
|
||||
def test_pe_to_pe_diagnostic_h3():
|
||||
print("\n" + "=" * 78)
|
||||
print(f" Diagnostic: h3 inter-cube horizontal, nbytes={NBYTES}")
|
||||
print(f" src={SRC} dst={DST}")
|
||||
print("=" * 78)
|
||||
|
||||
# ── RAW scenario
|
||||
print("\n[RAW] tl.load + tl.store (sender pays both legs)")
|
||||
raw_per_pe, raw_ctx, raw_path, raw_engine = _run_raw()
|
||||
print(f" per-PE pe_exec_ns ({len(raw_per_pe)} entries):")
|
||||
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
|
||||
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
|
||||
for nid in (src_id, dst_id):
|
||||
if nid in raw_per_pe:
|
||||
print(f" {nid:<60s} {raw_per_pe[nid]:.2f} ns <-- key PE")
|
||||
nonzero = {k: v for k, v in raw_per_pe.items() if v > 0.5}
|
||||
if nonzero:
|
||||
print(f" other PEs with pe_exec_ns > 0.5 ns:")
|
||||
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
|
||||
if nid not in (src_id, dst_id):
|
||||
print(f" {nid:<60s} {v:.2f} ns")
|
||||
print(f" max(pe_exec_ns) = "
|
||||
f"{max(raw_per_pe.values()) if raw_per_pe else 0:.2f} ns")
|
||||
|
||||
if raw_path:
|
||||
br = _path_breakdown(raw_ctx, raw_path, NBYTES)
|
||||
_print_breakdown("RAW sub-txn path (src.pe_dma -> dst.hbm_ctrl)", br)
|
||||
_dump_src_op_records(raw_engine, *SRC, "RAW")
|
||||
|
||||
# ── IPCQ scenario
|
||||
print("\n[IPCQ] tl.send + tl.recv (recv pays inbound traversal+drain)")
|
||||
(ipcq_per_pe, ipcq_ctx, ipcq_path, ipcq_engine,
|
||||
arrival_log, qp_state, recv_log, barrier_log) = _run_ipcq()
|
||||
print(f"\n [BARRIER LOG] {len(barrier_log)} _execute_kernel entries:")
|
||||
src_id = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}.pe_cpu"
|
||||
dst_id = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}.pe_cpu"
|
||||
n_skipped = 0
|
||||
src_entry = None
|
||||
dst_entry = None
|
||||
for e in barrier_log:
|
||||
if e["barrier_skipped"]:
|
||||
n_skipped += 1
|
||||
if e["node_id"] == src_id:
|
||||
src_entry = e
|
||||
if e["node_id"] == dst_id:
|
||||
dst_entry = e
|
||||
print(f" PEs entering _execute_kernel: {len(barrier_log)}")
|
||||
print(f" PEs that SKIPPED barrier (env.now > target_start): {n_skipped}")
|
||||
if src_entry:
|
||||
print(
|
||||
f" SRC pe ({src_id}): entry_now={src_entry['entry_now']:.2f} "
|
||||
f"target_start={src_entry['target_start_ns']:.2f} "
|
||||
f"skipped={src_entry['barrier_skipped']} "
|
||||
f"late_ns={src_entry['delta_late_ns']:.2f}"
|
||||
)
|
||||
if dst_entry:
|
||||
print(
|
||||
f" DST pe ({dst_id}): entry_now={dst_entry['entry_now']:.2f} "
|
||||
f"target_start={dst_entry['target_start_ns']:.2f} "
|
||||
f"skipped={dst_entry['barrier_skipped']} "
|
||||
f"late_ns={dst_entry['delta_late_ns']:.2f}"
|
||||
)
|
||||
# Top 5 latest arrivals
|
||||
sorted_late = sorted(
|
||||
[e for e in barrier_log if e["delta_late_ns"] is not None],
|
||||
key=lambda e: -e["delta_late_ns"],
|
||||
)[:5]
|
||||
print(f" Top 5 latest PE arrivals (positive = barrier missed):")
|
||||
for e in sorted_late:
|
||||
if e["delta_late_ns"] > 0:
|
||||
print(
|
||||
f" {e['node_id']}: late by {e['delta_late_ns']:.2f} ns "
|
||||
f"(entry={e['entry_now']:.2f}, target={e['target_start_ns']:.2f})"
|
||||
)
|
||||
print(f"\n [RECV LOG on dst pe_ipcq] {len(recv_log)} entries:")
|
||||
for e in recv_log:
|
||||
print(
|
||||
f" {e['phase']:5s} t={e['t']:8.2f} ns "
|
||||
f"dir={e['direction']} "
|
||||
f"peer_head_cache={e['peer_head_cache']} "
|
||||
f"my_tail={e['my_tail']}"
|
||||
)
|
||||
print(f"\n [META-ARRIVAL LOG on dst pe_ipcq] {len(arrival_log)} arrivals:")
|
||||
for i, e in enumerate(arrival_log):
|
||||
print(
|
||||
f" #{i:2d} t={e['t']:8.2f} ns "
|
||||
f"src=(sip{e['src_sip']},cube{e['src_cube']},pe{e['src_pe']}) "
|
||||
f"dir={e['src_direction']} "
|
||||
f"sender_seq={e['sender_seq']} "
|
||||
f"matched_dir={e['matched_direction']} "
|
||||
f"nbytes={e['nbytes']}"
|
||||
)
|
||||
for d, ph in e["peer_head_cache_before"].items():
|
||||
mt = e["my_tail_before"][d]
|
||||
if ph != 0 or mt != 0 or d == e["matched_direction"]:
|
||||
print(
|
||||
f" before: dir={d} peer_head_cache={ph} my_tail={mt}"
|
||||
)
|
||||
print(f"\n [QP STATE END-OF-RUN on dst pe_ipcq]:")
|
||||
for d, st in qp_state.items():
|
||||
print(
|
||||
f" dir={d} peer_head_cache={st['peer_head_cache']} "
|
||||
f"my_tail={st['my_tail']} rx_range=[{st['rx_range'][0]}..."
|
||||
f"{st['rx_range'][1]}) n_slots={st['n_slots']} "
|
||||
f"slot_size={st['slot_size']}"
|
||||
)
|
||||
print(f" per-PE pe_exec_ns ({len(ipcq_per_pe)} entries):")
|
||||
for nid in (src_id, dst_id):
|
||||
if nid in ipcq_per_pe:
|
||||
print(f" {nid:<60s} {ipcq_per_pe[nid]:.2f} ns <-- key PE")
|
||||
nonzero = {k: v for k, v in ipcq_per_pe.items() if v > 0.5}
|
||||
if nonzero:
|
||||
print(f" other PEs with pe_exec_ns > 0.5 ns:")
|
||||
for nid, v in sorted(nonzero.items(), key=lambda kv: -kv[1])[:6]:
|
||||
if nid not in (src_id, dst_id):
|
||||
print(f" {nid:<60s} {v:.2f} ns")
|
||||
print(f" max(pe_exec_ns) = "
|
||||
f"{max(ipcq_per_pe.values()) if ipcq_per_pe else 0:.2f} ns")
|
||||
|
||||
if ipcq_path:
|
||||
br = _path_breakdown(ipcq_ctx, ipcq_path, NBYTES)
|
||||
_print_breakdown("IPCQ sub-txn path (src.pe_dma -> peer.pe_dma)", br)
|
||||
_dump_src_op_records(ipcq_engine, *SRC, "IPCQ")
|
||||
_dump_src_op_records(ipcq_engine, *DST, "IPCQ DST")
|
||||
|
||||
# ── Credit-return path analysis (where the missing IPCQ "ack" lives)
|
||||
print("\n" + "-" * 78)
|
||||
print("Credit-return path (current modeling)")
|
||||
print("-" * 78)
|
||||
src_pe_prefix = f"sip{SRC[0]}.cube{SRC[1]}.pe{SRC[2]}"
|
||||
dst_pe_prefix = f"sip{DST[0]}.cube{DST[1]}.pe{DST[2]}"
|
||||
# PE_IPCQ._credit_latency_ns calls
|
||||
# ctx.router.find_path(self._pe_prefix, peer_pe_prefix)
|
||||
# where the *destination* lacks the ".pe_dma" suffix. find_path()
|
||||
# only auto-appends to the source, so this raises -> the except
|
||||
# clause silently returns 0.0. Effectively credit latency = 0.
|
||||
try:
|
||||
ipcq_ctx.router.find_path(dst_pe_prefix, src_pe_prefix)
|
||||
bug_caught = False
|
||||
except Exception as e:
|
||||
bug_caught = True
|
||||
print(f" CONFIRMED BUG in _credit_latency_ns: dest lacks '.pe_dma' "
|
||||
f"-> find_path raises -> caught exception -> returns 0.0")
|
||||
print(f" Error: {e}")
|
||||
# The intended credit path is recv -> sender (reverse data direction)
|
||||
try:
|
||||
credit_path = ipcq_ctx.router.find_path(
|
||||
dst_pe_prefix, f"{src_pe_prefix}.pe_dma",
|
||||
)
|
||||
except Exception as e:
|
||||
credit_path = []
|
||||
print(f" WARN: corrected find_path credit failed: {e}")
|
||||
if credit_path:
|
||||
credit_size = 16 # PE_IPCQ default _credit_size_bytes
|
||||
# Today's modeling: drain only, 16 bytes -> ~0.125 ns
|
||||
cur = ipcq_ctx.compute_drain_ns(credit_path, credit_size)
|
||||
# Proposed modeling: full path latency (edges + node overhead + drain)
|
||||
proposed = ipcq_ctx.compute_path_latency_ns(credit_path, credit_size)
|
||||
print(f" credit path nodes = {len(credit_path)} (recv -> sender)")
|
||||
for nid in credit_path[:6]:
|
||||
print(f" {nid}")
|
||||
if len(credit_path) > 6:
|
||||
print(f" ... {len(credit_path) - 6} more nodes")
|
||||
br = _path_breakdown(ipcq_ctx, credit_path, credit_size)
|
||||
print(f" edge_total_ns = {br['edge_total_ns']:.2f}")
|
||||
print(f" overhead_total_ns = {br['overhead_total_ns']:.2f}")
|
||||
print(f" drain_ns(16 bytes) = {br['drain_ns']:.2f}")
|
||||
print(f" CURRENT _credit_latency_ns (drain only) = {cur:.3f} ns")
|
||||
print(f" PROPOSED (compute_path_latency_ns) = {proposed:.2f} ns")
|
||||
print(f" delta = {proposed - cur:+.2f} ns")
|
||||
|
||||
# ── Comparison summary
|
||||
print("\n" + "-" * 78)
|
||||
print("Summary")
|
||||
print("-" * 78)
|
||||
raw_max = max(raw_per_pe.values()) if raw_per_pe else 0.0
|
||||
ipcq_max = max(ipcq_per_pe.values()) if ipcq_per_pe else 0.0
|
||||
print(f" RAW max(pe_exec_ns) = {raw_max:.2f} ns")
|
||||
print(f" IPCQ max(pe_exec_ns) (current) = {ipcq_max:.2f} ns")
|
||||
print(f" delta (RAW - IPCQ current) = {raw_max - ipcq_max:+.2f} ns")
|
||||
if credit_path:
|
||||
ipcq_with_credit = ipcq_max + (proposed - cur)
|
||||
print(
|
||||
f" IPCQ projected w/ blocking credit + full path overhead "
|
||||
f"= {ipcq_with_credit:.2f} ns"
|
||||
)
|
||||
print(
|
||||
f" delta (RAW - IPCQ projected) = "
|
||||
f"{raw_max - ipcq_with_credit:+.2f} ns "
|
||||
f"(<= 0 means IPCQ >= RAW)"
|
||||
)
|
||||
|
||||
# No assertions — this is observational.
|
||||
assert raw_per_pe, "no RAW pe_exec_ns recorded"
|
||||
assert ipcq_per_pe, "no IPCQ pe_exec_ns recorded"
|
||||
Reference in New Issue
Block a user