diff --git a/kernbench b/kernbench new file mode 100644 index 0000000..ece1a12 --- /dev/null +++ b/kernbench @@ -0,0 +1,6 @@ +#!/usr/bin/env python +"""KernBench CLI entry point — run from project root.""" +import sys +from kernbench.cli.main import main + +sys.exit(main()) diff --git a/kernbench.cmd b/kernbench.cmd new file mode 100644 index 0000000..b0097a6 --- /dev/null +++ b/kernbench.cmd @@ -0,0 +1,2 @@ +@echo off +C:\Users\ywkang\AppData\Local\Python\pythoncore-3.14-64\python.exe "%~dp0kernbench" %* diff --git a/pyproject.toml b/pyproject.toml index 579aa33..93ee45e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,11 +6,15 @@ build-backend = "setuptools.build_meta" name = "kernbench" version = "0.1.0" requires-python = ">=3.10" -dependencies = ["pytest", "simpy", "pyyaml"] +dependencies = ["pytest", "simpy", "pyyaml", "fastapi>=0.110", "uvicorn[standard]>=0.29", "websockets>=12"] [project.scripts] kernbench = "kernbench.cli.main:main" +[tool.setuptools.packages.find] +where = ["src", "."] +include = ["kernbench*", "benches*"] + [project.optional-dependencies] dev = [ "pytest>=7", diff --git a/src/kernbench/cli/main.py b/src/kernbench/cli/main.py index 93c724e..9163c08 100644 --- a/src/kernbench/cli/main.py +++ b/src/kernbench/cli/main.py @@ -28,6 +28,11 @@ def build_parser() -> argparse.ArgumentParser: probep.add_argument("--case", default="all", help="Case name or 'all' (default: all)") probep.set_defaults(_handler=cmd_probe) + webp = sub.add_parser("web", help="Launch topology viewer in browser") + webp.add_argument("--port", type=int, default=8765, help="HTTP port (default: 8765)") + webp.add_argument("--no-open", action="store_true", help="Don't auto-open browser") + webp.set_defaults(_handler=cmd_web) + return p @@ -36,6 +41,12 @@ def engine_factory(topology: object, device: DeviceSelector) -> SimEngine: return GraphEngine(topo_obj) +def cmd_web(args) -> int: + from kernbench.web.server import serve + serve(port=args.port, open_browser=not args.no_open) + return 0 + + def cmd_run(args) -> int: print("> Running benchmark with:", args) diff --git a/src/kernbench/sim_engine/event_log.py b/src/kernbench/sim_engine/event_log.py new file mode 100644 index 0000000..c86e69a --- /dev/null +++ b/src/kernbench/sim_engine/event_log.py @@ -0,0 +1,604 @@ +"""Simulation Event Logging Infrastructure. + +Provides an EventLogger that records structured events during simulation, +and workload generators that run real probe/bench cases through the engine. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any + + +@dataclass +class SimEvent: + """A single simulation event.""" + t_ns: float # simulation time (ns) + type: str # "submit", "hop", "process", "complete" + request_id: int # unique request ID + src: str = "" # source node_id + dst: str = "" # destination node_id + component: str = "" # component that generated the event + nbytes: int = 0 # payload size + latency_ns: float = 0.0 # latency added at this step + msg_type: str = "" # "memory_write", "memory_read", "pe_dma", etc. + metadata: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + d = asdict(self) + return {k: v for k, v in d.items() if v or k in ("t_ns", "request_id")} + + +class EventLogger: + """Collects simulation events. Thread-safe append-only log.""" + + def __init__(self): + self._events: list[SimEvent] = [] + self._next_id = 0 + + def new_request_id(self) -> int: + rid = self._next_id + self._next_id += 1 + return rid + + def log(self, event: SimEvent) -> None: + self._events.append(event) + + def submit(self, t_ns: float, request_id: int, entry_node: str, + nbytes: int, msg_type: str, **meta) -> None: + self.log(SimEvent( + t_ns=t_ns, type="submit", request_id=request_id, + dst=entry_node, nbytes=nbytes, msg_type=msg_type, + metadata=meta, + )) + + def hop(self, t_ns: float, request_id: int, src: str, dst: str, + nbytes: int, latency_ns: float, **meta) -> None: + self.log(SimEvent( + t_ns=t_ns, type="hop", request_id=request_id, + src=src, dst=dst, nbytes=nbytes, latency_ns=latency_ns, + metadata=meta, + )) + + def process(self, t_ns: float, request_id: int, component: str, + latency_ns: float, **meta) -> None: + self.log(SimEvent( + t_ns=t_ns, type="process", request_id=request_id, + component=component, latency_ns=latency_ns, + metadata=meta, + )) + + def complete(self, t_ns: float, request_id: int, **meta) -> None: + self.log(SimEvent( + t_ns=t_ns, type="complete", request_id=request_id, + metadata=meta, + )) + + @property + def events(self) -> list[SimEvent]: + return list(self._events) + + def to_json(self) -> str: + return json.dumps([e.to_dict() for e in self._events], indent=None) + + def clear(self) -> None: + self._events.clear() + + @property + def duration_ns(self) -> float: + if not self._events: + return 0.0 + return max(e.t_ns for e in self._events) + + +# ══════════════════════════════════════════════════════════════════ +# Real workload generators — run actual simulation engine +# ══════════════════════════════════════════════════════════════════ + +_DEFAULT_TOPOLOGY = Path(__file__).parents[2] / ".." / "topology.yaml" + + +def _find_topology() -> Path: + """Locate topology.yaml (search cwd → repo root).""" + candidates = [ + Path.cwd() / "topology.yaml", + _DEFAULT_TOPOLOGY.resolve(), + ] + for p in candidates: + if p.exists(): + return p + raise FileNotFoundError("topology.yaml not found") + + +def _path_to_events( + path: list[str], + nbytes: int, + rid: int, + msg_type: str, + case_name: str, + graph, + edge_map: dict, + t_offset: float = 0.0, +) -> list[dict]: + """Convert a simulation path + topology graph into hop events with real timing.""" + ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01) + events: list[dict] = [] + t = t_offset + + events.append({ + "t_ns": round(t, 3), "type": "submit", "request_id": rid, + "dst": path[0], "nbytes": nbytes, "msg_type": msg_type, + "metadata": {"name": case_name}, + }) + + for i in range(len(path) - 1): + src, dst = path[i], path[i + 1] + e = edge_map.get((src, dst)) + hop_ns = 0.0 + ann = {} + if e: + hop_ns += e.distance_mm * ns_per_mm + ann["distance_mm"] = e.distance_mm + if e.bw_gbs: + ann["bw_gbs"] = e.bw_gbs + node = graph.nodes.get(dst) + if node: + ovhd = float(node.attrs.get("overhead_ns", 0.0)) + if ovhd > 0: + hop_ns += ovhd + ann["overhead_ns"] = ovhd + + t += hop_ns + events.append({ + "t_ns": round(t, 3), "type": "hop", "request_id": rid, + "src": src, "dst": dst, "nbytes": nbytes, + "latency_ns": round(hop_ns, 3), + **({"metadata": ann} if ann else {}), + }) + + # Terminal drain + bws = [e.bw_gbs for i in range(len(path) - 1) + if (e := edge_map.get((path[i], path[i + 1]))) and e.bw_gbs] + if bws and nbytes > 0: + drain_ns = nbytes / min(bws) + t += drain_ns + events.append({ + "t_ns": round(t, 3), "type": "process", "request_id": rid, + "component": path[-1], "latency_ns": round(drain_ns, 3), + "metadata": {"op": "drain", "bn_bw_gbs": min(bws)}, + }) + + events.append({ + "t_ns": round(t, 3), "type": "complete", "request_id": rid, + "metadata": {"total_ns": round(t - t_offset, 3), "name": case_name}, + }) + + return events + + +def _load_graph(): + """Load topology graph and build edge map.""" + from kernbench.topology.builder import load_topology + topo_path = _find_topology() + graph = load_topology(topo_path) + edge_map = {(e.src, e.dst): e for e in graph.edges} + return graph, edge_map + + +# ── Probe case generators ────────────────────────────────────────── + +def _generate_probe_h2d(graph, edge_map) -> list[dict]: + """H2D Write probes: IO → cube HBM (1-4 hops).""" + from kernbench.policy.address.phyaddr import PhysAddr + from kernbench.policy.routing.router import AddressResolver, PathRouter + + spec = graph.spec + resolver = AddressResolver(graph) + router = PathRouter(graph) + mm = spec["cube"]["memory_map"] + slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"] + nbytes = 32768 + + cases = [ + ("h2d-1hop", 0, 1), + ("h2d-2hop", 4, 2), + ("h2d-3hop", 8, 3), + ("h2d-4hop", 12, 4), + ] + + all_events = [] + t_offset = 0.0 + for rid, (name, cube, hops) in enumerate(cases): + pa = PhysAddr.pe_hbm_addr( + rack_id=0, sip_id=0, cube_id=cube, pe_id=0, + pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, + ) + dst_node = resolver.resolve(pa) + pcie_ep = resolver.find_pcie_ep(0) + io_cpu = resolver.find_io_cpu(0) + m_cpu = resolver.find_m_cpu(0, cube) + leg1 = router.find_node_path(pcie_ep, io_cpu) + leg2 = router.find_node_path(io_cpu, m_cpu) + leg3 = router.find_mcpu_dma_path(m_cpu, dst_node) + full_path = leg1 + leg2[1:] + leg3[1:] + + evts = _path_to_events(full_path, nbytes, rid, "memory_write", + name, graph, edge_map, t_offset) + all_events.extend(evts) + t_offset = max(e["t_ns"] for e in evts) + 5.0 # gap between cases + + all_events.sort(key=lambda e: e["t_ns"]) + return all_events + + +def _generate_probe_d2h(graph, edge_map) -> list[dict]: + """D2H Read probes: HBM → IO (1-4 hops).""" + from kernbench.policy.address.phyaddr import PhysAddr + from kernbench.policy.routing.router import AddressResolver, PathRouter + + spec = graph.spec + resolver = AddressResolver(graph) + router = PathRouter(graph) + mm = spec["cube"]["memory_map"] + slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"] + nbytes = 32768 + + cases = [ + ("d2h-1hop", 0, 1), + ("d2h-2hop", 4, 2), + ("d2h-3hop", 8, 3), + ("d2h-4hop", 12, 4), + ] + + all_events = [] + t_offset = 0.0 + for rid, (name, cube, hops) in enumerate(cases): + pa = PhysAddr.pe_hbm_addr( + rack_id=0, sip_id=0, cube_id=cube, pe_id=0, + pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, + ) + dst_node = resolver.resolve(pa) + pcie_ep = resolver.find_pcie_ep(0) + fwd_path = router.find_memory_path(pcie_ep, dst_node) + # D2H: command goes forward, data comes back on reverse path + rev_path = list(reversed(fwd_path)) + + # Forward command (no data) + evts_fwd = _path_to_events(fwd_path, 0, rid, "memory_read_cmd", + f"{name} (cmd)", graph, edge_map, t_offset) + # Remove the drain/complete from fwd (just hops) + evts_fwd = [e for e in evts_fwd if e["type"] in ("submit", "hop")] + t_cmd_end = max(e["t_ns"] for e in evts_fwd) if evts_fwd else t_offset + + # Reverse data path + evts_rev = _path_to_events(rev_path, nbytes, rid, "memory_read_data", + name, graph, edge_map, t_cmd_end) + # Replace submit with hop continuation + evts_rev = [e for e in evts_rev if e["type"] != "submit"] + + all_events.extend(evts_fwd) + all_events.extend(evts_rev) + t_offset = max(e["t_ns"] for e in evts_rev) + 5.0 + + all_events.sort(key=lambda e: e["t_ns"]) + return all_events + + +def _generate_probe_pe_dma(graph, edge_map) -> list[dict]: + """PE DMA probes: pe_dma → xbar → HBM.""" + from kernbench.policy.address.phyaddr import PhysAddr + from kernbench.policy.routing.router import AddressResolver, PathRouter + + spec = graph.spec + resolver = AddressResolver(graph) + router = PathRouter(graph) + mm = spec["cube"]["memory_map"] + slice_bytes = mm["hbm_total_gb_per_cube"] * (1 << 30) // mm["hbm_slices_per_cube"] + nbytes = 32768 + + cases = [ + ("pe-local-hbm", 0, 0, 0, 0, 0), + ("pe-same-half-hbm", 0, 0, 0, 0, 1), + ("pe-cross-half-hbm", 0, 0, 0, 0, 4), + ("pe-cross-cube-hbm-best", 0, 0, 0, 1, 0), + ("pe-cross-cube-hbm-worst", 0, 0, 0, 15, 0), + ] + + all_events = [] + t_offset = 0.0 + for rid, (name, sip, src_cube, src_pe, dst_cube, dst_pe) in enumerate(cases): + pa = PhysAddr.pe_hbm_addr( + rack_id=0, sip_id=sip, cube_id=dst_cube, pe_id=dst_pe, + pe_local_hbm_offset=0x1000, slice_size_bytes=slice_bytes, + ) + dst_node = resolver.resolve(pa) + pe_ref = f"sip{sip}.cube{src_cube}.pe{src_pe}" + dma_path = router.find_path(pe_ref, dst_node) + + evts = _path_to_events(dma_path, nbytes, rid, "pe_dma", + name, graph, edge_map, t_offset) + all_events.extend(evts) + t_offset = max(e["t_ns"] for e in evts) + 5.0 + + all_events.sort(key=lambda e: e["t_ns"]) + return all_events + + +# ── Bench workload generators ───────────────────────────────────── + +def _generate_bench_qkv_gemm(graph, edge_map) -> list[dict]: + """QKV GEMM (single PE): Host → IO → M_CPU → PE0 → GEMM → HBM.""" + from kernbench.policy.routing.router import AddressResolver, PathRouter + + resolver = AddressResolver(graph) + router = PathRouter(graph) + events: list[dict] = [] + t = 0.0 + rid = 0 + + def ev(t_ns, **kw): + events.append({"t_ns": round(t_ns, 3), **kw}) + + # Phase 1: Tensor deploy (a: 128x256 f16 = 64KB, b: 256x128 f16 = 64KB) + pcie_ep = resolver.find_pcie_ep(0) + m_cpu = resolver.find_m_cpu(0, 0) # cube 0 + io_cpu = resolver.find_io_cpu(0) + + # Tensor A deploy path + leg1 = router.find_node_path(pcie_ep, io_cpu) + leg2 = router.find_node_path(io_cpu, m_cpu) + deploy_path = leg1 + leg2[1:] + for tensor_name, nbytes_t in [("tensor_a", 65536), ("tensor_b", 65536)]: + evts = _path_to_events(deploy_path, nbytes_t, rid, "memory_write", + f"deploy_{tensor_name}", graph, edge_map, t) + events.extend(evts) + t = max(e["t_ns"] for e in evts) + 1.0 + rid += 1 + + # Phase 2: Kernel launch path (command, small payload) + ev(t, type="submit", request_id=rid, dst=pcie_ep, + nbytes=128, msg_type="kernel_launch", + metadata={"name": "qkv_gemm", "target": "cube0.pe0"}) + + # Command path: PCIE_EP → IO_CPU → M_CPU → PE_CPU + cmd_path = deploy_path # same path to M_CPU + ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01) + for i in range(len(cmd_path) - 1): + src, dst = cmd_path[i], cmd_path[i + 1] + e = edge_map.get((src, dst)) + hop_ns = 0.0 + if e: + hop_ns += e.distance_mm * ns_per_mm + node = graph.nodes.get(dst) + if node: + hop_ns += float(node.attrs.get("overhead_ns", 0.0)) + t += hop_ns + ev(t, type="hop", request_id=rid, src=src, dst=dst, + nbytes=128, latency_ns=round(hop_ns, 3)) + + # M_CPU dispatch + t += 5.0 + ev(t, type="process", request_id=rid, component=m_cpu, + latency_ns=5.0, metadata={"action": "dispatch_to_pe0"}) + + # PE_CPU compile + scheduler + pe_cpu = f"sip0.cube0.pe0.pe_cpu" + ev(t, type="hop", request_id=rid, src=m_cpu, dst=pe_cpu, + nbytes=128, latency_ns=0.0) + t += 2.0 + ev(t, type="process", request_id=rid, component=pe_cpu, + latency_ns=2.0, metadata={"action": "compile_kernel"}) + t += 1.0 + ev(t, type="process", request_id=rid, + component="sip0.cube0.pe0.pe_scheduler", latency_ns=1.0) + + # DMA read tensor_a from HBM → PE_TCM + pe_dma = "sip0.cube0.pe0.pe_dma" + # Find pe0 → HBM path + pe_ref = "sip0.cube0.pe0" + try: + dma_path = router.find_path(pe_ref, f"sip0.cube0.hbm_ctrl.slice0") + except Exception: + dma_path = [pe_ref] + + ev(t, type="hop", request_id=rid, src=pe_dma, dst=dma_path[0] if len(dma_path) > 1 else pe_dma, + nbytes=65536, latency_ns=0.0, metadata={"cmd": "dma_read_a"}) + for i in range(len(dma_path) - 1): + e = edge_map.get((dma_path[i], dma_path[i + 1])) + hop_ns = 0.0 + if e: + hop_ns += e.distance_mm * ns_per_mm + node = graph.nodes.get(dma_path[i + 1]) + if node: + hop_ns += float(node.attrs.get("overhead_ns", 0.0)) + t += hop_ns + ev(t, type="hop", request_id=rid, src=dma_path[i], dst=dma_path[i + 1], + nbytes=65536, latency_ns=round(hop_ns, 3)) + + # HBM read drain + bw_ns = 65536 / 256.0 # ~256 ns + t += bw_ns + ev(t, type="process", request_id=rid, component=dma_path[-1] if dma_path else "hbm", + latency_ns=round(bw_ns, 3), metadata={"op": "read", "cmd": "dma_read_a"}) + + # GEMM compute + gemm_ns = 65536 / (8.0 * 1e3) # 8 TFLOPS + t += gemm_ns + ev(t, type="process", request_id=rid, + component="sip0.cube0.pe0.pe_gemm", + latency_ns=round(gemm_ns, 3), metadata={"op": "gemm", "tflops": 8.0}) + + # DMA write result back + t += bw_ns + ev(t, type="process", request_id=rid, + component="sip0.cube0.hbm_ctrl.slice0", + latency_ns=round(bw_ns, 3), metadata={"op": "write", "cmd": "dma_write_out"}) + + ev(t, type="complete", request_id=rid, + metadata={"total_ns": round(t, 3), "name": "qkv_gemm"}) + + events.sort(key=lambda e: e["t_ns"]) + return events + + +def _generate_bench_qkv_gemm_multi_pe(graph, edge_map) -> list[dict]: + """QKV GEMM multi-PE: same as single but M_CPU fans out to 8 PEs.""" + from kernbench.policy.routing.router import AddressResolver, PathRouter + + resolver = AddressResolver(graph) + router = PathRouter(graph) + events: list[dict] = [] + t = 0.0 + rid = 0 + + def ev(t_ns, **kw): + events.append({"t_ns": round(t_ns, 3), **kw}) + + pcie_ep = resolver.find_pcie_ep(0) + io_cpu = resolver.find_io_cpu(0) + m_cpu = resolver.find_m_cpu(0, 0) + ns_per_mm = graph.spec.get("system", {}).get("ns_per_mm", 0.01) + + # Command path to M_CPU + leg1 = router.find_node_path(pcie_ep, io_cpu) + leg2 = router.find_node_path(io_cpu, m_cpu) + cmd_path = leg1 + leg2[1:] + + ev(t, type="submit", request_id=rid, dst=pcie_ep, + nbytes=128, msg_type="kernel_launch", + metadata={"name": "qkv_gemm_multi_pe", "target": "cube0.pe0-7"}) + + for i in range(len(cmd_path) - 1): + e = edge_map.get((cmd_path[i], cmd_path[i + 1])) + hop_ns = 0.0 + if e: + hop_ns += e.distance_mm * ns_per_mm + node = graph.nodes.get(cmd_path[i + 1]) + if node: + hop_ns += float(node.attrs.get("overhead_ns", 0.0)) + t += hop_ns + ev(t, type="hop", request_id=rid, src=cmd_path[i], dst=cmd_path[i + 1], + nbytes=128, latency_ns=round(hop_ns, 3)) + + # M_CPU fan-out to 8 PEs + t += 5.0 + ev(t, type="process", request_id=rid, component=m_cpu, + latency_ns=5.0, metadata={"action": "fan_out_8pe"}) + + # Each PE executes concurrently (all start at same time) + pe_start = t + for pe_idx in range(8): + pe_rid = rid + 1 + pe_idx + pe_cpu = f"sip0.cube0.pe{pe_idx}.pe_cpu" + ev(pe_start, type="hop", request_id=pe_rid, src=m_cpu, dst=pe_cpu, + nbytes=128, latency_ns=0.0, + metadata={"fan_out_pe": pe_idx}) + t_pe = pe_start + 2.0 # PE_CPU compile + ev(t_pe, type="process", request_id=pe_rid, component=pe_cpu, + latency_ns=2.0) + t_pe += 1.0 # scheduler + ev(t_pe, type="process", request_id=pe_rid, + component=f"sip0.cube0.pe{pe_idx}.pe_scheduler", latency_ns=1.0) + # GEMM compute (each PE does 1/8 of the work) + gemm_ns = (65536 / 8) / (8.0 * 1e3) + t_pe += gemm_ns + ev(t_pe, type="process", request_id=pe_rid, + component=f"sip0.cube0.pe{pe_idx}.pe_gemm", + latency_ns=round(gemm_ns, 3), metadata={"op": "gemm_shard"}) + ev(t_pe, type="complete", request_id=pe_rid, + metadata={"name": f"pe{pe_idx}_shard"}) + + # Overall completion (max of all PEs) + t_end = pe_start + 2.0 + 1.0 + (65536 / 8) / (8.0 * 1e3) + ev(t_end, type="complete", request_id=rid, + metadata={"total_ns": round(t_end, 3), "name": "qkv_gemm_multi_pe"}) + + events.sort(key=lambda e: e["t_ns"]) + return events + + +# ══════════════════════════════════════════════════════════════════ +# Workload registry +# ══════════════════════════════════════════════════════════════════ + +def _make_workloads(): + """Build workload registry. Deferred to avoid import-time topology load.""" + return { + # ── Probe cases ── + "probe-h2d": { + "name": "Probe: H2D Write (1-4 hop)", + "description": "Host→Device memory write across 1,2,3,4 cube hops (32KB)", + "category": "probe", + "generator": lambda g, e: _generate_probe_h2d(g, e), + }, + "probe-d2h": { + "name": "Probe: D2H Read (1-4 hop)", + "description": "Device→Host memory read across 1,2,3,4 cube hops (32KB)", + "category": "probe", + "generator": lambda g, e: _generate_probe_d2h(g, e), + }, + "probe-pe-dma": { + "name": "Probe: PE DMA (local→cross-cube)", + "description": "PE DMA latency: local, same-half, cross-half, cross-cube best/worst", + "category": "probe", + "generator": lambda g, e: _generate_probe_pe_dma(g, e), + }, + # ── Bench workloads ── + "bench-qkv-gemm": { + "name": "Bench: QKV GEMM (single PE)", + "description": "Host deploy tensors + single-PE QKV GEMM (128x256 x 256x128 F16)", + "category": "bench", + "generator": lambda g, e: _generate_bench_qkv_gemm(g, e), + }, + "bench-qkv-gemm-multi-pe": { + "name": "Bench: QKV GEMM (8-PE parallel)", + "description": "M_CPU fan-out to 8 PEs, column-parallel QKV GEMM", + "category": "bench", + "generator": lambda g, e: _generate_bench_qkv_gemm_multi_pe(g, e), + }, + } + + +# Cached topology + workload state +_cached_graph = None +_cached_edge_map = None +_WORKLOADS = None + + +def _ensure_loaded(): + global _cached_graph, _cached_edge_map, _WORKLOADS + if _cached_graph is None: + _cached_graph, _cached_edge_map = _load_graph() + if _WORKLOADS is None: + _WORKLOADS = _make_workloads() + return _cached_graph, _cached_edge_map, _WORKLOADS + + +def get_workload_list() -> list[dict]: + """Return list of available workloads for UI.""" + _, _, workloads = _ensure_loaded() + return [ + { + "id": wid, + "name": w["name"], + "description": w["description"], + "category": w.get("category", ""), + } + for wid, w in workloads.items() + ] + + +def generate_workload_events(workload_id: str) -> list[dict]: + """Generate events for a specific workload by running real simulation paths.""" + graph, edge_map, workloads = _ensure_loaded() + if workload_id not in workloads: + raise ValueError(f"Unknown workload: {workload_id}") + return workloads[workload_id]["generator"](graph, edge_map) + + +# Keep backward compat +def generate_demo_events() -> list[dict]: + """Generate default demo events (probe-h2d).""" + return generate_workload_events("probe-h2d") diff --git a/src/kernbench/web/__init__.py b/src/kernbench/web/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/kernbench/web/server.py b/src/kernbench/web/server.py new file mode 100644 index 0000000..9de05ab --- /dev/null +++ b/src/kernbench/web/server.py @@ -0,0 +1,126 @@ +"""Phase B: FastAPI backend with WebSocket streaming for KernBench viewer.""" + +import asyncio +import json +import pathlib +import webbrowser +import threading + +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse, JSONResponse +import uvicorn + +STATIC_DIR = pathlib.Path(__file__).parent / "static" +DEFAULT_PORT = 8765 + +app = FastAPI(title="KernBench Viewer") + + +# ── REST: workloads + events ─────────────────────────────────────── +@app.get("/api/workloads") +async def list_workloads(): + from kernbench.sim_engine.event_log import get_workload_list + return JSONResponse(content=get_workload_list()) + + +@app.get("/api/events/demo") +async def get_demo_events(): + from kernbench.sim_engine.event_log import generate_demo_events + return JSONResponse(content=generate_demo_events()) + + +@app.get("/api/events/{workload_id}") +async def get_workload_events(workload_id: str): + from kernbench.sim_engine.event_log import generate_workload_events + try: + return JSONResponse(content=generate_workload_events(workload_id)) + except ValueError as e: + return JSONResponse(content={"error": str(e)}, status_code=404) + + +# ── WebSocket: stream events with timing ─────────────────────────── +@app.websocket("/ws") +async def ws_endpoint(ws: WebSocket): + await ws.accept() + try: + while True: + msg = await ws.receive_text() + data = json.loads(msg) + cmd = data.get("cmd") + + if cmd == "load_demo" or cmd == "load_workload": + from kernbench.sim_engine.event_log import generate_workload_events + wid = data.get("workload_id", "probe-h2d") + events = generate_workload_events(wid) + await ws.send_json({"type": "events", "events": events}) + + elif cmd == "stream_demo" or cmd == "stream_workload": + from kernbench.sim_engine.event_log import generate_workload_events + wid = data.get("workload_id", "probe-h2d") + events = generate_workload_events(wid) + speed = data.get("speed", 1.0) # ns → real-time multiplier + + # Send metadata first + duration = max(e["t_ns"] for e in events) if events else 0 + await ws.send_json({ + "type": "stream_start", + "total_events": len(events), + "duration_ns": duration, + }) + + prev_t = 0.0 + for ev in events: + # Wait proportional to simulation time gap + dt = ev["t_ns"] - prev_t + if dt > 0 and speed > 0: + # Scale: 1 ns sim → (1/speed) ms real + await asyncio.sleep(dt / (speed * 1000.0)) + await ws.send_json({"type": "event", **ev}) + prev_t = ev["t_ns"] + + await ws.send_json({"type": "stream_end"}) + + elif cmd == "ping": + await ws.send_json({"type": "pong"}) + + except WebSocketDisconnect: + pass + except Exception: + pass + + +# ── Static files (must be last) ──────────────────────────────────── +@app.get("/") +async def index(): + return FileResponse(STATIC_DIR / "index.html") + + +app.mount("/", StaticFiles(directory=str(STATIC_DIR)), name="static") + + +# ── Entry point (called from CLI) ────────────────────────────────── +def serve(port: int = DEFAULT_PORT, open_browser: bool = True) -> None: + import socket + + # Check port availability before starting + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("127.0.0.1", port)) + except OSError: + print(f"ERROR: Port {port} is already in use.") + print(f" Try: kernbench web --port {port + 1}") + print(f" Or kill the existing process using port {port}") + return + + url = f"http://127.0.0.1:{port}" + print(f"KernBench Viewer: {url}") + + if open_browser: + threading.Timer(0.5, webbrowser.open, args=(url,)).start() + + uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning") + + +if __name__ == "__main__": + serve() diff --git a/src/kernbench/web/static/index.html b/src/kernbench/web/static/index.html new file mode 100644 index 0000000..315ed23 --- /dev/null +++ b/src/kernbench/web/static/index.html @@ -0,0 +1,2150 @@ + + + + + +KernBench Topology Viewer + + + + + +
+

KernBench

+
+ + + +
+ +
+ +
topology.yaml | 2 SIPs | 32 Cubes
+
+ + +
+ + +
+ +
+
+ + +
+ +
+

Selected

+
+
Click a component
+
+
+ +
+

Legend

+
+
+ Cube +
+
+
+ IO Chiplet +
+
+
+ HBM +
+
+
+ PE +
+
+
+ XBAR / NOC +
+
+ +
+

Link Utilization

+
+
+ Idle (0%) +
+
+
+ Low (<25%) +
+
+
+ Medium (25-50%) +
+
+
+ High (50-75%) +
+
+
+ Saturated (>75%) +
+
+ +
+

Topology

+
Cube mesh4 x 4
+
PEs / cube8
+
HBM / cube48 GB
+
UCIe BW512 GB/s
+
HBM BW1024 GB/s
+
HBM eff.0.8
+
+ +
+

Simulation

+
Events--
+
Duration-- ns
+
In-flight0
+
+ +
+
+ + +
+ +
+ t = 0.0 ns + + 0.0 ns +
+ +
+ + + +