Files
kernbench2/docs/ccl-author-guide.en.md
T
ywkang 357cab525b ADR-0026: DPPolicy intra-device only + ShardSpec structural coords
DPPolicy no longer carries a cross-SIP axis. SIP-level placement is
solely controlled by torch.ahbm.set_device(rank) (ADR-0024); DPPolicy
itself describes only the cube × PE layout within one SIP. ShardSpec
switches to structural (sip, cube, pe) coordinates; the flat pe_index
field/property is fully removed — silent drift between global-flat and
SIP-local interpretations was a foot-gun flagged by ADR-0024 D11.

Breaking API (explicit TypeError / AttributeError):
- DPPolicy(sip=...) / DPPolicy(num_sips=...) -> TypeError
- ShardSpec.pe_index -> AttributeError
- ShardSpec(pe_index=...) -> TypeError
- resolve_dp_policy now takes target_sip= (required), no num_sips.

Downstream migration:
- PE allocator dict keyed by (sip, cube, pe) tuples, in both
  _ensure_allocators and _free_tensor. deploy_tensor uses tuple lookup.
- _create_tensor passes target_sip=current_sip; post-hoc pe_index
  shifting removed entirely.
- launch._compute_local_shape drops the dp.sip branch.
- Internal resolvers (column_wise / row_wise / replicate / tiled_*)
  return _LocalPeShard (cube-local identifier) instead of ShardSpec —
  resolve_dp_policy lifts them to full structural coords.

Tests:
- New tests/test_adr0026_dppolicy_intra_device.py (12 tests) pins the
  contract end-to-end.
- test_sip_parallel.py rewritten: SIP composition now modeled as two
  resolve_dp_policy(target_sip=...) calls (ADR-0024 launcher style).
- Call-site migration: test_tensor, test_va_integration, test_va_offset,
  test_runtime_api_tensor, test_tl_recv_async, test_ccl_* and benches
  gemm_single_pe, gpt3_qkv, va_offset_verify, ccl_allreduce (legacy
  branch) all use intra-device DPPolicy and structural ShardSpec.

Result: 523 passed, 1 strict xfail (ring_default_ws — unchanged
ADR-0024 Phase B blocker; architectural fix deferred to ADR-0027).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 13:02:19 -07:00

593 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# CCL Algorithm Author Guide (English)
This document is a step-by-step guide for engineers writing CCL
(Collective Communication Library) algorithms in kernbench. The
internal system design and component structure live in
[ADR-0023](adr/ADR-0023-ipcq-pe-collective.md).
The goal here is to clearly separate **what an algorithm author has to
touch** from **what they can leave alone**, and to get a first
algorithm running through the shortest possible path.
---
## 0. Five-minute tour
| Things you touch | Location |
|------------------|----------|
| Algorithm module (kernel + optional `neighbors()`) | `src/kernbench/ccl/algorithms/<algo>.py` |
| Algorithm registration | `ccl.yaml` |
| Host bench (rank count, init, launch, verify) | `benches/<your_bench>.py` |
| (Optional) unit test | `tests/test_<algo>.py` |
| Things you do NOT touch | Location |
|--------------------------|----------|
| TLContext API | `src/kernbench/triton_emu/tl_context.py` (ADR-0022 spec) |
| Framework (topology generators, helpers, mock testing) | `src/kernbench/ccl/` |
| PE_IPCQ / PE_DMA components | `src/kernbench/components/builtin/` |
| Backend implementation (`install_ipcq`) | `src/kernbench/runtime_api/distributed.py` and `kernbench/ccl/install.py` |
Workflow:
1. Write a `kernel` function in the algorithm module.
2. Register an entry in `ccl.yaml`.
3. Write a host bench using `torch.distributed.init_process_group` /
`torch.distributed.all_reduce` (the unified `benches/ccl_allreduce.py`
handles the common case).
4. (Optional) Run the mock runtime for fast unit tests (a few ms).
5. `kernbench run --bench <name> --verify-data` for full SimPy verification.
---
## 1. Hello World — the simplest send/recv
Each PE sends its tile to its E neighbor once and receives a tile from
its W neighbor once. The reference code lives in
[`src/kernbench/ccl/algorithms/hello_send.py`](../src/kernbench/ccl/algorithms/hello_send.py).
### Step 1: write the kernel
New file `src/kernbench/ccl/algorithms/hello_send.py`:
```python
"""Hello world: send your tile to the next rank, receive from the previous one."""
def kernel(t_ptr, n_elem, tl):
# Global rank is computed from program_id(0/1) (ADR-0022).
local_pe = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
pes_per_cube = tl.num_programs(axis=0)
rank = cube_id * pes_per_cube + local_pe
nbytes = n_elem * 2 # f16
pe_addr = t_ptr + rank * nbytes
# Load our slice and send it east.
src = tl.load(pe_addr, shape=(n_elem,), dtype="f16")
tl.send(dir="E", src=src)
# Receive from west and store directly back into our slice.
recv = tl.recv(dir="W", shape=(n_elem,), dtype="f16")
tl.store(pe_addr, recv)
def kernel_args(world_size: int, n_elem: int) -> tuple:
"""Positional kernel args used by the ahbm backend (after t_ptr)."""
return (n_elem,)
```
Key points:
- **Global rank is computed from `program_id(axis=0)` + `program_id(axis=1)`.**
TL has no contractually-supported `tl.rank` / `tl.world_size`. If the
host needs to pass `world_size` or anything else as an algorithm
parameter, it goes through ordinary `torch.launch` arguments.
- **`tl.send` takes a `TensorHandle`.** PE_IPCQ reads
`addr`/`space`/`shape`/`dtype`/`nbytes` from the handle to issue an
`IpcqDmaToken` to PE_DMA.
- **`tl.recv` requires `shape` and `dtype`.** The returned TensorHandle
points at the IPCQ ring slot and can be used directly as a `dst`
handle (e.g. `tl.store(pe_addr, recv)`). Phase 2's `dma_write` replay
handles the (slot → hbm) copy, so user code never has to touch
`recv.data`.
### Step 2: register in `ccl.yaml`
```yaml
algorithms:
hello_send:
module: kernbench.ccl.algorithms.hello_send
topology: ring_1d
buffer_kind: tcm
world_size: 8
```
`world_size` here is optional. If absent, `AhbmCCLBackend` derives it
from the topology spec (`sips × cubes_per_sip × pes_per_cube`).
### Step 3: write a host bench (optional — the unified bench may suffice)
For most CCL benchmarks the existing `benches/ccl_allreduce.py` is
sufficient: it reads `ccl.yaml`, picks the algorithm, sets up the
process group, and runs the collective. If your algorithm needs custom
host logic, write a new bench file along the same lines.
The host code looks like a real PyTorch DDP worker:
```python
"""benches/ccl_hello.py"""
from __future__ import annotations
import numpy as np
from kernbench.policy.placement.dp import DPPolicy
N_ELEM = 8
def worker(rank: int, world_size: int, torch) -> None:
"""Per-rank business logic — mirrors a real PyTorch DDP worker."""
dp = DPPolicy(
cube="replicate", pe="column_wise",
num_cubes=1, num_pes=world_size,
)
tensor = torch.zeros(
(1, world_size * N_ELEM), dtype="f16", dp=dp, name="hello_in",
)
# Per-rank initialization via the real PyTorch idiom.
init = np.zeros((1, world_size * N_ELEM), dtype=np.float16)
for r in range(world_size):
init[0, r * N_ELEM : (r + 1) * N_ELEM] = float(r + 1)
tensor.copy_(torch.from_numpy(init))
# The collective itself.
torch.distributed.all_reduce(tensor, op="sum")
# Verify on rank 0 (real PyTorch DDP idiom).
if rank == 0:
result = tensor.numpy()
for r in range(world_size):
expected = float(((r - 1) % world_size) + 1)
slice_r = result[0, r * N_ELEM : (r + 1) * N_ELEM]
print(
f" rank {r}: got {float(slice_r.mean()):.1f}, "
f"expected {expected:.1f}"
)
def run(torch) -> None:
"""CLI entry point. Initializes dist, dispatches to worker."""
dist = torch.distributed
dist.init_process_group(backend="ahbm")
worker(
rank=dist.get_rank(),
world_size=dist.get_world_size(),
torch=torch,
)
```
### Step 4: unit test (optional but strongly recommended)
`tests/test_hello_send.py`:
```python
import numpy as np
from kernbench.ccl.algorithms.hello_send import kernel
from kernbench.ccl.testing import run_kernel_in_mock
def test_hello_send_4_ranks():
n_elem = 8
inputs = [
np.full((n_elem,), float(r + 1), dtype=np.float16)
for r in range(4)
]
outputs = run_kernel_in_mock(
kernel_fn=kernel,
world_size=4,
topology="ring_1d",
inputs=inputs,
kernel_args=(n_elem,),
)
# rank r should now hold rank (r-1) % 4's data.
for r in range(4):
assert np.array_equal(outputs[r], inputs[(r - 1) % 4])
```
`run_kernel_in_mock` runs every rank concurrently in pure Python (no
SimPy), so a unit test like this finishes in **milliseconds**. It only
verifies algorithmic correctness — no latency, no DMA, no fabric.
### Step 5: SimPy validation
```bash
kernbench run --topology topology.yaml --bench ccl_hello --verify-data
```
Phase 1 runs the SimPy simulation + MemoryStore data movement, Phase 2
replays the op_log for correctness. The bench's `print` lines should
show OK for every rank.
---
## 2. Ring all-reduce — the second algorithm
Slightly more complex. Each PE runs `world_size - 1` rounds, sending
its current tile east and accumulating the tile received from the west.
After all rounds, every PE holds the global sum.
The reference implementation lives in
[`src/kernbench/ccl/algorithms/ring_allreduce.py`](../src/kernbench/ccl/algorithms/ring_allreduce.py).
The core flow:
```python
"""Ring all-reduce."""
def kernel(t_ptr, n_elem, world_size, tl):
local_pe = tl.program_id(axis=0)
cube_id = tl.program_id(axis=1)
pes_per_cube = tl.num_programs(axis=0)
rank = cube_id * pes_per_cube + local_pe
nbytes = n_elem * 2
pe_addr = t_ptr + rank * nbytes
# The handle points at HBM[pe_addr]. In greenlet mode .data is
# populated, but the kernel never has to touch .data directly.
acc = tl.load(pe_addr, shape=(n_elem,), dtype="f16")
current = acc # source for the first send
for _step in range(world_size - 1):
tl.send(dir="E", src=current)
recv = tl.recv(dir="W", shape=(n_elem,), dtype="f16")
# TensorHandle operator overload → MathCmd → PE_MATH dispatch.
# Phase 1 only models timing; Phase 2 DataExecutor replays the
# actual numpy accumulation.
acc = acc + recv
current = recv # forward the received slot to the next round
# Store the final accumulator back to HBM. Source is acc (a PE-local
# scratch addr); dst is HBM. The op_log dma_write entry records both
# ends so Phase 2 copies the math result into HBM at verify time.
tl.store(pe_addr, acc)
def kernel_args(world_size: int, n_elem: int) -> tuple:
return (n_elem, world_size)
```
Four key points:
1. **Accumulation goes through TensorHandle operators.** `acc + recv`
emits a `MathCmd` and dispatches it through PE_MATH — i.e. the
real hardware path, so the latency model stays accurate. Per
ADR-0020 D3, Phase 1 only simulates timing; Phase 2's `DataExecutor`
replays the op_log and runs the actual numpy accumulation.
2. **Use `current = recv` to forward.** Each round must update the send
source to the just-received slot handle so the same data circulates
exactly once around the ring. Setting `current = acc` would resend
the cumulative sum, inflating the result.
3. **`tl.store(pe_addr, acc)` exactly once at the end.** Do not use a
store→reload pattern in the middle. `acc` lives in PE-local scratch;
the op_log records `(src=scratch, dst=hbm)` and Phase 2 first runs
math (filling scratch) then copies via the dma_write snapshot.
4. **`world_size` is passed by the host explicitly.** TL only knows the
topology slot count (e.g. `num_programs(axis=0)` is "PEs per cube"),
not the participating CCL group size. The host bench knows
`world_size` and forwards it as an explicit kernel argument.
For registration in `ccl.yaml` and wiring through the unified bench,
look at the existing `ring_allreduce_tcm/_hbm/_sram` entries plus
[`benches/ccl_allreduce.py`](../benches/ccl_allreduce.py). Mock unit
tests live in
[`tests/test_ccl_mock_runtime.py`](../tests/test_ccl_mock_runtime.py)
and follow the `kernel_args=(n_elem, world_size)` convention.
---
## 3. `neighbors()` override — custom topology
Most algorithms are happy with the builtin topologies (`ring_1d`,
`mesh_2d`, `tree_binary`, `ring_1d_unidir`, `none`). If you want to
modify a builtin or define a brand-new connectivity pattern, define a
`neighbors()` function in your algorithm module.
### Signature
```python
def neighbors(
rank: int, world_size: int, neighbor_map: dict[str, int],
) -> dict[str, int] | None:
"""Override the neighbor map produced by the builtin topology.
Args:
neighbor_map: the mapping the ccl.yaml ``topology`` field built.
For ring_1d this is {"E": (rank+1)%ws, "W": (rank-1)%ws}.
The dict is mutable — modify in place if you want.
Returns:
dict: the new neighbor map (or the modified-in-place dict).
None: do not override; use neighbor_map as-is.
"""
return None
```
### Pattern A: tweak a builtin
```python
def neighbors(rank, world_size, neighbor_map):
# Only even ranks use W; remove W from odd ranks.
if rank % 2 == 1:
neighbor_map.pop("W", None)
return neighbor_map
```
### Pattern B: replace entirely (skip-connection ring)
```python
def neighbors(rank, world_size, neighbor_map):
return {"E": (rank + 2) % world_size}
```
### Pattern C: keep builtin
Either omit `neighbors` entirely or return None:
```python
def neighbors(rank, world_size, neighbor_map):
return None # explicit "use the builtin"
```
---
## 4. PE kernel API reference (ADR-0023 D4)
### IPCQ API
| API | Description | Blocking? |
|-----|-------------|-----------|
| `tl.send(dir, src=TensorHandle)` | Send to a peer in the given direction. | Yes (waits if peer slots are full) |
| `tl.send(dir, src_addr=..., nbytes=..., shape=..., dtype=..., space=...)` | Same, keyword form. | Yes |
| `tl.recv(dir, shape=..., dtype=...)` | Blocking recv from one direction. | Yes |
| `tl.recv(shape=..., dtype=...)` | Round-robin recv across all four directions. | Yes |
| `tl.recv_async(dir, shape=..., dtype=...) → RecvFuture` | Non-blocking recv. | No |
| `tl.wait(future)` | Wait for a non-blocking recv future → returns the resolved TensorHandle. | Yes |
### Existing TL API (ADR-0020/0022, unchanged)
| API | Description |
|-----|-------------|
| `tl.load(addr, shape, dtype) → TensorHandle` | DMA read; in greenlet mode `.data` carries the ndarray. |
| `tl.store(addr, handle)` | DMA write — when `handle.data` is set the runner propagates it to MemoryStore. |
| `tl.composite(op, ...)` | Submit a GEMM/Math composite (non-blocking). |
| `tl.program_id(axis=0)` | Local PE id within the cube. |
| `tl.program_id(axis=1)` | Cube id (ADR-0022). |
| `tl.num_programs(axis=0/1)` | Topology slot counts (NOT the participating-rank count). |
### Two recv modes
The default is `return_slot` (zero-copy): the IPCQ slot address is
returned in `handle.addr`. To force a copy into a custom destination,
pass `dst_addr` + `dst_space`:
```python
recv = tl.recv(
dir="W", shape=(8,), dtype="f16",
dst_addr=my_scratch_addr,
dst_space="hbm",
)
# After this call recv.addr == my_scratch_addr (copy_to_dst mode).
```
---
## 5. Helpers (`kernbench.ccl.helpers`)
Convenience helpers to keep algorithm code short:
```python
from kernbench.ccl.helpers import chunked, ring_step, tree_step
```
### `chunked(base_addr, n_chunks, n_elem, dtype="f16") → list[Chunk]`
Split a tile of `n_elem` elements into `n_chunks` equal-size views.
Each `Chunk` has `addr`, `n_elem`, `nbytes` fields.
```python
chunks = chunked(t_ptr, n_chunks=4, n_elem=64, dtype="f16")
# chunks[0..3] are 16-element views with consecutive addresses.
```
### `ring_step(rank, step, world_size) → (send_idx, recv_idx)`
Per-step chunk indices for a ring algorithm (reduce-scatter / all-gather):
```python
for step in range(world_size - 1):
send_idx, recv_idx = ring_step(rank, step, world_size)
tl.send(
dir="E", src_addr=chunks[send_idx].addr,
nbytes=chunks[send_idx].nbytes,
shape=(chunks[send_idx].n_elem,), dtype="f16",
)
recv = tl.recv(
dir="W", shape=(chunks[recv_idx].n_elem,), dtype="f16",
)
# accumulate ...
```
### `tree_step(rank, world_size) → {"parent": int|None, "children": list[int]}`
Parent / children rank ids for a binary tree:
```python
info = tree_step(rank, world_size)
if info["parent"] is None:
print(f"rank {rank} is the root")
for child in info["children"]:
...
```
---
## 6. Unit testing — Mock runtime
`kernbench.ccl.testing.run_kernel_in_mock` runs an algorithm without
SimPy for fast feedback.
### Basic usage
```python
import numpy as np
from kernbench.ccl.testing import run_kernel_in_mock
from kernbench.ccl.algorithms.my_algo import kernel
def test_my_algo():
n_elem = 16
inputs = [np.arange(n_elem, dtype="f16") + r for r in range(4)]
expected = sum(inputs)
outputs = run_kernel_in_mock(
kernel_fn=kernel,
world_size=4,
topology="ring_1d",
inputs=inputs,
kernel_args=(n_elem, 4), # positional args after t_ptr
)
for r in range(4):
assert np.allclose(outputs[r], expected, rtol=1e-3)
```
### Behavior
- All ranks run their kernels concurrently as cooperative greenlets.
- `tl.send` / `tl.recv` are serviced by in-memory FIFOs (no DMA, no
latency).
- Each rank's last `store` is what the helper returns as a numpy array.
### Limitations
- No latency or performance numbers (it is not a simulation).
- No PE_DMA, fabric, or BW model.
- Correctness only.
- One cube assumed: `program_id(axis=1)` is always 0.
---
## 7. Debugging
### CCL trace
```bash
KERNBENCH_CCL_TRACE=1 kernbench run --topology topology.yaml \
--bench ccl_allreduce --verify-data
```
Per-rank send/recv events appear on stdout:
```
[ccl t=346.4 send] sip0.cube0.pe1 dir=E nbytes=64 seq=0
[ccl t=360.4 recv] sip0.cube0.pe2 dir=W nbytes=64
```
### Pointer dump
`kernbench.ccl.diagnostics.pointer_dump(engine)` returns a multi-line
dump of every PE_IPCQ ring buffer's `my_head`, `my_tail`,
`peer_head_cache`, `peer_tail_cache`. When something hangs, this shows
which rank is stuck and on what.
### Deadlock detection
When the SimPy schedule empties because of unmatched send/recv pairs,
the engine raises `IpcqDeadlock` and embeds the pointer dump in the
message (ADR-0023 D14 F3). Wait-for-graph visualization is future
work.
---
## 8. Common mistakes
### 1. Using a direction that wasn't installed
`topology: ring_1d` only installs E and W. Trying:
```python
tl.send(dir="N", ...) # → IpcqInvalidDirection
```
Fix: switch to `topology: mesh_2d`, or add N/S in a `neighbors()` override.
### 2. `send` without a matching `recv`
```python
def kernel(..., tl):
for _ in range(100):
tl.send(dir="E", ...)
# The peer never recvs → ring buffer fills → backpressure → deadlock.
```
Fix: every `send` needs a matching `recv` on the receiver side.
Otherwise `IpcqDeadlock` is raised.
### 3. dtype/shape mismatch
By default mismatches are not validated. The author is responsible for
consistency. Set `strict_validation: true` on a PE_IPCQ node's attrs to
enable D14 F2 strict mode and catch them immediately.
### 4. Assuming round-robin recv fairness
`tl.recv()` (no direction) returns the first slot to arrive in
round-robin order, but **arrival order is not predictable**. If your
algorithm depends on a particular direction, name it explicitly:
`tl.recv(dir="N", ...)`.
### 5. Confusing `num_programs` with the CCL group size
`tl.num_programs(axis=0/1)` reports topology slot counts, not the
number of ranks participating in the collective. The host bench knows
`world_size` and must pass it through as a kernel argument.
### 6. Overwriting the send source before it's actually sent
PE_DMA snapshots the source data into the IpcqDmaToken at send time,
preserving in-flight semantics. Even so, the safest pattern is to call
`tl.send` first and only mutate the source addr afterwards. If you
mutate the addr before `tl.send` makes it into the PE_DMA queue, the
snapshot will pick up the wrong data.
---
## 9. Next steps
- Try other topologies (`mesh_2d`, `tree_binary`).
- Faster algorithms (recursive halving / doubling).
- Compare `buffer_kind` (tcm/hbm/sram) and `backpressure` (poll/sleep)
modes for latency.
- Larger-scale validation through the unified `ccl_allreduce` bench
with different `ccl.yaml` overlays.
If you add a new algorithm or pattern, please send a PR.
---
## References
- [ADR-0023](adr/ADR-0023-ipcq-pe-collective.md): IPCQ + PE-level collective design.
- [ADR-0022](adr/ADR-0022-program-id-2d-grid.md): 2D grid program_id (axis=0/1).
- [ADR-0020](adr/ADR-0020-data-execution-two-pass.md): 2-pass data execution.
- [ADR-0021](adr/ADR-0021-pe-pipeline-refactor.md): PE pipeline refactor.
Existing algorithm examples:
- [`src/kernbench/ccl/algorithms/hello_send.py`](../src/kernbench/ccl/algorithms/hello_send.py) — simplest send/recv
- [`src/kernbench/ccl/algorithms/ring_allreduce.py`](../src/kernbench/ccl/algorithms/ring_allreduce.py) — ring all-reduce
- [`src/kernbench/ccl/algorithms/mesh_allreduce.py`](../src/kernbench/ccl/algorithms/mesh_allreduce.py) — 2D mesh all-reduce
- [`src/kernbench/ccl/algorithms/tree_allreduce.py`](../src/kernbench/ccl/algorithms/tree_allreduce.py) — binary tree all-reduce