Files
kernbench2/docs/adr/ADR-0024-sip-tp-launcher.md
T
mukesh 1c33afec55 ADR-0032 + intra_* opposite directions in IPCQ install
Add intra_N/S/E/W to install.py _OPPOSITE_DIR table so the intra-cube
PE-to-PE namespace is symmetrical with intercube N/S/E/W. ADR-0032
documents the intercube allreduce algorithm (supersedes ADR-0029).
Refresh ADR-0024/0025/0029 cross-refs and update
test_intercube_sfr_config.py to cover the new intra_* mappings. Drop
the obsolete test_ccl_round_robin_recv.py (replaced by intercube tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 16:43:01 -07:00

998 lines
38 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ADR-0024: SIP-level TP Launcher — rank = SIP (host-driven dispatch)
## Status
Accepted. rank = SIP process-group model stands. The allreduce algorithm
path (mapper / validator / per-PE install machinery originally targeted at
ADR-0029) has been replaced by ADR-0032: `AhbmCCLBackend` now calls
`configure_sfr_intercube_multisip` at `init_process_group` time and the
intercube kernel receives `(sip_rank, sip_topo_kind, sip_topo_w,
sip_topo_h)` appended after the module's `kernel_args()`. The
`leader_only` / `all_pes` mapper concepts in this document are no longer
used by the default allreduce path.
## Context
### 목표
`torch.distributed` collective 호출의 참여 단위(rank)를 **SIP**(device)
경계에 맞춘다. 실제 PyTorch DDP/TP 스크립트와 **호스트 레벨에서 구분 없이**
읽히는 bench 코드를 목표로 한다.
real PyTorch와 비교:
| 차원 | real PyTorch | KernBench (이 ADR 이후) |
|---|---|---|
| 프로세스 모델 | N개 프로세스, 각 1 GPU | 1 프로세스, N greenlet, 각 1 SIP |
| `get_rank()` | `RANK` env var | greenlet-local 레지스트리 |
| `get_world_size()` | `WORLD_SIZE` env var | topology의 SIP 수 |
| `torch.cuda.set_device(r)` (real) / `torch.ahbm.set_device(r)` (KernBench) | rank → GPU | rank → SIP |
| `mp.spawn` | OS 프로세스 fork | greenlet fan-out |
### 설계 원칙 — 공개 API의 추상화, 내부는 기존 path 활용
**공개 API (bench worker) 수준의 추상화**:
```
rank = SIP
DPPolicy = intra-device (cube × PE) 분산만
dist.all_reduce, torch.ahbm.set_device, mp.spawn 등 PyTorch-style 표면
```
**Framework 내부 구현**:
```
build_install_plans (host): topology + mapper + algorithm → SipInstallPlan
backend (host): plan의 per-PE spec을 engine.submit으로 IpcqInitMsg 디스패치
engine: 기존 PE-scoped routing (MmuMapMsg 등과 동일 경로)
PE_IPCQ: 자체 message loop에서 IpcqInitMsg 처리 (기존 capability)
```
**핵심**: 새 message 타입이나 IO_CPU 확장 없음. 기존 engine routing과 기존
`IpcqInitMsg` 타입을 그대로 사용. 기존의 "sideband direct call" 우회만
제거하여 convention 일원화.
### 현재 상태
- `DistributedContext` facade 존재
- `init_process_group("ahbm")``AhbmCCLBackend``ctx.install_ipcq` 호출
`ccl/install.py`**sideband direct call** (`pe_ipcq._install_neighbors`)로
PE_IPCQ에 neighbor table 설치
- `get_rank()` 항상 `0` (single-driver)
- `get_world_size()` fallback: 총 PE 수 (rank = PE)
- `benches/ccl_allreduce.py`: `worker(rank=0, world_size=total_PEs)` 1회 호출
### 풀어야 할 문제
1. **공개 API에서 rank = SIP** — bench worker가 PE 개념을 알지 않도록.
2. **Multi-worker 실행** — N개 rank가 독립 worker 코드 실행. 1 프로세스 제약
하에서 greenlet + barrier 동기화.
3. **Cross-rank collective submit 동기화** — 첫 rank가 혼자 wait하면 peer 부재로
SimPy deadlock. 모든 rank submit 후 drain 보장.
4. **기존 sideband install 제거** — IpcqInitMsg를 engine.submit으로 일원화.
MmuMapMsg 등 다른 control-plane 메시지와 동일 패턴.
5. **Algorithm / mapper / validator 분리** — 알고리즘 모듈은 kernel 코드만
담고, topology / mapping / validation은 registry + 선언.
### Non-problem (이 ADR 밖)
- IPCQ direction addressing fix → **ADR-0025**
- `DPPolicy.sip`/`num_sips` 제거 → **ADR-0026**
- Megatron-style TP → **ADR-0027**
- DTensor → **ADR-0028 (future)**
- **IO_CPU를 SIP-level control-plane 단일 endpoint로 승격**: 이 ADR에서는
invariant으로 채택하지 않음. 현재 KernBench에 해당 원칙이 없고, 단독으로
도입하기엔 정당화가 약함. 미래에 control-plane latency 모델링 정밀도 요구가
생기면 별도 ADR.
### TODO (이 ADR 구현 이후)
- Tensor Parallelism (ADR-0027)
- Hierarchical all-reduce 알고리즘 설계 (ADR-0029) — 본 ADR의 mapper /
validator registry 인프라를 활용하는 첫 사례
---
## Decision
### D1. rank = SIP (world_size 해석)
```python
def _resolve_world_size(self) -> int:
if "world_size" in self._merged:
return int(self._merged["world_size"])
defaults = self._cfg_all.get("defaults", {})
if "world_size" in defaults:
return int(defaults["world_size"])
spec = self.ctx.spec or {}
return int(spec.get("system", {}).get("sips", {}).get("count", 1))
```
우선순위: 알고리즘 override > defaults override > SIP count. `ccl.yaml`
override는 legacy "rank = PE" 테스트 경로로 유지.
### D2. Install 경로 — engine.submit 일원화
`ccl/install.py`의 sideband direct call을 제거하고, `IpcqInitMsg`
`engine.submit`으로 보낸다. MmuMapMsg / MemoryWriteMsg 등이 이미 동일 패턴.
```python
# Backend (AhbmCCLBackend.__init__ 또는 init_process_group 시점)
from kernbench.ccl.install_plan import build_install_plans
plans = build_install_plans(
world_size=self._world_size,
algorithm=self._merged["algorithm"],
algorithm_config=self._merged,
spec=self.ctx.spec,
)
self._plans = plans
# Each PE_IPCQ가 자기 neighbor table을 받도록 engine 경유 submit
handles = []
for plan in plans:
for pe_install in plan.pe_installs:
h = self.ctx.submit(IpcqInitMsg(
correlation_id=self.ctx.correlation_id,
request_id=f"ipcq_init_s{plan.sip}c{pe_install.cube}p{pe_install.pe}",
target_sips=(plan.sip,),
target_cubes=(pe_install.cube,),
target_pe=pe_install.pe,
entries=pe_install.neighbors,
buffer_kind=plan.buffer_kind,
n_slots=plan.n_slots,
slot_size=plan.slot_size,
# ... (기존 IpcqInitMsg 필드)
))
handles.append(h)
# Eager install — init_process_group이 반환하기 전에 완료 보장
for h in handles:
self.ctx.wait(h)
```
**PE_IPCQ 컴포넌트**는 이미 `IpcqInitMsg`를 main loop에서 처리 (`pe_ipcq.py`
라인 145-147). 변경 불필요. 유일한 차이는 "message가 sideband Python call이
아니라 engine queue를 거쳐 도착한다"는 점.
**Correctness invariant (equivalence)**: `init_process_group()`은 모든
install handle을 `wait()`한 후 반환하므로 launch-before-install 문제는
구조적으로 없다. 남는 correctness 질문은 단 하나:
> Engine-routed `IpcqInitMsg` 처리가 기존 sideband
> `pe_ipcq._install_neighbors(msg)` 호출과 **동일한 최종 PE_IPCQ 상태**를
> 생성하는가.
검증 포인트 (T3 참고):
1. **State equivalence**: `_install_neighbors()` 내부 상태 전이가 engine
dispatch path에서도 동일하게 일어나 최종 PE_IPCQ state
(`_queue_pairs`, `_installed`, `_credit_inbox` 등)가 일치.
2. **Sideband-only side effect 부재**: Sideband path에서만 있던 부수 효과가
없음 (예: engine.submit이 설정하는 request_id / correlation tracking 등이
install semantics를 왜곡하지 않음).
3. **Ordering independence**: 서로 다른 PE들의 install message가 engine
큐에서 임의 순서로 처리되어도 최종 상태가 동일. 즉 install은 **PE별
독립 연산**이어야 하고, cross-PE 순서 의존성이 있으면 안 됨.
4. **Idempotency**: 동일 PE에 대해 `IpcqInitMsg`가 두 번 도착하면? 현재
설계 전제는 "per-PE 단 한 번 install". 중복 install 시 동작은 정의되지
않음. 보수적 정책:
- 최초 install 시 `_installed = True`로 전이
- 이후 중복 install msg는 **에러** (raise) 또는 **silent idempotent**
(no-op) 둘 중 하나로 명시
- Recommend: **raise** (명시적 에러 → 버그 조기 검출). T3에 duplicate
install 케이스 추가.
5. **Partial install visibility**: 일부 PE만 install 완료된 중간 상태가
외부에 observable한가? 현재 구조에서는 `init_process_group()`의 eager
wait-all이 barrier 역할을 하므로 partial state는 bench 코드에 노출되지
않음. 단, debugging / introspection API는 중간 상태를 볼 수 있음 (문제
아님, 문서화만).
**Timing 영향**: Engine-routed install은 `init_process_group()`이 SimPy 시간을
소비하게 만든다. 기존 sideband install은 사실상 zero-cost. ADR 계약:
> Benchmarks must not rely on zero-cost initialization.
> `init_process_group()` consumes simulated time proportional to the number
> of participating PEs × per-PE install latency. First collective call
> starts at a well-defined but non-zero sim time.
### D3. Launch 경로 — non-CCL 커널과 동일 primitive
**CCL 커널은 non-CCL 커널과 동일한 `KernelLaunchMsg` submission path를 쓴다.**
Engine 내부의 IO_CPU/M_CPU transit 같은 것은 **기존 구현 세부이지 CCL-specific
장치가 아님**. Backend는 plan의 `participating_pes` 목록을 돌면서 `KernelLaunchMsg`
submit할 뿐이다. 새 메시지 타입 없음, 새 라우팅 경로 없음.
```python
# AhbmCCLBackend.all_reduce
def all_reduce(self, tensor, op="sum"):
if op != "sum":
raise NotImplementedError(...)
if tensor._handle is None or not tensor._handle.shards:
raise RuntimeError(...)
# Validator — global handle 기준 (D8)
validator_name = self._merged.get("validator")
if validator_name:
resolve_validator(validator_name)(tensor._handle, self._world_size, self.ctx.spec)
rank = self.ctx.distributed.get_rank()
plan = self._plans[rank]
tensor_view = _tensor_slice_for_sip(tensor._handle, plan.sip)
# Plan에서 kernel args 계산 (host-side)
import importlib
mod = importlib.import_module(plan.kernel_module)
n_elem = tensor_view.shards[0].nbytes // tensor.itemsize
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
**plan.kernel_config)
def _submit():
out = []
for (cube, pe) in plan.participating_pes:
h = self.ctx.submit(KernelLaunchMsg(
correlation_id=self.ctx.correlation_id,
request_id=f"allreduce_r{rank}_c{cube}p{pe}",
kernel_ref=KernelRef(name=plan.algorithm_name, kind="builtin"),
args=(_tensor_arg_for_pe(tensor_view, cube, pe), *kargs),
target_sips=(plan.sip,),
target_cubes=(cube,),
target_pe=pe,
))
out.append(h)
return out
self._barrier.submit_and_drain(self.ctx, rank, _submit)
```
### D4. Algorithm ABI — 얇게 + 명시적 arg 계약
각 알고리즘 모듈은 **kernel + kernel_args만 필수**.
```python
# src/kernbench/ccl/algorithms/ring_allreduce.py
def kernel(t_ptr, n_elem, world_size, tl):
"""PE-side kernel code.
Signature convention: first positional arg is the tensor pointer
(per-PE slice), subsequent positional args are whatever
kernel_args() returns. `tl` is injected by the TLContext runtime.
"""
def kernel_args(*, n_elem: int, world_size: int, **kw) -> tuple:
"""Return the tuple of non-tensor positional args.
Signature contract:
- Called keyword-only with n_elem and world_size plus kernel_config.
- Returns a tuple (possibly empty) of scalar / metadata args.
- The backend constructs the final KernelLaunchMsg.args as:
(per_pe_tensor_arg, *kernel_args(...))
where per_pe_tensor_arg is a TensorArg containing only the shards
local to the receiving PE (derived from tensor_view).
"""
return (n_elem, world_size)
```
**Arg assembly in backend (reference)**:
```python
# AhbmCCLBackend.all_reduce (D3에서 발췌)
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
**plan.kernel_config)
for (cube, pe) in plan.participating_pes:
pe_tensor_arg = _tensor_arg_for_pe(tensor_view, cube, pe)
self.ctx.submit(KernelLaunchMsg(
args=(pe_tensor_arg, *kargs), # tensor first, then kernel_args return
target_sips=(plan.sip,),
target_cubes=(cube,),
target_pe=pe,
...
))
```
**ccl.yaml**에서 선언적 metadata:
```yaml
algorithms:
ring_allreduce_tcm:
module: kernbench.ccl.algorithms.ring_allreduce
topology: ring_1d # kernbench/ccl/topologies.py
mapper: leader_only # kernbench/ccl/mappers.py (신규)
validator: single_shard_per_rank # kernbench/ccl/validators.py (신규)
buffer_kind: tcm
n_elem: 8
```
- `topology` (필수)
- `mapper` (선택, default `"leader_only"`)
- `validator` (선택)
알고리즘 모듈 자체에는 mapper/validator/participating_pes/neighbor
생성기가 **들어가지 않음**.
### D5. Mapper + validator — registry key **또는** import path
Host-side framework가 built-in registry 제공. 커스텀 확장은 dot-import path.
```python
# src/kernbench/ccl/mappers.py (new)
Mapper = Callable[[dict, int], list[tuple[int, int]]]
def leader_only(spec, rank):
"""Single leader PE per SIP. Ring/tree/mesh용."""
return [(0, 0)]
def all_pes(spec, rank):
"""Every PE in the SIP. 알고리즘이 intra-SIP 전체 PE를 참여시킬 때 사용
(e.g. intra-SIP reduction, intra-SIP broadcast, hierarchical collective
의 낮은 레벨 등)."""
cm = spec["sip"]["cube_mesh"]
pl = spec["cube"]["pe_layout"]
n_cubes = cm["w"] * cm["h"]
n_pes = pl["pe_per_corner"] * len(pl["corners"])
return [(c, p) for c in range(n_cubes) for p in range(n_pes)]
MAPPER_REGISTRY = {"leader_only": leader_only, "all_pes": all_pes}
def resolve_mapper(key_or_path: str) -> Mapper:
if key_or_path in MAPPER_REGISTRY:
return MAPPER_REGISTRY[key_or_path]
if "." in key_or_path:
import importlib
mod_path, fn_name = key_or_path.rsplit(".", 1)
return getattr(importlib.import_module(mod_path), fn_name)
raise ValueError(f"unknown mapper: {key_or_path!r}")
```
Validator도 동일 패턴 (`src/kernbench/ccl/validators.py`). 입력은 **global
TensorHandle** (D8 참고).
### D6. Host-side install plan builder
```python
# src/kernbench/ccl/install_plan.py (new; 기존 install.py의 재구성)
from dataclasses import dataclass
from typing import Any, Mapping
@dataclass(frozen=True)
class NeighborTableEntry:
direction: str
peer_direction: str # ADR-0025
peer_sip: int
peer_cube: int
peer_pe: int
rx_base_pa: int
# ... 기타 IPCQ 설정 ...
@dataclass(frozen=True)
class PeInstallSpec:
cube: int
pe: int
neighbors: tuple[NeighborTableEntry, ...]
@dataclass(frozen=True)
class SipInstallPlan:
algorithm_name: str # human-readable ("ring_allreduce_tcm")
sip: int
rank: int
world_size: int
pe_installs: tuple[PeInstallSpec, ...] # per-PE neighbor tables
buffer_kind: str
n_slots: int
slot_size: int
kernel_module: str
participating_pes: tuple[tuple[int, int], ...]
kernel_config: Mapping[str, Any]
def build_install_plans(
world_size: int,
algorithm: str,
algorithm_config: dict,
spec: dict,
) -> list[SipInstallPlan]:
"""Compose topology + mapper + algorithm into per-SIP plan list."""
topo_fn = _resolve_topology(algorithm_config["topology"])
mapper = resolve_mapper(algorithm_config.get("mapper", "leader_only"))
# kernel_config: launch 시 kernel_args에 전달할 algorithm-specific params
kernel_config = {
k: v for k, v in algorithm_config.items()
if k in {"n_elem", "reduce_op", "chunk_size"} or k.startswith("kernel_")
}
plans = []
for rank in range(world_size):
sip = rank # identity mapping (non-identity는 open question)
pes = mapper(spec, rank)
pe_installs = _build_pe_installs(
rank=rank, world_size=world_size, sip=sip,
pes=pes, topo_fn=topo_fn, algorithm_config=algorithm_config, spec=spec,
)
plans.append(SipInstallPlan(
algorithm_name=algorithm,
sip=sip, rank=rank, world_size=world_size,
pe_installs=pe_installs,
buffer_kind=algorithm_config["buffer_kind"],
n_slots=algorithm_config["n_slots"],
slot_size=algorithm_config["slot_size"],
kernel_module=algorithm_config["module"],
participating_pes=tuple(pes),
kernel_config=kernel_config,
))
return plans
```
`_build_pe_installs`는 기존 `ccl/install.py`의 neighbor 계산 로직을 재활용
(ADR-0025의 `reverse_direction` 개선 반영).
**Multi-PE 매퍼와 neighbor 생성 책임**: mapper가 SIP 내 여러 PE를 반환하는
경우 (`all_pes` 등), PE-level neighbor 그래프는 `_build_pe_installs` 내부에
형성된다. 즉 topology 모듈은 rank-level 관계만 제공하고, PE-level 연결은
builder에서 풀어낸다. 복잡한 multi-level 패턴을 쓰는 알고리즘은 이 책임
분산이 관리 부담이 될 수 있음 — 관련 논의는 ADR-0029 참고.
### D7. Epoch-based collective barrier
Cross-rank submit 동기화. 각 collective 호출은 독립 epoch. 같은 rank의
중복 join은 즉시 에러.
```python
# src/kernbench/runtime_api/distributed.py
@dataclass
class _EpochState:
participants: set[int] = field(default_factory=set)
pending: list = field(default_factory=list)
drained: bool = False
returned: int = 0
class _CollectiveBarrier:
"""Epoch-based barrier.
Contract:
- Each call joins the earliest non-drained epoch.
- Each rank may join a given epoch at most once. Duplicate join raises.
- Last arriver (participants == world_size) performs drain and advances
_next_epoch. Earlier arrivers yield and re-check drained on resume.
- Epoch state is GC'd when returned == world_size (success path).
- On failure paths, residual state is acceptable; reset() clears it.
"""
def __init__(self, world_size: int):
self._world_size = world_size
self._next_epoch = 0
self._state: dict[int, _EpochState] = {}
def submit_and_drain(self, ctx, rank: int, submit_fn) -> None:
epoch = self._next_epoch
state = self._state.setdefault(epoch, _EpochState())
if rank in state.participants:
raise RuntimeError(
f"rank {rank} attempted duplicate join to epoch {epoch}"
)
state.participants.add(rank)
handles = submit_fn()
state.pending.extend(handles)
is_last = len(state.participants) >= self._world_size
if is_last:
for h in state.pending:
ctx.wait(h)
state.drained = True
self._next_epoch = epoch + 1
else:
from greenlet import getcurrent
g = getcurrent()
if g.parent is None:
raise RuntimeError("barrier requires a bound worker greenlet")
while not state.drained:
g.parent.switch()
state.returned += 1
if state.returned >= self._world_size:
self._state.pop(epoch, None)
def reset(self) -> None:
"""Explicit cleanup on spawn exception unwinding."""
self._state.clear()
self._next_epoch = 0
```
### D8. Per-rank tensor view + validator contract
**Validator** (host-side, pre-slice, global handle 기준):
```python
# src/kernbench/ccl/validators.py
Validator = Callable[[TensorHandle, int, dict], None]
def single_shard_per_rank(handle, world_size, spec):
"""Ring 계열: 정확히 world_size개 shard, SIP당 1개."""
if len(handle.shards) != world_size:
raise ValueError(...)
per_sip = {}
for s in handle.shards:
per_sip[s.sip] = per_sip.get(s.sip, 0) + 1
if any(c != 1 for c in per_sip.values()):
raise ValueError(...)
def multi_pe_sip_local(handle, world_size, spec):
"""Multi-PE per SIP layout: 각 SIP에 intra-SIP PE 수만큼 shard 존재.
Intra-SIP 전체 PE를 참여시키는 알고리즘이 사용."""
cm = spec["sip"]["cube_mesh"]
pl = spec["cube"]["pe_layout"]
per_sip = cm["w"] * cm["h"] * pl["pe_per_corner"] * len(pl["corners"])
if len(handle.shards) != world_size * per_sip:
raise ValueError(...)
VALIDATOR_REGISTRY = {...}
def resolve_validator(key_or_path): ...
```
Validator는 world 전체의 shard layout 불변량을 본다. Per-rank view는
backend가 validator 호출 **후** `_tensor_slice_for_sip`로 생성.
**Per-rank tensor view** — SIP-local slice:
```python
def _tensor_slice_for_sip(handle, sip) -> TensorArg:
sip_shards = [s for s in handle.shards if s.sip == sip]
if not sip_shards:
raise RuntimeError(f"tensor has no shards on SIP {sip}")
# Deterministic ordering contract: (cube, pe, offset_bytes) ascending.
# Multi-PE mappers (hierarchical 등) rely on this ordering to align
# per-PE tensor arg construction with participating_pes enumeration.
sip_shards.sort(key=lambda s: (s.cube, s.pe, s.offset_bytes))
min_offset = min(s.offset_bytes for s in sip_shards)
local_va_base = handle.va_base + min_offset if handle.va_base else 0
return TensorArg(
shards=tuple(TensorArgShard(...) for s in sip_shards),
va_base=local_va_base,
)
```
**Ordering invariant**: slice의 shard는 `(cube, pe, offset_bytes)` 오름차순.
Backend가 `participating_pes`를 iterate하며 `_tensor_arg_for_pe(view, cube, pe)`
구성할 때, 결정론적 ordering을 전제할 수 있다. 특히 `all_pes` mapper +
hierarchical 알고리즘이 per-PE slice 조합을 순서 의존적으로 해석하는 경우에
중요.
### D9. Greenlet-local rank registry (+ debug warning)
```python
class DistributedContext:
def __init__(self):
self._backend = None
self._rank_by_greenlet: dict = {}
def _bind_rank(self, g, rank: int) -> None:
self._rank_by_greenlet[g] = int(rank)
def get_rank(self) -> int:
self._ensure_initialized()
from greenlet import getcurrent
g = getcurrent()
if g not in self._rank_by_greenlet:
if os.environ.get("KERNBENCH_DEBUG"):
warnings.warn(
"get_rank() called outside a bound greenlet — returning 0. "
"Likely a bug unless running single-driver."
)
return 0
return int(self._rank_by_greenlet[g])
```
### D10. `torch.ahbm.set_device(rank)` — SIP 바인딩
KernBench 백엔드 이름은 `ahbm` (ADR-0023 D10). Real PyTorch는
`torch.cuda.set_device(r)`이지만 우리는 CUDA가 아니므로 honestly-named
namespace를 사용한다.
```python
class _AhbmNamespace:
"""torch.ahbm — per-greenlet SIP device binding.
Real-PyTorch parity idiom: ``torch.cuda.set_device(rank)``. Since
KernBench's backend is 'ahbm' (not CUDA), we expose the equivalent
API under ``torch.ahbm`` to avoid pretending to be a CUDA runtime.
"""
def __init__(self):
self._device_by_greenlet: dict = {}
def set_device(self, device: int) -> None:
from greenlet import getcurrent
self._device_by_greenlet[getcurrent()] = int(device)
def current_device(self) -> int | None:
from greenlet import getcurrent
return self._device_by_greenlet.get(getcurrent())
# Attached to RuntimeContext as `self.ahbm = _AhbmNamespace()`.
# Bench code: `torch.ahbm.set_device(rank)` mirrors `torch.cuda.set_device`.
```
**PyTorch 2.x style 병행 지원**: 최신 PyTorch는 device-agnostic한
`torch.accelerator` 네임스페이스를 지향 (`torch.accelerator.set_device_index(r)`,
`torch.accelerator.current_device_index()`). Device vendor에 종속되지 않는
코드를 쓰려는 사용자를 위해 KernBench도 이 표면을 병행 지원한다.
```python
class _AcceleratorNamespace:
"""torch.accelerator — device-agnostic API (PyTorch 2.x style).
Aliases torch.ahbm for bench code that prefers device-neutral idiom:
torch.accelerator.set_device_index(rank)
torch.accelerator.current_device_index()
"""
def __init__(self, ahbm: _AhbmNamespace):
self._ahbm = ahbm
def set_device_index(self, device: int) -> None:
self._ahbm.set_device(device)
def current_device_index(self) -> int | None:
return self._ahbm.current_device()
# RuntimeContext
self.ahbm = _AhbmNamespace()
self.accelerator = _AcceleratorNamespace(self.ahbm) # alias
```
Bench 작성자는 다음 중 하나를 선택 — 둘 다 내부적으로 같은 레지스트리를 보유:
```python
torch.ahbm.set_device(rank) # KernBench-native, explicit backend
torch.accelerator.set_device_index(rank) # PyTorch 2.x device-agnostic
```
### D11. Tensor placement = structural (sip, cube, pe) 좌표
`resolve_dp_policy``target_sip`을 직접 받아 구조적 좌표로 placement 생성.
세부는 ADR-0026.
```python
# RuntimeContext._create_tensor
current_sip = self.ahbm.current_device() # (D10 naming)
if current_sip is None:
current_sip = 0 # single-driver fallback (D9와 일관)
placement = resolve_dp_policy(
dp, shape=shape_2d, itemsize=itemsize,
num_pe=eff_num_pe, num_cubes=eff_num_cubes,
target_sip=current_sip,
)
```
Post-hoc `pe_index` shifting 제거 — ShardSpec이 `(sip, cube, pe)` 구조적
좌표 보유.
### D12. `torch.multiprocessing.spawn`-compat surface
Bench 작성자 표면은 real PyTorch `mp.spawn`과 동일:
```python
# src/kernbench/runtime_api/multiprocessing.py (new)
def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method="spawn"):
"""Drop-in for torch.multiprocessing.spawn.
Internal: greenlet fan-out + epoch-barrier sync + exception propagation.
"""
...
# torch namespace에 부착
torch.multiprocessing = SimpleNamespace(spawn=spawn)
```
Bench:
```python
import torch.multiprocessing as mp
mp.spawn(worker, nprocs=world_size, args=(world_size, torch))
```
### D13. Scheduler + exception handling
```python
def spawn(fn, args, nprocs, ...):
dist = torch.distributed
gs: list[greenlet] = []
errors: dict[int, Exception] = {}
for rank in range(nprocs):
def _entry(r=rank):
try:
fn(r, *args)
except Exception as e:
errors[r] = e
raise
g = greenlet(_entry)
dist._bind_rank(g, rank)
gs.append(g)
try:
while True:
alive = [g for g in gs if not g.dead]
if not alive:
break
for g in alive:
if not g.dead:
g.switch()
except Exception as outer:
for other in gs:
if not other.dead:
try:
other.throw(SystemExit)
except Exception:
pass
# Epoch barrier state 명시적 cleanup
backend = getattr(dist, "_backend", None)
if backend is not None and hasattr(backend, "_barrier"):
backend._barrier.reset()
raise SpawnException(errors) from outer
```
**Scheduler contract**:
- Deterministic round-robin over insertion order (rank 0, 1, ..., N-1).
- 동기화 지점은 epoch barrier (D7)만. Scheduler 순서에 의존하는 correctness 없음.
- 예외 발생 시 다른 greenlet 강제 종료 + `SpawnException` 전파.
**Starvation guideline**:
- 일반적으로 collective barrier가 workers를 동기화. 큰 편차 없음.
- 극단적 non-collective 루프 대비 cooperative yield 제공:
`torch.distributed.cooperative_yield()`.
### D14. Backward compatibility
1. **Single-driver 호출**: `get_rank()` 0 반환 (D9).
2. **`ccl.yaml` world_size override**: D1 fallback 우회 — legacy "rank = PE"
테스트 경로로 사용 가능.
3. **`DPPolicy.sip="column_wise"` 명시**: ADR-0026 scope.
4. **`install_ipcq()` compatibility wrapper**:
기존 `ccl/install.py``install_ipcq()` API는 곧바로 제거하지 않는다.
Thin compatibility wrapper로 남겨 기존 직접 호출자가 점진적으로 migration할
수 있게 한다.
```python
# src/kernbench/ccl/install.py (after this ADR)
def install_ipcq(engine, spec, merged, *, algo_module=None, rank_to_pe=None):
"""DEPRECATED: legacy host-side PE installer.
Internally delegates to build_install_plans + engine-routed IpcqInitMsg.
Use dist.init_process_group() instead.
"""
from kernbench.ccl.install_plan import build_install_plans
import warnings
warnings.warn(
"install_ipcq() is deprecated; use dist.init_process_group()",
DeprecationWarning, stacklevel=2,
)
plans = build_install_plans(
world_size=merged.get("world_size", 1),
algorithm=merged["algorithm"],
algorithm_config=merged,
spec=spec,
)
handles = []
for plan in plans:
for pe_install in plan.pe_installs:
h = engine.submit(IpcqInitMsg(
target_sips=(plan.sip,),
target_cubes=(pe_install.cube,),
target_pe=pe_install.pe,
entries=pe_install.neighbors,
buffer_kind=plan.buffer_kind,
n_slots=plan.n_slots,
slot_size=plan.slot_size,
))
handles.append(h)
for h in handles:
engine.wait(h)
return {"world_size": merged.get("world_size", 1), "plans": plans}
```
Migration 스케줄:
- Phase 1: wrapper로 유지 + DeprecationWarning
- Phase 2: 직접 호출자 grep-audit → 각각 `dist.init_process_group()` 또는
`build_install_plans()` 직접 사용으로 이관
- Phase 3: wrapper 제거 (별도 cleanup ADR 또는 PR)
---
## Dependencies
- **ADR-0023** (IPCQ): `IpcqInitMsg` 메시지 타입과 PE_IPCQ 핸들링을 그대로
활용. Engine-routed submit으로 전환하는 것이 유일한 변경.
- **ADR-0025** (IPCQ direction fix): `_build_pe_installs`의 neighbor 계산이
2-rank ring 등에서 정확히 동작하려면 필요.
- **ADR-0003 / 0016** (IO_CPU): IO_CPU는 기존 transit 역할 그대로. 본 ADR에서
IO_CPU 역할 변경 없음.
---
## Non-goals
- **IPCQ protocol 수정**: ADR-0023 유지.
- **DPPolicy 필드 정리**: ADR-0026.
- **Megatron-style TP**: ADR-0027.
- **Multi-node (프로세스 간)**: 단일 프로세스.
- **IO_CPU SIP control-plane 단일 endpoint 원칙 채택**: 본 ADR 범위 밖. 현재
KernBench에 이 원칙이 없고, 도입은 별도 ADR.
- **Hierarchical all-reduce 알고리즘 설계**: ADR-0029. 본 ADR은 그 알고리즘이
쓸 framework 인프라 (`all_pes` mapper, `multi_pe_sip_local` validator,
registry 확장점)만 제공.
---
## Open questions
### 🔴 Critical — 구현 blocker 가능성 (integration 전 반드시 검증)
- **`IpcqInitMsg`의 engine routing — primary implementation risk**: 현재
sideband만 쓰여서 engine routing path가 실사용 검증되지 않은 상태. **본
ADR 전체가 "engine routing이 동작한다"는 가정 위에 서 있다**. 이것이
실제로 안 되면 D2, D14, T3 등이 전부 영향 받음. 반드시 **ADR 구현 착수
전 스파이크 검증**:
- `engine.submit(IpcqInitMsg(target_sips=..., target_cubes=..., target_pe=...))`
가 PE_IPCQ로 정확히 배달되는지 (기존 `MmuMapMsg` / `MemoryWriteMsg` 라우팅
패턴과 비교)
- 미지원 시 minor hook: engine의 message-type → component-kind 매핑 테이블에
`IpcqInitMsg → "pe_ipcq"` 등록 (localized change, topology builder /
message schema 영향 없음)
- 결과에 따라 D2 채택 여부가 달라질 수 있음 — 만약 routing 불가 시 sideband
path 유지로 fallback 후 본 ADR 범위 재조정
- **Engine-routed install vs sideband equivalence** (D2 검증점 1-5): T3의
equivalence test가 실제 동작하는지 스파이크. 특히 ordering independence와
idempotency는 기존 테스트에 없는 속성이라 신규 검증 필요.
- **`install_ipcq()` 직접 호출자 audit** (구현 전 필수): deprecated wrapper
전략은 적절하지만 실제 migration 리스크는 호출자 목록에 따라 다름. 착수 전
grep audit:
- Pattern: `install_ipcq(` (cwd 전체)
- Scope: `src/`, `tests/`, `benches/`, `scripts/`, `src/kernbench/cli/`
- 각 호출자의 예상 migration path (→ `dist.init_process_group` vs
`build_install_plans` 직접)를 정리한 후 wrapper 도입
### 🟡 Nice-to-have — scope 경계 관련
- **Install timing 허용치**: SimPy 시간 상 install이 몇 ns~us 소모. 기존
sideband는 0ns. 기존 테스트가 t=0 시작을 전제로 하는지 확인 (audit 결과에
따라 테스트 교정 필요).
- **`IpcqInitMsg` 배치 가능성**: MmuMapMsg처럼 `target_pe="all"` 브로드캐스트
는 IPCQ에서는 부적합 (PE마다 neighbor가 다름). 현재는 per-PE 개별 submit.
Per-PE payload를 담는 batched IpcqInitMsg 타입은 future optimization.
- **`_rank_to_sip` 매핑**: 현재 identity. Non-trivial mapping 요구 시 별도.
- **Cooperative yield API 위치**: `torch.distributed.cooperative_yield()`
노출 예정. 실제 필요성은 Phase 2 이후 벤치 추가 시 판단.
(PE-level topology 일원화 관련 중장기 방향은 **ADR-0029** 참고 — 복잡한
multi-level 알고리즘이 driving force가 되는 framework 진화 방향.)
---
## Test strategy
### T1. Launcher infrastructure
`tests/test_ccl_ddp_launcher.py`:
- `test_world_size_equals_sip_count` — D1
- `test_ahbm_set_device_binds_tensor_to_single_sip` — D10/D11
- `test_get_rank_is_greenlet_local` — D9
- `test_run_spawns_one_worker_per_rank` — D12/D13
- `test_get_rank_debug_warning` — D9 warning path
### T2. Install plan builder
`tests/test_ccl_install_plan.py` (new):
- `build_install_plans` — ring_1d × leader_only 조합 (단일 PE per rank)
- `build_install_plans` — ring_1d × all_pes 조합 (multi-PE per rank; mapper
framework 동작 확인, 알고리즘-무관)
- Mapper / validator registry resolution (built-in key vs import path vs
unknown)
- Import path fallback (`"pkg.mod.fn"` 형식) 동작 검증
### T3. Engine-routed IpcqInitMsg (equivalence — 핵심 검증)
`tests/test_ipcq_init_routing.py` (new):
- **Routing**: `engine.submit(IpcqInitMsg)` → 지정 PE_IPCQ가 실제 설치 수행
- **Equivalence**: 동일한 IpcqInitMsg를 (a) sideband `_install_neighbors`
직접 호출, (b) engine.submit 두 경로로 보낸 뒤 PE_IPCQ 최종 state
(`_queue_pairs`, `_installed` 등) 동일성 비교
- **Ordering independence**: 서로 다른 PE의 install msg를 engine 큐에 임의
순서로 넣어도 최종 state가 동일
- **Idempotency (duplicate install)**: 동일 PE에 두 번 install msg → 두
번째는 에러 raise (policy: explicit error; D2 검증점 4 참고)
- **Multi-PE 병렬 install**: per-PE submit이 interference 없이 완료
- **Install 후 send 성공**: 설치 직후 `IpcqSendCmd` 실행해서 neighbor table
state가 실제로 유효한지 확인
### T4. Barrier correctness
`tests/test_collective_barrier.py` (new):
- Single collective 정상
- 다중 collective 연속 호출 (epoch 격리)
- 동일 rank의 duplicate join → RuntimeError
- Rank 1이 all_reduce 전 종료 → SpawnException + barrier.reset()
- Conditional branch 시 모든 rank 도달하면 정상
### T5. E2E
`tests/test_ccl_allreduce_matrix.py`:
- `ring_tcm` / `ring_hbm` / `ring_sram` @ ws=SIP_count
### T6. 회귀
기존 `test_ccl_framework`, `test_ccl_install`, `test_ccl_topologies`,
`test_ccl_mock_runtime`, `test_pe_ipcq`, `test_ipcq_e2e`, 기타 non-CCL
모두 통과.
---
## Consequences
### Positive
- **새 message 타입 0개**: 기존 `IpcqInitMsg` + `KernelLaunchMsg`만으로 구현.
- **IO_CPU / engine 변경 없음**: 기존 routing 그대로.
- **Sideband install convention 제거**: MmuMapMsg 등과 동일 패턴으로 일원화.
- **Plan state stale 문제 소멸**: Plan은 host 단일 소유.
- **Bench = real PyTorch DDP** (공개 API 관점).
- **Algorithm ABI 경량**: `kernel` + `kernel_args`만 필수.
- **Epoch-based barrier**: interleaved collective 안전.
- **Control/data plane 분리**: data plane(PE_IPCQ)은 ADR-0023 유지, control
plane은 host-driven.
- 장기 확장성: Megatron TP, DTensor 기반.
### Negative
- 신규 모듈: `install_plan.py`, `mappers.py`, `validators.py`,
`multiprocessing.py`.
- Engine이 `IpcqInitMsg`를 엔진-path로 라우팅할 수 있는지 구현 시 확인 필요
(minor hook 가능성).
- Install이 SimPy 시간을 소모 (positive로도 볼 수 있으나, 기존 sideband 시점
0ns 전제인 테스트가 있으면 교정 필요).
### Neutral
- IPCQ PE-level protocol (ADR-0023) 불변.
- `DPPolicy` 필드 변경은 ADR-0026.
- IO_CPU 역할 불변 (기존 transit 그대로).
---
## Affected files
| File | Change |
|------|--------|
| `src/kernbench/runtime_api/distributed.py` | D1/D2/D7/D9: world_size fallback, rank_to_sip, plan 소유, engine-routed install/launch, epoch barrier |
| `src/kernbench/runtime_api/context.py` | D10/D11: `_AhbmNamespace`, `ctx.ahbm`, `_create_tensor``target_sip` 전달 |
| `src/kernbench/runtime_api/multiprocessing.py` (new) | D12/D13: `spawn` + scheduler + exception |
| `src/kernbench/ccl/install_plan.py` (new) | D6: `build_install_plans`, `SipInstallPlan`, `PeInstallSpec`, `NeighborTableEntry` |
| `src/kernbench/ccl/mappers.py` (new) | D5: `leader_only`, `all_pes`, registry + resolver |
| `src/kernbench/ccl/validators.py` (new) | D5: validator registry + resolver |
| `src/kernbench/ccl/install.py` | Thin deprecated compat wrapper (D14) |
| `src/kernbench/ccl/algorithms/ring_allreduce.py` | D4: `kernel` + `kernel_args` 유지 (큰 변화 없음) |
| `src/kernbench/ccl/algorithms/mesh_allreduce.py` | D4 동일 |
| `src/kernbench/ccl/algorithms/tree_allreduce.py` | D4 동일 |
| `ccl.yaml` | 각 알고리즘에 `mapper` / `validator` 선언 추가 |
| `src/kernbench/sim_engine/engine.py` | (If needed) `IpcqInitMsg` → PE_IPCQ 라우팅 확인 hook |
| `benches/ccl_allreduce.py` | 새 launcher 기반 rewrite |
| `tests/test_ccl_ddp_launcher.py` (new) | T1 |
| `tests/test_ccl_install_plan.py` (new) | T2 |
| `tests/test_ipcq_init_routing.py` (new) | T3 |
| `tests/test_collective_barrier.py` (new) | T4 |
| `tests/test_ccl_allreduce_matrix.py` | T5: ws=SIP_count 단순화 |