# kernbench/topology/builder.py """ Topology compiler: parses topology.yaml and produces a fully-instantiated TopologyGraph with nodes, edges, and representative view projections. """ from __future__ import annotations import math from pathlib import Path from typing import Any import yaml from .mesh_gen import ensure_mesh_file from .types import Edge, Node, TopologyGraph, TopologyHandle, ViewGraph # PE component offsets from PE center (small, intra-PE distances ~0.5mm) _PE_COMP_OFFSETS = { "pe_cpu": (-0.3, 0.0), "pe_scheduler": (-0.15, 0.0), "pe_dma": (0.0, -0.15), "pe_fetch_store": (0.15, 0.0), "pe_gemm": (0.0, 0.0), "pe_math": (0.0, 0.15), "pe_mmu": (0.15, -0.15), "pe_tcm": (0.3, 0.0), "pe_ipcq": (-0.15, 0.15), } # ── Public API ─────────────────────────────────────────────────────── def resolve_topology(path_str: str) -> TopologyHandle: """Validate path and build compiled topology graph.""" p = Path(path_str).expanduser().resolve() if not p.exists(): raise FileNotFoundError(f"Topology file not found: {p}") if not p.is_file(): raise ValueError(f"Topology path is not a file: {p}") graph = load_topology(p) return TopologyHandle(path=p, topology_obj=graph) def load_topology(path: Path) -> TopologyGraph: """Load topology spec from file and compile into a topology graph.""" spec = _read_spec(path) _validate_spec(spec) # Generate cube_mesh.yaml alongside the topology file mesh_path = path.parent / "cube_mesh.yaml" mesh_data = ensure_mesh_file(spec["cube"], mesh_path) spec["_mesh"] = mesh_data return _compile_graph(spec) def _read_spec(path: Path) -> dict[str, Any]: """Read YAML topology spec file and return a dict.""" try: with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) except yaml.YAMLError as e: msg = f"Failed to parse YAML topology: {path}" mark = getattr(e, "problem_mark", None) if mark is not None: msg += f" (line {mark.line + 1}, column {mark.column + 1})" raise ValueError(msg) from e if data is None: raise ValueError(f"Topology YAML is empty: {path}") if not isinstance(data, dict): raise ValueError( f"Topology YAML root must be a mapping/dict: {path} (got {type(data).__name__})" ) return data def _validate_spec(spec: dict) -> None: # TODO: schema validation return # ── Graph Compiler ─────────────────────────────────────────────────── def _compile_graph(spec: dict) -> TopologyGraph: """Build fully-instantiated flat graph + representative view projections.""" nodes: dict[str, Node] = {} edges: list[Edge] = [] system = spec["system"] sip_spec = spec["sip"] cube_spec = spec["cube"] mesh_w = sip_spec["cube_mesh"]["w"] mesh_h = sip_spec["cube_mesh"]["h"] cube_w = cube_spec["geometry"]["cube_mm"]["w"] cube_h = cube_spec["geometry"]["cube_mm"]["h"] seam = sip_spec["links"]["inter_cube_mesh"]["distance_mm_across_seam"] stride_x = cube_w + seam stride_y = cube_h + seam # System-level _instantiate_system(nodes, system) # Per-SIP for sip_id in range(system["sips"]["count"]): sp = f"sip{sip_id}" # IO chiplets _instantiate_io_chiplets( nodes, edges, sp, sip_spec, cube_w, cube_h, mesh_w, mesh_h, seam, ) # Cubes + PEs for row in range(mesh_h): for col in range(mesh_w): cid = row * mesh_w + col cp = f"{sp}.cube{cid}" origin = (col * stride_x, row * stride_y) _instantiate_cube(nodes, edges, cp, cube_spec, origin, spec["_mesh"]) # Inter-cube UCIe mesh _add_inter_cube_edges(edges, sp, mesh_w, mesh_h, sip_spec) # IO → cube UCIe _add_io_to_cube_edges(edges, sp, sip_spec, mesh_w) # Switch → IO pcie_ep _add_system_to_io_edges(edges, sp, sip_spec, system) # Build views return TopologyGraph( spec=spec, nodes=nodes, edges=edges, system_view=_build_system_view(spec), sip_view=_build_sip_view(spec), cube_view=_build_cube_view(spec), pe_view=_build_pe_view(spec), ) # ── Layout helpers ─────────────────────────────────────────────────── def _cube_local_positions(cube_w: float, cube_h: float) -> dict[str, tuple[float, float]]: """Cube-internal component positions relative to cube origin (0,0) at top-left.""" cx, cy = cube_w / 2, cube_h / 2 # UCIe node half-sizes (default 2.0×1.2mm) — inset so edges touch boundary uh = 0.6 # half height uw = 1.0 # half width return { "ucie-N": (cx, uh), "ucie-S": (cx, cube_h - uh), "ucie-W": (uw, cy), "ucie-E": (cube_w - uw, cy), "m_cpu": (cube_w - 2.5, cy - 1.5), "hbm_ctrl": (cx - 2.0, cy), "sram": (2.5, cy - 1.5), } def _corner_pe_positions(cube_w: float, cube_h: float) -> dict[str, list[tuple[float, float]]]: """PE center positions per corner, relative to cube origin.""" return { "NW": [(1.5, 1.5), (4.5, 1.5)], "NE": [(cube_w - 4.5, 1.5), (cube_w - 1.5, 1.5)], "SW": [(1.5, cube_h - 1.5), (4.5, cube_h - 1.5)], "SE": [(cube_w - 4.5, cube_h - 1.5), (cube_w - 1.5, cube_h - 1.5)], } # ── Instantiation: system ─────────────────────────────────────────── def _instantiate_system(nodes: dict[str, Node], system: dict) -> None: """Add system-level nodes (fabric switch).""" sw = system["components"]["switch"] sw_id = "fabric.switch0" nodes[sw_id] = Node( id=sw_id, kind=sw["kind"], impl=sw["impl"], attrs=sw.get("attrs", {}), pos_mm=None, label="Switch", ) # ── Instantiation: IO chiplets ────────────────────────────────────── def _instantiate_io_chiplets( nodes: dict[str, Node], edges: list[Edge], sp: str, sip_spec: dict, cube_w: float, cube_h: float, mesh_w: int, mesh_h: int, seam: float, ) -> None: """Add IO chiplet nodes: pcie_ep, io_cpu, io_noc, io_ucie PHYs, conn nodes.""" io_spec = sip_spec["iochiplet"] comp = io_spec["components"] links = io_spec["links"] ucie_cfg = io_spec.get("ucie", {}) mesh_total_w = mesh_w * cube_w + (mesh_w - 1) * seam mesh_total_h = mesh_h * cube_h + (mesh_h - 1) * seam for inst in io_spec["instances"]: iid = inst["id"] prefix = f"{sp}.{iid}" side = inst["place"]["side"] cx = mesh_total_w / 2 if side == "N": pcie_y, cpu_y, noc_y = -5.0, -3.0, -4.0 else: pcie_y, cpu_y, noc_y = mesh_total_h + 5.0, mesh_total_h + 3.0, mesh_total_h + 4.0 # pcie_ep ep = comp["pcie_ep"] ep_id = f"{prefix}.pcie_ep" nodes[ep_id] = Node( id=ep_id, kind=ep["kind"], impl=ep["impl"], attrs=ep["attrs"], pos_mm=(cx, pcie_y), label="PCIe EP", ) # io_cpu cpu = comp["io_cpu"] cpu_id = f"{prefix}.io_cpu" nodes[cpu_id] = Node( id=cpu_id, kind=cpu["kind"], impl=cpu["impl"], attrs=cpu["attrs"], pos_mm=(cx, cpu_y), label="IO CPU", ) # io_noc (central switch inside IOChiplet) noc = comp["io_noc"] noc_id = f"{prefix}.noc" nodes[noc_id] = Node( id=noc_id, kind=noc["kind"], impl=noc["impl"], attrs=noc["attrs"], pos_mm=(cx, noc_y), label="IO NOC", ) # pcie_ep ↔ io_noc (bidirectional) edges.append(Edge( src=ep_id, dst=noc_id, distance_mm=links["pcie_ep_to_noc_mm"], bw_gbs=links["pcie_ep_to_noc_bw_gbs"], kind="io_internal", )) edges.append(Edge( src=noc_id, dst=ep_id, distance_mm=links["pcie_ep_to_noc_mm"], bw_gbs=links["pcie_ep_to_noc_bw_gbs"], kind="io_internal", )) # io_cpu ↔ io_noc (bidirectional) edges.append(Edge( src=cpu_id, dst=noc_id, distance_mm=links["io_cpu_to_noc_mm"], bw_gbs=links["io_cpu_to_noc_bw_gbs"], kind="io_internal", )) edges.append(Edge( src=noc_id, dst=cpu_id, distance_mm=links["io_cpu_to_noc_mm"], bw_gbs=links["io_cpu_to_noc_bw_gbs"], kind="io_internal", )) # io_ucie PHY nodes + conn nodes per PHY io_ucie_ns = float(ucie_cfg.get("overhead_ns", 1.0)) io_n_conn = int(ucie_cfg.get("n_connections", 4)) io_conn_bw = float(ucie_cfg.get("per_connection_bw_gbs", 128.0)) io_noc_to_ucie_mm = float(ucie_cfg.get("noc_to_ucie_mm", 0.5)) for phy in inst["ucie"]["phys"]: phy_id = f"{prefix}.ucie-{phy}" nodes[phy_id] = Node( id=phy_id, kind="io_ucie", impl="builtin.ucie", attrs={"overhead_ns": io_ucie_ns}, pos_mm=(cx, noc_y), label=f"IO UCIe-{phy}", ) for ci in range(io_n_conn): conn_id = f"{phy_id}.conn{ci}" nodes[conn_id] = Node( id=conn_id, kind="io_ucie_conn", impl="builtin.ucie", attrs={"overhead_ns": 0.0}, pos_mm=(cx, noc_y), label=f"IO UCIe-{phy} C{ci}", ) # io_noc ↔ conn (per-connection BW) edges.append(Edge( src=noc_id, dst=conn_id, distance_mm=io_noc_to_ucie_mm, bw_gbs=io_conn_bw, kind="io_noc_to_conn", )) edges.append(Edge( src=conn_id, dst=noc_id, distance_mm=io_noc_to_ucie_mm, bw_gbs=io_conn_bw, kind="conn_to_io_noc", )) # conn ↔ io_ucie (internal, no BW limit) edges.append(Edge( src=conn_id, dst=phy_id, distance_mm=0.0, kind="io_ucie_internal", )) edges.append(Edge( src=phy_id, dst=conn_id, distance_mm=0.0, kind="io_ucie_internal", )) # ── PE-to-router distance ───────────────────────────────────────── def _compute_pe_noc_distances( mesh_data: dict, corner_pos: dict[str, list[tuple[float, float]]], corners: list[str], pe_per_corner: int, ) -> dict[int, float]: """Compute per-PE Euclidean distance from physical position to assigned router.""" distances: dict[int, float] = {} routers = mesh_data["routers"] pe_idx = 0 for corner in corners: for ci in range(pe_per_corner): pe_cx, pe_cy = corner_pos[corner][ci] target = f"pe{pe_idx}.dma" for _rkey, rval in routers.items(): if rval is not None and target in rval.get("attach", []): rx, ry = rval["pos_mm"] dist = math.sqrt((pe_cx - rx) ** 2 + (pe_cy - ry) ** 2) distances[pe_idx] = round(dist, 2) break else: distances[pe_idx] = 0.0 pe_idx += 1 return distances # ── Instantiation: cube + PEs ─────────────────────────────────────── def _instantiate_cube( nodes: dict[str, Node], edges: list[Edge], cp: str, cube: dict, origin: tuple[float, float], mesh_data: dict, ) -> None: """Add all cube-internal nodes and edges, including PE instances. Topology: explicit router mesh from cube_mesh.yaml (ADR-0019). Each router is a separate SimPy node. Components attach to routers based on cube_mesh.yaml attachment lists. """ cube_w = cube["geometry"]["cube_mm"]["w"] cube_h = cube["geometry"]["cube_mm"]["h"] ox, oy = origin local_pos = _cube_local_positions(cube_w, cube_h) clinks = cube["links"] mm = cube["memory_map"] # ── Mode branch (ADR-0019) ── mode = mm.get("hbm_mapping_mode", "n_to_one") if mode == "one_to_one": raise NotImplementedError("1:1 mode: ADR-0019 D3") # ── UCIe ports + connection nodes ── ucie_cfg = cube["ucie"] ucie_ns = ucie_cfg["overhead_ns"] ucie_n_conn = ucie_cfg.get("n_connections", 1) for port in ucie_cfg["ports"]: pid = f"{cp}.ucie-{port}" lx, ly = local_pos[f"ucie-{port}"] nodes[pid] = Node( id=pid, kind="ucie_port", impl="builtin.ucie", attrs={"overhead_ns": ucie_ns}, pos_mm=(ox + lx, oy + ly), label=f"UCIe-{port}", ) for ci in range(ucie_n_conn): conn_id = f"{cp}.ucie-{port}.conn{ci}" nodes[conn_id] = Node( id=conn_id, kind="ucie_conn", impl="builtin.ucie", attrs={"overhead_ns": 0.0}, pos_mm=(ox + lx, oy + ly), label=f"UCIe-{port} C{ci}", ) # ── Named components: m_cpu, sram (noc is now explicit routers) ── for name in ("m_cpu", "sram"): c = cube["components"][name] nid = f"{cp}.{name}" lx, ly = local_pos[name] nodes[nid] = Node( id=nid, kind=c["kind"], impl=c["impl"], attrs=c["attrs"], pos_mm=(ox + lx, oy + ly), label=name.upper().replace("_", " "), ) # ── Per-PE HBM controller (ADR-0019 D1/D4) ── # Each PE owns one slice of the cube's HBM. The slice has its own # set of pseudo-channels and is reachable ONLY through that PE's # attaching router (see cube_mesh.yaml ``peX.hbm`` attach lists). # Restored after the ADR-0019 over-consolidation in commit 5917b34. hbm_spec = cube["components"]["hbm_ctrl"] hbm_lx, hbm_ly = local_pos["hbm_ctrl"] _hbm_total_bw = float(cube["links"].get("hbm_to_router_bw_gbs", 256.0)) _num_pcs = int(hbm_spec["attrs"].get("num_pcs", 8)) pes_per_cube = int(cube["memory_map"].get("hbm_slices_per_cube", 8)) for pe_idx in range(pes_per_cube): pe_hbm_id = f"{cp}.hbm_ctrl.pe{pe_idx}" pe_hbm_attrs = dict(hbm_spec["attrs"]) pe_hbm_attrs["num_pcs"] = _num_pcs pe_hbm_attrs["pc_bw_gbs"] = _hbm_total_bw / _num_pcs nodes[pe_hbm_id] = Node( id=pe_hbm_id, kind=hbm_spec["kind"], impl=hbm_spec["impl"], attrs=pe_hbm_attrs, pos_mm=(ox + hbm_lx, oy + hbm_ly), label=f"HBM CTRL pe{pe_idx}", ) # ── Router mesh from cube_mesh.yaml (ADR-0019 D3) ── routers = mesh_data["routers"] router_spec = cube["components"]["noc_router"] router_bw = clinks.get("router_link_bw_gbs", 256.0) pe_to_router_bw = clinks.get("pe_to_router_bw_gbs", 256.0) hbm_eff = float(hbm_spec.get("attrs", {}).get("efficiency", 1.0)) hbm_to_router_bw = clinks.get("hbm_to_router_bw_gbs", 256.0) * hbm_eff sram_to_router_bw = clinks.get("sram_to_router_bw_gbs", 128.0) ucie_conn_bw = ucie_cfg.get("per_connection_bw_gbs", 128.0) n_rows = mesh_data["mesh"]["rows"] n_cols = mesh_data["mesh"]["cols"] # Create router nodes for rkey, rval in routers.items(): if rval is None: continue rid = f"{cp}.{rkey}" rx, ry = rval["pos_mm"] nodes[rid] = Node( id=rid, kind=router_spec["kind"], impl=router_spec["impl"], attrs=router_spec["attrs"], pos_mm=(ox + rx, oy + ry), label=rkey.upper(), ) # Router ↔ router XY mesh edges (adjacent non-null routers) for r in range(n_rows): for c in range(n_cols): rkey = f"r{r}c{c}" if routers.get(rkey) is None: continue src_id = f"{cp}.{rkey}" src_pos = routers[rkey]["pos_mm"] # Horizontal neighbor (same row, next col) for nc in range(c + 1, n_cols): nkey = f"r{r}c{nc}" if routers.get(nkey) is None: continue dst_id = f"{cp}.{nkey}" dst_pos = routers[nkey]["pos_mm"] dist = abs(dst_pos[0] - src_pos[0]) edges.append(Edge( src=src_id, dst=dst_id, distance_mm=round(dist, 2), bw_gbs=router_bw, kind="router_mesh", )) edges.append(Edge( src=dst_id, dst=src_id, distance_mm=round(dist, 2), bw_gbs=router_bw, kind="router_mesh", )) break # only immediate neighbor # Vertical neighbor (same col, next row) for nr in range(r + 1, n_rows): nkey = f"r{nr}c{c}" if routers.get(nkey) is None: continue dst_id = f"{cp}.{nkey}" dst_pos = routers[nkey]["pos_mm"] dist = abs(dst_pos[1] - src_pos[1]) edges.append(Edge( src=src_id, dst=dst_id, distance_mm=round(dist, 2), bw_gbs=router_bw, kind="router_mesh", )) edges.append(Edge( src=dst_id, dst=src_id, distance_mm=round(dist, 2), bw_gbs=router_bw, kind="router_mesh", )) break # only immediate neighbor # ── PE instances ── corners = cube["pe_layout"]["corners"] pe_per_corner = cube["pe_layout"]["pe_per_corner"] corner_pos = _corner_pe_positions(cube_w, cube_h) pe_tmpl = cube["pe_template"] pe_links = pe_tmpl["links"] pe_idx = 0 for corner in corners: for ci in range(pe_per_corner): pp = f"{cp}.pe{pe_idx}" pe_cx, pe_cy = corner_pos[corner][ci] # PE template components for comp_name, comp_spec in pe_tmpl["components"].items(): cid = f"{pp}.{comp_name}" dx, dy = _PE_COMP_OFFSETS.get(comp_name, (0.0, 0.0)) nodes[cid] = Node( id=cid, kind=comp_spec["kind"], impl=comp_spec["impl"], attrs=comp_spec["attrs"], pos_mm=(ox + pe_cx + dx, oy + pe_cy + dy), label=comp_name.upper().replace("_", " "), ) # PE-internal edges _add_pe_internal_edges(edges, pp, pe_links) pe_idx += 1 # ── Component ↔ router edges (based on cube_mesh.yaml attach) ── for rkey, rval in routers.items(): if rval is None: continue rid = f"{cp}.{rkey}" for item in rval.get("attach", []): if item.endswith(".dma"): # PE_DMA ↔ router pe_prefix = item.rsplit(".", 1)[0] dma_id = f"{cp}.{pe_prefix}.pe_dma" if dma_id in nodes: edges.append(Edge( src=dma_id, dst=rid, distance_mm=0.0, bw_gbs=pe_to_router_bw, kind="pe_to_router", )) edges.append(Edge( src=rid, dst=dma_id, distance_mm=0.0, bw_gbs=pe_to_router_bw, kind="router_to_pe", )) elif item.endswith(".cpu"): # PE_CPU ↔ router (command path) pe_prefix = item.rsplit(".", 1)[0] cpu_id = f"{cp}.{pe_prefix}.pe_cpu" if cpu_id in nodes: edges.append(Edge( src=rid, dst=cpu_id, distance_mm=clinks.get("noc_to_pe_cpu_mm", 0.0), kind="command", )) edges.append(Edge( src=cpu_id, dst=rid, distance_mm=clinks.get("noc_to_pe_cpu_mm", 0.0), kind="pe_response", )) # PE_MMU ↔ router (mapping install path) mmu_id = f"{cp}.{pe_prefix}.pe_mmu" if mmu_id in nodes: edges.append(Edge( src=rid, dst=mmu_id, distance_mm=0.0, kind="command", )) elif item.endswith(".hbm"): # peX.hbm: router rXcY owns the entry to hbm_ctrl.peX. # (ADR-0019 D1/D4 — per-PE HBM partitioning.) pe_prefix = item.rsplit(".", 1)[0] pe_idx = int(pe_prefix.replace("pe", "")) pe_hbm_id = f"{cp}.hbm_ctrl.pe{pe_idx}" if pe_hbm_id in nodes: edges.append(Edge( src=rid, dst=pe_hbm_id, distance_mm=0.0, bw_gbs=hbm_to_router_bw, kind="router_to_hbm", )) edges.append(Edge( src=pe_hbm_id, dst=rid, distance_mm=0.0, bw_gbs=hbm_to_router_bw, kind="hbm_to_router", )) elif item == "m_cpu": # M_CPU ↔ router mcpu_id = f"{cp}.m_cpu" edges.append(Edge( src=mcpu_id, dst=rid, distance_mm=clinks.get("m_cpu_to_router_mm", 0.0), kind="command", )) edges.append(Edge( src=rid, dst=mcpu_id, distance_mm=clinks.get("m_cpu_to_router_mm", 0.0), kind="command", )) elif item == "sram": # SRAM ↔ router sram_id = f"{cp}.sram" edges.append(Edge( src=sram_id, dst=rid, distance_mm=0.0, bw_gbs=sram_to_router_bw, kind="sram_to_router", )) edges.append(Edge( src=rid, dst=sram_id, distance_mm=0.0, bw_gbs=sram_to_router_bw, kind="router_to_sram", )) elif item.startswith("ucie_"): # UCIe conn ↔ router # item format: "ucie_{dir}.c{i}" e.g. "ucie_n.c0" parts = item.split(".") direction = parts[0].replace("ucie_", "").upper() conn_num = parts[1].replace("c", "") # "0", "1", etc. conn_id = f"{cp}.ucie-{direction}.conn{conn_num}" ucie_id = f"{cp}.ucie-{direction}" # conn ↔ ucie port if conn_id in nodes: edges.append(Edge( src=ucie_id, dst=conn_id, distance_mm=0.0, kind="ucie_internal", )) edges.append(Edge( src=conn_id, dst=ucie_id, distance_mm=0.0, kind="ucie_internal", )) # conn ↔ router edges.append(Edge( src=conn_id, dst=rid, distance_mm=0.0, bw_gbs=ucie_conn_bw, kind="ucie_conn_to_router", )) edges.append(Edge( src=rid, dst=conn_id, distance_mm=0.0, bw_gbs=ucie_conn_bw, kind="router_to_ucie_conn", )) # NOTE: HBM↔router edges are created in the per-router attach loop # above (peX.hbm items map router → hbm_ctrl.peX). Removed the # legacy "all routers → single hbm_ctrl" loop that bypassed the # ADR-0019 D4 per-PE partition. def _add_pe_internal_edges(edges: list[Edge], pp: str, pe_links: dict) -> None: """Add PE-internal edges for a single PE instance (ADR-0021).""" edges.append(Edge( src=f"{pp}.pe_cpu", dst=f"{pp}.pe_scheduler", distance_mm=pe_links["pe_cpu_to_scheduler_mm"], kind="pe_internal", )) # Scheduler → engines (initial dispatch) for eng, key in [("pe_dma", "scheduler_to_dma_mm"), ("pe_gemm", "scheduler_to_gemm_mm"), ("pe_math", "scheduler_to_math_mm")]: edges.append(Edge( src=f"{pp}.pe_scheduler", dst=f"{pp}.{eng}", distance_mm=pe_links[key], kind="pe_internal", )) # Scheduler → fetch_store (initial dispatch) if "scheduler_to_fetch_store_mm" in pe_links: edges.append(Edge( src=f"{pp}.pe_scheduler", dst=f"{pp}.pe_fetch_store", distance_mm=pe_links["scheduler_to_fetch_store_mm"], kind="pe_internal", )) # Engine → TCM (legacy BW edges) for eng, mm_key, bw_key in [("pe_dma", "dma_to_tcm_mm", "dma_to_tcm_bw_gbs"), ("pe_gemm", "gemm_to_tcm_mm", "gemm_to_tcm_bw_gbs"), ("pe_math", "math_to_tcm_mm", "math_to_tcm_bw_gbs")]: edges.append(Edge( src=f"{pp}.{eng}", dst=f"{pp}.pe_tcm", distance_mm=pe_links[mm_key], bw_gbs=pe_links[bw_key], kind="pe_internal", )) # Fetch/Store → TCM (ADR-0021 D5) if "fetch_store_to_tcm_mm" in pe_links: edges.append(Edge( src=f"{pp}.pe_fetch_store", dst=f"{pp}.pe_tcm", distance_mm=pe_links["fetch_store_to_tcm_mm"], bw_gbs=pe_links.get("fetch_store_to_tcm_bw_gbs", 512.0), kind="pe_internal", )) # Chaining edges (ADR-0021 D4 — token self-routing) chaining = [ ("pe_dma", "pe_fetch_store", "dma_to_fetch_store_mm"), ("pe_fetch_store", "pe_gemm", "fetch_store_to_gemm_mm"), ("pe_fetch_store", "pe_math", "fetch_store_to_math_mm"), ("pe_gemm", "pe_fetch_store", "gemm_to_fetch_store_mm"), ("pe_math", "pe_fetch_store", "math_to_fetch_store_mm"), ("pe_fetch_store", "pe_dma", "fetch_store_to_dma_mm"), ] for src_eng, dst_eng, mm_key in chaining: if mm_key in pe_links: edges.append(Edge( src=f"{pp}.{src_eng}", dst=f"{pp}.{dst_eng}", distance_mm=pe_links[mm_key], kind="pe_internal", )) # PE_IPCQ edges (ADR-0023 D1, D9 D10) ipcq_edges = [ ("pe_cpu", "pe_ipcq", "cpu_to_ipcq_mm"), # IpcqRequest ("pe_ipcq", "pe_dma", "ipcq_to_dma_mm"), # IpcqDmaToken outbound ("pe_dma", "pe_ipcq", "dma_to_ipcq_mm"), # IpcqMetaArrival inbound ] for src_c, dst_c, mm_key in ipcq_edges: if mm_key in pe_links: edges.append(Edge( src=f"{pp}.{src_c}", dst=f"{pp}.{dst_c}", distance_mm=pe_links[mm_key], kind="pe_internal", )) # ── Inter-cube / IO / system edges ────────────────────────────────── def _add_inter_cube_edges( edges: list[Edge], sp: str, mesh_w: int, mesh_h: int, sip_spec: dict, ) -> None: """Add UCIe mesh edges between adjacent cubes within a SIP.""" mesh = sip_spec["links"]["inter_cube_mesh"] bw = mesh["bw_gbs_per_ucie_phy"] dist = mesh["distance_mm_across_seam"] for row in range(mesh_h): for col in range(mesh_w): cid = row * mesh_w + col if col + 1 < mesh_w: nid = row * mesh_w + (col + 1) edges.append(Edge( src=f"{sp}.cube{cid}.ucie-E", dst=f"{sp}.cube{nid}.ucie-W", distance_mm=dist, bw_gbs=bw, kind="ucie_mesh", )) edges.append(Edge( src=f"{sp}.cube{nid}.ucie-W", dst=f"{sp}.cube{cid}.ucie-E", distance_mm=dist, bw_gbs=bw, kind="ucie_mesh", )) if row + 1 < mesh_h: nid = (row + 1) * mesh_w + col edges.append(Edge( src=f"{sp}.cube{cid}.ucie-S", dst=f"{sp}.cube{nid}.ucie-N", distance_mm=dist, bw_gbs=bw, kind="ucie_mesh", )) edges.append(Edge( src=f"{sp}.cube{nid}.ucie-N", dst=f"{sp}.cube{cid}.ucie-S", distance_mm=dist, bw_gbs=bw, kind="ucie_mesh", )) def _add_io_to_cube_edges( edges: list[Edge], sp: str, sip_spec: dict, mesh_w: int, ) -> None: """Add IO chiplet io_ucie ↔ cube UCIe edges (bidirectional).""" for inst in sip_spec["iochiplet"]["instances"]: iid = inst["id"] phy_bw = float(inst["ucie"]["phy_bw_gbs"]) for port in inst["cube_ports"]: cube_col, cube_row = port["cube"]["xy"] cube_id = cube_row * mesh_w + cube_col cube_side = port["cube_side"] phy = port["phy"] io_ucie_id = f"{sp}.{iid}.ucie-{phy}" cube_ucie_id = f"{sp}.cube{cube_id}.ucie-{cube_side}" edges.append(Edge( src=io_ucie_id, dst=cube_ucie_id, distance_mm=port["distance_mm"], bw_gbs=phy_bw, kind="io_to_cube", )) edges.append(Edge( src=cube_ucie_id, dst=io_ucie_id, distance_mm=port["distance_mm"], bw_gbs=phy_bw, kind="cube_to_io", )) def _add_system_to_io_edges( edges: list[Edge], sp: str, sip_spec: dict, system: dict, ) -> None: """Add bidirectional fabric switch ↔ IO chiplet PCIe edges. Both directions are needed: switch → pcie_ep for host→device traffic (memory writes, kernel launch) pcie_ep → switch for device-side outbound traffic (cross-SIP IPCQ send between PE_DMAs through the system switch). """ sw_id = "fabric.switch0" sys_link = system["links"]["io_ep_to_switch"] for inst in sip_spec["iochiplet"]["instances"]: pcie_ep_id = f"{sp}.{inst['id']}.pcie_ep" edges.append(Edge( src=sw_id, dst=pcie_ep_id, distance_mm=sys_link["distance_mm"], bw_gbs=sys_link["bw_gbs_per_ep"], kind="pcie", )) edges.append(Edge( src=pcie_ep_id, dst=sw_id, distance_mm=sys_link["distance_mm"], bw_gbs=sys_link["bw_gbs_per_ep"], kind="pcie", )) # ── View builders ──────────────────────────────────────────────────── def _build_system_view(spec: dict) -> ViewGraph: """System-level view: SIP blocks, IO chiplets, fabric switch.""" system = spec["system"] sip_count = system["sips"]["count"] sip_w, sip_h = 71.0, 59.0 gap = 30.0 canvas_w = sip_count * sip_w + (sip_count - 1) * gap canvas_h = sip_h + 20.0 nodes: dict[str, Node] = {} view_edges: list[Edge] = [] sw = system["components"]["switch"] sw_id = "fabric.switch0" nodes[sw_id] = Node( id=sw_id, kind=sw["kind"], impl=sw["impl"], attrs=sw.get("attrs", {}), pos_mm=(canvas_w / 2, 5.0), label="Fabric Switch", ) for s in range(sip_count): sx = s * (sip_w + gap) sy = 20.0 sip_id = f"sip{s}" nodes[sip_id] = Node( id=sip_id, kind="sip", impl="", attrs={"w_mm": sip_w, "h_mm": sip_h}, pos_mm=(sx + sip_w / 2, sy + sip_h / 2), label=f"SIP {s}", ) for inst in spec["sip"]["iochiplet"]["instances"]: iid = inst["id"] io_nid = f"{sip_id}.{iid}" side = inst["place"]["side"] iy = sy if side == "N" else sy + sip_h nodes[io_nid] = Node( id=io_nid, kind="iochiplet", impl="", attrs={}, pos_mm=(sx + sip_w / 2, iy), label=f"IO {iid}", ) view_edges.append(Edge( src=sw_id, dst=io_nid, distance_mm=system["links"]["io_ep_to_switch"]["distance_mm"], bw_gbs=system["links"]["io_ep_to_switch"]["bw_gbs_per_ep"], kind="pcie", )) return ViewGraph( name="system", nodes=nodes, edges=view_edges, width_mm=canvas_w, height_mm=canvas_h, ) def _build_sip_view(spec: dict) -> ViewGraph: """SIP-level view: cube mesh + IO chiplets (representative, sip0).""" sip_spec = spec["sip"] cube_spec = spec["cube"] mesh_w = sip_spec["cube_mesh"]["w"] mesh_h = sip_spec["cube_mesh"]["h"] cube_w = cube_spec["geometry"]["cube_mm"]["w"] cube_h = cube_spec["geometry"]["cube_mm"]["h"] seam = sip_spec["links"]["inter_cube_mesh"]["distance_mm_across_seam"] stride_x = cube_w + seam stride_y = cube_h + seam mesh_total_w = mesh_w * cube_w + (mesh_w - 1) * seam mesh_total_h = mesh_h * cube_h + (mesh_h - 1) * seam io_margin = 6.0 canvas_w = mesh_total_w canvas_h = mesh_total_h + 2 * io_margin nodes: dict[str, Node] = {} view_edges: list[Edge] = [] # Cubes as opaque blocks for row in range(mesh_h): for col in range(mesh_w): cid = row * mesh_w + col cx = col * stride_x + cube_w / 2 cy = io_margin + row * stride_y + cube_h / 2 nid = f"cube{cid}" nodes[nid] = Node( id=nid, kind="cube", impl="", attrs={"w_mm": cube_w, "h_mm": cube_h, "col": col, "row": row}, pos_mm=(cx, cy), label=f"CUBE ({col},{row})", ) # Inter-cube mesh edges mesh_link = sip_spec["links"]["inter_cube_mesh"] for row in range(mesh_h): for col in range(mesh_w): cid = row * mesh_w + col if col + 1 < mesh_w: nid = row * mesh_w + (col + 1) view_edges.append(Edge( src=f"cube{cid}", dst=f"cube{nid}", distance_mm=mesh_link["distance_mm_across_seam"], bw_gbs=mesh_link["bw_gbs_per_ucie_phy"], kind="ucie_mesh", )) if row + 1 < mesh_h: nid = (row + 1) * mesh_w + col view_edges.append(Edge( src=f"cube{cid}", dst=f"cube{nid}", distance_mm=mesh_link["distance_mm_across_seam"], bw_gbs=mesh_link["bw_gbs_per_ucie_phy"], kind="ucie_mesh", )) # IO chiplets io_ucie_cfg = sip_spec["iochiplet"].get("ucie", {}) io_noc_to_ucie_mm = float(io_ucie_cfg.get("noc_to_ucie_mm", 0.5)) for inst in sip_spec["iochiplet"]["instances"]: iid = inst["id"] side = inst["place"]["side"] iy = 2.0 if side == "N" else canvas_h - 2.0 phy_bw = float(inst["ucie"]["phy_bw_gbs"]) nodes[iid] = Node( id=iid, kind="iochiplet", impl="", attrs={}, pos_mm=(mesh_total_w / 2, iy), label=f"IO {iid}", ) for port in inst["cube_ports"]: cube_col, cube_row = port["cube"]["xy"] cube_id = cube_row * mesh_w + cube_col view_edges.append(Edge( src=iid, dst=f"cube{cube_id}", distance_mm=io_noc_to_ucie_mm + port["distance_mm"], bw_gbs=phy_bw, kind="io_to_cube", )) return ViewGraph( name="sip", nodes=nodes, edges=view_edges, width_mm=canvas_w, height_mm=canvas_h, ) def _build_cube_view(spec: dict) -> ViewGraph: """Cube-level view: representative single cube, PEs as opaque blocks.""" cube = spec["cube"] cube_w = cube["geometry"]["cube_mm"]["w"] cube_h = cube["geometry"]["cube_mm"]["h"] local_pos = _cube_local_positions(cube_w, cube_h) clinks = cube["links"] n_slices = cube["memory_map"]["hbm_slices_per_cube"] half = n_slices // 2 nodes: dict[str, Node] = {} view_edges: list[Edge] = [] # UCIe ports + connection nodes ucie_cfg = cube["ucie"] ucie_n_conn = ucie_cfg.get("n_connections", 1) for port in ucie_cfg["ports"]: pid = f"ucie-{port}" lx, ly = local_pos[pid] nodes[pid] = Node( id=pid, kind="ucie_port", impl="builtin.ucie", attrs={}, pos_mm=(lx, ly), label=f"UCIe-{port}", ) for ci in range(ucie_n_conn): conn_id = f"ucie-{port}.conn{ci}" nodes[conn_id] = Node( id=conn_id, kind="ucie_conn", impl="builtin.ucie", attrs={"overhead_ns": 0.0}, pos_mm=(lx, ly), label=f"UCIe-{port} C{ci}", ) # Named components (hbm_ctrl as single node in view) for name in ("m_cpu", "hbm_ctrl", "sram"): c = cube["components"][name] lx, ly = local_pos.get(name, local_pos.get("hbm_ctrl")) nodes[name] = Node( id=name, kind=c["kind"], impl=c["impl"], attrs=c["attrs"], pos_mm=(lx, ly), label=name.upper().replace("_", " "), ) # Load mesh data early (needed for router nodes + PE distances) mesh_data = spec.get("_mesh", {}) # Router nodes from cube_mesh.yaml (explicit in view) router_spec = cube["components"]["noc_router"] routers = mesh_data.get("routers", {}) for rkey, rval in routers.items(): if rval is None: continue rx, ry = rval["pos_mm"] nodes[rkey] = Node( id=rkey, kind=router_spec["kind"], impl=router_spec["impl"], attrs=router_spec["attrs"], pos_mm=(rx, ry), label=rkey.upper(), ) # PEs as opaque blocks corners = cube["pe_layout"]["corners"] pe_per_corner = cube["pe_layout"]["pe_per_corner"] corner_pos = _corner_pe_positions(cube_w, cube_h) pe_noc_distances = _compute_pe_noc_distances( mesh_data, corner_pos, corners, pe_per_corner, ) if mesh_data else {} pe_idx = 0 pe_offset_y = 1.2 # mm offset to avoid overlapping router node for corner in corners: is_top = corner in ("NW", "NE") for ci in range(pe_per_corner): pid = f"pe{pe_idx}" px, py = corner_pos[corner][ci] # Offset PE above (top) or below (bottom) its router py_view = py - pe_offset_y if is_top else py + pe_offset_y nodes[pid] = Node( id=pid, kind="pe", impl="", attrs={"corner": corner}, pos_mm=(px, py_view), label=f"PE{pe_idx}", ) pe_idx += 1 # View edges based on cube_mesh.yaml attach (mirrors _instantiate_cube logic) pe_to_router_bw = clinks.get("pe_to_router_bw_gbs", 256.0) hbm_to_router_bw = clinks.get("hbm_to_router_bw_gbs", 256.0) sram_bw = clinks.get("sram_to_router_bw_gbs", 128.0) ucie_conn_bw_v = ucie_cfg.get("per_connection_bw_gbs", 128.0) n_rows = mesh_data.get("mesh", {}).get("rows", 6) n_cols = mesh_data.get("mesh", {}).get("cols", 6) # Router ↔ router mesh edges for r in range(n_rows): for c in range(n_cols): rkey = f"r{r}c{c}" if routers.get(rkey) is None: continue src_pos = routers[rkey]["pos_mm"] # Horizontal neighbor for nc in range(c + 1, n_cols): nkey = f"r{r}c{nc}" if routers.get(nkey) is None: continue dist = abs(routers[nkey]["pos_mm"][0] - src_pos[0]) view_edges.append(Edge( src=rkey, dst=nkey, distance_mm=round(dist, 2), kind="router_mesh", )) break # Vertical neighbor for nr in range(r + 1, n_rows): nkey = f"r{nr}c{c}" if routers.get(nkey) is None: continue dist = abs(routers[nkey]["pos_mm"][1] - src_pos[1]) view_edges.append(Edge( src=rkey, dst=nkey, distance_mm=round(dist, 2), kind="router_mesh", )) break # Component ↔ router edges from attach lists for rkey, rval in routers.items(): if rval is None: continue for item in rval.get("attach", []): if item.endswith(".dma"): pe_prefix = item.rsplit(".", 1)[0] pid = pe_prefix.replace("pe", "pe") # "pe0" → "pe0" if pid in nodes: view_edges.append(Edge( src=pid, dst=rkey, distance_mm=0.0, bw_gbs=pe_to_router_bw, kind="pe_to_router", )) view_edges.append(Edge( src=rkey, dst=pid, distance_mm=0.0, kind="command", )) elif item.endswith(".hbm"): view_edges.append(Edge( src=rkey, dst="hbm_ctrl", distance_mm=0.0, bw_gbs=hbm_to_router_bw, kind="router_to_hbm", )) elif item == "m_cpu": view_edges.append(Edge( src="m_cpu", dst=rkey, distance_mm=0.0, kind="command", )) view_edges.append(Edge( src=rkey, dst="m_cpu", distance_mm=0.0, kind="command", )) elif item == "sram": view_edges.append(Edge( src="sram", dst=rkey, distance_mm=0.0, bw_gbs=sram_bw, kind="router_to_sram", )) elif item.startswith("ucie_"): parts = item.split(".") direction = parts[0].replace("ucie_", "").upper() conn_num = parts[1].replace("c", "") conn_id = f"ucie-{direction}.conn{conn_num}" view_edges.append(Edge( src=rkey, dst=conn_id, distance_mm=0.0, bw_gbs=ucie_conn_bw_v, kind="router_to_ucie_conn", )) view_edges.append(Edge( src=conn_id, dst=rkey, distance_mm=0.0, bw_gbs=ucie_conn_bw_v, kind="ucie_conn_to_router", )) view_edges.append(Edge( src=conn_id, dst=f"ucie-{direction}", distance_mm=0.0, kind="ucie_internal", )) view_edges.append(Edge( src=f"ucie-{direction}", dst=conn_id, distance_mm=0.0, kind="ucie_internal", )) return ViewGraph( name="cube", nodes=nodes, edges=view_edges, width_mm=cube_w, height_mm=cube_h, ) def _build_pe_view(spec: dict) -> ViewGraph: """PE-level view: representative single PE with all template components.""" pe_tmpl = spec["cube"]["pe_template"] pe_links = pe_tmpl["links"] canvas_w, canvas_h = 12.0, 8.0 positions = { "pe_cpu": (1.5, 4.0), "pe_scheduler": (4.0, 4.0), "pe_dma": (7.0, 1.5), "pe_fetch_store": (8.5, 4.0), "pe_gemm": (7.0, 4.0), "pe_math": (7.0, 6.5), "pe_mmu": (4.0, 1.5), "pe_tcm": (10.0, 4.0), "pe_ipcq": (4.0, 6.5), } nodes: dict[str, Node] = {} view_edges: list[Edge] = [] for comp_name, comp_spec in pe_tmpl["components"].items(): px, py = positions.get(comp_name, (1.0, 1.0)) nodes[comp_name] = Node( id=comp_name, kind=comp_spec["kind"], impl=comp_spec["impl"], attrs=comp_spec["attrs"], pos_mm=(px, py), label=comp_name.upper().replace("_", " "), ) view_edges.append(Edge( src="pe_cpu", dst="pe_scheduler", distance_mm=pe_links["pe_cpu_to_scheduler_mm"], kind="pe_internal", )) for eng, key in [("pe_dma", "scheduler_to_dma_mm"), ("pe_gemm", "scheduler_to_gemm_mm"), ("pe_math", "scheduler_to_math_mm")]: view_edges.append(Edge( src="pe_scheduler", dst=eng, distance_mm=pe_links[key], kind="pe_internal", )) if "scheduler_to_fetch_store_mm" in pe_links: view_edges.append(Edge( src="pe_scheduler", dst="pe_fetch_store", distance_mm=pe_links["scheduler_to_fetch_store_mm"], kind="pe_internal", )) for eng, mm_key, bw_key in [("pe_dma", "dma_to_tcm_mm", "dma_to_tcm_bw_gbs"), ("pe_gemm", "gemm_to_tcm_mm", "gemm_to_tcm_bw_gbs"), ("pe_math", "math_to_tcm_mm", "math_to_tcm_bw_gbs")]: view_edges.append(Edge( src=eng, dst="pe_tcm", distance_mm=pe_links[mm_key], bw_gbs=pe_links[bw_key], kind="pe_internal", )) if "fetch_store_to_tcm_mm" in pe_links: view_edges.append(Edge( src="pe_fetch_store", dst="pe_tcm", distance_mm=pe_links["fetch_store_to_tcm_mm"], bw_gbs=pe_links.get("fetch_store_to_tcm_bw_gbs", 512.0), kind="pe_internal", )) return ViewGraph( name="pe", nodes=nodes, edges=view_edges, width_mm=canvas_w, height_mm=canvas_h, )