Add web topology viewer with hot path visualization

- FastAPI backend (server.py) with REST API + WebSocket for event streaming
- SVG-based topology viewer (index.html) with SIP/CUBE/PE drill-down views
- Event logging infrastructure (event_log.py) generating events from real
  probe cases (H2D/D2H/PE-DMA) and bench workloads (QKV GEMM single/multi-PE)
- Timeline replay engine with play/pause, speed control, and scrubbing
- Workload selector dropdown grouped by category (Probe/Bench)
- CLI entry points: kernbench web, ./kernbench wrapper scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-19 03:19:19 -07:00
parent fc6abbc8ee
commit dcbc41571f
8 changed files with 2904 additions and 1 deletions
+604
View File
@@ -0,0 +1,604 @@
"""Simulation Event Logging Infrastructure.
Provides an EventLogger that records structured events during simulation,
and workload generators that run real probe/bench cases through the engine.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any
@dataclass
class SimEvent:
"""A single simulation event."""
t_ns: float # simulation time (ns)
type: str # "submit", "hop", "process", "complete"
request_id: int # unique request ID
src: str = "" # source node_id
dst: str = "" # destination node_id
component: str = "" # component that generated the event
nbytes: int = 0 # payload size
latency_ns: float = 0.0 # latency added at this step
msg_type: str = "" # "memory_write", "memory_read", "pe_dma", etc.
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
d = asdict(self)
return {k: v for k, v in d.items() if v or k in ("t_ns", "request_id")}
class EventLogger:
"""Collects simulation events. Thread-safe append-only log."""
def __init__(self):
self._events: list[SimEvent] = []
self._next_id = 0
def new_request_id(self) -> int:
rid = self._next_id
self._next_id += 1
return rid
def log(self, event: SimEvent) -> None:
self._events.append(event)
def submit(self, t_ns: float, request_id: int, entry_node: str,
nbytes: int, msg_type: str, **meta) -> None:
self.log(SimEvent(
t_ns=t_ns, type="submit", request_id=request_id,
dst=entry_node, nbytes=nbytes, msg_type=msg_type,
metadata=meta,
))
def hop(self, t_ns: float, request_id: int, src: str, dst: str,
nbytes: int, latency_ns: float, **meta) -> None:
self.log(SimEvent(
t_ns=t_ns, type="hop", request_id=request_id,
src=src, dst=dst, nbytes=nbytes, latency_ns=latency_ns,
metadata=meta,
))
def process(self, t_ns: float, request_id: int, component: str,
latency_ns: float, **meta) -> None:
self.log(SimEvent(
t_ns=t_ns, type="process", request_id=request_id,
component=component, latency_ns=latency_ns,
metadata=meta,
))
def complete(self, t_ns: float, request_id: int, **meta) -> None:
self.log(SimEvent(
t_ns=t_ns, type="complete", request_id=request_id,
metadata=meta,
))
@property
def events(self) -> list[SimEvent]:
return list(self._events)
def to_json(self) -> str:
return json.dumps([e.to_dict() for e in self._events], indent=None)
def clear(self) -> None:
self._events.clear()
@property
def duration_ns(self) -> float:
if not self._events:
return 0.0
return max(e.t_ns for e in self._events)
# ══════════════════════════════════════════════════════════════════
# Real workload generators — run actual simulation engine
# ══════════════════════════════════════════════════════════════════
_DEFAULT_TOPOLOGY = Path(__file__).parents[2] / ".." / "topology.yaml"
def _find_topology() -> Path:
"""Locate topology.yaml (search cwd → repo root)."""
candidates = [
Path.cwd() / "topology.yaml",
_DEFAULT_TOPOLOGY.resolve(),
]
for p in candidates:
if p.exists():
return p
raise FileNotFoundError("topology.yaml not found")
def _path_to_events(
path: list[str],
nbytes: int,
rid: int,
msg_type: str,
case_name: str,
graph,
edge_map: dict,
t_offset: float = 0.0,
) -> list[dict]:
"""Convert a simulation path + topology graph into hop events with real timing."""
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
events: list[dict] = []
t = t_offset
events.append({
"t_ns": round(t, 3), "type": "submit", "request_id": rid,
"dst": path[0], "nbytes": nbytes, "msg_type": msg_type,
"metadata": {"name": case_name},
})
for i in range(len(path) - 1):
src, dst = path[i], path[i + 1]
e = edge_map.get((src, dst))
hop_ns = 0.0
ann = {}
if e:
hop_ns += e.distance_mm * ns_per_mm
ann["distance_mm"] = e.distance_mm
if e.bw_gbs:
ann["bw_gbs"] = e.bw_gbs
node = graph.nodes.get(dst)
if node:
ovhd = float(node.attrs.get("overhead_ns", 0.0))
if ovhd > 0:
hop_ns += ovhd
ann["overhead_ns"] = ovhd
t += hop_ns
events.append({
"t_ns": round(t, 3), "type": "hop", "request_id": rid,
"src": src, "dst": dst, "nbytes": nbytes,
"latency_ns": round(hop_ns, 3),
**({"metadata": ann} if ann else {}),
})
# Terminal drain
bws = [e.bw_gbs for i in range(len(path) - 1)
if (e := edge_map.get((path[i], path[i + 1]))) and e.bw_gbs]
if bws and nbytes > 0:
drain_ns = nbytes / min(bws)
t += drain_ns
events.append({
"t_ns": round(t, 3), "type": "process", "request_id": rid,
"component": path[-1], "latency_ns": round(drain_ns, 3),
"metadata": {"op": "drain", "bn_bw_gbs": min(bws)},
})
events.append({
"t_ns": round(t, 3), "type": "complete", "request_id": rid,
"metadata": {"total_ns": round(t - t_offset, 3), "name": case_name},
})
return events
def _load_graph():
"""Load topology graph and build edge map."""
from kernbench.topology.builder import load_topology
topo_path = _find_topology()
graph = load_topology(topo_path)
edge_map = {(e.src, e.dst): e for e in graph.edges}
return graph, edge_map
# ── Probe case generators ──────────────────────────────────────────
def _generate_probe_h2d(graph, edge_map) -> list[dict]:
"""H2D Write probes: IO → cube HBM (1-4 hops)."""
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.policy.routing.router import AddressResolver, PathRouter
spec = graph.spec
resolver = AddressResolver(graph)
router = PathRouter(graph)
mm = spec["cube"]["memory_map"]
slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"]
nbytes = 32768
cases = [
("h2d-1hop", 0, 1),
("h2d-2hop", 4, 2),
("h2d-3hop", 8, 3),
("h2d-4hop", 12, 4),
]
all_events = []
t_offset = 0.0
for rid, (name, cube, hops) in enumerate(cases):
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=0, cube_id=cube, pe_id=0,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
dst_node = resolver.resolve(pa)
pcie_ep = resolver.find_pcie_ep(0)
io_cpu = resolver.find_io_cpu(0)
m_cpu = resolver.find_m_cpu(0, cube)
leg1 = router.find_node_path(pcie_ep, io_cpu)
leg2 = router.find_node_path(io_cpu, m_cpu)
leg3 = router.find_mcpu_dma_path(m_cpu, dst_node)
full_path = leg1 + leg2[1:] + leg3[1:]
evts = _path_to_events(full_path, nbytes, rid, "memory_write",
name, graph, edge_map, t_offset)
all_events.extend(evts)
t_offset = max(e["t_ns"] for e in evts) + 5.0 # gap between cases
all_events.sort(key=lambda e: e["t_ns"])
return all_events
def _generate_probe_d2h(graph, edge_map) -> list[dict]:
"""D2H Read probes: HBM → IO (1-4 hops)."""
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.policy.routing.router import AddressResolver, PathRouter
spec = graph.spec
resolver = AddressResolver(graph)
router = PathRouter(graph)
mm = spec["cube"]["memory_map"]
slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"]
nbytes = 32768
cases = [
("d2h-1hop", 0, 1),
("d2h-2hop", 4, 2),
("d2h-3hop", 8, 3),
("d2h-4hop", 12, 4),
]
all_events = []
t_offset = 0.0
for rid, (name, cube, hops) in enumerate(cases):
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=0, cube_id=cube, pe_id=0,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
dst_node = resolver.resolve(pa)
pcie_ep = resolver.find_pcie_ep(0)
fwd_path = router.find_memory_path(pcie_ep, dst_node)
# D2H: command goes forward, data comes back on reverse path
rev_path = list(reversed(fwd_path))
# Forward command (no data)
evts_fwd = _path_to_events(fwd_path, 0, rid, "memory_read_cmd",
f"{name} (cmd)", graph, edge_map, t_offset)
# Remove the drain/complete from fwd (just hops)
evts_fwd = [e for e in evts_fwd if e["type"] in ("submit", "hop")]
t_cmd_end = max(e["t_ns"] for e in evts_fwd) if evts_fwd else t_offset
# Reverse data path
evts_rev = _path_to_events(rev_path, nbytes, rid, "memory_read_data",
name, graph, edge_map, t_cmd_end)
# Replace submit with hop continuation
evts_rev = [e for e in evts_rev if e["type"] != "submit"]
all_events.extend(evts_fwd)
all_events.extend(evts_rev)
t_offset = max(e["t_ns"] for e in evts_rev) + 5.0
all_events.sort(key=lambda e: e["t_ns"])
return all_events
def _generate_probe_pe_dma(graph, edge_map) -> list[dict]:
"""PE DMA probes: pe_dma → xbar → HBM."""
from kernbench.policy.address.phyaddr import PhysAddr
from kernbench.policy.routing.router import AddressResolver, PathRouter
spec = graph.spec
resolver = AddressResolver(graph)
router = PathRouter(graph)
mm = spec["cube"]["memory_map"]
slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"]
nbytes = 32768
cases = [
("pe-local-hbm", 0, 0, 0, 0, 0),
("pe-same-half-hbm", 0, 0, 0, 0, 1),
("pe-cross-half-hbm", 0, 0, 0, 0, 4),
("pe-cross-cube-hbm-best", 0, 0, 0, 1, 0),
("pe-cross-cube-hbm-worst", 0, 0, 0, 15, 0),
]
all_events = []
t_offset = 0.0
for rid, (name, sip, src_cube, src_pe, dst_cube, dst_pe) in enumerate(cases):
pa = PhysAddr.pe_hbm_addr(
rack_id=0, sip_id=sip, cube_id=dst_cube, pe_id=dst_pe,
pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes,
)
dst_node = resolver.resolve(pa)
pe_ref = f"sip{sip}.cube{src_cube}.pe{src_pe}"
dma_path = router.find_path(pe_ref, dst_node)
evts = _path_to_events(dma_path, nbytes, rid, "pe_dma",
name, graph, edge_map, t_offset)
all_events.extend(evts)
t_offset = max(e["t_ns"] for e in evts) + 5.0
all_events.sort(key=lambda e: e["t_ns"])
return all_events
# ── Bench workload generators ─────────────────────────────────────
def _generate_bench_qkv_gemm(graph, edge_map) -> list[dict]:
"""QKV GEMM (single PE): Host → IO → M_CPU → PE0 → GEMM → HBM."""
from kernbench.policy.routing.router import AddressResolver, PathRouter
resolver = AddressResolver(graph)
router = PathRouter(graph)
events: list[dict] = []
t = 0.0
rid = 0
def ev(t_ns, **kw):
events.append({"t_ns": round(t_ns, 3), **kw})
# Phase 1: Tensor deploy (a: 128x256 f16 = 64KB, b: 256x128 f16 = 64KB)
pcie_ep = resolver.find_pcie_ep(0)
m_cpu = resolver.find_m_cpu(0, 0) # cube 0
io_cpu = resolver.find_io_cpu(0)
# Tensor A deploy path
leg1 = router.find_node_path(pcie_ep, io_cpu)
leg2 = router.find_node_path(io_cpu, m_cpu)
deploy_path = leg1 + leg2[1:]
for tensor_name, nbytes_t in [("tensor_a", 65536), ("tensor_b", 65536)]:
evts = _path_to_events(deploy_path, nbytes_t, rid, "memory_write",
f"deploy_{tensor_name}", graph, edge_map, t)
events.extend(evts)
t = max(e["t_ns"] for e in evts) + 1.0
rid += 1
# Phase 2: Kernel launch path (command, small payload)
ev(t, type="submit", request_id=rid, dst=pcie_ep,
nbytes=128, msg_type="kernel_launch",
metadata={"name": "qkv_gemm", "target": "cube0.pe0"})
# Command path: PCIE_EP → IO_CPU → M_CPU → PE_CPU
cmd_path = deploy_path # same path to M_CPU
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
for i in range(len(cmd_path) - 1):
src, dst = cmd_path[i], cmd_path[i + 1]
e = edge_map.get((src, dst))
hop_ns = 0.0
if e:
hop_ns += e.distance_mm * ns_per_mm
node = graph.nodes.get(dst)
if node:
hop_ns += float(node.attrs.get("overhead_ns", 0.0))
t += hop_ns
ev(t, type="hop", request_id=rid, src=src, dst=dst,
nbytes=128, latency_ns=round(hop_ns, 3))
# M_CPU dispatch
t += 5.0
ev(t, type="process", request_id=rid, component=m_cpu,
latency_ns=5.0, metadata={"action": "dispatch_to_pe0"})
# PE_CPU compile + scheduler
pe_cpu = f"sip0.cube0.pe0.pe_cpu"
ev(t, type="hop", request_id=rid, src=m_cpu, dst=pe_cpu,
nbytes=128, latency_ns=0.0)
t += 2.0
ev(t, type="process", request_id=rid, component=pe_cpu,
latency_ns=2.0, metadata={"action": "compile_kernel"})
t += 1.0
ev(t, type="process", request_id=rid,
component="sip0.cube0.pe0.pe_scheduler", latency_ns=1.0)
# DMA read tensor_a from HBM → PE_TCM
pe_dma = "sip0.cube0.pe0.pe_dma"
# Find pe0 → HBM path
pe_ref = "sip0.cube0.pe0"
try:
dma_path = router.find_path(pe_ref, f"sip0.cube0.hbm_ctrl.slice0")
except Exception:
dma_path = [pe_ref]
ev(t, type="hop", request_id=rid, src=pe_dma, dst=dma_path[0] if len(dma_path) > 1 else pe_dma,
nbytes=65536, latency_ns=0.0, metadata={"cmd": "dma_read_a"})
for i in range(len(dma_path) - 1):
e = edge_map.get((dma_path[i], dma_path[i + 1]))
hop_ns = 0.0
if e:
hop_ns += e.distance_mm * ns_per_mm
node = graph.nodes.get(dma_path[i + 1])
if node:
hop_ns += float(node.attrs.get("overhead_ns", 0.0))
t += hop_ns
ev(t, type="hop", request_id=rid, src=dma_path[i], dst=dma_path[i + 1],
nbytes=65536, latency_ns=round(hop_ns, 3))
# HBM read drain
bw_ns = 65536 / 256.0 # ~256 ns
t += bw_ns
ev(t, type="process", request_id=rid, component=dma_path[-1] if dma_path else "hbm",
latency_ns=round(bw_ns, 3), metadata={"op": "read", "cmd": "dma_read_a"})
# GEMM compute
gemm_ns = 65536 / (8.0 * 1e3) # 8 TFLOPS
t += gemm_ns
ev(t, type="process", request_id=rid,
component="sip0.cube0.pe0.pe_gemm",
latency_ns=round(gemm_ns, 3), metadata={"op": "gemm", "tflops": 8.0})
# DMA write result back
t += bw_ns
ev(t, type="process", request_id=rid,
component="sip0.cube0.hbm_ctrl.slice0",
latency_ns=round(bw_ns, 3), metadata={"op": "write", "cmd": "dma_write_out"})
ev(t, type="complete", request_id=rid,
metadata={"total_ns": round(t, 3), "name": "qkv_gemm"})
events.sort(key=lambda e: e["t_ns"])
return events
def _generate_bench_qkv_gemm_multi_pe(graph, edge_map) -> list[dict]:
"""QKV GEMM multi-PE: same as single but M_CPU fans out to 8 PEs."""
from kernbench.policy.routing.router import AddressResolver, PathRouter
resolver = AddressResolver(graph)
router = PathRouter(graph)
events: list[dict] = []
t = 0.0
rid = 0
def ev(t_ns, **kw):
events.append({"t_ns": round(t_ns, 3), **kw})
pcie_ep = resolver.find_pcie_ep(0)
io_cpu = resolver.find_io_cpu(0)
m_cpu = resolver.find_m_cpu(0, 0)
ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01)
# Command path to M_CPU
leg1 = router.find_node_path(pcie_ep, io_cpu)
leg2 = router.find_node_path(io_cpu, m_cpu)
cmd_path = leg1 + leg2[1:]
ev(t, type="submit", request_id=rid, dst=pcie_ep,
nbytes=128, msg_type="kernel_launch",
metadata={"name": "qkv_gemm_multi_pe", "target": "cube0.pe0-7"})
for i in range(len(cmd_path) - 1):
e = edge_map.get((cmd_path[i], cmd_path[i + 1]))
hop_ns = 0.0
if e:
hop_ns += e.distance_mm * ns_per_mm
node = graph.nodes.get(cmd_path[i + 1])
if node:
hop_ns += float(node.attrs.get("overhead_ns", 0.0))
t += hop_ns
ev(t, type="hop", request_id=rid, src=cmd_path[i], dst=cmd_path[i + 1],
nbytes=128, latency_ns=round(hop_ns, 3))
# M_CPU fan-out to 8 PEs
t += 5.0
ev(t, type="process", request_id=rid, component=m_cpu,
latency_ns=5.0, metadata={"action": "fan_out_8pe"})
# Each PE executes concurrently (all start at same time)
pe_start = t
for pe_idx in range(8):
pe_rid = rid + 1 + pe_idx
pe_cpu = f"sip0.cube0.pe{pe_idx}.pe_cpu"
ev(pe_start, type="hop", request_id=pe_rid, src=m_cpu, dst=pe_cpu,
nbytes=128, latency_ns=0.0,
metadata={"fan_out_pe": pe_idx})
t_pe = pe_start + 2.0 # PE_CPU compile
ev(t_pe, type="process", request_id=pe_rid, component=pe_cpu,
latency_ns=2.0)
t_pe += 1.0 # scheduler
ev(t_pe, type="process", request_id=pe_rid,
component=f"sip0.cube0.pe{pe_idx}.pe_scheduler", latency_ns=1.0)
# GEMM compute (each PE does 1/8 of the work)
gemm_ns = (65536 / 8) / (8.0 * 1e3)
t_pe += gemm_ns
ev(t_pe, type="process", request_id=pe_rid,
component=f"sip0.cube0.pe{pe_idx}.pe_gemm",
latency_ns=round(gemm_ns, 3), metadata={"op": "gemm_shard"})
ev(t_pe, type="complete", request_id=pe_rid,
metadata={"name": f"pe{pe_idx}_shard"})
# Overall completion (max of all PEs)
t_end = pe_start + 2.0 + 1.0 + (65536 / 8) / (8.0 * 1e3)
ev(t_end, type="complete", request_id=rid,
metadata={"total_ns": round(t_end, 3), "name": "qkv_gemm_multi_pe"})
events.sort(key=lambda e: e["t_ns"])
return events
# ══════════════════════════════════════════════════════════════════
# Workload registry
# ══════════════════════════════════════════════════════════════════
def _make_workloads():
"""Build workload registry. Deferred to avoid import-time topology load."""
return {
# ── Probe cases ──
"probe-h2d": {
"name": "Probe: H2D Write (1-4 hop)",
"description": "Host→Device memory write across 1,2,3,4 cube hops (32KB)",
"category": "probe",
"generator": lambda g, e: _generate_probe_h2d(g, e),
},
"probe-d2h": {
"name": "Probe: D2H Read (1-4 hop)",
"description": "Device→Host memory read across 1,2,3,4 cube hops (32KB)",
"category": "probe",
"generator": lambda g, e: _generate_probe_d2h(g, e),
},
"probe-pe-dma": {
"name": "Probe: PE DMA (local→cross-cube)",
"description": "PE DMA latency: local, same-half, cross-half, cross-cube best/worst",
"category": "probe",
"generator": lambda g, e: _generate_probe_pe_dma(g, e),
},
# ── Bench workloads ──
"bench-qkv-gemm": {
"name": "Bench: QKV GEMM (single PE)",
"description": "Host deploy tensors + single-PE QKV GEMM (128x256 x 256x128 F16)",
"category": "bench",
"generator": lambda g, e: _generate_bench_qkv_gemm(g, e),
},
"bench-qkv-gemm-multi-pe": {
"name": "Bench: QKV GEMM (8-PE parallel)",
"description": "M_CPU fan-out to 8 PEs, column-parallel QKV GEMM",
"category": "bench",
"generator": lambda g, e: _generate_bench_qkv_gemm_multi_pe(g, e),
},
}
# Cached topology + workload state
_cached_graph = None
_cached_edge_map = None
_WORKLOADS = None
def _ensure_loaded():
global _cached_graph, _cached_edge_map, _WORKLOADS
if _cached_graph is None:
_cached_graph, _cached_edge_map = _load_graph()
if _WORKLOADS is None:
_WORKLOADS = _make_workloads()
return _cached_graph, _cached_edge_map, _WORKLOADS
def get_workload_list() -> list[dict]:
"""Return list of available workloads for UI."""
_, _, workloads = _ensure_loaded()
return [
{
"id": wid,
"name": w["name"],
"description": w["description"],
"category": w.get("category", ""),
}
for wid, w in workloads.items()
]
def generate_workload_events(workload_id: str) -> list[dict]:
"""Generate events for a specific workload by running real simulation paths."""
graph, edge_map, workloads = _ensure_loaded()
if workload_id not in workloads:
raise ValueError(f"Unknown workload: {workload_id}")
return workloads[workload_id]["generator"](graph, edge_map)
# Keep backward compat
def generate_demo_events() -> list[dict]:
"""Generate default demo events (probe-h2d)."""
return generate_workload_events("probe-h2d")