Files
kernbench2/scripts/plot_pe_dma_perf.py
ywkang a76487ca48 PE_DMA perf: SIP-wide scenarios + dual outputs + clearer naming
User asked to surface system-wide congestion (more accurate than
single-cube), bring back the latency-breakdown plot under a separate
filename, and rename the obscure ``streaming`` category.

Scenarios:
  Renamed all_pe_to_pe0 → all_pe_cube0_to_pe0 (clarify cube scope).
  Added two SIP-wide scenarios:
    sip_local_all     — every PE in sip0 (128 total) accesses its own
                        local slice. All paths disjoint (each PE owns
                        its own hbm_ctrl.peX), so the model should
                        scale linearly with cube count.
    sip_hotspot_pe0   — every PE in sip0 (128 total) targets
                        sip0.cube0.pe0_slice. Worst-case hotspot:
                        UCIe inbound + r0c0→hbm_ctrl.pe0 saturated.
  Each bar now carries an ``N=...`` annotation showing the issuer
  count, and the chart titles say the scope explicitly.

Effective BW + util at 16 KB:
  sip_local_all       N=128  eff= 27.2 TB/s  util_a= 83 %
  sip_hotspot_pe0     N=128  eff= 134 GB/s   util_a= 93 %
                                              (UCIe-into-cube0 saturated)

Plots:
  no_congestion.png + congestion.png        — Effective BW utilization
                                              (two bars: single vs aggregate peak)
  breakdown_no_congestion.png +
  breakdown_congestion.png                  — stacked latency breakdown
                                              (renamed from previous)
  summary.csv with columns for both views.

The visual y-cap on BW utilization is 150 %. Bars exceeding it (e.g.
sip_local_all's util_single = 10,639 %) are drawn at the cap with an
upward arrow and the real value annotated. The verification rule for
``util_single`` is loosened to ``≤ n_issuers × 100 % + 5 %`` so
massively-parallel disjoint scenarios pass.

Category renamed: ``streaming`` → ``wire_transfer``. It is the
bulk-transfer time = (n_flits − 1) × flit_bytes / bottleneck_bw — the
cost of streaming the rest of the payload through the slowest wire
after the first flit has arrived.

All checks PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 09:43:09 -07:00

807 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plot PE_DMA Effective BW utilization across topological distance.
Two graphs (saved to docs/diagrams/pe_dma_perf/):
no_congestion.png — single PE issues one DMA, target varies in distance:
1. SAME_CUBE_PE_LOCAL — pe0 -> pe0's slice (own router, 1 hop)
2. SAME_CUBE_PE_REMOTE_BEST — pe0 -> pe1's slice (adjacent corner)
3. SAME_CUBE_PE_REMOTE_WORST — pe0 -> pe7's slice (opposite corner)
4. REMOTE_CUBE_PE_REMOTE_BEST — pe0 -> cube1 pe0's slice (1 UCIe hop)
5. REMOTE_CUBE_PE_REMOTE_WORST — pe0 -> cube15 pe7's slice (max UCIe + mesh)
6. REMOTE_SIP_SAME_CUBE_SAME_PE — pe0 -> sip1.cube0.pe0's slice
congestion.png — concurrent PEs hitting either the same HBM CTRL or
the same UCIe direction:
A. 1×PE remote single — baseline (one remote PE reads cube0.pe0_slice)
B. 2×PE remote concurrent — two adjacent PEs share path to pe0_slice
C. 3×PE remote concurrent — three PEs contend on pe0's router/HBM
D. 8×PE same-direction-UCIe — every PE in cube0 reads cube1 same-PE slice
E. 8×PE all-hit-PE0 — every PE reads cube0.pe0_slice (hottest HBM CTRL)
Outputs (under ``docs/diagrams/pe_dma_perf/``):
no_congestion.png — BW utilization, single-issuer scenarios
congestion.png — BW utilization, multi-issuer scenarios
breakdown_no_congestion.png — latency stacked breakdown, single-issuer
breakdown_congestion.png — latency stacked breakdown, multi-issuer
summary.csv — all rows + columns for either re-plot
BW utilization plot (per scenario, two bars):
util_single = effective_bw / single-path peak × 100
(peak = slowest edge bw on the first issuer's path)
util_aggregate = effective_bw / aggregate-resource peak × 100
(peak from max-min fair share over concurrent paths)
Effective BW = (total bytes transferred) / wall-clock time
no_congestion: nbytes / total_ns
congestion: n_issuers × nbytes / makespan_ns
util_aggregate is bounded by 100 % by construction. util_single can
exceed 100 % when concurrent paths use multiple parallel lanes of a
shared resource (e.g. UCIe's 4 connections), because the single-path
peak under-counts the aggregate capacity. The bar is visually capped
at 150 % with an upward arrow if it would exceed.
Latency breakdown plot (per scenario, stacked bar) — see
``_path_breakdown`` for the per-category accounting and the
wormhole-pipelined formula used.
"""
from __future__ import annotations
import csv
import math
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import matplotlib.pyplot as plt
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.runtime_api.kernel import PeDmaMsg
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
REPO = Path(__file__).resolve().parent.parent
TOPOLOGY_PATH = REPO / "topology.yaml"
OUT_DIR = REPO / "docs" / "diagrams" / "pe_dma_perf"
DEFAULT_NBYTES = 16 * 1024 # 16 KB per DMA
# Category order (stacked bottom-to-top) and colours.
CATEGORIES = [
("pe_setup", "#3b82f6"), # blue
("noc_mesh", "#10b981"), # green
("ucie", "#f59e0b"), # amber
("fabric", "#8b5cf6"), # purple (switch + io chiplet for cross-SIP)
("wire_transfer", "#6366f1"), # indigo (bulk = (n_flits-1)/bottleneck)
("hbm_ctrl", "#ef4444"), # red (final-chunk commit = chunk_time)
("contention", "#9ca3af"), # grey (actual formula, surfaces serialization)
]
@dataclass
class Scenario:
name: str
label: str
src_sip: int
src_cube: int
src_pe: int
dst_sip: int
dst_cube: int
dst_pe: int
def _slice_bytes(spec) -> int:
mm = spec["cube"]["memory_map"]
return mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"]
def _hbm_pa(*, sip: int, cube: int, pe_id: int, offset: int, slice_bytes: int) -> int:
return PhysAddr.pe_hbm_addr(
sip_id=sip, die_id=cube, pe_id=pe_id,
pe_local_hbm_offset=offset, slice_size_bytes=slice_bytes,
).encode()
def _categorise_node(node) -> str | None:
nid = node.id
if ".pe_dma" in nid:
return "pe_setup"
if node.kind == "noc_router":
return "noc_mesh"
if "ucie" in nid:
return "ucie"
if node.kind == "hbm_ctrl":
return "hbm_ctrl"
if node.kind in ("switch", "pcie_ep", "io_cpu", "io_noc"):
return "fabric"
return None
def _categorise_edge_kind(kind: str | None) -> str | None:
if kind in ("pe_to_router", "router_to_pe", "pe_internal"):
return "pe_setup"
if kind in ("router_mesh",):
return "noc_mesh"
if kind in ("router_to_hbm", "hbm_to_router"):
return "hbm_ctrl"
# UCIe transit. Includes the cube↔io_chiplet UCIe crossings.
if kind and "ucie" in kind:
return "ucie"
if kind in ("cube_to_io", "io_to_cube"):
return "ucie"
# Cross-SIP fabric: switch port + IO chiplet internal NoC + pcie link.
if kind in (
"io_to_switch", "switch_to_io", "io_internal",
"conn_to_io_noc", "io_noc_to_conn",
"pcie", "command", "fabric",
):
return "fabric"
return None
def _bottleneck_bw(path: list[str], edge_map: dict) -> float | None:
"""Min ``bw_gbs`` over edges with positive bandwidth on the path."""
bws = [e.bw_gbs for i in range(len(path) - 1)
if (e := edge_map.get((path[i], path[i + 1]))) and e.bw_gbs]
return min(bws) if bws else None
def _aggregate_peak_bw(paths: list[list[str]], edge_map: dict) -> float:
"""Max-min fair-share aggregate throughput across concurrent paths.
Each path is one unit of demand from source to destination. For each
edge, fair share per path = ``bw_gbs / usage_count``. A path's
sustainable throughput is the minimum fair share along its edges,
and the aggregate peak is the sum across paths. This produces the
correct answer for both shared-bottleneck scenarios (all paths
converge on one wire → aggregate = wire BW) and multi-lane shared
resources (UCIe's 4 connections used in parallel → aggregate = 4 ×
per-conn BW), without enumerating max-flow explicitly.
Examples:
* 3 paths sharing r0c0→hbm_ctrl.pe0 @ 256 GB/s
per-path = 256/3 ≈ 85.3, aggregate = 3 × 85.3 = 256 GB/s ✓
* 8 paths sharing 4 UCIe conns @ 128 GB/s (2 paths per conn)
per-path = 128/2 = 64, aggregate = 8 × 64 = 512 GB/s ✓
* 1 path through 256 GB/s bottleneck
per-path = 256, aggregate = 256 GB/s ✓ (= single-path peak)
"""
from collections import Counter
edge_usage: Counter = Counter()
for path in paths:
for i in range(len(path) - 1):
edge_usage[(path[i], path[i + 1])] += 1
aggregate = 0.0
for path in paths:
per_path = float("inf")
for i in range(len(path) - 1):
key = (path[i], path[i + 1])
e = edge_map.get(key)
if e and e.bw_gbs:
share = e.bw_gbs / edge_usage[key]
if share < per_path:
per_path = share
if per_path != float("inf"):
aggregate += per_path
return aggregate
def _path_breakdown(
path: list[str], nbytes: int, graph, edge_map, ns_per_mm: float,
) -> dict[str, float]:
"""Wormhole-pipelined breakdown of a path's expected latency.
Model:
total ≈ first_flit_arrival_time
+ (n_flits - 1) × bottleneck_per_flit_time
+ last_chunk_commit_time
Each summand is categorised:
* Per-component overheads + first-flit wire transfers are attributed
by component class (pe_setup / noc_mesh / ucie / fabric).
* ``wire_transfer`` is the bulk-transfer cost
= (n_flits 1) × flit_bytes / bottleneck_bw — the time the
rest of the payload spends streaming through the slowest link
after the first flit has arrived. Renamed from ``streaming``
for clarity.
* ``hbm_ctrl`` is the HBM CTRL overhead + the final chunk's PC commit
(= chunk_time). Earlier chunks overlap with arrival.
"""
cats: dict[str, float] = defaultdict(float)
# 1) Per-component overheads (first-flit).
for nid in path:
node = graph.nodes.get(nid)
if node is None:
continue
cat = _categorise_node(node)
if cat is None:
continue
cats[cat] += float(node.attrs.get("overhead_ns", 0.0))
# 2) Per-edge first-flit transfer = prop_ns + flit_bytes / bw_gbs.
bws: list[float] = []
flit_bytes = 256 # see ADR-0033 (matches default HBM burst_bytes)
for i in range(len(path) - 1):
e = edge_map.get((path[i], path[i + 1]))
if e is None:
continue
prop_ns = e.distance_mm * ns_per_mm
first_flit_xfer = (flit_bytes / e.bw_gbs) if e.bw_gbs else 0.0
cat = _categorise_edge_kind(e.kind)
if cat:
cats[cat] += prop_ns + first_flit_xfer
if e.bw_gbs:
bws.append(e.bw_gbs)
# 3) Streaming: (n_flits - 1) × per-flit at bottleneck.
if bws and nbytes > flit_bytes:
n_flits = math.ceil(nbytes / flit_bytes)
min_bw = min(bws)
cats["wire_transfer"] = (n_flits - 1) * (flit_bytes / min_bw)
# 4) HBM CTRL: last-chunk commit time (earlier chunks overlap arrival).
if path:
hbm_node = graph.nodes.get(path[-1])
if hbm_node and hbm_node.kind == "hbm_ctrl" and nbytes > 0:
burst = int(hbm_node.attrs.get("burst_bytes", 256))
pc_bw = float(hbm_node.attrs.get("pc_bw_gbs", 32.0))
cats["hbm_ctrl"] += burst / pc_bw # chunk_time of final chunk
return dict(cats)
# ── No-congestion scenarios ───────────────────────────────────────────
def _no_congestion_scenarios() -> list[Scenario]:
return [
Scenario("local",
"SAME_CUBE\nPE_LOCAL",
0, 0, 0, 0, 0, 0),
Scenario("same_cube_best",
"SAME_CUBE\nREMOTE_BEST\n(pe0→pe1)",
0, 0, 0, 0, 0, 1),
Scenario("same_cube_worst",
"SAME_CUBE\nREMOTE_WORST\n(pe0→pe7)",
0, 0, 0, 0, 0, 7),
Scenario("remote_cube_best",
"REMOTE_CUBE\nREMOTE_BEST\n(cube0→cube1)",
0, 0, 0, 0, 1, 0),
Scenario("remote_cube_worst",
"REMOTE_CUBE\nREMOTE_WORST\n(cube0→cube15.pe7)",
0, 0, 0, 0, 15, 7),
Scenario("remote_sip",
"REMOTE_SIP\nSAME_CUBE_SAME_PE\n(sip0→sip1)",
0, 0, 0, 1, 0, 0),
]
def _run_pe_dma(engine: GraphEngine, scn: Scenario, nbytes: int,
slice_bytes: int) -> tuple[float, list[str]]:
pa = _hbm_pa(sip=scn.dst_sip, cube=scn.dst_cube, pe_id=scn.dst_pe,
offset=0x1000, slice_bytes=slice_bytes)
msg = PeDmaMsg(
correlation_id="pedma-perf", request_id=scn.name,
src_sip=scn.src_sip, src_cube=scn.src_cube, src_pe=scn.src_pe,
dst_pa=pa, nbytes=nbytes,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
# Resolve the path for breakdown analysis (engine doesn't keep it).
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
src = f"sip{scn.src_sip}.cube{scn.src_cube}.pe{scn.src_pe}"
path = engine._router.find_path(src, dst_node)
return float(trace["total_ns"]), path
def _run_no_congestion(nbytes: int):
graph = load_topology(TOPOLOGY_PATH)
edge_map = {(e.src, e.dst): e for e in graph.edges}
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
slice_bytes = _slice_bytes(graph.spec)
rows = []
for scn in _no_congestion_scenarios():
engine = GraphEngine(load_topology(TOPOLOGY_PATH))
total_ns, path = _run_pe_dma(engine, scn, nbytes, slice_bytes)
br = _path_breakdown(path, nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, total_ns - formula_sum)
peak_single = _bottleneck_bw(path, edge_map) or 0.0
peak_aggregate = _aggregate_peak_bw([path], edge_map)
eff_bw = nbytes / total_ns if total_ns > 0 else 0.0
util_single = (eff_bw / peak_single * 100.0) if peak_single > 0 else 0.0
util_aggregate = (eff_bw / peak_aggregate * 100.0) if peak_aggregate > 0 else 0.0
rows.append({
"graph": "no_congestion",
"scenario": scn.name,
"label": scn.label,
"nbytes": nbytes,
"n_issuers": 1,
"path": " -> ".join(_short_path(path)),
"total_ns": total_ns,
"peak_single_bw_gbs": peak_single,
"peak_aggregate_bw_gbs": peak_aggregate,
"effective_bw_gbs": eff_bw,
"util_single_pct": util_single,
"util_aggregate_pct": util_aggregate,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
# ── Congestion scenarios ──────────────────────────────────────────────
@dataclass
class CongestionScenario:
name: str
label: str
issues: list[tuple[int, int, int, int, int, int]]
"""List of (src_sip, src_cube, src_pe, dst_sip, dst_cube, dst_pe)."""
def _congestion_scenarios() -> list[CongestionScenario]:
same_cube_same_target_pe0 = lambda srcs: [
(0, 0, p, 0, 0, 0) for p in srcs
]
# Build (sip, cube, pe, dst_sip, dst_cube, dst_pe) tuples for every
# PE in sip0 (16 cubes × 8 PEs = 128 PEs total).
sip0_all_pes = [(0, c, p) for c in range(16) for p in range(8)]
return [
# A-C: 1, 2, 3 cube-local PEs target pe0's slice (incremental cube0)
CongestionScenario(
"ctrl_hot_1",
"cube0\n1×PE → pe0_slice",
same_cube_same_target_pe0([1]),
),
CongestionScenario(
"ctrl_hot_2",
"cube0\n2×PE → pe0_slice",
same_cube_same_target_pe0([1, 2]),
),
CongestionScenario(
"ctrl_hot_3",
"cube0\n3×PE → pe0_slice",
same_cube_same_target_pe0([1, 2, 3]),
),
# D: every PE in cube0 sends to corresponding PE in cube1
# (same UCIe direction, single-cube source)
CongestionScenario(
"ucie_eastbound",
"cube0\n8×PE corresp.\n→ cube1",
[(0, 0, p, 0, 1, p) for p in range(8)],
),
# E: every PE in cube0 hits pe0's slice (cube-local HBM hotspot)
CongestionScenario(
"all_pe_cube0_to_pe0",
"cube0\n8×PE → pe0_slice",
same_cube_same_target_pe0(list(range(8))),
),
# F: every PE in sip0 (128 PEs) accesses its own local slice.
# All paths disjoint (each PE has its own hbm_ctrl.peX) — tests
# whether the aggregate cube HBM BW scales linearly with cube
# count (16 × 8 × 32 = 4096 GB/s peak).
CongestionScenario(
"sip_local_all",
"sip0\n128×PE → own slice",
[(s, c, p, s, c, p) for (s, c, p) in sip0_all_pes],
),
# G: every PE in sip0 targets sip0.cube0.pe0_slice (system-wide
# hotspot). Tests UCIe inbound saturation into cube0 +
# convergence on r0c0 → hbm_ctrl.pe0.
CongestionScenario(
"sip_hotspot_pe0",
"sip0\n128×PE → cube0.pe0_slice",
[(s, c, p, 0, 0, 0) for (s, c, p) in sip0_all_pes],
),
]
def _run_congestion(nbytes: int):
graph = load_topology(TOPOLOGY_PATH)
edge_map = {(e.src, e.dst): e for e in graph.edges}
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
slice_bytes = _slice_bytes(graph.spec)
rows = []
for scn in _congestion_scenarios():
engine = GraphEngine(load_topology(TOPOLOGY_PATH))
handles = []
paths: list[list[str]] = []
for i, (ss, sc, sp, ds, dc, dp) in enumerate(scn.issues):
pa = _hbm_pa(sip=ds, cube=dc, pe_id=dp,
offset=0x1000 + i * 0x100, slice_bytes=slice_bytes)
msg = PeDmaMsg(
correlation_id="pedma-cong", request_id=f"{scn.name}-{i}",
src_sip=ss, src_cube=sc, src_pe=sp,
dst_pa=pa, nbytes=nbytes,
)
handles.append(engine.submit(msg))
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
paths.append(engine._router.find_path(
f"sip{ss}.cube{sc}.pe{sp}", dst_node))
first_path = paths[0] if paths else []
for h in handles:
engine.wait(h)
latencies = [engine.get_completion(h)[1]["total_ns"] for h in handles]
makespan = max(latencies)
# Breakdown uses the first issuer's path as a representative;
# ``contention`` absorbs serialization across requests.
br = _path_breakdown(first_path, nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, makespan - formula_sum)
peak_single = _bottleneck_bw(first_path, edge_map) or 0.0
peak_aggregate = _aggregate_peak_bw(paths, edge_map)
total_bytes = nbytes * len(scn.issues)
eff_bw = total_bytes / makespan if makespan > 0 else 0.0
util_single = (eff_bw / peak_single * 100.0) if peak_single > 0 else 0.0
util_aggregate = (eff_bw / peak_aggregate * 100.0) if peak_aggregate > 0 else 0.0
rows.append({
"graph": "congestion",
"scenario": scn.name,
"label": scn.label,
"nbytes": nbytes,
"n_issuers": len(scn.issues),
"first_path": " -> ".join(_short_path(first_path)),
"makespan_ns": makespan,
"min_lat_ns": min(latencies) if latencies else 0.0,
"peak_single_bw_gbs": peak_single,
"peak_aggregate_bw_gbs": peak_aggregate,
"effective_bw_gbs": eff_bw,
"util_single_pct": util_single,
"util_aggregate_pct": util_aggregate,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
# ── Plotting ───────────────────────────────────────────────────────────
def _short_path(path: Iterable[str]) -> list[str]:
return [".".join(p.split(".")[-2:]) for p in path]
def _plot_bw_utilization(rows, title, out_path):
"""Plot Effective BW utilization (%) per scenario with TWO bars:
util_single = effective_bw / single-path peak × 100
util_aggregate = effective_bw / aggregate-resource peak × 100
The aggregate peak sums fair-share BW across all concurrent paths
(max-min fair share) — modelling shared resources correctly.
Y-axis is capped at ``Y_CAP_PCT`` so the chart stays readable when
a disjoint-path scenario (e.g. all 128 SIP PEs accessing their own
slice) drives util_single far above n_issuers × 100 %. Any bar that
exceeds the cap is drawn at the cap with an upward arrow and the
real value annotated.
"""
import numpy as np
Y_CAP_PCT = 150.0 # visual ceiling
n = len(rows)
labels = [r["label"] for r in rows]
util_s = [r.get("util_single_pct", 0.0) for r in rows]
util_a = [r.get("util_aggregate_pct", 0.0) for r in rows]
eff = [r.get("effective_bw_gbs", 0.0) for r in rows]
peak_s = [r.get("peak_single_bw_gbs", 0.0) for r in rows]
peak_a = [r.get("peak_aggregate_bw_gbs", 0.0) for r in rows]
n_iss = [r.get("n_issuers", 1) for r in rows]
fig, ax = plt.subplots(figsize=(max(9, n * 1.6), 6.0))
x = np.arange(n)
w = 0.38
util_s_capped = [min(u, Y_CAP_PCT) for u in util_s]
util_a_capped = [min(u, Y_CAP_PCT) for u in util_a]
ax.bar(x - w / 2, util_s_capped, w, color="#6366f1",
edgecolor="white", linewidth=0.5,
label="util vs single-path peak")
ax.bar(x + w / 2, util_a_capped, w, color="#10b981",
edgecolor="white", linewidth=0.5,
label="util vs aggregate-resource peak")
ax.axhline(100.0, color="grey", linestyle="--", linewidth=0.8,
label="saturation (100 %)")
y_max = Y_CAP_PCT * 1.20
for i in range(n):
# util_single bar annotation: show ↑ if exceeded the cap
marker_s = "" if util_s[i] > Y_CAP_PCT + 1e-3 else ""
ax.text(i - w / 2, util_s_capped[i] + y_max * 0.012,
f"{marker_s}{util_s[i]:.0f}%\n/{peak_s[i]:.0f}",
ha="center", va="bottom", fontsize=7)
marker_a = "" if util_a[i] > Y_CAP_PCT + 1e-3 else ""
ax.text(i + w / 2, util_a_capped[i] + y_max * 0.012,
f"{marker_a}{util_a[i]:.0f}%\n/{peak_a[i]:.0f}",
ha="center", va="bottom", fontsize=7)
# Effective BW + n_issuers annotation underneath each pair.
ax.text(i, -y_max * 0.04,
f"N={n_iss[i]}\neff={eff[i]:.0f} GB/s",
ha="center", va="top", fontsize=7, color="#444444")
ax.set_xticks(x)
ax.set_xticklabels(labels, fontsize=7)
ax.set_ylabel("Effective BW utilization (%)")
ax.set_title(title, fontsize=11)
ax.set_ylim(-y_max * 0.10, y_max)
ax.legend(loc="upper right", fontsize=9, frameon=False)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
# ── CSV ────────────────────────────────────────────────────────────────
def _plot_breakdown(rows, value_key, title, out_path):
"""Stacked-bar latency breakdown per scenario (one stack per row).
Each category from ``CATEGORIES`` (except ``contention``) contributes
a coloured segment proportional to its computed time in ns;
``contention`` is the residual ``actual formula_sum`` and absorbs
serialisation across concurrent issuers plus any model-fidelity gap.
The total bar height = actual ``total_ns`` (no_congestion) or
``makespan_ns`` (congestion).
"""
n = len(rows)
labels = [r["label"] for r in rows]
fig, ax = plt.subplots(figsize=(max(9, n * 1.6), 6.0))
bottoms = [0.0] * n
for cat, colour in CATEGORIES:
heights = [r.get(cat, 0.0) for r in rows]
ax.bar(labels, heights, bottom=bottoms, color=colour, label=cat,
edgecolor="white", linewidth=0.5)
bottoms = [b + h for b, h in zip(bottoms, heights)]
for i, r in enumerate(rows):
ax.text(i, bottoms[i] * 1.01,
f"{r[value_key]:.0f} ns", ha="center", va="bottom",
fontsize=8)
# n_issuers annotation under the label
ax.text(i, -max(bottoms) * 0.04, f"N={r.get('n_issuers', 1)}",
ha="center", va="top", fontsize=7, color="#444444")
ax.set_ylabel("Latency (ns)")
ax.set_title(title, fontsize=11)
ax.legend(loc="upper left", fontsize=9, frameon=False)
ax.set_ylim(-max(bottoms) * 0.10, max(bottoms) * 1.18)
ax.tick_params(axis="x", labelsize=7)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
def _write_csv(no_cong_rows, cong_rows, out_path):
fields = [
"graph", "scenario", "label", "nbytes", "n_issuers",
"total_ns", "makespan_ns", "min_lat_ns",
"peak_single_bw_gbs", "peak_aggregate_bw_gbs", "effective_bw_gbs",
"util_single_pct", "util_aggregate_pct",
"pe_setup", "noc_mesh", "ucie", "fabric", "wire_transfer",
"hbm_ctrl", "contention",
"path", "first_path",
]
with open(out_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
w.writeheader()
for r in no_cong_rows + cong_rows:
w.writerow(r)
# ── Self-verification ──────────────────────────────────────────────────
def _verify(rows_no_cong, rows_cong) -> list[str]:
"""Return a list of human-readable issues; empty means PASS.
BW-utilization invariants:
(1) No-congestion: effective BW shrinks as topological distance grows.
(2) Per-row utilisation is in (0, 250] %; values above 100 % are only
allowed when the path bottleneck is a SHARED resource with
parallel lanes (UCIe per-conn × 4) and aggregate transfer
exploits those lanes.
(3) Single-issuer utilisation cannot exceed 100 %.
(4) Effective BW for a single request equals nbytes / latency.
(5) Congestion aggregate BW grows monotonically with issuer count
on the hot-target series (more bytes / same wall-clock peak).
(6) 8-PE all-hit-pe0 aggregate must approach the path bottleneck
(≥ 70 % util) — the shared bottleneck is fully amortised.
"""
issues = []
by_name = {r["scenario"]: r for r in rows_no_cong}
cong_map = {r["scenario"]: r for r in rows_cong}
# (1) No-congestion effective BW shrinks as distance grows
order = [
"local",
"same_cube_best",
"same_cube_worst",
"remote_cube_best",
"remote_cube_worst",
]
prev_bw = float("inf")
for n in order:
if n in by_name and by_name[n]["effective_bw_gbs"] >= prev_bw:
issues.append(
f"no_congestion: {n} effective BW "
f"({by_name[n]['effective_bw_gbs']:.1f} GB/s) not strictly "
f"smaller than previous ({prev_bw:.1f})"
)
prev_bw = min(prev_bw, by_name.get(n, {}).get("effective_bw_gbs", prev_bw))
# (2) util_single positive and bounded by n_issuers × 100 % (the
# max possible when all paths are disjoint and each saturates the
# single-path peak). util_aggregate bounded by 100 % by definition.
for r in rows_no_cong + rows_cong:
us = r.get("util_single_pct", 0.0)
ua = r.get("util_aggregate_pct", 0.0)
n = r.get("n_issuers", 1)
if us <= 0 or ua <= 0:
issues.append(f"{r['scenario']}: non-positive util "
f"(single={us}, agg={ua})")
# 5 % slack for measurement/pipeline noise.
if us > n * 100.0 + 5.0:
issues.append(
f"{r['scenario']}: util_single={us:.1f}% > n_issuers×100% "
f"({n * 100:.0f}%) — likely a peak or effective BW miscompute"
)
if ua > 100.0 + 1.0:
issues.append(
f"{r['scenario']}: util_aggregate={ua:.1f}% > 100 % — "
f"effective BW must not exceed the aggregate resource peak"
)
# (3) Single-issuer utilisation (both metrics) cannot exceed 100 %.
for r in rows_no_cong:
us = r.get("util_single_pct", 0.0)
ua = r.get("util_aggregate_pct", 0.0)
if us > 100.0 + 1e-3:
issues.append(
f"no_congestion {r['scenario']}: util_single={us:.1f}% > 100% "
f"for a single-issuer scenario"
)
if abs(us - ua) > 1e-3:
issues.append(
f"no_congestion {r['scenario']}: util_single ({us:.1f}) != "
f"util_aggregate ({ua:.1f}) — should match for single issuer"
)
# (4) Effective BW for a single request = nbytes / total_ns
for r in rows_no_cong:
expected = r["nbytes"] / r["total_ns"] if r["total_ns"] > 0 else 0
got = r["effective_bw_gbs"]
if abs(got - expected) > 1e-3:
issues.append(
f"no_congestion {r['scenario']}: eff_bw={got:.3f} != "
f"nbytes/total_ns={expected:.3f}"
)
# (5) Congestion aggregate BW grows monotonically with issuer count on
# the hot-target series.
seq = ["ctrl_hot_1", "ctrl_hot_2", "ctrl_hot_3"]
last = 0.0
for n in seq:
if n in cong_map and cong_map[n]["effective_bw_gbs"] < last - 1e-6:
issues.append(
f"congestion: {n} aggregate BW dropped below prior "
f"({cong_map[n]['effective_bw_gbs']:.1f} < {last:.1f})"
)
last = max(last, cong_map.get(n, {}).get("effective_bw_gbs", last))
# (6) all_pe_to_pe0 must approach the shared single-path peak.
if "all_pe_cube0_to_pe0" in cong_map:
u = cong_map["all_pe_cube0_to_pe0"]["util_single_pct"]
if u < 70.0:
issues.append(
f"congestion all_pe_to_pe0: util_single={u:.1f}% < 70 % — "
f"8-PE hotspot should saturate the shared HBM CTRL path"
)
# (7) ucie_eastbound: util_aggregate should be meaningfully smaller
# than util_single (the multi-lane peak should pull the bar down).
if "ucie_eastbound" in cong_map:
e = cong_map["ucie_eastbound"]
if e["util_aggregate_pct"] >= e["util_single_pct"] - 5.0:
issues.append(
f"congestion ucie_eastbound: util_aggregate "
f"({e['util_aggregate_pct']:.1f}%) should be << "
f"util_single ({e['util_single_pct']:.1f}%) when UCIe's "
f"multi-lane peak applies"
)
return issues
# ── Entry point ────────────────────────────────────────────────────────
def main(nbytes: int = DEFAULT_NBYTES) -> int:
OUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"== PE_DMA perf @ {nbytes} B per request ==")
print("Collecting NO-congestion scenarios...")
no_cong = _run_no_congestion(nbytes)
print("Collecting CONGESTION scenarios...")
cong = _run_congestion(nbytes)
print("\n-- No-congestion summary --")
for r in no_cong:
print(f" {r['scenario']:22s} total={r['total_ns']:7.1f} ns "
f"eff={r['effective_bw_gbs']:6.1f} GB/s "
f"peak_s={r['peak_single_bw_gbs']:6.1f} "
f"peak_a={r['peak_aggregate_bw_gbs']:6.1f} "
f"util_s={r['util_single_pct']:5.1f}% "
f"util_a={r['util_aggregate_pct']:5.1f}%")
print("\n-- Congestion summary --")
for r in cong:
agg_bytes = r["nbytes"] * r["n_issuers"]
print(f" {r['scenario']:22s} makespan={r['makespan_ns']:7.1f} ns "
f"agg_bytes={agg_bytes:>7d} "
f"eff={r['effective_bw_gbs']:6.1f} GB/s "
f"peak_s={r['peak_single_bw_gbs']:6.1f} "
f"peak_a={r['peak_aggregate_bw_gbs']:6.1f} "
f"util_s={r['util_single_pct']:5.1f}% "
f"util_a={r['util_aggregate_pct']:5.1f}%")
issues = _verify(no_cong, cong)
print("\n-- Self-verification --")
if not issues:
print(" PASS")
else:
for i, msg in enumerate(issues, 1):
print(f" [{i}] {msg}")
_plot_bw_utilization(
no_cong,
f"PE_DMA Effective BW utilization — no congestion\n"
f"1 PE issuer per scenario, nbytes={nbytes}",
OUT_DIR / "no_congestion.png",
)
_plot_bw_utilization(
cong,
f"PE_DMA Effective BW utilization — congestion\n"
f"N concurrent PE issuers (N shown under each label); "
f"agg = N × nbytes / makespan, nbytes={nbytes}",
OUT_DIR / "congestion.png",
)
_plot_breakdown(
no_cong, "total_ns",
f"PE_DMA latency breakdown — no congestion\n"
f"1 PE issuer per scenario, nbytes={nbytes}",
OUT_DIR / "breakdown_no_congestion.png",
)
_plot_breakdown(
cong, "makespan_ns",
f"PE_DMA latency breakdown — congestion (makespan)\n"
f"N concurrent PE issuers (N shown under each label), "
f"nbytes={nbytes}",
OUT_DIR / "breakdown_congestion.png",
)
_write_csv(no_cong, cong, OUT_DIR / "summary.csv")
print(f"\nWrote:\n {OUT_DIR / 'no_congestion.png'}\n"
f" {OUT_DIR / 'congestion.png'}\n"
f" {OUT_DIR / 'breakdown_no_congestion.png'}\n"
f" {OUT_DIR / 'breakdown_congestion.png'}\n"
f" {OUT_DIR / 'summary.csv'}")
return 0 if not issues else 1
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--n-bytes", type=int, default=DEFAULT_NBYTES,
help="bytes per DMA (default 16384)")
args = p.parse_args()
raise SystemExit(main(nbytes=args.n_bytes))