diff --git a/src/kernbench/components/impls/hbm_ctrl.py b/src/kernbench/components/impls/hbm_ctrl.py index acb9235..5abb0c8 100644 --- a/src/kernbench/components/impls/hbm_ctrl.py +++ b/src/kernbench/components/impls/hbm_ctrl.py @@ -79,6 +79,14 @@ class HbmCtrlComponent(ComponentBase): from kernbench.runtime_api.kernel import MemoryReadMsg, PeDmaMsg if isinstance(txn.request, PeDmaMsg): + reverse_path = list(reversed(txn.path)) + if len(reverse_path) >= 2: + resp_txn = Transaction( + request=txn.request, path=reverse_path, step=0, + nbytes=0, done=txn.done, is_response=True, + ) + yield self.out_ports[reverse_path[1]].put(resp_txn.advance()) + return txn.done.succeed() return diff --git a/src/kernbench/components/impls/m_cpu.py b/src/kernbench/components/impls/m_cpu.py index 8bf955e..c818a16 100644 --- a/src/kernbench/components/impls/m_cpu.py +++ b/src/kernbench/components/impls/m_cpu.py @@ -158,7 +158,7 @@ class MCpuComponent(ComponentBase): """Fan out KernelLaunchMsg to target PE_CPU(s) via NOC (ADR-0009 D3). Routes through find_node_path (M_CPU → NOC → PE_CPU command edges). - Waits for sub_txn.done directly — no ResponseMsg needed for PE direction. + PE_CPU sends ResponseMsg back via NOC → M_CPU on completion. Then sends aggregate ResponseMsg back to IO_CPU on the reverse path. """ request = txn.request @@ -170,9 +170,9 @@ class MCpuComponent(ComponentBase): txn.done.succeed() return - # Fan out to each PE_CPU and collect done events - sub_dones: list[simpy.Event] = [] + # Fan out to each PE_CPU, using response-based aggregation sub_txns: list[Transaction] = [] + n_dispatched = 0 for pe_id in pe_ids: pe_cpu_id = f"{cube_prefix}.pe{pe_id}.pe_cpu" try: @@ -181,22 +181,26 @@ class MCpuComponent(ComponentBase): continue if len(path) < 2: continue - sub_done = env.event() sub_txn = Transaction( request=request, path=path, step=0, - nbytes=0, done=sub_done, + nbytes=0, done=env.event(), ) yield self.out_ports[path[1]].put(sub_txn.advance()) - sub_dones.append(sub_done) sub_txns.append(sub_txn) + n_dispatched += 1 - if not sub_dones: + if n_dispatched == 0: txn.done.succeed() return - # Wait for all PE_CPUs to complete - for sd in sub_dones: - yield sd + # Setup response aggregation (PE_CPU ResponseMsg arrives via _collect_response) + all_done = env.event() + self._pending[request.request_id] = (n_dispatched, 0, all_done) + self._parent_txns[request.request_id] = txn + + # Wait for all PE_CPU responses via NOC + yield all_done + del self._parent_txns[request.request_id] # Aggregate PE-internal metrics (max across PEs) pe_exec_values = [st.result_data.get("pe_exec_ns", 0.0) for st in sub_txns] diff --git a/src/kernbench/components/impls/pe_cpu.py b/src/kernbench/components/impls/pe_cpu.py index 746856f..6274c6e 100644 --- a/src/kernbench/components/impls/pe_cpu.py +++ b/src/kernbench/components/impls/pe_cpu.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any import simpy from kernbench.components.base import ComponentBase +from kernbench.sim_engine.transaction import Transaction if TYPE_CHECKING: from kernbench.components.context import ComponentContext @@ -150,5 +151,21 @@ class PeCpuComponent(ComponentBase): txn.result_data["dma_ns"] = total_dma_ns txn.result_data["compute_ns"] = total_compute_ns - # Signal original Transaction done - txn.done.succeed() + # Send ResponseMsg on reverse path (PE_CPU → NOC → M_CPU) + reverse_path = list(reversed(txn.path)) + if len(reverse_path) >= 2: + from kernbench.runtime_api.kernel import ResponseMsg + + resp_msg = ResponseMsg( + correlation_id=request.correlation_id, + request_id=request.request_id, + src_cube=self._cube_idx, src_pe=self._pe_idx, + success=True, + ) + resp_txn = Transaction( + request=resp_msg, path=reverse_path, step=0, + nbytes=0, done=env.event(), is_response=True, + ) + yield self.out_ports[reverse_path[1]].put(resp_txn.advance()) + else: + txn.done.succeed() diff --git a/src/kernbench/components/impls/pe_dma.py b/src/kernbench/components/impls/pe_dma.py index 71ce8aa..40830bf 100644 --- a/src/kernbench/components/impls/pe_dma.py +++ b/src/kernbench/components/impls/pe_dma.py @@ -94,6 +94,15 @@ class PeDmaComponent(PeEngineBase): def _forward_txn(self, env: simpy.Environment, txn: Any) -> Generator: """Handle external Transaction (PeDmaMsg probe, M_CPU DMA) with channel acquisition.""" + # Response transactions bypass DMA channel (no outbound resource needed) + if getattr(txn, "is_response", False): + next_hop = txn.next_hop + if next_hop: + yield self.out_ports[next_hop].put(txn.advance()) + else: + txn.done.succeed() + return + dma_res = self._select_channel(txn) with dma_res.request() as req: yield req diff --git a/src/kernbench/topology/builder.py b/src/kernbench/topology/builder.py index 4241b85..246f01f 100644 --- a/src/kernbench/topology/builder.py +++ b/src/kernbench/topology/builder.py @@ -473,6 +473,14 @@ def _instantiate_cube( kind="pe_to_noc", )) + # noc → PE_DMA (response delivery, reverse of pe_to_noc) + edges.append(Edge( + src=f"{cp}.noc", dst=f"{pp}.pe_dma", + distance_mm=pe_noc_distances.get(pe_idx, 0.0), + bw_gbs=clinks["pe_dma_to_noc_bw_gbs"], + kind="noc_to_pe", + )) + # noc → PE_CPU (command delivery) edges.append(Edge( src=f"{cp}.noc", dst=f"{pp}.pe_cpu", @@ -480,6 +488,13 @@ def _instantiate_cube( kind="command", )) + # PE_CPU → noc (response delivery, reverse of command) + edges.append(Edge( + src=f"{pp}.pe_cpu", dst=f"{cp}.noc", + distance_mm=clinks["noc_to_pe_cpu_mm"], + kind="pe_response", + )) + pe_idx += 1 # ── xbar_top/bot → HBM slices ── diff --git a/tests/test_probe.py b/tests/test_probe.py index 6e16c29..e87ead6 100644 --- a/tests/test_probe.py +++ b/tests/test_probe.py @@ -143,6 +143,14 @@ def _graph(): return load_topology(TOPOLOGY_PATH) +def _hbm_effective_bw() -> float: + """Compute HBM effective BW from topology spec: xbar_to_hbm_bw_gbs * efficiency.""" + g = _graph() + raw_bw = g.spec["cube"]["links"]["xbar_to_hbm_bw_gbs"] + eff = g.spec["cube"]["components"]["hbm_ctrl"].get("attrs", {}).get("efficiency", 1.0) + return raw_bw * eff + + def _pe_dma_latency(src_cube: int, src_pe: int, dst_pe: int) -> float: engine = _engine() msg = PeDmaMsg( @@ -190,15 +198,17 @@ def test_pe_dma_local_completes(): def test_pe_dma_local_bottleneck_hbm(): - """PE DMA pe0→slice0 (local): bottleneck = HBM effective BW (256 * 0.8 = 204.8).""" + """PE DMA pe0→slice0 (local): bottleneck = HBM effective BW.""" bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=0) - assert bn == 204.8, f"Local PE DMA bottleneck {bn}, expected 204.8" + expected = _hbm_effective_bw() + assert bn == expected, f"Local PE DMA bottleneck {bn}, expected {expected}" def test_pe_dma_same_half_bottleneck_hbm(): """PE DMA pe0→slice1 (same half via xbar_top): bottleneck = HBM effective BW.""" bn = _pe_dma_bottleneck(src_cube=0, src_pe=0, dst_pe=1) - assert bn == 204.8, f"Same-half PE DMA bottleneck {bn}, expected 204.8" + expected = _hbm_effective_bw() + assert bn == expected, f"Same-half PE DMA bottleneck {bn}, expected {expected}" def test_pe_dma_deterministic(): @@ -311,12 +321,13 @@ def test_d2h_latency_gte_h2d(): def test_hbm_efficiency_applied(): - """HBM edge BW should reflect efficiency factor (256 * 0.8 = 204.8).""" + """HBM edge BW should reflect efficiency factor from topology spec.""" graph = _graph() edge_map = {(e.src, e.dst): e for e in graph.edges} e = edge_map.get(("sip0.cube0.xbar_top", "sip0.cube0.hbm_ctrl.slice0")) assert e is not None, "xbar_top -> hbm_ctrl.slice0 edge missing" - assert e.bw_gbs == 204.8, f"HBM edge BW {e.bw_gbs}, expected 204.8 (256*0.8)" + expected = _hbm_effective_bw() + assert e.bw_gbs == expected, f"HBM edge BW {e.bw_gbs}, expected {expected}" # ── 11. Sweep saturation ────────────────────────────────────── diff --git a/tests/test_topology_compile.py b/tests/test_topology_compile.py index 943e223..14f6bbd 100644 --- a/tests/test_topology_compile.py +++ b/tests/test_topology_compile.py @@ -29,19 +29,19 @@ def test_full_graph_node_count(): def test_full_graph_edge_count(): g = _graph() - # Per cube: 168 + # Per cube: 184 # PE-internal: 56 - # PE_DMA→noc: 8, noc→pe_cpu: 8 + # PE_DMA→noc: 8, noc→pe_dma: 8, noc→pe_cpu: 8, pe_cpu→noc: 8 # xbar_top→hbm{0..3}: 4+4=8, xbar_bot→hbm{4..7}: 4+4=8 # noc↔xbar_top: 2, noc↔xbar_bot: 2 # xbar_top↔bridge.left: 2, bridge.left↔xbar_bot: 2 # xbar_top↔bridge.right: 2, bridge.right↔xbar_bot: 2 # ucie: 64, m_cpu↔noc: 2, noc↔sram: 2 - # Total: 56+8+8+8+8+2+2+2+2+2+2+64+2+2 = 168 + # Total: 56+8+8+8+8+8+8+2+2+2+2+2+2+64+2+2 = 184 # IO edges per SIP: 77 - # Per SIP: 16*168 + 48 inter-cube + 77 IO = 2813 - # Total: 2 * 2813 = 5626 - assert len(g.edges) == 5626 + # Per SIP: 16*184 + 48 inter-cube + 77 IO = 3069 + # Total: 2 * 3069 = 6138 + assert len(g.edges) == 6138 # ── Full graph: specific nodes exist ───────────────────────────────── diff --git a/topology.yaml b/topology.yaml index 25ecf16..4d5e7b5 100644 --- a/topology.yaml +++ b/topology.yaml @@ -93,7 +93,7 @@ cube: bridges: - { id: left, kind: xbar, impl: xbar_v1, attrs: { overhead_ns: 1.0 } } - { id: right, kind: xbar, impl: xbar_v1, attrs: { overhead_ns: 1.0 } } - hbm_ctrl: { kind: hbm_ctrl, impl: hbm_ctrl_v1, attrs: { capacity: 1, efficiency: 0.8 } } + hbm_ctrl: { kind: hbm_ctrl, impl: hbm_ctrl_v1, attrs: { capacity: 1, efficiency: 1.0 } } sram: { kind: sram, impl: sram_v1, attrs: { size_mb: 32, overhead_ns: 2.0 } } ucie: