Files
kernbench2/tests/test_topology_compile.py
ywkang 998cc85762 Add PE-level IPCQ collective infra + unified ccl_allreduce bench (ADR-0023)
Major changes:

PE-level IPCQ infrastructure:
- New PE_IPCQ component: ring-buffer control plane with 4-direction
  neighbor mapping, head/tail pointers, backpressure (poll/sleep).
- PE_DMA extended with vc_comm channel for IPCQ outbound/inbound DMA,
  including in-flight data snapshot (D9) and op_log recording at
  outbound time for Phase 2 replay correctness.
- IpcqDmaToken piggyback model: data + metadata travel together,
  atomic visibility at receiver (invariant I6).
- Credit return fast path: bottleneck-BW latency, no fabric vc_comm.

Phase 2 data execution (ADR-0020 integration):
- op_log extended: DmaWriteCmd now captures src_space/src_addr for
  Phase 2 dma_write copy; ipcq_copy ops recorded at outbound time.
- DataExecutor replays dma_write + ipcq_copy in t_start order.
- Engine._flush_data_phase: incremental cursor-based replay after
  each engine.wait() so host reads see post-Phase-2 data.
- KernelRunner Phase 1 writes disabled when op_log is active to
  prevent stale data from corrupting the MemoryStore snapshot.

TLContext / kernel API:
- tl.send(dir, src=TensorHandle), tl.recv(dir, shape, dtype),
  tl.recv_async, tl.wait(RecvFuture), copy_to_dst mode.
- TensorHandle operator overloading (add/sub/mul/div) via thread-local
  active TLContext → MathCmd dispatch through PE_MATH.
- PE-local scratch allocator for math output handles.
- tl.load returns space="hbm" handles for correct Phase 2 addressing.
- Additional math functions: maximum, minimum, fma, clamp, softmax, cdiv.

Unified ccl_allreduce bench (PyTorch-compat host code):
- Single benches/ccl_allreduce.py with run() + worker(rank, ws, torch)
  split matching real PyTorch DDP worker pattern.
- torch.distributed facade: init_process_group, get_world_size,
  get_rank, get_backend, all_reduce, barrier — only real PyTorch names.
- AhbmCCLBackend: eager install_ipcq at init, all_reduce dispatches
  kernel via tensor shard metadata (n_elem from shards[0].nbytes).
- world_size derived from topology spec (sips × cubes × pes_per_cube)
  with optional algorithm-level override in ccl.yaml.

Tensor API (PyTorch-compat surface):
- Tensor.numpy(): gather-aware (all shards via VA-based addressing).
- Tensor.copy_(source): scatter from host tensor into sharded target.
- RuntimeContext.from_numpy(arr): host-side staging tensor.
- Tensor.data property fixed to use numpy() (was shards[0]-only).

Algorithm modules moved to src/kernbench/ccl/algorithms/:
- ring_allreduce, mesh_allreduce, tree_allreduce, hello_send.
- Each module exports kernel_args(world_size, n_elem) helper.
- ccl.yaml module paths updated to kernbench.ccl.algorithms.*.

Dead code removed:
- 7 per-variant bench files (ccl_allreduce_{tcm,hbm,sram}, etc.).
- _run_ccl_bench greenlet-per-SIP scheduler.
- benches.loader.is_ccl_bench + run_rank detection.
- benches/ccl/ directory.

Tests:
- New test_ccl_allreduce_matrix.py: 7 parametrized cases
  (ring×3 buffers, ring 8/16, mesh 4, tree 7).
- New test_runtime_api_tensor.py: copy_/numpy/from_numpy unit tests.
- Existing tests updated for new import paths + world_size_override.

Docs:
- Korean ccl-author-guide.md and ADR-0023 paths updated.
- New English versions: ccl-author-guide.en.md, ADR-0023.en.md.

502 tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 19:36:59 -07:00

428 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from pathlib import Path
from kernbench.policy.routing.router import PathRouter
from kernbench.topology.builder import load_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
def _graph():
return load_topology(TOPOLOGY_PATH)
# -- Full graph: node counts --------------------------------------------------
def test_full_graph_node_count():
g = _graph()
# 1 switch
# + 2 SIPs x (1 IO x 23 io_nodes
# + 16 cubes x (32 routers + 1 hbm_ctrl + 1 m_cpu + 1 sram
# + 20 ucie (4 ports x (1 port + 4 conn))
# + 8 PEs x 9 pe_comps)) (ADR-0023: +pe_ipcq)
# IO: pcie_ep + io_cpu + noc + 4 io_ucie_ports + 4*4 io_ucie_conn = 23
# cube: 32 + 3 + 20 + 72 = 127
# = 1 + 2*(23 + 16*127) = 1 + 2*(23+2032) = 1 + 4110 = 4111
assert len(g.nodes) == 4111
def test_full_graph_edge_count():
g = _graph()
# ADR-0023: +3 IPCQ edges per PE (cpu→ipcq, ipcq→dma, dma→ipcq)
# 2 SIPs × 16 cubes × 8 PEs × 3 = 768 new edges
# Cross-SIP routing: +1 reverse pcie_ep→switch edge per SIP = +2
assert len(g.edges) == 13692
# -- Full graph: specific nodes exist -----------------------------------------
def test_system_switch_exists():
g = _graph()
assert "fabric.switch0" in g.nodes
assert g.nodes["fabric.switch0"].kind == "switch"
assert g.nodes["fabric.switch0"].pos_mm is None # abstract
def test_io_chiplet_nodes_exist():
g = _graph()
for s in range(2):
assert f"sip{s}.io0.pcie_ep" in g.nodes
assert f"sip{s}.io0.io_cpu" in g.nodes
def test_cube_component_nodes_exist():
g = _graph()
cp = "sip0.cube0"
# Core cube components (no more noc, xbar, bridge)
for name in ("m_cpu", "sram", "hbm_ctrl",
"ucie-N", "ucie-S", "ucie-E", "ucie-W"):
assert f"{cp}.{name}" in g.nodes
# Old nodes must not exist
for old in ("noc", "xbar_top", "xbar_bot", "bridge.left", "bridge.right"):
assert f"{cp}.{old}" not in g.nodes
# Router mesh nodes (32 routers in 6x6 grid minus 4 null holes)
router_nodes = [n for n in g.nodes if n.startswith(f"{cp}.r")]
assert len(router_nodes) == 32
# Spot-check specific routers
assert f"{cp}.r0c0" in g.nodes
assert g.nodes[f"{cp}.r0c0"].kind == "noc_router"
assert f"{cp}.r5c5" in g.nodes
# Null holes must not exist
for null_rc in ("r2c2", "r2c3", "r3c2", "r3c3"):
assert f"{cp}.{null_rc}" not in g.nodes
# Single hbm_ctrl (no more slices)
assert g.nodes[f"{cp}.hbm_ctrl"].kind == "hbm_ctrl"
for s in range(8):
assert f"{cp}.hbm_ctrl.slice{s}" not in g.nodes
def test_pe_component_nodes_exist():
g = _graph()
for comp in ("pe_cpu", "pe_scheduler", "pe_dma", "pe_gemm", "pe_math", "pe_tcm"):
assert f"sip0.cube0.pe0.{comp}" in g.nodes
assert f"sip1.cube15.pe7.{comp}" in g.nodes
# -- Full graph: positions ----------------------------------------------------
def test_hbm_ctrl_at_cube_center():
g = _graph()
# Single hbm_ctrl per cube; cube0 origin = (0, 0), hbm at (6.5, 7.0)
node = g.nodes["sip0.cube0.hbm_ctrl"]
assert node.pos_mm == (6.5, 7.0)
def test_hbm_ctrl_cube5_position():
g = _graph()
# cube5 = col=1, row=1 -> origin = (1*18, 1*15) = (18, 15)
# hbm_ctrl = (18 + 6.5, 15 + 7.0) = (24.5, 22.0)
node = g.nodes["sip0.cube5.hbm_ctrl"]
assert node.pos_mm == (24.5, 22.0)
def test_ucie_ports_at_cube_edges():
g = _graph()
# cube0 origin = (0, 0), cube_w=17, cube_h=14
# UCIe nodes inset by half-size so edges touch boundary
assert g.nodes["sip0.cube0.ucie-N"].pos_mm == (8.5, 0.6)
assert g.nodes["sip0.cube0.ucie-S"].pos_mm == (8.5, 13.4)
assert g.nodes["sip0.cube0.ucie-W"].pos_mm == (1.0, 7.0)
assert g.nodes["sip0.cube0.ucie-E"].pos_mm == (16.0, 7.0)
# -- Full graph: edges --------------------------------------------------------
def _edge_set(g):
return {(e.src, e.dst) for e in g.edges}
def test_inter_cube_ucie_edges():
es = _edge_set(_graph())
# cube0 (0,0) E -> cube1 (1,0) W
assert ("sip0.cube0.ucie-E", "sip0.cube1.ucie-W") in es
# cube0 (0,0) S -> cube4 (0,1) N
assert ("sip0.cube0.ucie-S", "sip0.cube4.ucie-N") in es
def test_io_to_cube_edges():
es = _edge_set(_graph())
# io0 connects io_ucie PHYs to cube UCIe ports on N side
assert ("sip0.io0.ucie-P0", "sip0.cube0.ucie-N") in es
assert ("sip0.io0.ucie-P3", "sip0.cube3.ucie-N") in es
def test_switch_to_io_edges():
es = _edge_set(_graph())
assert ("fabric.switch0", "sip0.io0.pcie_ep") in es
assert ("fabric.switch0", "sip1.io0.pcie_ep") in es
def test_pe_dma_to_router():
"""PE_DMA connects to its local router (pe_to_router kind)."""
es = _edge_set(_graph())
cp = "sip0.cube0"
# PE0 at r0c0, PE1 at r0c1
assert (f"{cp}.pe0.pe_dma", f"{cp}.r0c0") in es
assert (f"{cp}.pe1.pe_dma", f"{cp}.r0c1") in es
# PE2 at r1c4, PE3 at r1c5
assert (f"{cp}.pe2.pe_dma", f"{cp}.r1c4") in es
assert (f"{cp}.pe3.pe_dma", f"{cp}.r1c5") in es
# PE4 at r4c0, PE5 at r4c1
assert (f"{cp}.pe4.pe_dma", f"{cp}.r4c0") in es
assert (f"{cp}.pe5.pe_dma", f"{cp}.r4c1") in es
# PE6 at r5c4, PE7 at r5c5
assert (f"{cp}.pe6.pe_dma", f"{cp}.r5c4") in es
assert (f"{cp}.pe7.pe_dma", f"{cp}.r5c5") in es
def test_command_path_m_cpu_router_pe_cpu():
es = _edge_set(_graph())
cp = "sip0.cube0"
# m_cpu <-> r1c2 (bidirectional command)
assert (f"{cp}.m_cpu", f"{cp}.r1c2") in es
assert (f"{cp}.r1c2", f"{cp}.m_cpu") in es
# router -> pe_cpu for each PE (command kind)
assert (f"{cp}.r0c0", f"{cp}.pe0.pe_cpu") in es
assert (f"{cp}.r5c5", f"{cp}.pe7.pe_cpu") in es
def test_pe_internal_edges():
es = _edge_set(_graph())
pp = "sip0.cube0.pe0"
assert (f"{pp}.pe_cpu", f"{pp}.pe_scheduler") in es
assert (f"{pp}.pe_scheduler", f"{pp}.pe_dma") in es
assert (f"{pp}.pe_scheduler", f"{pp}.pe_gemm") in es
assert (f"{pp}.pe_scheduler", f"{pp}.pe_math") in es
assert (f"{pp}.pe_dma", f"{pp}.pe_tcm") in es
assert (f"{pp}.pe_gemm", f"{pp}.pe_tcm") in es
assert (f"{pp}.pe_math", f"{pp}.pe_tcm") in es
def test_hbm_ctrl_connects_all_routers():
"""HBM_CTRL connects to every router (router_to_hbm / hbm_to_router)."""
g = _graph()
es = _edge_set(g)
cp = "sip0.cube0"
routers = sorted(n for n in g.nodes if n.startswith(f"{cp}.r"))
assert len(routers) == 32
for r in routers:
assert (r, f"{cp}.hbm_ctrl") in es, f"missing {r}->hbm_ctrl"
assert (f"{cp}.hbm_ctrl", r) in es, f"missing hbm_ctrl->{r}"
def test_router_mesh_edges():
"""Adjacent routers are connected by router_mesh edges."""
g = _graph()
edge_kinds = {(e.src, e.dst): e.kind for e in g.edges}
cp = "sip0.cube0"
# r0c0 <-> r0c1 (horizontal neighbors)
assert edge_kinds.get((f"{cp}.r0c0", f"{cp}.r0c1")) == "router_mesh"
assert edge_kinds.get((f"{cp}.r0c1", f"{cp}.r0c0")) == "router_mesh"
# r0c0 <-> r1c0 (vertical neighbors)
assert edge_kinds.get((f"{cp}.r0c0", f"{cp}.r1c0")) == "router_mesh"
assert edge_kinds.get((f"{cp}.r1c0", f"{cp}.r0c0")) == "router_mesh"
# -- Views: system ------------------------------------------------------------
def test_system_view_nodes():
v = _graph().system_view
assert "fabric.switch0" in v.nodes
assert "sip0" in v.nodes
assert "sip1" in v.nodes
assert "sip0.io0" in v.nodes
assert "sip1.io0" in v.nodes
# -- Views: SIP ---------------------------------------------------------------
def test_sip_view_cube_count():
v = _graph().sip_view
cube_nodes = [n for n in v.nodes if n.startswith("cube")]
assert len(cube_nodes) == 16
def test_sip_view_io_chiplets():
v = _graph().sip_view
assert "io0" in v.nodes
def test_sip_view_cube_positions():
v = _graph().sip_view
# cube0 (0,0): center = (8.5, 6+7.0) = (8.5, 13.0) [io_margin=6]
x, y = v.nodes["cube0"].pos_mm
assert x == 8.5
assert y == 13.0
# cube1 (1,0): center = (18+8.5, 13.0) = (26.5, 13.0)
x1, y1 = v.nodes["cube1"].pos_mm
assert x1 == 26.5
assert y1 == 13.0
# -- Views: cube ---------------------------------------------------------------
def test_cube_view_has_all_components():
v = _graph().cube_view
expected = {"ucie-N", "ucie-S", "ucie-W", "ucie-E",
"m_cpu", "hbm_ctrl", "sram",
"pe0", "pe1", "pe2", "pe3", "pe4", "pe5", "pe6", "pe7",
"r0c0", "r0c1", "r0c2", "r0c3", "r0c4", "r0c5",
"r1c0", "r1c1", "r1c2", "r1c3", "r1c4", "r1c5",
"r2c0", "r2c1", "r2c4", "r2c5",
"r3c0", "r3c1", "r3c4", "r3c5",
"r4c0", "r4c1", "r4c2", "r4c3", "r4c4", "r4c5",
"r5c0", "r5c1", "r5c2", "r5c3", "r5c4", "r5c5"}
# Add UCIe connection nodes (4 ports x 4 connections)
for port in ("N", "S", "E", "W"):
for ci in range(4):
expected.add(f"ucie-{port}.conn{ci}")
assert set(v.nodes.keys()) == expected
def test_cube_view_hbm_at_center():
v = _graph().cube_view
assert v.nodes["hbm_ctrl"].pos_mm == (6.5, 7.0)
assert "r0c0" in v.nodes # routers exist in cube view
assert v.width_mm == 17.0
assert v.height_mm == 14.0
def test_cube_view_pe_to_router():
"""PEs connect to their assigned routers in cube view."""
v = _graph().cube_view
ves = {(e.src, e.dst) for e in v.edges}
pe_router_map = {"pe0": "r0c0", "pe1": "r0c1", "pe2": "r1c4", "pe3": "r1c5",
"pe4": "r4c0", "pe5": "r4c1", "pe6": "r5c4", "pe7": "r5c5"}
for pe, router in pe_router_map.items():
assert (pe, router) in ves, f"{pe} should connect to {router}"
# -- Views: PE ----------------------------------------------------------------
def test_pe_view_has_all_components():
v = _graph().pe_view
assert set(v.nodes.keys()) == {
"pe_cpu", "pe_scheduler", "pe_dma", "pe_fetch_store",
"pe_gemm", "pe_math", "pe_mmu", "pe_tcm", "pe_ipcq",
}
def test_pe_view_edges():
v = _graph().pe_view
ves = {(e.src, e.dst) for e in v.edges}
assert ("pe_cpu", "pe_scheduler") in ves
assert ("pe_scheduler", "pe_dma") in ves
assert ("pe_scheduler", "pe_gemm") in ves
assert ("pe_scheduler", "pe_math") in ves
assert ("pe_dma", "pe_tcm") in ves
assert ("pe_gemm", "pe_tcm") in ves
assert ("pe_math", "pe_tcm") in ves
# -- SRAM ----------------------------------------------------------------------
def test_sram_node_exists():
g = _graph()
assert "sip0.cube0.sram" in g.nodes
assert g.nodes["sip0.cube0.sram"].kind == "sram"
def test_sram_to_router_edges():
es = _edge_set(_graph())
cp = "sip0.cube0"
# SRAM connects to router r3c0
assert (f"{cp}.sram", f"{cp}.r3c0") in es
assert (f"{cp}.r3c0", f"{cp}.sram") in es
# -- PE_DMA -> Router (data path) ---------------------------------------------
def test_pe_dma_to_router_edges():
es = _edge_set(_graph())
cp = "sip0.cube0"
# Each PE DMA connects to its local router
pe_router_map = {
0: "r0c0", 1: "r0c1", 2: "r1c4", 3: "r1c5",
4: "r4c0", 5: "r4c1", 6: "r5c4", 7: "r5c5",
}
for i, router in pe_router_map.items():
assert (f"{cp}.pe{i}.pe_dma", f"{cp}.{router}") in es
# -- UCIe conn nodes connect to routers (not NOC) -----------------------------
def test_ucie_noc_reverse_edges():
"""UCIe ports connect to routers via conn nodes (bidirectional)."""
es = _edge_set(_graph())
cp = "sip0.cube1" # non-edge cube to avoid io-cube edges
for port in ("N", "S", "E", "W"):
# Each conn has edges: ucie<->conn, conn<->router
for ci in range(4):
conn = f"{cp}.ucie-{port}.conn{ci}"
assert (f"{cp}.ucie-{port}", conn) in es, \
f"missing ucie-{port}->conn{ci}"
assert (conn, f"{cp}.ucie-{port}") in es, \
f"missing conn{ci}->ucie-{port}"
def test_ucie_conn_nodes_exist():
"""Each UCIe port must have n_connections independent conn nodes."""
g = _graph()
cp = "sip0.cube0"
for port in ("N", "S", "E", "W"):
for ci in range(4):
conn_id = f"{cp}.ucie-{port}.conn{ci}"
assert conn_id in g.nodes, f"missing {conn_id}"
assert g.nodes[conn_id].kind == "ucie_conn"
assert g.nodes[conn_id].attrs["overhead_ns"] == 0.0
def test_ucie_conn_edge_bw():
"""conn<->router edges must have per_connection_bw_gbs (128 GB/s)."""
g = _graph()
edge_map = {(e.src, e.dst): e for e in g.edges}
cp = "sip0.cube0"
# Check conn0 for each port connects to a router with correct bw
for port in ("N", "S", "E", "W"):
for ci in range(4):
conn_id = f"{cp}.ucie-{port}.conn{ci}"
# Find the ucie_conn_to_router edge
conn_edges = [e for e in g.edges
if e.src == conn_id and e.kind == "ucie_conn_to_router"]
assert len(conn_edges) == 1, f"expected 1 ucie_conn_to_router from {conn_id}"
assert conn_edges[0].bw_gbs == 128.0
def test_cross_cube_path_includes_conn():
"""PE cross-cube path must traverse conn nodes."""
g = _graph()
router = PathRouter(g)
path = router.find_path("sip0.cube0.pe0", "sip0.cube1.hbm_ctrl")
conn_nodes = [n for n in path if ".conn" in n]
assert len(conn_nodes) >= 2, f"Expected >=2 conn nodes in path, got {conn_nodes}"
# -- Cube view: edges ---------------------------------------------------------
def test_cube_view_pe_to_router_edges():
"""All PEs connect to their routers in cube view."""
v = _graph().cube_view
ves = {(e.src, e.dst) for e in v.edges}
pe_router_map = {"pe0": "r0c0", "pe1": "r0c1", "pe2": "r1c4", "pe3": "r1c5",
"pe4": "r4c0", "pe5": "r4c1", "pe6": "r5c4", "pe7": "r5c5"}
for pe, router in pe_router_map.items():
assert (pe, router) in ves, f"{pe} should connect to {router}"
def test_cube_view_sram():
v = _graph().cube_view
assert "sram" in v.nodes
ves = {(e.src, e.dst) for e in v.edges}
assert ("sram", "r3c0") in ves
def test_cube_view_hbm_router():
"""Cube view: PE routers connect to hbm_ctrl."""
v = _graph().cube_view
ves = {(e.src, e.dst) for e in v.edges}
assert ("r0c0", "hbm_ctrl") in ves # PE0's router → HBM
def test_cube_view_m_cpu_router():
"""Cube view: m_cpu connects to its router r1c2."""
v = _graph().cube_view
ves = {(e.src, e.dst) for e in v.edges}
assert ("m_cpu", "r1c2") in ves
assert ("r1c2", "m_cpu") in ves