Files
kernbench2/scripts/plot_pe_dma_perf.py
T
ywkang 0bf220fed0 Switch PE_DMA perf plots to Effective BW utilization
Replaces the latency-breakdown stacked bars with a single utilization
bar per scenario. Each bar shows ``effective_bw / peak_bottleneck_bw``
with both values annotated, and a horizontal "single-path peak" line at
100 %. The colour band (green ≥70 %, amber ≥40 %, red <40 %) makes the
no-congestion distance roll-off scannable at a glance.

Definitions:
  effective_bw = (total bytes transferred) / wall-clock time
    no_congestion: nbytes / total_ns
    congestion:    n_issuers × nbytes / makespan_ns  (aggregate)
  peak_bw      = min(edge.bw_gbs) on first issuer's path
  util_pct     = effective_bw / peak_bw × 100

The congestion graph shows that 8×PE eastbound exceeds 100 % of a
single-path peak (106.4 %): UCIe-N's 4 connections × 128 GB/s give
512 GB/s of aggregate eastbound capacity, so concurrent issuers across
disjoint conns sum past any single conn's 128 GB/s. The 8×PE→pe0_slice
hotspot reaches 91.7 %, almost saturating the shared r0c0→hbm_ctrl.pe0
bottleneck — the simulator's address-based PC striping + per-flit
arbitration model amortises the cost cleanly.

Self-verification updated to BW invariants:
  (1) effective BW shrinks as topological distance grows
  (2) util_pct ∈ (0, 250 %]
  (3) single-issuer util_pct ≤ 100 %
  (4) effective_bw = nbytes / total_ns for single requests
  (5) congestion aggregate BW grows monotonically with issuer count
      on the hot-target series
  (6) 8-PE all-hit-pe0 saturates ≥ 70 % of shared peak

All checks PASS at the current model.

The CSV retains all breakdown components (pe_setup, noc_mesh, ucie,
fabric, streaming, hbm_ctrl, contention) so a future replot can still
recover the latency-breakdown view without re-running the simulator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 07:59:45 -07:00

607 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plot PE_DMA Effective BW utilization across topological distance.
Two graphs (saved to docs/diagrams/pe_dma_perf/):
no_congestion.png — single PE issues one DMA, target varies in distance:
1. SAME_CUBE_PE_LOCAL — pe0 -> pe0's slice (own router, 1 hop)
2. SAME_CUBE_PE_REMOTE_BEST — pe0 -> pe1's slice (adjacent corner)
3. SAME_CUBE_PE_REMOTE_WORST — pe0 -> pe7's slice (opposite corner)
4. REMOTE_CUBE_PE_REMOTE_BEST — pe0 -> cube1 pe0's slice (1 UCIe hop)
5. REMOTE_CUBE_PE_REMOTE_WORST — pe0 -> cube15 pe7's slice (max UCIe + mesh)
6. REMOTE_SIP_SAME_CUBE_SAME_PE — pe0 -> sip1.cube0.pe0's slice
congestion.png — concurrent PEs hitting either the same HBM CTRL or
the same UCIe direction:
A. 1×PE remote single — baseline (one remote PE reads cube0.pe0_slice)
B. 2×PE remote concurrent — two adjacent PEs share path to pe0_slice
C. 3×PE remote concurrent — three PEs contend on pe0's router/HBM
D. 8×PE same-direction-UCIe — every PE in cube0 reads cube1 same-PE slice
E. 8×PE all-hit-PE0 — every PE reads cube0.pe0_slice (hottest HBM CTRL)
Effective BW = (total bytes transferred) / (wall-clock time)
no_congestion: nbytes / total_ns
congestion: n_issuers × nbytes / makespan_ns (aggregate throughput)
Peak BW = the path bottleneck (slowest single-edge bandwidth on the
first issuer's path). For shared-resource congestion scenarios the
aggregate effective BW can exceed this single-path peak when the
shared resource provides parallel lanes (e.g. UCIe has 4 connections
× 128 GB/s = 512 GB/s aggregate even though each connection is 128).
Utilization% = effective / peak × 100.
Outputs ``summary.csv`` (including breakdown components for any future
analysis) so the plot can be re-rendered without re-running the
simulator.
"""
from __future__ import annotations
import csv
import math
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import matplotlib.pyplot as plt
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.runtime_api.kernel import PeDmaMsg
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import load_topology
REPO = Path(__file__).resolve().parent.parent
TOPOLOGY_PATH = REPO / "topology.yaml"
OUT_DIR = REPO / "docs" / "diagrams" / "pe_dma_perf"
DEFAULT_NBYTES = 16 * 1024 # 16 KB per DMA
# Category order (stacked bottom-to-top) and colours.
CATEGORIES = [
("pe_setup", "#3b82f6"), # blue
("noc_mesh", "#10b981"), # green
("ucie", "#f59e0b"), # amber
("fabric", "#8b5cf6"), # purple (switch + io chiplet for cross-SIP)
("streaming", "#6366f1"), # indigo (bulk = (n_flits-1)/bottleneck)
("hbm_ctrl", "#ef4444"), # red (final-chunk commit = chunk_time)
("contention", "#9ca3af"), # grey (actual formula, surfaces serialization)
]
@dataclass
class Scenario:
name: str
label: str
src_sip: int
src_cube: int
src_pe: int
dst_sip: int
dst_cube: int
dst_pe: int
def _slice_bytes(spec) -> int:
mm = spec["cube"]["memory_map"]
return mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"]
def _hbm_pa(*, sip: int, cube: int, pe_id: int, offset: int, slice_bytes: int) -> int:
return PhysAddr.pe_hbm_addr(
sip_id=sip, die_id=cube, pe_id=pe_id,
pe_local_hbm_offset=offset, slice_size_bytes=slice_bytes,
).encode()
def _categorise_node(node) -> str | None:
nid = node.id
if ".pe_dma" in nid:
return "pe_setup"
if node.kind == "noc_router":
return "noc_mesh"
if "ucie" in nid:
return "ucie"
if node.kind == "hbm_ctrl":
return "hbm_ctrl"
if node.kind in ("switch", "pcie_ep", "io_cpu", "io_noc"):
return "fabric"
return None
def _categorise_edge_kind(kind: str | None) -> str | None:
if kind in ("pe_to_router", "router_to_pe", "pe_internal"):
return "pe_setup"
if kind in ("router_mesh",):
return "noc_mesh"
if kind in ("router_to_hbm", "hbm_to_router"):
return "hbm_ctrl"
# UCIe transit. Includes the cube↔io_chiplet UCIe crossings.
if kind and "ucie" in kind:
return "ucie"
if kind in ("cube_to_io", "io_to_cube"):
return "ucie"
# Cross-SIP fabric: switch port + IO chiplet internal NoC + pcie link.
if kind in (
"io_to_switch", "switch_to_io", "io_internal",
"conn_to_io_noc", "io_noc_to_conn",
"pcie", "command", "fabric",
):
return "fabric"
return None
def _bottleneck_bw(path: list[str], edge_map: dict) -> float | None:
"""Min ``bw_gbs`` over edges with positive bandwidth on the path."""
bws = [e.bw_gbs for i in range(len(path) - 1)
if (e := edge_map.get((path[i], path[i + 1]))) and e.bw_gbs]
return min(bws) if bws else None
def _path_breakdown(
path: list[str], nbytes: int, graph, edge_map, ns_per_mm: float,
) -> dict[str, float]:
"""Wormhole-pipelined breakdown of a path's expected latency.
Model:
total ≈ first_flit_arrival_time
+ (n_flits - 1) × bottleneck_per_flit_time
+ last_chunk_commit_time
Each summand is categorised:
* Per-component overheads + first-flit wire transfers are attributed
by component class (pe_setup / noc_mesh / ucie).
* ``streaming`` is the bulk-transfer cost = (n_flits-1) × per_flit
at the slowest wire bandwidth in the path.
* ``hbm_ctrl`` is the HBM CTRL overhead + the final chunk's PC commit
(= chunk_time). Earlier chunks overlap with arrival.
"""
cats: dict[str, float] = defaultdict(float)
# 1) Per-component overheads (first-flit).
for nid in path:
node = graph.nodes.get(nid)
if node is None:
continue
cat = _categorise_node(node)
if cat is None:
continue
cats[cat] += float(node.attrs.get("overhead_ns", 0.0))
# 2) Per-edge first-flit transfer = prop_ns + flit_bytes / bw_gbs.
bws: list[float] = []
flit_bytes = 256 # see ADR-0033 (matches default HBM burst_bytes)
for i in range(len(path) - 1):
e = edge_map.get((path[i], path[i + 1]))
if e is None:
continue
prop_ns = e.distance_mm * ns_per_mm
first_flit_xfer = (flit_bytes / e.bw_gbs) if e.bw_gbs else 0.0
cat = _categorise_edge_kind(e.kind)
if cat:
cats[cat] += prop_ns + first_flit_xfer
if e.bw_gbs:
bws.append(e.bw_gbs)
# 3) Streaming: (n_flits - 1) × per-flit at bottleneck.
if bws and nbytes > flit_bytes:
n_flits = math.ceil(nbytes / flit_bytes)
min_bw = min(bws)
cats["streaming"] = (n_flits - 1) * (flit_bytes / min_bw)
# 4) HBM CTRL: last-chunk commit time (earlier chunks overlap arrival).
if path:
hbm_node = graph.nodes.get(path[-1])
if hbm_node and hbm_node.kind == "hbm_ctrl" and nbytes > 0:
burst = int(hbm_node.attrs.get("burst_bytes", 256))
pc_bw = float(hbm_node.attrs.get("pc_bw_gbs", 32.0))
cats["hbm_ctrl"] += burst / pc_bw # chunk_time of final chunk
return dict(cats)
# ── No-congestion scenarios ───────────────────────────────────────────
def _no_congestion_scenarios() -> list[Scenario]:
return [
Scenario("local",
"SAME_CUBE\nPE_LOCAL",
0, 0, 0, 0, 0, 0),
Scenario("same_cube_best",
"SAME_CUBE\nREMOTE_BEST\n(pe0→pe1)",
0, 0, 0, 0, 0, 1),
Scenario("same_cube_worst",
"SAME_CUBE\nREMOTE_WORST\n(pe0→pe7)",
0, 0, 0, 0, 0, 7),
Scenario("remote_cube_best",
"REMOTE_CUBE\nREMOTE_BEST\n(cube0→cube1)",
0, 0, 0, 0, 1, 0),
Scenario("remote_cube_worst",
"REMOTE_CUBE\nREMOTE_WORST\n(cube0→cube15.pe7)",
0, 0, 0, 0, 15, 7),
Scenario("remote_sip",
"REMOTE_SIP\nSAME_CUBE_SAME_PE\n(sip0→sip1)",
0, 0, 0, 1, 0, 0),
]
def _run_pe_dma(engine: GraphEngine, scn: Scenario, nbytes: int,
slice_bytes: int) -> tuple[float, list[str]]:
pa = _hbm_pa(sip=scn.dst_sip, cube=scn.dst_cube, pe_id=scn.dst_pe,
offset=0x1000, slice_bytes=slice_bytes)
msg = PeDmaMsg(
correlation_id="pedma-perf", request_id=scn.name,
src_sip=scn.src_sip, src_cube=scn.src_cube, src_pe=scn.src_pe,
dst_pa=pa, nbytes=nbytes,
)
h = engine.submit(msg)
engine.wait(h)
_, trace = engine.get_completion(h)
# Resolve the path for breakdown analysis (engine doesn't keep it).
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
src = f"sip{scn.src_sip}.cube{scn.src_cube}.pe{scn.src_pe}"
path = engine._router.find_path(src, dst_node)
return float(trace["total_ns"]), path
def _run_no_congestion(nbytes: int):
graph = load_topology(TOPOLOGY_PATH)
edge_map = {(e.src, e.dst): e for e in graph.edges}
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
slice_bytes = _slice_bytes(graph.spec)
rows = []
for scn in _no_congestion_scenarios():
engine = GraphEngine(load_topology(TOPOLOGY_PATH))
total_ns, path = _run_pe_dma(engine, scn, nbytes, slice_bytes)
br = _path_breakdown(path, nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, total_ns - formula_sum)
peak_bw = _bottleneck_bw(path, edge_map) or 0.0
eff_bw = nbytes / total_ns if total_ns > 0 else 0.0
util = (eff_bw / peak_bw * 100.0) if peak_bw > 0 else 0.0
rows.append({
"graph": "no_congestion",
"scenario": scn.name,
"label": scn.label,
"nbytes": nbytes,
"n_issuers": 1,
"path": " -> ".join(_short_path(path)),
"total_ns": total_ns,
"bottleneck_bw_gbs": peak_bw,
"effective_bw_gbs": eff_bw,
"util_pct": util,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
# ── Congestion scenarios ──────────────────────────────────────────────
@dataclass
class CongestionScenario:
name: str
label: str
issues: list[tuple[int, int, int, int, int, int]]
"""List of (src_sip, src_cube, src_pe, dst_sip, dst_cube, dst_pe)."""
def _congestion_scenarios() -> list[CongestionScenario]:
same_cube_same_target_pe0 = lambda srcs: [
(0, 0, p, 0, 0, 0) for p in srcs
]
return [
# A-C: 1, 2, 3 remote PEs concurrently access pe0's slice in same cube
CongestionScenario(
"ctrl_hot_1",
"1×PE → pe0_slice",
same_cube_same_target_pe0([1]),
),
CongestionScenario(
"ctrl_hot_2",
"2×PE → pe0_slice",
same_cube_same_target_pe0([1, 2]),
),
CongestionScenario(
"ctrl_hot_3",
"3×PE → pe0_slice",
same_cube_same_target_pe0([1, 2, 3]),
),
# D: every PE in cube0 sends to corresponding PE in cube1 (same UCIe direction)
CongestionScenario(
"ucie_eastbound",
"8×PE corresp.\ncube0→cube1",
[(0, 0, p, 0, 1, p) for p in range(8)],
),
# E: every PE in cube0 hits pe0's slice → worst HBM CTRL hotspot
CongestionScenario(
"all_pe_to_pe0",
"8×PE → pe0_slice",
same_cube_same_target_pe0(list(range(8))),
),
]
def _run_congestion(nbytes: int):
graph = load_topology(TOPOLOGY_PATH)
edge_map = {(e.src, e.dst): e for e in graph.edges}
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
slice_bytes = _slice_bytes(graph.spec)
rows = []
for scn in _congestion_scenarios():
engine = GraphEngine(load_topology(TOPOLOGY_PATH))
handles = []
first_path = None
for i, (ss, sc, sp, ds, dc, dp) in enumerate(scn.issues):
pa = _hbm_pa(sip=ds, cube=dc, pe_id=dp,
offset=0x1000 + i * 0x100, slice_bytes=slice_bytes)
msg = PeDmaMsg(
correlation_id="pedma-cong", request_id=f"{scn.name}-{i}",
src_sip=ss, src_cube=sc, src_pe=sp,
dst_pa=pa, nbytes=nbytes,
)
handles.append(engine.submit(msg))
if first_path is None:
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
first_path = engine._router.find_path(
f"sip{ss}.cube{sc}.pe{sp}", dst_node)
for h in handles:
engine.wait(h)
latencies = [engine.get_completion(h)[1]["total_ns"] for h in handles]
makespan = max(latencies)
# Breakdown uses the first issuer's path as a representative;
# ``contention`` absorbs serialization across requests.
br = _path_breakdown(first_path or [], nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, makespan - formula_sum)
peak_bw = (_bottleneck_bw(first_path or [], edge_map) or 0.0)
total_bytes = nbytes * len(scn.issues)
eff_bw = total_bytes / makespan if makespan > 0 else 0.0
util = (eff_bw / peak_bw * 100.0) if peak_bw > 0 else 0.0
rows.append({
"graph": "congestion",
"scenario": scn.name,
"label": scn.label,
"nbytes": nbytes,
"n_issuers": len(scn.issues),
"first_path": " -> ".join(_short_path(first_path or [])),
"makespan_ns": makespan,
"min_lat_ns": min(latencies) if latencies else 0.0,
"bottleneck_bw_gbs": peak_bw,
"effective_bw_gbs": eff_bw,
"util_pct": util,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
# ── Plotting ───────────────────────────────────────────────────────────
def _short_path(path: Iterable[str]) -> list[str]:
return [".".join(p.split(".")[-2:]) for p in path]
def _plot_bw_utilization(rows, title, out_path):
"""Plot Effective BW utilization (%) per scenario.
Each bar is util_pct = effective_bw / peak_bottleneck_bw × 100.
Annotation shows effective and peak in GB/s. A horizontal dashed
line marks 100 % (single-path peak); bars exceeding it indicate
the scenario uses multiple parallel resources (e.g. UCIe's 4
connections) beyond the bottleneck of any single path.
"""
n = len(rows)
labels = [r["label"] for r in rows]
util = [r.get("util_pct", 0.0) for r in rows]
eff = [r.get("effective_bw_gbs", 0.0) for r in rows]
peak = [r.get("bottleneck_bw_gbs", 0.0) for r in rows]
fig, ax = plt.subplots(figsize=(max(8, n * 1.4), 5.5))
# Colour bars by utilization band for quick scanning.
colours = ["#10b981" if u >= 70 else "#f59e0b" if u >= 40 else "#ef4444"
for u in util]
ax.bar(labels, util, color=colours, edgecolor="white", linewidth=0.5)
ax.axhline(100.0, color="grey", linestyle="--", linewidth=0.8,
label="single-path peak")
# Annotate each bar with util%, effective, and peak.
y_max = max(util + [100.0]) * 1.2
for i, (u, e, p) in enumerate(zip(util, eff, peak)):
ax.text(i, u + y_max * 0.012,
f"{u:.1f}%\n{e:.0f} / {p:.0f} GB/s",
ha="center", va="bottom", fontsize=8)
ax.set_ylabel("Effective BW utilization (%)")
ax.set_title(title)
ax.set_ylim(0, y_max)
ax.tick_params(axis="x", labelsize=8)
ax.legend(loc="upper right", fontsize=9, frameon=False)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
# ── CSV ────────────────────────────────────────────────────────────────
def _write_csv(no_cong_rows, cong_rows, out_path):
fields = [
"graph", "scenario", "label", "nbytes", "n_issuers",
"total_ns", "makespan_ns", "min_lat_ns",
"bottleneck_bw_gbs", "effective_bw_gbs", "util_pct",
"pe_setup", "noc_mesh", "ucie", "fabric", "streaming",
"hbm_ctrl", "contention",
"path", "first_path",
]
with open(out_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
w.writeheader()
for r in no_cong_rows + cong_rows:
w.writerow(r)
# ── Self-verification ──────────────────────────────────────────────────
def _verify(rows_no_cong, rows_cong) -> list[str]:
"""Return a list of human-readable issues; empty means PASS.
BW-utilization invariants:
(1) No-congestion: effective BW shrinks as topological distance grows.
(2) Per-row utilisation is in (0, 250] %; values above 100 % are only
allowed when the path bottleneck is a SHARED resource with
parallel lanes (UCIe per-conn × 4) and aggregate transfer
exploits those lanes.
(3) Single-issuer utilisation cannot exceed 100 %.
(4) Effective BW for a single request equals nbytes / latency.
(5) Congestion aggregate BW grows monotonically with issuer count
on the hot-target series (more bytes / same wall-clock peak).
(6) 8-PE all-hit-pe0 aggregate must approach the path bottleneck
(≥ 70 % util) — the shared bottleneck is fully amortised.
"""
issues = []
by_name = {r["scenario"]: r for r in rows_no_cong}
cong_map = {r["scenario"]: r for r in rows_cong}
# (1) No-congestion effective BW shrinks as distance grows
order = [
"local",
"same_cube_best",
"same_cube_worst",
"remote_cube_best",
"remote_cube_worst",
]
prev_bw = float("inf")
for n in order:
if n in by_name and by_name[n]["effective_bw_gbs"] >= prev_bw:
issues.append(
f"no_congestion: {n} effective BW "
f"({by_name[n]['effective_bw_gbs']:.1f} GB/s) not strictly "
f"smaller than previous ({prev_bw:.1f})"
)
prev_bw = min(prev_bw, by_name.get(n, {}).get("effective_bw_gbs", prev_bw))
# (2) Utilisation in (0, 250 %]; values > 100 only allowed on shared
# multi-lane resources (UCIe per_conn × 4 → 4-fold parallelism).
for r in rows_no_cong + rows_cong:
u = r.get("util_pct", 0.0)
if u <= 0:
issues.append(f"{r['scenario']}: non-positive util_pct={u}")
if u > 250:
issues.append(
f"{r['scenario']}: util_pct={u:.1f}% exceeds 250 % — "
f"likely a peak-BW or effective-BW miscompute"
)
# (3) Single-issuer utilisation cannot exceed 100 %.
for r in rows_no_cong:
u = r.get("util_pct", 0.0)
if u > 100.0 + 1e-3:
issues.append(
f"no_congestion {r['scenario']}: util_pct={u:.1f}% > 100% "
f"for single-issuer scenario (eff={r['effective_bw_gbs']:.1f}, "
f"peak={r['bottleneck_bw_gbs']:.1f})"
)
# (4) Effective BW for a single request = nbytes / total_ns
for r in rows_no_cong:
expected = r["nbytes"] / r["total_ns"] if r["total_ns"] > 0 else 0
got = r["effective_bw_gbs"]
if abs(got - expected) > 1e-3:
issues.append(
f"no_congestion {r['scenario']}: eff_bw={got:.3f} != "
f"nbytes/total_ns={expected:.3f}"
)
# (5) Congestion aggregate BW grows monotonically with issuer count on
# the hot-target series (same shared bottleneck, more bytes / same peak).
seq = ["ctrl_hot_1", "ctrl_hot_2", "ctrl_hot_3"]
last = 0.0
for n in seq:
if n in cong_map and cong_map[n]["effective_bw_gbs"] < last - 1e-6:
issues.append(
f"congestion: {n} aggregate BW dropped below prior "
f"({cong_map[n]['effective_bw_gbs']:.1f} < {last:.1f})"
)
last = max(last, cong_map.get(n, {}).get("effective_bw_gbs", last))
# (6) all_pe_to_pe0 must approach single-path peak (≥ 70 % util) —
# the shared r0c0 → hbm_ctrl.pe0 bottleneck is fully amortised when
# all 8 PEs target it.
if "all_pe_to_pe0" in cong_map:
u = cong_map["all_pe_to_pe0"]["util_pct"]
if u < 70.0:
issues.append(
f"congestion all_pe_to_pe0: util_pct={u:.1f}% < 70 % — "
f"8-PE hotspot should saturate the shared HBM CTRL path"
)
return issues
# ── Entry point ────────────────────────────────────────────────────────
def main(nbytes: int = DEFAULT_NBYTES) -> int:
OUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"== PE_DMA perf @ {nbytes} B per request ==")
print("Collecting NO-congestion scenarios...")
no_cong = _run_no_congestion(nbytes)
print("Collecting CONGESTION scenarios...")
cong = _run_congestion(nbytes)
print("\n-- No-congestion summary --")
for r in no_cong:
print(f" {r['scenario']:22s} total={r['total_ns']:7.1f} ns "
f"eff={r['effective_bw_gbs']:6.1f} peak={r['bottleneck_bw_gbs']:6.1f} "
f"GB/s util={r['util_pct']:5.1f}%")
print("\n-- Congestion summary --")
for r in cong:
agg_bytes = r["nbytes"] * r["n_issuers"]
print(f" {r['scenario']:22s} makespan={r['makespan_ns']:7.1f} ns "
f"agg_bytes={agg_bytes:>7d} "
f"eff={r['effective_bw_gbs']:6.1f} peak={r['bottleneck_bw_gbs']:6.1f} "
f"GB/s util={r['util_pct']:5.1f}%")
issues = _verify(no_cong, cong)
print("\n-- Self-verification --")
if not issues:
print(" PASS")
else:
for i, msg in enumerate(issues, 1):
print(f" [{i}] {msg}")
_plot_bw_utilization(
no_cong,
f"PE_DMA Effective BW utilization (no congestion, nbytes={nbytes})",
OUT_DIR / "no_congestion.png",
)
_plot_bw_utilization(
cong,
f"PE_DMA Effective BW utilization (congestion, "
f"agg = n_issuers × nbytes / makespan, nbytes={nbytes})",
OUT_DIR / "congestion.png",
)
_write_csv(no_cong, cong, OUT_DIR / "summary.csv")
print(f"\nWrote:\n {OUT_DIR / 'no_congestion.png'}\n"
f" {OUT_DIR / 'congestion.png'}\n"
f" {OUT_DIR / 'summary.csv'}")
return 0 if not issues else 1
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--n-bytes", type=int, default=DEFAULT_NBYTES,
help="bytes per DMA (default 16384)")
args = p.parse_args()
raise SystemExit(main(nbytes=args.n_bytes))