docs: add ADRs 0024–0031 for SIP-TP launcher stack

ADR-0024 (SIP-level TP launcher): rank = SIP abstraction, engine-routed
  install, mp.spawn parity, epoch barrier, ShardSpec structural coords.
ADR-0025 (IPCQ direction addressing): address-based matching for meta
  arrival and credit return; fixes 2-rank bidirectional ring deadlock.
ADR-0026 (DPPolicy intra-device only): remove sip/num_sips fields;
  ShardSpec uses structural (sip, cube, pe); pe_index property removed.
ADR-0027 (Megatron-style TP API): ColumnParallelLinear / RowParallelLinear
  on top of ADR-0024 launcher. Backlog until 0024/0025/0026 land.
ADR-0028 (DTensor support): stub / future work.
ADR-0029 (Hierarchical all-reduce): 3-level reduce using all_pes mapper
  and multi_pe_sip_local validator from ADR-0024. Backlog.
ADR-0030 (IPCQ PhysAddr integration): blocked on ADR-0031.
ADR-0031 (PhysAddr PE-resource extension): stub; local_offset range-based
  partition approach; specific ranges TBD.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-14 00:38:27 -07:00
parent b2c52f0e34
commit e1084800ab
8 changed files with 3366 additions and 0 deletions
+990
View File
@@ -0,0 +1,990 @@
# ADR-0024: SIP-level TP Launcher — rank = SIP (host-driven dispatch)
## Status
Proposed (Revision 8 — Hierarchical content split out to ADR-0029)
## Context
### 목표
`torch.distributed` collective 호출의 참여 단위(rank)를 **SIP**(device)
경계에 맞춘다. 실제 PyTorch DDP/TP 스크립트와 **호스트 레벨에서 구분 없이**
읽히는 bench 코드를 목표로 한다.
real PyTorch와 비교:
| 차원 | real PyTorch | KernBench (이 ADR 이후) |
|---|---|---|
| 프로세스 모델 | N개 프로세스, 각 1 GPU | 1 프로세스, N greenlet, 각 1 SIP |
| `get_rank()` | `RANK` env var | greenlet-local 레지스트리 |
| `get_world_size()` | `WORLD_SIZE` env var | topology의 SIP 수 |
| `torch.cuda.set_device(r)` (real) / `torch.ahbm.set_device(r)` (KernBench) | rank → GPU | rank → SIP |
| `mp.spawn` | OS 프로세스 fork | greenlet fan-out |
### 설계 원칙 — 공개 API의 추상화, 내부는 기존 path 활용
**공개 API (bench worker) 수준의 추상화**:
```
rank = SIP
DPPolicy = intra-device (cube × PE) 분산만
dist.all_reduce, torch.ahbm.set_device, mp.spawn 등 PyTorch-style 표면
```
**Framework 내부 구현**:
```
build_install_plans (host): topology + mapper + algorithm → SipInstallPlan
backend (host): plan의 per-PE spec을 engine.submit으로 IpcqInitMsg 디스패치
engine: 기존 PE-scoped routing (MmuMapMsg 등과 동일 경로)
PE_IPCQ: 자체 message loop에서 IpcqInitMsg 처리 (기존 capability)
```
**핵심**: 새 message 타입이나 IO_CPU 확장 없음. 기존 engine routing과 기존
`IpcqInitMsg` 타입을 그대로 사용. 기존의 "sideband direct call" 우회만
제거하여 convention 일원화.
### 현재 상태
- `DistributedContext` facade 존재
- `init_process_group("ahbm")``AhbmCCLBackend``ctx.install_ipcq` 호출
`ccl/install.py`**sideband direct call** (`pe_ipcq._install_neighbors`)로
PE_IPCQ에 neighbor table 설치
- `get_rank()` 항상 `0` (single-driver)
- `get_world_size()` fallback: 총 PE 수 (rank = PE)
- `benches/ccl_allreduce.py`: `worker(rank=0, world_size=total_PEs)` 1회 호출
### 풀어야 할 문제
1. **공개 API에서 rank = SIP** — bench worker가 PE 개념을 알지 않도록.
2. **Multi-worker 실행** — N개 rank가 독립 worker 코드 실행. 1 프로세스 제약
하에서 greenlet + barrier 동기화.
3. **Cross-rank collective submit 동기화** — 첫 rank가 혼자 wait하면 peer 부재로
SimPy deadlock. 모든 rank submit 후 drain 보장.
4. **기존 sideband install 제거** — IpcqInitMsg를 engine.submit으로 일원화.
MmuMapMsg 등 다른 control-plane 메시지와 동일 패턴.
5. **Algorithm / mapper / validator 분리** — 알고리즘 모듈은 kernel 코드만
담고, topology / mapping / validation은 registry + 선언.
### Non-problem (이 ADR 밖)
- IPCQ direction addressing fix → **ADR-0025**
- `DPPolicy.sip`/`num_sips` 제거 → **ADR-0026**
- Megatron-style TP → **ADR-0027**
- DTensor → **ADR-0028 (future)**
- **IO_CPU를 SIP-level control-plane 단일 endpoint로 승격**: 이 ADR에서는
invariant으로 채택하지 않음. 현재 KernBench에 해당 원칙이 없고, 단독으로
도입하기엔 정당화가 약함. 미래에 control-plane latency 모델링 정밀도 요구가
생기면 별도 ADR.
### TODO (이 ADR 구현 이후)
- Tensor Parallelism (ADR-0027)
- Hierarchical all-reduce 알고리즘 설계 (ADR-0029) — 본 ADR의 mapper /
validator registry 인프라를 활용하는 첫 사례
---
## Decision
### D1. rank = SIP (world_size 해석)
```python
def _resolve_world_size(self) -> int:
if "world_size" in self._merged:
return int(self._merged["world_size"])
defaults = self._cfg_all.get("defaults", {})
if "world_size" in defaults:
return int(defaults["world_size"])
spec = self.ctx.spec or {}
return int(spec.get("system", {}).get("sips", {}).get("count", 1))
```
우선순위: 알고리즘 override > defaults override > SIP count. `ccl.yaml`
override는 legacy "rank = PE" 테스트 경로로 유지.
### D2. Install 경로 — engine.submit 일원화
`ccl/install.py`의 sideband direct call을 제거하고, `IpcqInitMsg`
`engine.submit`으로 보낸다. MmuMapMsg / MemoryWriteMsg 등이 이미 동일 패턴.
```python
# Backend (AhbmCCLBackend.__init__ 또는 init_process_group 시점)
from kernbench.ccl.install_plan import build_install_plans
plans = build_install_plans(
world_size=self._world_size,
algorithm=self._merged["algorithm"],
algorithm_config=self._merged,
spec=self.ctx.spec,
)
self._plans = plans
# Each PE_IPCQ가 자기 neighbor table을 받도록 engine 경유 submit
handles = []
for plan in plans:
for pe_install in plan.pe_installs:
h = self.ctx.submit(IpcqInitMsg(
correlation_id=self.ctx.correlation_id,
request_id=f"ipcq_init_s{plan.sip}c{pe_install.cube}p{pe_install.pe}",
target_sips=(plan.sip,),
target_cubes=(pe_install.cube,),
target_pe=pe_install.pe,
entries=pe_install.neighbors,
buffer_kind=plan.buffer_kind,
n_slots=plan.n_slots,
slot_size=plan.slot_size,
# ... (기존 IpcqInitMsg 필드)
))
handles.append(h)
# Eager install — init_process_group이 반환하기 전에 완료 보장
for h in handles:
self.ctx.wait(h)
```
**PE_IPCQ 컴포넌트**는 이미 `IpcqInitMsg`를 main loop에서 처리 (`pe_ipcq.py`
라인 145-147). 변경 불필요. 유일한 차이는 "message가 sideband Python call이
아니라 engine queue를 거쳐 도착한다"는 점.
**Correctness invariant (equivalence)**: `init_process_group()`은 모든
install handle을 `wait()`한 후 반환하므로 launch-before-install 문제는
구조적으로 없다. 남는 correctness 질문은 단 하나:
> Engine-routed `IpcqInitMsg` 처리가 기존 sideband
> `pe_ipcq._install_neighbors(msg)` 호출과 **동일한 최종 PE_IPCQ 상태**를
> 생성하는가.
검증 포인트 (T3 참고):
1. **State equivalence**: `_install_neighbors()` 내부 상태 전이가 engine
dispatch path에서도 동일하게 일어나 최종 PE_IPCQ state
(`_queue_pairs`, `_installed`, `_credit_inbox` 등)가 일치.
2. **Sideband-only side effect 부재**: Sideband path에서만 있던 부수 효과가
없음 (예: engine.submit이 설정하는 request_id / correlation tracking 등이
install semantics를 왜곡하지 않음).
3. **Ordering independence**: 서로 다른 PE들의 install message가 engine
큐에서 임의 순서로 처리되어도 최종 상태가 동일. 즉 install은 **PE별
독립 연산**이어야 하고, cross-PE 순서 의존성이 있으면 안 됨.
4. **Idempotency**: 동일 PE에 대해 `IpcqInitMsg`가 두 번 도착하면? 현재
설계 전제는 "per-PE 단 한 번 install". 중복 install 시 동작은 정의되지
않음. 보수적 정책:
- 최초 install 시 `_installed = True`로 전이
- 이후 중복 install msg는 **에러** (raise) 또는 **silent idempotent**
(no-op) 둘 중 하나로 명시
- Recommend: **raise** (명시적 에러 → 버그 조기 검출). T3에 duplicate
install 케이스 추가.
5. **Partial install visibility**: 일부 PE만 install 완료된 중간 상태가
외부에 observable한가? 현재 구조에서는 `init_process_group()`의 eager
wait-all이 barrier 역할을 하므로 partial state는 bench 코드에 노출되지
않음. 단, debugging / introspection API는 중간 상태를 볼 수 있음 (문제
아님, 문서화만).
**Timing 영향**: Engine-routed install은 `init_process_group()`이 SimPy 시간을
소비하게 만든다. 기존 sideband install은 사실상 zero-cost. ADR 계약:
> Benchmarks must not rely on zero-cost initialization.
> `init_process_group()` consumes simulated time proportional to the number
> of participating PEs × per-PE install latency. First collective call
> starts at a well-defined but non-zero sim time.
### D3. Launch 경로 — non-CCL 커널과 동일 primitive
**CCL 커널은 non-CCL 커널과 동일한 `KernelLaunchMsg` submission path를 쓴다.**
Engine 내부의 IO_CPU/M_CPU transit 같은 것은 **기존 구현 세부이지 CCL-specific
장치가 아님**. Backend는 plan의 `participating_pes` 목록을 돌면서 `KernelLaunchMsg`
submit할 뿐이다. 새 메시지 타입 없음, 새 라우팅 경로 없음.
```python
# AhbmCCLBackend.all_reduce
def all_reduce(self, tensor, op="sum"):
if op != "sum":
raise NotImplementedError(...)
if tensor._handle is None or not tensor._handle.shards:
raise RuntimeError(...)
# Validator — global handle 기준 (D8)
validator_name = self._merged.get("validator")
if validator_name:
resolve_validator(validator_name)(tensor._handle, self._world_size, self.ctx.spec)
rank = self.ctx.distributed.get_rank()
plan = self._plans[rank]
tensor_view = _tensor_slice_for_sip(tensor._handle, plan.sip)
# Plan에서 kernel args 계산 (host-side)
import importlib
mod = importlib.import_module(plan.kernel_module)
n_elem = tensor_view.shards[0].nbytes // tensor.itemsize
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
**plan.kernel_config)
def _submit():
out = []
for (cube, pe) in plan.participating_pes:
h = self.ctx.submit(KernelLaunchMsg(
correlation_id=self.ctx.correlation_id,
request_id=f"allreduce_r{rank}_c{cube}p{pe}",
kernel_ref=KernelRef(name=plan.algorithm_name, kind="builtin"),
args=(_tensor_arg_for_pe(tensor_view, cube, pe), *kargs),
target_sips=(plan.sip,),
target_cubes=(cube,),
target_pe=pe,
))
out.append(h)
return out
self._barrier.submit_and_drain(self.ctx, rank, _submit)
```
### D4. Algorithm ABI — 얇게 + 명시적 arg 계약
각 알고리즘 모듈은 **kernel + kernel_args만 필수**.
```python
# src/kernbench/ccl/algorithms/ring_allreduce.py
def kernel(t_ptr, n_elem, world_size, tl):
"""PE-side kernel code.
Signature convention: first positional arg is the tensor pointer
(per-PE slice), subsequent positional args are whatever
kernel_args() returns. `tl` is injected by the TLContext runtime.
"""
def kernel_args(*, n_elem: int, world_size: int, **kw) -> tuple:
"""Return the tuple of non-tensor positional args.
Signature contract:
- Called keyword-only with n_elem and world_size plus kernel_config.
- Returns a tuple (possibly empty) of scalar / metadata args.
- The backend constructs the final KernelLaunchMsg.args as:
(per_pe_tensor_arg, *kernel_args(...))
where per_pe_tensor_arg is a TensorArg containing only the shards
local to the receiving PE (derived from tensor_view).
"""
return (n_elem, world_size)
```
**Arg assembly in backend (reference)**:
```python
# AhbmCCLBackend.all_reduce (D3에서 발췌)
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
**plan.kernel_config)
for (cube, pe) in plan.participating_pes:
pe_tensor_arg = _tensor_arg_for_pe(tensor_view, cube, pe)
self.ctx.submit(KernelLaunchMsg(
args=(pe_tensor_arg, *kargs), # tensor first, then kernel_args return
target_sips=(plan.sip,),
target_cubes=(cube,),
target_pe=pe,
...
))
```
**ccl.yaml**에서 선언적 metadata:
```yaml
algorithms:
ring_allreduce_tcm:
module: kernbench.ccl.algorithms.ring_allreduce
topology: ring_1d # kernbench/ccl/topologies.py
mapper: leader_only # kernbench/ccl/mappers.py (신규)
validator: single_shard_per_rank # kernbench/ccl/validators.py (신규)
buffer_kind: tcm
n_elem: 8
```
- `topology` (필수)
- `mapper` (선택, default `"leader_only"`)
- `validator` (선택)
알고리즘 모듈 자체에는 mapper/validator/participating_pes/neighbor
생성기가 **들어가지 않음**.
### D5. Mapper + validator — registry key **또는** import path
Host-side framework가 built-in registry 제공. 커스텀 확장은 dot-import path.
```python
# src/kernbench/ccl/mappers.py (new)
Mapper = Callable[[dict, int], list[tuple[int, int]]]
def leader_only(spec, rank):
"""Single leader PE per SIP. Ring/tree/mesh용."""
return [(0, 0)]
def all_pes(spec, rank):
"""Every PE in the SIP. 알고리즘이 intra-SIP 전체 PE를 참여시킬 때 사용
(e.g. intra-SIP reduction, intra-SIP broadcast, hierarchical collective
의 낮은 레벨 등)."""
cm = spec["sip"]["cube_mesh"]
pl = spec["cube"]["pe_layout"]
n_cubes = cm["w"] * cm["h"]
n_pes = pl["pe_per_corner"] * len(pl["corners"])
return [(c, p) for c in range(n_cubes) for p in range(n_pes)]
MAPPER_REGISTRY = {"leader_only": leader_only, "all_pes": all_pes}
def resolve_mapper(key_or_path: str) -> Mapper:
if key_or_path in MAPPER_REGISTRY:
return MAPPER_REGISTRY[key_or_path]
if "." in key_or_path:
import importlib
mod_path, fn_name = key_or_path.rsplit(".", 1)
return getattr(importlib.import_module(mod_path), fn_name)
raise ValueError(f"unknown mapper: {key_or_path!r}")
```
Validator도 동일 패턴 (`src/kernbench/ccl/validators.py`). 입력은 **global
TensorHandle** (D8 참고).
### D6. Host-side install plan builder
```python
# src/kernbench/ccl/install_plan.py (new; 기존 install.py의 재구성)
from dataclasses import dataclass
from typing import Any, Mapping
@dataclass(frozen=True)
class NeighborTableEntry:
direction: str
peer_direction: str # ADR-0025
peer_sip: int
peer_cube: int
peer_pe: int
rx_base_pa: int
# ... 기타 IPCQ 설정 ...
@dataclass(frozen=True)
class PeInstallSpec:
cube: int
pe: int
neighbors: tuple[NeighborTableEntry, ...]
@dataclass(frozen=True)
class SipInstallPlan:
algorithm_name: str # human-readable ("ring_allreduce_tcm")
sip: int
rank: int
world_size: int
pe_installs: tuple[PeInstallSpec, ...] # per-PE neighbor tables
buffer_kind: str
n_slots: int
slot_size: int
kernel_module: str
participating_pes: tuple[tuple[int, int], ...]
kernel_config: Mapping[str, Any]
def build_install_plans(
world_size: int,
algorithm: str,
algorithm_config: dict,
spec: dict,
) -> list[SipInstallPlan]:
"""Compose topology + mapper + algorithm into per-SIP plan list."""
topo_fn = _resolve_topology(algorithm_config["topology"])
mapper = resolve_mapper(algorithm_config.get("mapper", "leader_only"))
# kernel_config: launch 시 kernel_args에 전달할 algorithm-specific params
kernel_config = {
k: v for k, v in algorithm_config.items()
if k in {"n_elem", "reduce_op", "chunk_size"} or k.startswith("kernel_")
}
plans = []
for rank in range(world_size):
sip = rank # identity mapping (non-identity는 open question)
pes = mapper(spec, rank)
pe_installs = _build_pe_installs(
rank=rank, world_size=world_size, sip=sip,
pes=pes, topo_fn=topo_fn, algorithm_config=algorithm_config, spec=spec,
)
plans.append(SipInstallPlan(
algorithm_name=algorithm,
sip=sip, rank=rank, world_size=world_size,
pe_installs=pe_installs,
buffer_kind=algorithm_config["buffer_kind"],
n_slots=algorithm_config["n_slots"],
slot_size=algorithm_config["slot_size"],
kernel_module=algorithm_config["module"],
participating_pes=tuple(pes),
kernel_config=kernel_config,
))
return plans
```
`_build_pe_installs`는 기존 `ccl/install.py`의 neighbor 계산 로직을 재활용
(ADR-0025의 `reverse_direction` 개선 반영).
**Multi-PE 매퍼와 neighbor 생성 책임**: mapper가 SIP 내 여러 PE를 반환하는
경우 (`all_pes` 등), PE-level neighbor 그래프는 `_build_pe_installs` 내부에
형성된다. 즉 topology 모듈은 rank-level 관계만 제공하고, PE-level 연결은
builder에서 풀어낸다. 복잡한 multi-level 패턴을 쓰는 알고리즘은 이 책임
분산이 관리 부담이 될 수 있음 — 관련 논의는 ADR-0029 참고.
### D7. Epoch-based collective barrier
Cross-rank submit 동기화. 각 collective 호출은 독립 epoch. 같은 rank의
중복 join은 즉시 에러.
```python
# src/kernbench/runtime_api/distributed.py
@dataclass
class _EpochState:
participants: set[int] = field(default_factory=set)
pending: list = field(default_factory=list)
drained: bool = False
returned: int = 0
class _CollectiveBarrier:
"""Epoch-based barrier.
Contract:
- Each call joins the earliest non-drained epoch.
- Each rank may join a given epoch at most once. Duplicate join raises.
- Last arriver (participants == world_size) performs drain and advances
_next_epoch. Earlier arrivers yield and re-check drained on resume.
- Epoch state is GC'd when returned == world_size (success path).
- On failure paths, residual state is acceptable; reset() clears it.
"""
def __init__(self, world_size: int):
self._world_size = world_size
self._next_epoch = 0
self._state: dict[int, _EpochState] = {}
def submit_and_drain(self, ctx, rank: int, submit_fn) -> None:
epoch = self._next_epoch
state = self._state.setdefault(epoch, _EpochState())
if rank in state.participants:
raise RuntimeError(
f"rank {rank} attempted duplicate join to epoch {epoch}"
)
state.participants.add(rank)
handles = submit_fn()
state.pending.extend(handles)
is_last = len(state.participants) >= self._world_size
if is_last:
for h in state.pending:
ctx.wait(h)
state.drained = True
self._next_epoch = epoch + 1
else:
from greenlet import getcurrent
g = getcurrent()
if g.parent is None:
raise RuntimeError("barrier requires a bound worker greenlet")
while not state.drained:
g.parent.switch()
state.returned += 1
if state.returned >= self._world_size:
self._state.pop(epoch, None)
def reset(self) -> None:
"""Explicit cleanup on spawn exception unwinding."""
self._state.clear()
self._next_epoch = 0
```
### D8. Per-rank tensor view + validator contract
**Validator** (host-side, pre-slice, global handle 기준):
```python
# src/kernbench/ccl/validators.py
Validator = Callable[[TensorHandle, int, dict], None]
def single_shard_per_rank(handle, world_size, spec):
"""Ring 계열: 정확히 world_size개 shard, SIP당 1개."""
if len(handle.shards) != world_size:
raise ValueError(...)
per_sip = {}
for s in handle.shards:
per_sip[s.sip] = per_sip.get(s.sip, 0) + 1
if any(c != 1 for c in per_sip.values()):
raise ValueError(...)
def multi_pe_sip_local(handle, world_size, spec):
"""Multi-PE per SIP layout: 각 SIP에 intra-SIP PE 수만큼 shard 존재.
Intra-SIP 전체 PE를 참여시키는 알고리즘이 사용."""
cm = spec["sip"]["cube_mesh"]
pl = spec["cube"]["pe_layout"]
per_sip = cm["w"] * cm["h"] * pl["pe_per_corner"] * len(pl["corners"])
if len(handle.shards) != world_size * per_sip:
raise ValueError(...)
VALIDATOR_REGISTRY = {...}
def resolve_validator(key_or_path): ...
```
Validator는 world 전체의 shard layout 불변량을 본다. Per-rank view는
backend가 validator 호출 **후** `_tensor_slice_for_sip`로 생성.
**Per-rank tensor view** — SIP-local slice:
```python
def _tensor_slice_for_sip(handle, sip) -> TensorArg:
sip_shards = [s for s in handle.shards if s.sip == sip]
if not sip_shards:
raise RuntimeError(f"tensor has no shards on SIP {sip}")
# Deterministic ordering contract: (cube, pe, offset_bytes) ascending.
# Multi-PE mappers (hierarchical 등) rely on this ordering to align
# per-PE tensor arg construction with participating_pes enumeration.
sip_shards.sort(key=lambda s: (s.cube, s.pe, s.offset_bytes))
min_offset = min(s.offset_bytes for s in sip_shards)
local_va_base = handle.va_base + min_offset if handle.va_base else 0
return TensorArg(
shards=tuple(TensorArgShard(...) for s in sip_shards),
va_base=local_va_base,
)
```
**Ordering invariant**: slice의 shard는 `(cube, pe, offset_bytes)` 오름차순.
Backend가 `participating_pes`를 iterate하며 `_tensor_arg_for_pe(view, cube, pe)`
구성할 때, 결정론적 ordering을 전제할 수 있다. 특히 `all_pes` mapper +
hierarchical 알고리즘이 per-PE slice 조합을 순서 의존적으로 해석하는 경우에
중요.
### D9. Greenlet-local rank registry (+ debug warning)
```python
class DistributedContext:
def __init__(self):
self._backend = None
self._rank_by_greenlet: dict = {}
def _bind_rank(self, g, rank: int) -> None:
self._rank_by_greenlet[g] = int(rank)
def get_rank(self) -> int:
self._ensure_initialized()
from greenlet import getcurrent
g = getcurrent()
if g not in self._rank_by_greenlet:
if os.environ.get("KERNBENCH_DEBUG"):
warnings.warn(
"get_rank() called outside a bound greenlet — returning 0. "
"Likely a bug unless running single-driver."
)
return 0
return int(self._rank_by_greenlet[g])
```
### D10. `torch.ahbm.set_device(rank)` — SIP 바인딩
KernBench 백엔드 이름은 `ahbm` (ADR-0023 D10). Real PyTorch는
`torch.cuda.set_device(r)`이지만 우리는 CUDA가 아니므로 honestly-named
namespace를 사용한다.
```python
class _AhbmNamespace:
"""torch.ahbm — per-greenlet SIP device binding.
Real-PyTorch parity idiom: ``torch.cuda.set_device(rank)``. Since
KernBench's backend is 'ahbm' (not CUDA), we expose the equivalent
API under ``torch.ahbm`` to avoid pretending to be a CUDA runtime.
"""
def __init__(self):
self._device_by_greenlet: dict = {}
def set_device(self, device: int) -> None:
from greenlet import getcurrent
self._device_by_greenlet[getcurrent()] = int(device)
def current_device(self) -> int | None:
from greenlet import getcurrent
return self._device_by_greenlet.get(getcurrent())
# Attached to RuntimeContext as `self.ahbm = _AhbmNamespace()`.
# Bench code: `torch.ahbm.set_device(rank)` mirrors `torch.cuda.set_device`.
```
**PyTorch 2.x style 병행 지원**: 최신 PyTorch는 device-agnostic한
`torch.accelerator` 네임스페이스를 지향 (`torch.accelerator.set_device_index(r)`,
`torch.accelerator.current_device_index()`). Device vendor에 종속되지 않는
코드를 쓰려는 사용자를 위해 KernBench도 이 표면을 병행 지원한다.
```python
class _AcceleratorNamespace:
"""torch.accelerator — device-agnostic API (PyTorch 2.x style).
Aliases torch.ahbm for bench code that prefers device-neutral idiom:
torch.accelerator.set_device_index(rank)
torch.accelerator.current_device_index()
"""
def __init__(self, ahbm: _AhbmNamespace):
self._ahbm = ahbm
def set_device_index(self, device: int) -> None:
self._ahbm.set_device(device)
def current_device_index(self) -> int | None:
return self._ahbm.current_device()
# RuntimeContext
self.ahbm = _AhbmNamespace()
self.accelerator = _AcceleratorNamespace(self.ahbm) # alias
```
Bench 작성자는 다음 중 하나를 선택 — 둘 다 내부적으로 같은 레지스트리를 보유:
```python
torch.ahbm.set_device(rank) # KernBench-native, explicit backend
torch.accelerator.set_device_index(rank) # PyTorch 2.x device-agnostic
```
### D11. Tensor placement = structural (sip, cube, pe) 좌표
`resolve_dp_policy``target_sip`을 직접 받아 구조적 좌표로 placement 생성.
세부는 ADR-0026.
```python
# RuntimeContext._create_tensor
current_sip = self.ahbm.current_device() # (D10 naming)
if current_sip is None:
current_sip = 0 # single-driver fallback (D9와 일관)
placement = resolve_dp_policy(
dp, shape=shape_2d, itemsize=itemsize,
num_pe=eff_num_pe, num_cubes=eff_num_cubes,
target_sip=current_sip,
)
```
Post-hoc `pe_index` shifting 제거 — ShardSpec이 `(sip, cube, pe)` 구조적
좌표 보유.
### D12. `torch.multiprocessing.spawn`-compat surface
Bench 작성자 표면은 real PyTorch `mp.spawn`과 동일:
```python
# src/kernbench/runtime_api/multiprocessing.py (new)
def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method="spawn"):
"""Drop-in for torch.multiprocessing.spawn.
Internal: greenlet fan-out + epoch-barrier sync + exception propagation.
"""
...
# torch namespace에 부착
torch.multiprocessing = SimpleNamespace(spawn=spawn)
```
Bench:
```python
import torch.multiprocessing as mp
mp.spawn(worker, nprocs=world_size, args=(world_size, torch))
```
### D13. Scheduler + exception handling
```python
def spawn(fn, args, nprocs, ...):
dist = torch.distributed
gs: list[greenlet] = []
errors: dict[int, Exception] = {}
for rank in range(nprocs):
def _entry(r=rank):
try:
fn(r, *args)
except Exception as e:
errors[r] = e
raise
g = greenlet(_entry)
dist._bind_rank(g, rank)
gs.append(g)
try:
while True:
alive = [g for g in gs if not g.dead]
if not alive:
break
for g in alive:
if not g.dead:
g.switch()
except Exception as outer:
for other in gs:
if not other.dead:
try:
other.throw(SystemExit)
except Exception:
pass
# Epoch barrier state 명시적 cleanup
backend = getattr(dist, "_backend", None)
if backend is not None and hasattr(backend, "_barrier"):
backend._barrier.reset()
raise SpawnException(errors) from outer
```
**Scheduler contract**:
- Deterministic round-robin over insertion order (rank 0, 1, ..., N-1).
- 동기화 지점은 epoch barrier (D7)만. Scheduler 순서에 의존하는 correctness 없음.
- 예외 발생 시 다른 greenlet 강제 종료 + `SpawnException` 전파.
**Starvation guideline**:
- 일반적으로 collective barrier가 workers를 동기화. 큰 편차 없음.
- 극단적 non-collective 루프 대비 cooperative yield 제공:
`torch.distributed.cooperative_yield()`.
### D14. Backward compatibility
1. **Single-driver 호출**: `get_rank()` 0 반환 (D9).
2. **`ccl.yaml` world_size override**: D1 fallback 우회 — legacy "rank = PE"
테스트 경로로 사용 가능.
3. **`DPPolicy.sip="column_wise"` 명시**: ADR-0026 scope.
4. **`install_ipcq()` compatibility wrapper**:
기존 `ccl/install.py``install_ipcq()` API는 곧바로 제거하지 않는다.
Thin compatibility wrapper로 남겨 기존 직접 호출자가 점진적으로 migration할
수 있게 한다.
```python
# src/kernbench/ccl/install.py (after this ADR)
def install_ipcq(engine, spec, merged, *, algo_module=None, rank_to_pe=None):
"""DEPRECATED: legacy host-side PE installer.
Internally delegates to build_install_plans + engine-routed IpcqInitMsg.
Use dist.init_process_group() instead.
"""
from kernbench.ccl.install_plan import build_install_plans
import warnings
warnings.warn(
"install_ipcq() is deprecated; use dist.init_process_group()",
DeprecationWarning, stacklevel=2,
)
plans = build_install_plans(
world_size=merged.get("world_size", 1),
algorithm=merged["algorithm"],
algorithm_config=merged,
spec=spec,
)
handles = []
for plan in plans:
for pe_install in plan.pe_installs:
h = engine.submit(IpcqInitMsg(
target_sips=(plan.sip,),
target_cubes=(pe_install.cube,),
target_pe=pe_install.pe,
entries=pe_install.neighbors,
buffer_kind=plan.buffer_kind,
n_slots=plan.n_slots,
slot_size=plan.slot_size,
))
handles.append(h)
for h in handles:
engine.wait(h)
return {"world_size": merged.get("world_size", 1), "plans": plans}
```
Migration 스케줄:
- Phase 1: wrapper로 유지 + DeprecationWarning
- Phase 2: 직접 호출자 grep-audit → 각각 `dist.init_process_group()` 또는
`build_install_plans()` 직접 사용으로 이관
- Phase 3: wrapper 제거 (별도 cleanup ADR 또는 PR)
---
## Dependencies
- **ADR-0023** (IPCQ): `IpcqInitMsg` 메시지 타입과 PE_IPCQ 핸들링을 그대로
활용. Engine-routed submit으로 전환하는 것이 유일한 변경.
- **ADR-0025** (IPCQ direction fix): `_build_pe_installs`의 neighbor 계산이
2-rank ring 등에서 정확히 동작하려면 필요.
- **ADR-0003 / 0016** (IO_CPU): IO_CPU는 기존 transit 역할 그대로. 본 ADR에서
IO_CPU 역할 변경 없음.
---
## Non-goals
- **IPCQ protocol 수정**: ADR-0023 유지.
- **DPPolicy 필드 정리**: ADR-0026.
- **Megatron-style TP**: ADR-0027.
- **Multi-node (프로세스 간)**: 단일 프로세스.
- **IO_CPU SIP control-plane 단일 endpoint 원칙 채택**: 본 ADR 범위 밖. 현재
KernBench에 이 원칙이 없고, 도입은 별도 ADR.
- **Hierarchical all-reduce 알고리즘 설계**: ADR-0029. 본 ADR은 그 알고리즘이
쓸 framework 인프라 (`all_pes` mapper, `multi_pe_sip_local` validator,
registry 확장점)만 제공.
---
## Open questions
### 🔴 Critical — 구현 blocker 가능성 (integration 전 반드시 검증)
- **`IpcqInitMsg`의 engine routing — primary implementation risk**: 현재
sideband만 쓰여서 engine routing path가 실사용 검증되지 않은 상태. **본
ADR 전체가 "engine routing이 동작한다"는 가정 위에 서 있다**. 이것이
실제로 안 되면 D2, D14, T3 등이 전부 영향 받음. 반드시 **ADR 구현 착수
전 스파이크 검증**:
- `engine.submit(IpcqInitMsg(target_sips=..., target_cubes=..., target_pe=...))`
가 PE_IPCQ로 정확히 배달되는지 (기존 `MmuMapMsg` / `MemoryWriteMsg` 라우팅
패턴과 비교)
- 미지원 시 minor hook: engine의 message-type → component-kind 매핑 테이블에
`IpcqInitMsg → "pe_ipcq"` 등록 (localized change, topology builder /
message schema 영향 없음)
- 결과에 따라 D2 채택 여부가 달라질 수 있음 — 만약 routing 불가 시 sideband
path 유지로 fallback 후 본 ADR 범위 재조정
- **Engine-routed install vs sideband equivalence** (D2 검증점 1-5): T3의
equivalence test가 실제 동작하는지 스파이크. 특히 ordering independence와
idempotency는 기존 테스트에 없는 속성이라 신규 검증 필요.
- **`install_ipcq()` 직접 호출자 audit** (구현 전 필수): deprecated wrapper
전략은 적절하지만 실제 migration 리스크는 호출자 목록에 따라 다름. 착수 전
grep audit:
- Pattern: `install_ipcq(` (cwd 전체)
- Scope: `src/`, `tests/`, `benches/`, `scripts/`, `src/kernbench/cli/`
- 각 호출자의 예상 migration path (→ `dist.init_process_group` vs
`build_install_plans` 직접)를 정리한 후 wrapper 도입
### 🟡 Nice-to-have — scope 경계 관련
- **Install timing 허용치**: SimPy 시간 상 install이 몇 ns~us 소모. 기존
sideband는 0ns. 기존 테스트가 t=0 시작을 전제로 하는지 확인 (audit 결과에
따라 테스트 교정 필요).
- **`IpcqInitMsg` 배치 가능성**: MmuMapMsg처럼 `target_pe="all"` 브로드캐스트
는 IPCQ에서는 부적합 (PE마다 neighbor가 다름). 현재는 per-PE 개별 submit.
Per-PE payload를 담는 batched IpcqInitMsg 타입은 future optimization.
- **`_rank_to_sip` 매핑**: 현재 identity. Non-trivial mapping 요구 시 별도.
- **Cooperative yield API 위치**: `torch.distributed.cooperative_yield()`
노출 예정. 실제 필요성은 Phase 2 이후 벤치 추가 시 판단.
(PE-level topology 일원화 관련 중장기 방향은 **ADR-0029** 참고 — 복잡한
multi-level 알고리즘이 driving force가 되는 framework 진화 방향.)
---
## Test strategy
### T1. Launcher infrastructure
`tests/test_ccl_ddp_launcher.py`:
- `test_world_size_equals_sip_count` — D1
- `test_ahbm_set_device_binds_tensor_to_single_sip` — D10/D11
- `test_get_rank_is_greenlet_local` — D9
- `test_run_spawns_one_worker_per_rank` — D12/D13
- `test_get_rank_debug_warning` — D9 warning path
### T2. Install plan builder
`tests/test_ccl_install_plan.py` (new):
- `build_install_plans` — ring_1d × leader_only 조합 (단일 PE per rank)
- `build_install_plans` — ring_1d × all_pes 조합 (multi-PE per rank; mapper
framework 동작 확인, 알고리즘-무관)
- Mapper / validator registry resolution (built-in key vs import path vs
unknown)
- Import path fallback (`"pkg.mod.fn"` 형식) 동작 검증
### T3. Engine-routed IpcqInitMsg (equivalence — 핵심 검증)
`tests/test_ipcq_init_routing.py` (new):
- **Routing**: `engine.submit(IpcqInitMsg)` → 지정 PE_IPCQ가 실제 설치 수행
- **Equivalence**: 동일한 IpcqInitMsg를 (a) sideband `_install_neighbors`
직접 호출, (b) engine.submit 두 경로로 보낸 뒤 PE_IPCQ 최종 state
(`_queue_pairs`, `_installed` 등) 동일성 비교
- **Ordering independence**: 서로 다른 PE의 install msg를 engine 큐에 임의
순서로 넣어도 최종 state가 동일
- **Idempotency (duplicate install)**: 동일 PE에 두 번 install msg → 두
번째는 에러 raise (policy: explicit error; D2 검증점 4 참고)
- **Multi-PE 병렬 install**: per-PE submit이 interference 없이 완료
- **Install 후 send 성공**: 설치 직후 `IpcqSendCmd` 실행해서 neighbor table
state가 실제로 유효한지 확인
### T4. Barrier correctness
`tests/test_collective_barrier.py` (new):
- Single collective 정상
- 다중 collective 연속 호출 (epoch 격리)
- 동일 rank의 duplicate join → RuntimeError
- Rank 1이 all_reduce 전 종료 → SpawnException + barrier.reset()
- Conditional branch 시 모든 rank 도달하면 정상
### T5. E2E
`tests/test_ccl_allreduce_matrix.py`:
- `ring_tcm` / `ring_hbm` / `ring_sram` @ ws=SIP_count
### T6. 회귀
기존 `test_ccl_framework`, `test_ccl_install`, `test_ccl_topologies`,
`test_ccl_mock_runtime`, `test_pe_ipcq`, `test_ipcq_e2e`, 기타 non-CCL
모두 통과.
---
## Consequences
### Positive
- **새 message 타입 0개**: 기존 `IpcqInitMsg` + `KernelLaunchMsg`만으로 구현.
- **IO_CPU / engine 변경 없음**: 기존 routing 그대로.
- **Sideband install convention 제거**: MmuMapMsg 등과 동일 패턴으로 일원화.
- **Plan state stale 문제 소멸**: Plan은 host 단일 소유.
- **Bench = real PyTorch DDP** (공개 API 관점).
- **Algorithm ABI 경량**: `kernel` + `kernel_args`만 필수.
- **Epoch-based barrier**: interleaved collective 안전.
- **Control/data plane 분리**: data plane(PE_IPCQ)은 ADR-0023 유지, control
plane은 host-driven.
- 장기 확장성: Megatron TP, DTensor 기반.
### Negative
- 신규 모듈: `install_plan.py`, `mappers.py`, `validators.py`,
`multiprocessing.py`.
- Engine이 `IpcqInitMsg`를 엔진-path로 라우팅할 수 있는지 구현 시 확인 필요
(minor hook 가능성).
- Install이 SimPy 시간을 소모 (positive로도 볼 수 있으나, 기존 sideband 시점
0ns 전제인 테스트가 있으면 교정 필요).
### Neutral
- IPCQ PE-level protocol (ADR-0023) 불변.
- `DPPolicy` 필드 변경은 ADR-0026.
- IO_CPU 역할 불변 (기존 transit 그대로).
---
## Affected files
| File | Change |
|------|--------|
| `src/kernbench/runtime_api/distributed.py` | D1/D2/D7/D9: world_size fallback, rank_to_sip, plan 소유, engine-routed install/launch, epoch barrier |
| `src/kernbench/runtime_api/context.py` | D10/D11: `_AhbmNamespace`, `ctx.ahbm`, `_create_tensor``target_sip` 전달 |
| `src/kernbench/runtime_api/multiprocessing.py` (new) | D12/D13: `spawn` + scheduler + exception |
| `src/kernbench/ccl/install_plan.py` (new) | D6: `build_install_plans`, `SipInstallPlan`, `PeInstallSpec`, `NeighborTableEntry` |
| `src/kernbench/ccl/mappers.py` (new) | D5: `leader_only`, `all_pes`, registry + resolver |
| `src/kernbench/ccl/validators.py` (new) | D5: validator registry + resolver |
| `src/kernbench/ccl/install.py` | Thin deprecated compat wrapper (D14) |
| `src/kernbench/ccl/algorithms/ring_allreduce.py` | D4: `kernel` + `kernel_args` 유지 (큰 변화 없음) |
| `src/kernbench/ccl/algorithms/mesh_allreduce.py` | D4 동일 |
| `src/kernbench/ccl/algorithms/tree_allreduce.py` | D4 동일 |
| `ccl.yaml` | 각 알고리즘에 `mapper` / `validator` 선언 추가 |
| `src/kernbench/sim_engine/engine.py` | (If needed) `IpcqInitMsg` → PE_IPCQ 라우팅 확인 hook |
| `benches/ccl_allreduce.py` | 새 launcher 기반 rewrite |
| `tests/test_ccl_ddp_launcher.py` (new) | T1 |
| `tests/test_ccl_install_plan.py` (new) | T2 |
| `tests/test_ipcq_init_routing.py` (new) | T3 |
| `tests/test_collective_barrier.py` (new) | T4 |
| `tests/test_ccl_allreduce_matrix.py` | T5: ws=SIP_count 단순화 |