PE_DMA perf: dual-peak utilisation (single-path + aggregate)

Each scenario now shows TWO bars:

  util_single    = effective_bw / single-path peak × 100
                   (peak = min bw_gbs on first issuer's path)
  util_aggregate = effective_bw / aggregate-resource peak × 100
                   (peak = max-min fair share across concurrent paths)

Aggregate peak uses a max-min fair-share computation: each concurrent
path's sustainable share on an edge is bw_gbs / usage_count, the
per-path throughput is the min share along its edges, and the aggregate
peak is the sum across paths. This produces the correct answer for both
shared-bottleneck scenarios (N paths converge on one wire → aggregate =
wire BW) and multi-lane shared resources (UCIe's 4 connections used in
parallel → aggregate ≈ 4 × per-conn BW), without enumerating max-flow.

Single-issuer (no_congestion) → util_single == util_aggregate by
definition. Congestion exposes the divergence:
  ctrl_hot_{1,2,3}, all_pe_to_pe0 → both metrics agree (one shared
                    bottleneck: r0c0→hbm_ctrl.pe0 @ 256 GB/s)
  8×PE eastbound → util_single=106 % (single conn @ 128 GB/s) but
                    util_aggregate=85 % (UCIe-W.conn0 @ 7-way shared,
                    aggregate peak ≈ 160 GB/s under the current
                    cross-cube routing that funnels via cube1.r0c0).

Verification updated to assert:
  (2) util_aggregate ≤ 100 % (effective BW can't exceed the aggregate
      resource peak, by construction).
  (3) single-issuer util_single == util_aggregate.
  (7) ucie_eastbound: util_aggregate is meaningfully smaller than
      util_single (the multi-lane peak correction is observable).

CSV grows with peak_aggregate_bw_gbs and util_aggregate_pct columns;
breakdown columns retained.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-15 08:53:00 -07:00
parent 0bf220fed0
commit a143925a12
4 changed files with 173 additions and 74 deletions
+161 -62
View File
@@ -136,6 +136,48 @@ def _bottleneck_bw(path: list[str], edge_map: dict) -> float | None:
return min(bws) if bws else None
def _aggregate_peak_bw(paths: list[list[str]], edge_map: dict) -> float:
"""Max-min fair-share aggregate throughput across concurrent paths.
Each path is one unit of demand from source to destination. For each
edge, fair share per path = ``bw_gbs / usage_count``. A path's
sustainable throughput is the minimum fair share along its edges,
and the aggregate peak is the sum across paths. This produces the
correct answer for both shared-bottleneck scenarios (all paths
converge on one wire → aggregate = wire BW) and multi-lane shared
resources (UCIe's 4 connections used in parallel → aggregate = 4 ×
per-conn BW), without enumerating max-flow explicitly.
Examples:
* 3 paths sharing r0c0→hbm_ctrl.pe0 @ 256 GB/s
per-path = 256/3 ≈ 85.3, aggregate = 3 × 85.3 = 256 GB/s ✓
* 8 paths sharing 4 UCIe conns @ 128 GB/s (2 paths per conn)
per-path = 128/2 = 64, aggregate = 8 × 64 = 512 GB/s ✓
* 1 path through 256 GB/s bottleneck
per-path = 256, aggregate = 256 GB/s ✓ (= single-path peak)
"""
from collections import Counter
edge_usage: Counter = Counter()
for path in paths:
for i in range(len(path) - 1):
edge_usage[(path[i], path[i + 1])] += 1
aggregate = 0.0
for path in paths:
per_path = float("inf")
for i in range(len(path) - 1):
key = (path[i], path[i + 1])
e = edge_map.get(key)
if e and e.bw_gbs:
share = e.bw_gbs / edge_usage[key]
if share < per_path:
per_path = share
if per_path != float("inf"):
aggregate += per_path
return aggregate
def _path_breakdown(
path: list[str], nbytes: int, graph, edge_map, ns_per_mm: float,
) -> dict[str, float]:
@@ -257,9 +299,11 @@ def _run_no_congestion(nbytes: int):
br = _path_breakdown(path, nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, total_ns - formula_sum)
peak_bw = _bottleneck_bw(path, edge_map) or 0.0
peak_single = _bottleneck_bw(path, edge_map) or 0.0
peak_aggregate = _aggregate_peak_bw([path], edge_map)
eff_bw = nbytes / total_ns if total_ns > 0 else 0.0
util = (eff_bw / peak_bw * 100.0) if peak_bw > 0 else 0.0
util_single = (eff_bw / peak_single * 100.0) if peak_single > 0 else 0.0
util_aggregate = (eff_bw / peak_aggregate * 100.0) if peak_aggregate > 0 else 0.0
rows.append({
"graph": "no_congestion",
"scenario": scn.name,
@@ -268,9 +312,11 @@ def _run_no_congestion(nbytes: int):
"n_issuers": 1,
"path": " -> ".join(_short_path(path)),
"total_ns": total_ns,
"bottleneck_bw_gbs": peak_bw,
"peak_single_bw_gbs": peak_single,
"peak_aggregate_bw_gbs": peak_aggregate,
"effective_bw_gbs": eff_bw,
"util_pct": util,
"util_single_pct": util_single,
"util_aggregate_pct": util_aggregate,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
@@ -333,7 +379,7 @@ def _run_congestion(nbytes: int):
for scn in _congestion_scenarios():
engine = GraphEngine(load_topology(TOPOLOGY_PATH))
handles = []
first_path = None
paths: list[list[str]] = []
for i, (ss, sc, sp, ds, dc, dp) in enumerate(scn.issues):
pa = _hbm_pa(sip=ds, cube=dc, pe_id=dp,
offset=0x1000 + i * 0x100, slice_bytes=slice_bytes)
@@ -343,10 +389,10 @@ def _run_congestion(nbytes: int):
dst_pa=pa, nbytes=nbytes,
)
handles.append(engine.submit(msg))
if first_path is None:
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
first_path = engine._router.find_path(
f"sip{ss}.cube{sc}.pe{sp}", dst_node)
dst_node = engine._resolver.resolve(PhysAddr.decode(pa))
paths.append(engine._router.find_path(
f"sip{ss}.cube{sc}.pe{sp}", dst_node))
first_path = paths[0] if paths else []
for h in handles:
engine.wait(h)
latencies = [engine.get_completion(h)[1]["total_ns"] for h in handles]
@@ -354,25 +400,29 @@ def _run_congestion(nbytes: int):
# Breakdown uses the first issuer's path as a representative;
# ``contention`` absorbs serialization across requests.
br = _path_breakdown(first_path or [], nbytes, graph, edge_map, ns_per_mm)
br = _path_breakdown(first_path, nbytes, graph, edge_map, ns_per_mm)
formula_sum = sum(br.values())
br["contention"] = max(0.0, makespan - formula_sum)
peak_bw = (_bottleneck_bw(first_path or [], edge_map) or 0.0)
peak_single = _bottleneck_bw(first_path, edge_map) or 0.0
peak_aggregate = _aggregate_peak_bw(paths, edge_map)
total_bytes = nbytes * len(scn.issues)
eff_bw = total_bytes / makespan if makespan > 0 else 0.0
util = (eff_bw / peak_bw * 100.0) if peak_bw > 0 else 0.0
util_single = (eff_bw / peak_single * 100.0) if peak_single > 0 else 0.0
util_aggregate = (eff_bw / peak_aggregate * 100.0) if peak_aggregate > 0 else 0.0
rows.append({
"graph": "congestion",
"scenario": scn.name,
"label": scn.label,
"nbytes": nbytes,
"n_issuers": len(scn.issues),
"first_path": " -> ".join(_short_path(first_path or [])),
"first_path": " -> ".join(_short_path(first_path)),
"makespan_ns": makespan,
"min_lat_ns": min(latencies) if latencies else 0.0,
"bottleneck_bw_gbs": peak_bw,
"peak_single_bw_gbs": peak_single,
"peak_aggregate_bw_gbs": peak_aggregate,
"effective_bw_gbs": eff_bw,
"util_pct": util,
"util_single_pct": util_single,
"util_aggregate_pct": util_aggregate,
**{c: br.get(c, 0.0) for c, _ in CATEGORIES},
})
return rows
@@ -386,39 +436,60 @@ def _short_path(path: Iterable[str]) -> list[str]:
def _plot_bw_utilization(rows, title, out_path):
"""Plot Effective BW utilization (%) per scenario.
"""Plot Effective BW utilization (%) per scenario with TWO bars:
Each bar is util_pct = effective_bw / peak_bottleneck_bw × 100.
Annotation shows effective and peak in GB/s. A horizontal dashed
line marks 100 % (single-path peak); bars exceeding it indicate
the scenario uses multiple parallel resources (e.g. UCIe's 4
connections) beyond the bottleneck of any single path.
util_single = effective_bw / single-path peak × 100
util_aggregate = effective_bw / aggregate-resource peak × 100
The aggregate peak sums the BW of *distinct* bottleneck edges across
all issuer paths — modelling multi-lane shared resources (e.g. UCIe's
4 connections) correctly. For scenarios where all paths share one
bottleneck wire the two peaks are equal and the bars match.
The dashed line at 100 % is the saturation reference for both
metrics. util_single can exceed 100 % when multi-lane resources are
used; util_aggregate is bounded by 100 % by construction (since the
aggregate peak is the upper bound on aggregate throughput).
"""
import numpy as np
n = len(rows)
labels = [r["label"] for r in rows]
util = [r.get("util_pct", 0.0) for r in rows]
util_s = [r.get("util_single_pct", 0.0) for r in rows]
util_a = [r.get("util_aggregate_pct", 0.0) for r in rows]
eff = [r.get("effective_bw_gbs", 0.0) for r in rows]
peak = [r.get("bottleneck_bw_gbs", 0.0) for r in rows]
peak_s = [r.get("peak_single_bw_gbs", 0.0) for r in rows]
peak_a = [r.get("peak_aggregate_bw_gbs", 0.0) for r in rows]
fig, ax = plt.subplots(figsize=(max(8, n * 1.4), 5.5))
# Colour bars by utilization band for quick scanning.
colours = ["#10b981" if u >= 70 else "#f59e0b" if u >= 40 else "#ef4444"
for u in util]
ax.bar(labels, util, color=colours, edgecolor="white", linewidth=0.5)
fig, ax = plt.subplots(figsize=(max(9, n * 1.6), 6.0))
x = np.arange(n)
w = 0.38
ax.bar(x - w / 2, util_s, w, color="#6366f1",
edgecolor="white", linewidth=0.5,
label="util vs single-path peak")
ax.bar(x + w / 2, util_a, w, color="#10b981",
edgecolor="white", linewidth=0.5,
label="util vs aggregate-resource peak")
ax.axhline(100.0, color="grey", linestyle="--", linewidth=0.8,
label="single-path peak")
label="saturation (100 %)")
# Annotate each bar with util%, effective, and peak.
y_max = max(util + [100.0]) * 1.2
for i, (u, e, p) in enumerate(zip(util, eff, peak)):
ax.text(i, u + y_max * 0.012,
f"{u:.1f}%\n{e:.0f} / {p:.0f} GB/s",
ha="center", va="bottom", fontsize=8)
y_max = max(util_s + util_a + [100.0]) * 1.30
for i in range(n):
ax.text(i - w / 2, util_s[i] + y_max * 0.012,
f"{util_s[i]:.0f}%\n/{peak_s[i]:.0f}",
ha="center", va="bottom", fontsize=7)
ax.text(i + w / 2, util_a[i] + y_max * 0.012,
f"{util_a[i]:.0f}%\n/{peak_a[i]:.0f}",
ha="center", va="bottom", fontsize=7)
# Effective BW annotation underneath each pair
ax.text(i, -y_max * 0.04, f"eff={eff[i]:.0f} GB/s",
ha="center", va="top", fontsize=7, color="#444444")
ax.set_xticks(x)
ax.set_xticklabels(labels, fontsize=8)
ax.set_ylabel("Effective BW utilization (%)")
ax.set_title(title)
ax.set_ylim(0, y_max)
ax.tick_params(axis="x", labelsize=8)
ax.set_ylim(-y_max * 0.10, y_max)
ax.legend(loc="upper right", fontsize=9, frameon=False)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
@@ -432,7 +503,8 @@ def _write_csv(no_cong_rows, cong_rows, out_path):
fields = [
"graph", "scenario", "label", "nbytes", "n_issuers",
"total_ns", "makespan_ns", "min_lat_ns",
"bottleneck_bw_gbs", "effective_bw_gbs", "util_pct",
"peak_single_bw_gbs", "peak_aggregate_bw_gbs", "effective_bw_gbs",
"util_single_pct", "util_aggregate_pct",
"pe_setup", "noc_mesh", "ucie", "fabric", "streaming",
"hbm_ctrl", "contention",
"path", "first_path",
@@ -485,26 +557,37 @@ def _verify(rows_no_cong, rows_cong) -> list[str]:
)
prev_bw = min(prev_bw, by_name.get(n, {}).get("effective_bw_gbs", prev_bw))
# (2) Utilisation in (0, 250 %]; values > 100 only allowed on shared
# multi-lane resources (UCIe per_conn × 4 → 4-fold parallelism).
# (2) util_single in (0, 250 %]; util_aggregate in (0, 100 + ε %]
for r in rows_no_cong + rows_cong:
u = r.get("util_pct", 0.0)
if u <= 0:
issues.append(f"{r['scenario']}: non-positive util_pct={u}")
if u > 250:
us = r.get("util_single_pct", 0.0)
ua = r.get("util_aggregate_pct", 0.0)
if us <= 0 or ua <= 0:
issues.append(f"{r['scenario']}: non-positive util "
f"(single={us}, agg={ua})")
if us > 250:
issues.append(
f"{r['scenario']}: util_pct={u:.1f}% exceeds 250 % — "
f"likely a peak-BW or effective-BW miscompute"
f"{r['scenario']}: util_single={us:.1f}% > 250 % — "
f"likely a peak or effective BW miscompute"
)
if ua > 100.0 + 1.0: # 1 % numerical slack
issues.append(
f"{r['scenario']}: util_aggregate={ua:.1f}% > 100 % — "
f"effective BW must not exceed the aggregate resource peak"
)
# (3) Single-issuer utilisation cannot exceed 100 %.
# (3) Single-issuer utilisation (both metrics) cannot exceed 100 %.
for r in rows_no_cong:
u = r.get("util_pct", 0.0)
if u > 100.0 + 1e-3:
us = r.get("util_single_pct", 0.0)
ua = r.get("util_aggregate_pct", 0.0)
if us > 100.0 + 1e-3:
issues.append(
f"no_congestion {r['scenario']}: util_pct={u:.1f}% > 100% "
f"for single-issuer scenario (eff={r['effective_bw_gbs']:.1f}, "
f"peak={r['bottleneck_bw_gbs']:.1f})"
f"no_congestion {r['scenario']}: util_single={us:.1f}% > 100% "
f"for a single-issuer scenario"
)
if abs(us - ua) > 1e-3:
issues.append(
f"no_congestion {r['scenario']}: util_single ({us:.1f}) != "
f"util_aggregate ({ua:.1f}) — should match for single issuer"
)
# (4) Effective BW for a single request = nbytes / total_ns
@@ -518,7 +601,7 @@ def _verify(rows_no_cong, rows_cong) -> list[str]:
)
# (5) Congestion aggregate BW grows monotonically with issuer count on
# the hot-target series (same shared bottleneck, more bytes / same peak).
# the hot-target series.
seq = ["ctrl_hot_1", "ctrl_hot_2", "ctrl_hot_3"]
last = 0.0
for n in seq:
@@ -529,17 +612,27 @@ def _verify(rows_no_cong, rows_cong) -> list[str]:
)
last = max(last, cong_map.get(n, {}).get("effective_bw_gbs", last))
# (6) all_pe_to_pe0 must approach single-path peak (≥ 70 % util) —
# the shared r0c0 → hbm_ctrl.pe0 bottleneck is fully amortised when
# all 8 PEs target it.
# (6) all_pe_to_pe0 must approach the shared single-path peak.
if "all_pe_to_pe0" in cong_map:
u = cong_map["all_pe_to_pe0"]["util_pct"]
u = cong_map["all_pe_to_pe0"]["util_single_pct"]
if u < 70.0:
issues.append(
f"congestion all_pe_to_pe0: util_pct={u:.1f}% < 70 % — "
f"congestion all_pe_to_pe0: util_single={u:.1f}% < 70 % — "
f"8-PE hotspot should saturate the shared HBM CTRL path"
)
# (7) ucie_eastbound: util_aggregate should be meaningfully smaller
# than util_single (the multi-lane peak should pull the bar down).
if "ucie_eastbound" in cong_map:
e = cong_map["ucie_eastbound"]
if e["util_aggregate_pct"] >= e["util_single_pct"] - 5.0:
issues.append(
f"congestion ucie_eastbound: util_aggregate "
f"({e['util_aggregate_pct']:.1f}%) should be << "
f"util_single ({e['util_single_pct']:.1f}%) when UCIe's "
f"multi-lane peak applies"
)
return issues
@@ -558,15 +651,21 @@ def main(nbytes: int = DEFAULT_NBYTES) -> int:
print("\n-- No-congestion summary --")
for r in no_cong:
print(f" {r['scenario']:22s} total={r['total_ns']:7.1f} ns "
f"eff={r['effective_bw_gbs']:6.1f} peak={r['bottleneck_bw_gbs']:6.1f} "
f"GB/s util={r['util_pct']:5.1f}%")
f"eff={r['effective_bw_gbs']:6.1f} GB/s "
f"peak_s={r['peak_single_bw_gbs']:6.1f} "
f"peak_a={r['peak_aggregate_bw_gbs']:6.1f} "
f"util_s={r['util_single_pct']:5.1f}% "
f"util_a={r['util_aggregate_pct']:5.1f}%")
print("\n-- Congestion summary --")
for r in cong:
agg_bytes = r["nbytes"] * r["n_issuers"]
print(f" {r['scenario']:22s} makespan={r['makespan_ns']:7.1f} ns "
f"agg_bytes={agg_bytes:>7d} "
f"eff={r['effective_bw_gbs']:6.1f} peak={r['bottleneck_bw_gbs']:6.1f} "
f"GB/s util={r['util_pct']:5.1f}%")
f"eff={r['effective_bw_gbs']:6.1f} GB/s "
f"peak_s={r['peak_single_bw_gbs']:6.1f} "
f"peak_a={r['peak_aggregate_bw_gbs']:6.1f} "
f"util_s={r['util_single_pct']:5.1f}% "
f"util_a={r['util_aggregate_pct']:5.1f}%")
issues = _verify(no_cong, cong)
print("\n-- Self-verification --")