from pathlib import Path from kernbench.policy.routing.router import PathRouter from kernbench.topology.builder import load_topology TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml" def _graph(): return load_topology(TOPOLOGY_PATH) # -- Full graph: node counts -------------------------------------------------- def test_full_graph_node_count(): g = _graph() # 1 switch # + 2 SIPs x (1 IO x 23 io_nodes # + 16 cubes x (32 routers + 1 hbm_ctrl + 1 m_cpu + 1 sram # + 20 ucie (4 ports x (1 port + 4 conn)) # + 8 PEs x 7 pe_comps)) # IO: pcie_ep + io_cpu + noc + 4 io_ucie_ports + 4*4 io_ucie_conn = 23 # cube: 32 + 3 + 20 + 56 = 111 # = 1 + 2*(23 + 16*111) = 1 + 2*(23+1776) = 1 + 3598 = 3599 assert len(g.nodes) == 3599 def test_full_graph_edge_count(): g = _graph() assert len(g.edges) == 10874 # -- Full graph: specific nodes exist ----------------------------------------- def test_system_switch_exists(): g = _graph() assert "fabric.switch0" in g.nodes assert g.nodes["fabric.switch0"].kind == "switch" assert g.nodes["fabric.switch0"].pos_mm is None # abstract def test_io_chiplet_nodes_exist(): g = _graph() for s in range(2): assert f"sip{s}.io0.pcie_ep" in g.nodes assert f"sip{s}.io0.io_cpu" in g.nodes def test_cube_component_nodes_exist(): g = _graph() cp = "sip0.cube0" # Core cube components (no more noc, xbar, bridge) for name in ("m_cpu", "sram", "hbm_ctrl", "ucie-N", "ucie-S", "ucie-E", "ucie-W"): assert f"{cp}.{name}" in g.nodes # Old nodes must not exist for old in ("noc", "xbar_top", "xbar_bot", "bridge.left", "bridge.right"): assert f"{cp}.{old}" not in g.nodes # Router mesh nodes (32 routers in 6x6 grid minus 4 null holes) router_nodes = [n for n in g.nodes if n.startswith(f"{cp}.r")] assert len(router_nodes) == 32 # Spot-check specific routers assert f"{cp}.r0c0" in g.nodes assert g.nodes[f"{cp}.r0c0"].kind == "noc_router" assert f"{cp}.r5c5" in g.nodes # Null holes must not exist for null_rc in ("r2c2", "r2c3", "r3c2", "r3c3"): assert f"{cp}.{null_rc}" not in g.nodes # Single hbm_ctrl (no more slices) assert g.nodes[f"{cp}.hbm_ctrl"].kind == "hbm_ctrl" for s in range(8): assert f"{cp}.hbm_ctrl.slice{s}" not in g.nodes def test_pe_component_nodes_exist(): g = _graph() for comp in ("pe_cpu", "pe_scheduler", "pe_dma", "pe_gemm", "pe_math", "pe_tcm"): assert f"sip0.cube0.pe0.{comp}" in g.nodes assert f"sip1.cube15.pe7.{comp}" in g.nodes # -- Full graph: positions ---------------------------------------------------- def test_hbm_ctrl_at_cube_center(): g = _graph() # Single hbm_ctrl per cube; cube0 origin = (0, 0), hbm at (6.5, 7.0) node = g.nodes["sip0.cube0.hbm_ctrl"] assert node.pos_mm == (6.5, 7.0) def test_hbm_ctrl_cube5_position(): g = _graph() # cube5 = col=1, row=1 -> origin = (1*18, 1*15) = (18, 15) # hbm_ctrl = (18 + 6.5, 15 + 7.0) = (24.5, 22.0) node = g.nodes["sip0.cube5.hbm_ctrl"] assert node.pos_mm == (24.5, 22.0) def test_ucie_ports_at_cube_edges(): g = _graph() # cube0 origin = (0, 0), cube_w=17, cube_h=14 # UCIe nodes inset by half-size so edges touch boundary assert g.nodes["sip0.cube0.ucie-N"].pos_mm == (8.5, 0.6) assert g.nodes["sip0.cube0.ucie-S"].pos_mm == (8.5, 13.4) assert g.nodes["sip0.cube0.ucie-W"].pos_mm == (1.0, 7.0) assert g.nodes["sip0.cube0.ucie-E"].pos_mm == (16.0, 7.0) # -- Full graph: edges -------------------------------------------------------- def _edge_set(g): return {(e.src, e.dst) for e in g.edges} def test_inter_cube_ucie_edges(): es = _edge_set(_graph()) # cube0 (0,0) E -> cube1 (1,0) W assert ("sip0.cube0.ucie-E", "sip0.cube1.ucie-W") in es # cube0 (0,0) S -> cube4 (0,1) N assert ("sip0.cube0.ucie-S", "sip0.cube4.ucie-N") in es def test_io_to_cube_edges(): es = _edge_set(_graph()) # io0 connects io_ucie PHYs to cube UCIe ports on N side assert ("sip0.io0.ucie-P0", "sip0.cube0.ucie-N") in es assert ("sip0.io0.ucie-P3", "sip0.cube3.ucie-N") in es def test_switch_to_io_edges(): es = _edge_set(_graph()) assert ("fabric.switch0", "sip0.io0.pcie_ep") in es assert ("fabric.switch0", "sip1.io0.pcie_ep") in es def test_pe_dma_to_router(): """PE_DMA connects to its local router (pe_to_router kind).""" es = _edge_set(_graph()) cp = "sip0.cube0" # PE0 at r0c0, PE1 at r0c1 assert (f"{cp}.pe0.pe_dma", f"{cp}.r0c0") in es assert (f"{cp}.pe1.pe_dma", f"{cp}.r0c1") in es # PE2 at r1c4, PE3 at r1c5 assert (f"{cp}.pe2.pe_dma", f"{cp}.r1c4") in es assert (f"{cp}.pe3.pe_dma", f"{cp}.r1c5") in es # PE4 at r4c0, PE5 at r4c1 assert (f"{cp}.pe4.pe_dma", f"{cp}.r4c0") in es assert (f"{cp}.pe5.pe_dma", f"{cp}.r4c1") in es # PE6 at r5c4, PE7 at r5c5 assert (f"{cp}.pe6.pe_dma", f"{cp}.r5c4") in es assert (f"{cp}.pe7.pe_dma", f"{cp}.r5c5") in es def test_command_path_m_cpu_router_pe_cpu(): es = _edge_set(_graph()) cp = "sip0.cube0" # m_cpu <-> r2c0 (bidirectional command) assert (f"{cp}.m_cpu", f"{cp}.r2c0") in es assert (f"{cp}.r2c0", f"{cp}.m_cpu") in es # router -> pe_cpu for each PE (command kind) assert (f"{cp}.r0c0", f"{cp}.pe0.pe_cpu") in es assert (f"{cp}.r5c5", f"{cp}.pe7.pe_cpu") in es def test_pe_internal_edges(): es = _edge_set(_graph()) pp = "sip0.cube0.pe0" assert (f"{pp}.pe_cpu", f"{pp}.pe_scheduler") in es assert (f"{pp}.pe_scheduler", f"{pp}.pe_dma") in es assert (f"{pp}.pe_scheduler", f"{pp}.pe_gemm") in es assert (f"{pp}.pe_scheduler", f"{pp}.pe_math") in es assert (f"{pp}.pe_dma", f"{pp}.pe_tcm") in es assert (f"{pp}.pe_gemm", f"{pp}.pe_tcm") in es assert (f"{pp}.pe_math", f"{pp}.pe_tcm") in es def test_hbm_ctrl_connects_all_routers(): """HBM_CTRL connects to every router (router_to_hbm / hbm_to_router).""" g = _graph() es = _edge_set(g) cp = "sip0.cube0" routers = sorted(n for n in g.nodes if n.startswith(f"{cp}.r")) assert len(routers) == 32 for r in routers: assert (r, f"{cp}.hbm_ctrl") in es, f"missing {r}->hbm_ctrl" assert (f"{cp}.hbm_ctrl", r) in es, f"missing hbm_ctrl->{r}" def test_router_mesh_edges(): """Adjacent routers are connected by router_mesh edges.""" g = _graph() edge_kinds = {(e.src, e.dst): e.kind for e in g.edges} cp = "sip0.cube0" # r0c0 <-> r0c1 (horizontal neighbors) assert edge_kinds.get((f"{cp}.r0c0", f"{cp}.r0c1")) == "router_mesh" assert edge_kinds.get((f"{cp}.r0c1", f"{cp}.r0c0")) == "router_mesh" # r0c0 <-> r1c0 (vertical neighbors) assert edge_kinds.get((f"{cp}.r0c0", f"{cp}.r1c0")) == "router_mesh" assert edge_kinds.get((f"{cp}.r1c0", f"{cp}.r0c0")) == "router_mesh" # -- Views: system ------------------------------------------------------------ def test_system_view_nodes(): v = _graph().system_view assert "fabric.switch0" in v.nodes assert "sip0" in v.nodes assert "sip1" in v.nodes assert "sip0.io0" in v.nodes assert "sip1.io0" in v.nodes # -- Views: SIP --------------------------------------------------------------- def test_sip_view_cube_count(): v = _graph().sip_view cube_nodes = [n for n in v.nodes if n.startswith("cube")] assert len(cube_nodes) == 16 def test_sip_view_io_chiplets(): v = _graph().sip_view assert "io0" in v.nodes def test_sip_view_cube_positions(): v = _graph().sip_view # cube0 (0,0): center = (8.5, 6+7.0) = (8.5, 13.0) [io_margin=6] x, y = v.nodes["cube0"].pos_mm assert x == 8.5 assert y == 13.0 # cube1 (1,0): center = (18+8.5, 13.0) = (26.5, 13.0) x1, y1 = v.nodes["cube1"].pos_mm assert x1 == 26.5 assert y1 == 13.0 # -- Views: cube --------------------------------------------------------------- def test_cube_view_has_all_components(): v = _graph().cube_view expected = {"ucie-N", "ucie-S", "ucie-W", "ucie-E", "m_cpu", "hbm_ctrl", "router_mesh", "sram", "pe0", "pe1", "pe2", "pe3", "pe4", "pe5", "pe6", "pe7"} # Add UCIe connection nodes (4 ports x 4 connections) for port in ("N", "S", "E", "W"): for ci in range(4): expected.add(f"ucie-{port}.conn{ci}") assert set(v.nodes.keys()) == expected def test_cube_view_hbm_at_center(): v = _graph().cube_view assert v.nodes["hbm_ctrl"].pos_mm == (6.5, 7.0) assert v.nodes["router_mesh"].pos_mm == (10.5, 7.0) assert v.width_mm == 17.0 assert v.height_mm == 14.0 def test_cube_view_pe_to_router_mesh(): """PEs connect to router_mesh in cube view.""" v = _graph().cube_view ves = {(e.src, e.dst) for e in v.edges} for i in range(8): assert (f"pe{i}", "router_mesh") in ves # -- Views: PE ---------------------------------------------------------------- def test_pe_view_has_all_components(): v = _graph().pe_view assert set(v.nodes.keys()) == { "pe_cpu", "pe_scheduler", "pe_dma", "pe_gemm", "pe_math", "pe_mmu", "pe_tcm" } def test_pe_view_edges(): v = _graph().pe_view ves = {(e.src, e.dst) for e in v.edges} assert ("pe_cpu", "pe_scheduler") in ves assert ("pe_scheduler", "pe_dma") in ves assert ("pe_scheduler", "pe_gemm") in ves assert ("pe_scheduler", "pe_math") in ves assert ("pe_dma", "pe_tcm") in ves assert ("pe_gemm", "pe_tcm") in ves assert ("pe_math", "pe_tcm") in ves # -- SRAM ---------------------------------------------------------------------- def test_sram_node_exists(): g = _graph() assert "sip0.cube0.sram" in g.nodes assert g.nodes["sip0.cube0.sram"].kind == "sram" def test_sram_to_router_edges(): es = _edge_set(_graph()) cp = "sip0.cube0" # SRAM connects to router r3c0 assert (f"{cp}.sram", f"{cp}.r3c0") in es assert (f"{cp}.r3c0", f"{cp}.sram") in es # -- PE_DMA -> Router (data path) --------------------------------------------- def test_pe_dma_to_router_edges(): es = _edge_set(_graph()) cp = "sip0.cube0" # Each PE DMA connects to its local router pe_router_map = { 0: "r0c0", 1: "r0c1", 2: "r1c4", 3: "r1c5", 4: "r4c0", 5: "r4c1", 6: "r5c4", 7: "r5c5", } for i, router in pe_router_map.items(): assert (f"{cp}.pe{i}.pe_dma", f"{cp}.{router}") in es # -- UCIe conn nodes connect to routers (not NOC) ----------------------------- def test_ucie_noc_reverse_edges(): """UCIe ports connect to routers via conn nodes (bidirectional).""" es = _edge_set(_graph()) cp = "sip0.cube1" # non-edge cube to avoid io-cube edges for port in ("N", "S", "E", "W"): # Each conn has edges: ucie<->conn, conn<->router for ci in range(4): conn = f"{cp}.ucie-{port}.conn{ci}" assert (f"{cp}.ucie-{port}", conn) in es, \ f"missing ucie-{port}->conn{ci}" assert (conn, f"{cp}.ucie-{port}") in es, \ f"missing conn{ci}->ucie-{port}" def test_ucie_conn_nodes_exist(): """Each UCIe port must have n_connections independent conn nodes.""" g = _graph() cp = "sip0.cube0" for port in ("N", "S", "E", "W"): for ci in range(4): conn_id = f"{cp}.ucie-{port}.conn{ci}" assert conn_id in g.nodes, f"missing {conn_id}" assert g.nodes[conn_id].kind == "ucie_conn" assert g.nodes[conn_id].attrs["overhead_ns"] == 0.0 def test_ucie_conn_edge_bw(): """conn<->router edges must have per_connection_bw_gbs (128 GB/s).""" g = _graph() edge_map = {(e.src, e.dst): e for e in g.edges} cp = "sip0.cube0" # Check conn0 for each port connects to a router with correct bw for port in ("N", "S", "E", "W"): for ci in range(4): conn_id = f"{cp}.ucie-{port}.conn{ci}" # Find the ucie_conn_to_router edge conn_edges = [e for e in g.edges if e.src == conn_id and e.kind == "ucie_conn_to_router"] assert len(conn_edges) == 1, f"expected 1 ucie_conn_to_router from {conn_id}" assert conn_edges[0].bw_gbs == 128.0 def test_cross_cube_path_includes_conn(): """PE cross-cube path must traverse conn nodes.""" g = _graph() router = PathRouter(g) path = router.find_path("sip0.cube0.pe0", "sip0.cube1.hbm_ctrl") conn_nodes = [n for n in path if ".conn" in n] assert len(conn_nodes) >= 2, f"Expected >=2 conn nodes in path, got {conn_nodes}" # -- Cube view: edges --------------------------------------------------------- def test_cube_view_pe_to_router_mesh_edges(): """All PEs connect to router_mesh in cube view.""" v = _graph().cube_view ves = {(e.src, e.dst) for e in v.edges} for i in range(8): assert (f"pe{i}", "router_mesh") in ves def test_cube_view_sram(): v = _graph().cube_view assert "sram" in v.nodes ves = {(e.src, e.dst) for e in v.edges} assert ("router_mesh", "sram") in ves def test_cube_view_hbm_router_mesh(): """Cube view: hbm_ctrl connects to router_mesh.""" v = _graph().cube_view ves = {(e.src, e.dst) for e in v.edges} assert ("router_mesh", "hbm_ctrl") in ves assert ("hbm_ctrl", "router_mesh") in ves def test_cube_view_m_cpu_router_mesh(): """Cube view: m_cpu connects to router_mesh.""" v = _graph().cube_view ves = {(e.src, e.dst) for e in v.edges} assert ("router_mesh", "m_cpu") in ves assert ("m_cpu", "router_mesh") in ves