22fd0d2b9d
- CLAUDE.md: add ADR Lifecycle subsection (superseded → docs/history/, immutable numbering, no renumber) - ADR-0011: merge ADR-0018 content as "Address Model: LA" section alongside PA / VA; status notes VA model is currently implemented - ADR-0018 / 0029 / 0031: moved to docs/history/ with status updates (0018 merged into 0011, 0029 superseded by 0032, 0031 absorbed into 0001 rev 2) - ADR-0019: rewrite Context as PE-HBM connectivity decision (self-contained, no LA model framing) - ADR-0019/0020/0021/0023/0025/0027: Status Proposed → Accepted (code verified) and prune Implementation Notes / Affected files / Test strategy / "현재 상태" sub-sections describing pre-impl state - ADR-0024/0026: same migration-flavor cleanup; 0026 also drops D6 Migration and D8 docs-update sub-decisions - ADR-0030: status simplified (blocker ADR-0031 now superseded) - SPEC.md: R10 + §0.2 reflect PA / VA / LA model names - ADR-0008/0012/0013: refresh ADR-0011 subtitle in Links 21 files changed, 553 insertions(+), 1290 deletions(-). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
869 lines
32 KiB
Markdown
869 lines
32 KiB
Markdown
# ADR-0024: SIP-level TP Launcher — rank = SIP (host-driven dispatch)
|
||
|
||
## Status
|
||
|
||
Accepted. rank = SIP process-group model stands. The allreduce algorithm
|
||
path (mapper / validator / per-PE install machinery originally targeted at
|
||
ADR-0029) has been replaced by ADR-0032: `AhbmCCLBackend` now calls
|
||
`configure_sfr_intercube_multisip` at `init_process_group` time and the
|
||
intercube kernel receives `(sip_rank, sip_topo_kind, sip_topo_w,
|
||
sip_topo_h)` appended after the module's `kernel_args()`. The
|
||
`leader_only` / `all_pes` mapper concepts in this document are no longer
|
||
used by the default allreduce path.
|
||
|
||
## Context
|
||
|
||
### 목표
|
||
|
||
`torch.distributed` collective 호출의 참여 단위(rank)를 **SIP**(device)
|
||
경계에 맞춘다. 실제 PyTorch DDP/TP 스크립트와 **호스트 레벨에서 구분 없이**
|
||
읽히는 bench 코드를 목표로 한다.
|
||
|
||
real PyTorch와 비교:
|
||
|
||
| 차원 | real PyTorch | KernBench (이 ADR 이후) |
|
||
|---|---|---|
|
||
| 프로세스 모델 | N개 프로세스, 각 1 GPU | 1 프로세스, N greenlet, 각 1 SIP |
|
||
| `get_rank()` | `RANK` env var | greenlet-local 레지스트리 |
|
||
| `get_world_size()` | `WORLD_SIZE` env var | topology의 SIP 수 |
|
||
| `torch.cuda.set_device(r)` (real) / `torch.ahbm.set_device(r)` (KernBench) | rank → GPU | rank → SIP |
|
||
| `mp.spawn` | OS 프로세스 fork | greenlet fan-out |
|
||
|
||
### 설계 원칙 — 공개 API의 추상화, 내부는 기존 path 활용
|
||
|
||
**공개 API (bench worker) 수준의 추상화**:
|
||
```
|
||
rank = SIP
|
||
DPPolicy = intra-device (cube × PE) 분산만
|
||
dist.all_reduce, torch.ahbm.set_device, mp.spawn 등 PyTorch-style 표면
|
||
```
|
||
|
||
**Framework 내부 구현**:
|
||
```
|
||
build_install_plans (host): topology + mapper + algorithm → SipInstallPlan
|
||
↓
|
||
backend (host): plan의 per-PE spec을 engine.submit으로 IpcqInitMsg 디스패치
|
||
↓
|
||
engine: 기존 PE-scoped routing (MmuMapMsg 등과 동일 경로)
|
||
↓
|
||
PE_IPCQ: 자체 message loop에서 IpcqInitMsg 처리 (기존 capability)
|
||
```
|
||
|
||
**핵심**: 새 message 타입이나 IO_CPU 확장 없음. 기존 engine routing과 기존
|
||
`IpcqInitMsg` 타입을 그대로 사용. 기존의 "sideband direct call" 우회만
|
||
제거하여 convention 일원화.
|
||
|
||
### 풀어야 할 문제
|
||
|
||
1. **공개 API에서 rank = SIP** — bench worker가 PE 개념을 알지 않도록.
|
||
2. **Multi-worker 실행** — N개 rank가 독립 worker 코드 실행. 1 프로세스 제약
|
||
하에서 greenlet + barrier 동기화.
|
||
3. **Cross-rank collective submit 동기화** — 첫 rank가 혼자 wait하면 peer 부재로
|
||
SimPy deadlock. 모든 rank submit 후 drain 보장.
|
||
4. **기존 sideband install 제거** — IpcqInitMsg를 engine.submit으로 일원화.
|
||
MmuMapMsg 등 다른 control-plane 메시지와 동일 패턴.
|
||
5. **Algorithm / mapper / validator 분리** — 알고리즘 모듈은 kernel 코드만
|
||
담고, topology / mapping / validation은 registry + 선언.
|
||
|
||
### Non-problem (이 ADR 밖)
|
||
|
||
- IPCQ direction addressing fix → **ADR-0025**
|
||
- `DPPolicy.sip`/`num_sips` 제거 → **ADR-0026**
|
||
- Megatron-style TP → **ADR-0027**
|
||
- DTensor → **ADR-0028 (future)**
|
||
- **IO_CPU를 SIP-level control-plane 단일 endpoint로 승격**: 이 ADR에서는
|
||
invariant으로 채택하지 않음. 현재 KernBench에 해당 원칙이 없고, 단독으로
|
||
도입하기엔 정당화가 약함. 미래에 control-plane latency 모델링 정밀도 요구가
|
||
생기면 별도 ADR.
|
||
|
||
## Decision
|
||
|
||
### D1. rank = SIP (world_size 해석)
|
||
|
||
```python
|
||
def _resolve_world_size(self) -> int:
|
||
if "world_size" in self._merged:
|
||
return int(self._merged["world_size"])
|
||
defaults = self._cfg_all.get("defaults", {})
|
||
if "world_size" in defaults:
|
||
return int(defaults["world_size"])
|
||
spec = self.ctx.spec or {}
|
||
return int(spec.get("system", {}).get("sips", {}).get("count", 1))
|
||
```
|
||
|
||
우선순위: 알고리즘 override > defaults override > SIP count. `ccl.yaml`
|
||
override는 legacy "rank = PE" 테스트 경로로 유지.
|
||
|
||
### D2. Install 경로 — engine.submit 일원화
|
||
|
||
`ccl/install.py`의 sideband direct call을 제거하고, `IpcqInitMsg`를
|
||
`engine.submit`으로 보낸다. MmuMapMsg / MemoryWriteMsg 등이 이미 동일 패턴.
|
||
|
||
```python
|
||
# Backend (AhbmCCLBackend.__init__ 또는 init_process_group 시점)
|
||
from kernbench.ccl.install_plan import build_install_plans
|
||
|
||
plans = build_install_plans(
|
||
world_size=self._world_size,
|
||
algorithm=self._merged["algorithm"],
|
||
algorithm_config=self._merged,
|
||
spec=self.ctx.spec,
|
||
)
|
||
self._plans = plans
|
||
|
||
# Each PE_IPCQ가 자기 neighbor table을 받도록 engine 경유 submit
|
||
handles = []
|
||
for plan in plans:
|
||
for pe_install in plan.pe_installs:
|
||
h = self.ctx.submit(IpcqInitMsg(
|
||
correlation_id=self.ctx.correlation_id,
|
||
request_id=f"ipcq_init_s{plan.sip}c{pe_install.cube}p{pe_install.pe}",
|
||
target_sips=(plan.sip,),
|
||
target_cubes=(pe_install.cube,),
|
||
target_pe=pe_install.pe,
|
||
entries=pe_install.neighbors,
|
||
buffer_kind=plan.buffer_kind,
|
||
n_slots=plan.n_slots,
|
||
slot_size=plan.slot_size,
|
||
# ... (기존 IpcqInitMsg 필드)
|
||
))
|
||
handles.append(h)
|
||
|
||
# Eager install — init_process_group이 반환하기 전에 완료 보장
|
||
for h in handles:
|
||
self.ctx.wait(h)
|
||
```
|
||
|
||
**PE_IPCQ 컴포넌트**는 이미 `IpcqInitMsg`를 main loop에서 처리 (`pe_ipcq.py`
|
||
라인 145-147). 변경 불필요. 유일한 차이는 "message가 sideband Python call이
|
||
아니라 engine queue를 거쳐 도착한다"는 점.
|
||
|
||
**Correctness invariant (equivalence)**: `init_process_group()`은 모든
|
||
install handle을 `wait()`한 후 반환하므로 launch-before-install 문제는
|
||
구조적으로 없다. 남는 correctness 질문은 단 하나:
|
||
|
||
> Engine-routed `IpcqInitMsg` 처리가 기존 sideband
|
||
> `pe_ipcq._install_neighbors(msg)` 호출과 **동일한 최종 PE_IPCQ 상태**를
|
||
> 생성하는가.
|
||
|
||
검증 포인트 (T3 참고):
|
||
|
||
1. **State equivalence**: `_install_neighbors()` 내부 상태 전이가 engine
|
||
dispatch path에서도 동일하게 일어나 최종 PE_IPCQ state
|
||
(`_queue_pairs`, `_installed`, `_credit_inbox` 등)가 일치.
|
||
|
||
2. **Sideband-only side effect 부재**: Sideband path에서만 있던 부수 효과가
|
||
없음 (예: engine.submit이 설정하는 request_id / correlation tracking 등이
|
||
install semantics를 왜곡하지 않음).
|
||
|
||
3. **Ordering independence**: 서로 다른 PE들의 install message가 engine
|
||
큐에서 임의 순서로 처리되어도 최종 상태가 동일. 즉 install은 **PE별
|
||
독립 연산**이어야 하고, cross-PE 순서 의존성이 있으면 안 됨.
|
||
|
||
4. **Idempotency**: 동일 PE에 대해 `IpcqInitMsg`가 두 번 도착하면? 현재
|
||
설계 전제는 "per-PE 단 한 번 install". 중복 install 시 동작은 정의되지
|
||
않음. 보수적 정책:
|
||
- 최초 install 시 `_installed = True`로 전이
|
||
- 이후 중복 install msg는 **에러** (raise) 또는 **silent idempotent**
|
||
(no-op) 둘 중 하나로 명시
|
||
- Recommend: **raise** (명시적 에러 → 버그 조기 검출). T3에 duplicate
|
||
install 케이스 추가.
|
||
|
||
5. **Partial install visibility**: 일부 PE만 install 완료된 중간 상태가
|
||
외부에 observable한가? 현재 구조에서는 `init_process_group()`의 eager
|
||
wait-all이 barrier 역할을 하므로 partial state는 bench 코드에 노출되지
|
||
않음. 단, debugging / introspection API는 중간 상태를 볼 수 있음 (문제
|
||
아님, 문서화만).
|
||
|
||
**Timing 영향**: Engine-routed install은 `init_process_group()`이 SimPy 시간을
|
||
소비하게 만든다. 기존 sideband install은 사실상 zero-cost. ADR 계약:
|
||
|
||
> Benchmarks must not rely on zero-cost initialization.
|
||
> `init_process_group()` consumes simulated time proportional to the number
|
||
> of participating PEs × per-PE install latency. First collective call
|
||
> starts at a well-defined but non-zero sim time.
|
||
|
||
### D3. Launch 경로 — non-CCL 커널과 동일 primitive
|
||
|
||
**CCL 커널은 non-CCL 커널과 동일한 `KernelLaunchMsg` submission path를 쓴다.**
|
||
Engine 내부의 IO_CPU/M_CPU transit 같은 것은 **기존 구현 세부이지 CCL-specific
|
||
장치가 아님**. Backend는 plan의 `participating_pes` 목록을 돌면서 `KernelLaunchMsg`를
|
||
submit할 뿐이다. 새 메시지 타입 없음, 새 라우팅 경로 없음.
|
||
|
||
```python
|
||
# AhbmCCLBackend.all_reduce
|
||
def all_reduce(self, tensor, op="sum"):
|
||
if op != "sum":
|
||
raise NotImplementedError(...)
|
||
if tensor._handle is None or not tensor._handle.shards:
|
||
raise RuntimeError(...)
|
||
|
||
# Validator — global handle 기준 (D8)
|
||
validator_name = self._merged.get("validator")
|
||
if validator_name:
|
||
resolve_validator(validator_name)(tensor._handle, self._world_size, self.ctx.spec)
|
||
|
||
rank = self.ctx.distributed.get_rank()
|
||
plan = self._plans[rank]
|
||
tensor_view = _tensor_slice_for_sip(tensor._handle, plan.sip)
|
||
|
||
# Plan에서 kernel args 계산 (host-side)
|
||
import importlib
|
||
mod = importlib.import_module(plan.kernel_module)
|
||
n_elem = tensor_view.shards[0].nbytes // tensor.itemsize
|
||
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
|
||
**plan.kernel_config)
|
||
|
||
def _submit():
|
||
out = []
|
||
for (cube, pe) in plan.participating_pes:
|
||
h = self.ctx.submit(KernelLaunchMsg(
|
||
correlation_id=self.ctx.correlation_id,
|
||
request_id=f"allreduce_r{rank}_c{cube}p{pe}",
|
||
kernel_ref=KernelRef(name=plan.algorithm_name, kind="builtin"),
|
||
args=(_tensor_arg_for_pe(tensor_view, cube, pe), *kargs),
|
||
target_sips=(plan.sip,),
|
||
target_cubes=(cube,),
|
||
target_pe=pe,
|
||
))
|
||
out.append(h)
|
||
return out
|
||
|
||
self._barrier.submit_and_drain(self.ctx, rank, _submit)
|
||
```
|
||
|
||
### D4. Algorithm ABI — 얇게 + 명시적 arg 계약
|
||
|
||
각 알고리즘 모듈은 **kernel + kernel_args만 필수**.
|
||
|
||
```python
|
||
# src/kernbench/ccl/algorithms/ring_allreduce.py
|
||
def kernel(t_ptr, n_elem, world_size, tl):
|
||
"""PE-side kernel code.
|
||
|
||
Signature convention: first positional arg is the tensor pointer
|
||
(per-PE slice), subsequent positional args are whatever
|
||
kernel_args() returns. `tl` is injected by the TLContext runtime.
|
||
"""
|
||
|
||
def kernel_args(*, n_elem: int, world_size: int, **kw) -> tuple:
|
||
"""Return the tuple of non-tensor positional args.
|
||
|
||
Signature contract:
|
||
- Called keyword-only with n_elem and world_size plus kernel_config.
|
||
- Returns a tuple (possibly empty) of scalar / metadata args.
|
||
- The backend constructs the final KernelLaunchMsg.args as:
|
||
(per_pe_tensor_arg, *kernel_args(...))
|
||
where per_pe_tensor_arg is a TensorArg containing only the shards
|
||
local to the receiving PE (derived from tensor_view).
|
||
"""
|
||
return (n_elem, world_size)
|
||
```
|
||
|
||
**Arg assembly in backend (reference)**:
|
||
|
||
```python
|
||
# AhbmCCLBackend.all_reduce (D3에서 발췌)
|
||
kargs = mod.kernel_args(n_elem=n_elem, world_size=plan.world_size,
|
||
**plan.kernel_config)
|
||
for (cube, pe) in plan.participating_pes:
|
||
pe_tensor_arg = _tensor_arg_for_pe(tensor_view, cube, pe)
|
||
self.ctx.submit(KernelLaunchMsg(
|
||
args=(pe_tensor_arg, *kargs), # tensor first, then kernel_args return
|
||
target_sips=(plan.sip,),
|
||
target_cubes=(cube,),
|
||
target_pe=pe,
|
||
...
|
||
))
|
||
```
|
||
|
||
**ccl.yaml**에서 선언적 metadata:
|
||
|
||
```yaml
|
||
algorithms:
|
||
ring_allreduce_tcm:
|
||
module: kernbench.ccl.algorithms.ring_allreduce
|
||
topology: ring_1d # kernbench/ccl/topologies.py
|
||
mapper: leader_only # kernbench/ccl/mappers.py (신규)
|
||
validator: single_shard_per_rank # kernbench/ccl/validators.py (신규)
|
||
buffer_kind: tcm
|
||
n_elem: 8
|
||
```
|
||
|
||
- `topology` (필수)
|
||
- `mapper` (선택, default `"leader_only"`)
|
||
- `validator` (선택)
|
||
|
||
알고리즘 모듈 자체에는 mapper/validator/participating_pes/neighbor
|
||
생성기가 **들어가지 않음**.
|
||
|
||
### D5. Mapper + validator — registry key **또는** import path
|
||
|
||
Host-side framework가 built-in registry 제공. 커스텀 확장은 dot-import path.
|
||
|
||
```python
|
||
# src/kernbench/ccl/mappers.py (new)
|
||
Mapper = Callable[[dict, int], list[tuple[int, int]]]
|
||
|
||
def leader_only(spec, rank):
|
||
"""Single leader PE per SIP. Ring/tree/mesh용."""
|
||
return [(0, 0)]
|
||
|
||
def all_pes(spec, rank):
|
||
"""Every PE in the SIP. 알고리즘이 intra-SIP 전체 PE를 참여시킬 때 사용
|
||
(e.g. intra-SIP reduction, intra-SIP broadcast, hierarchical collective
|
||
의 낮은 레벨 등)."""
|
||
cm = spec["sip"]["cube_mesh"]
|
||
pl = spec["cube"]["pe_layout"]
|
||
n_cubes = cm["w"] * cm["h"]
|
||
n_pes = pl["pe_per_corner"] * len(pl["corners"])
|
||
return [(c, p) for c in range(n_cubes) for p in range(n_pes)]
|
||
|
||
MAPPER_REGISTRY = {"leader_only": leader_only, "all_pes": all_pes}
|
||
|
||
def resolve_mapper(key_or_path: str) -> Mapper:
|
||
if key_or_path in MAPPER_REGISTRY:
|
||
return MAPPER_REGISTRY[key_or_path]
|
||
if "." in key_or_path:
|
||
import importlib
|
||
mod_path, fn_name = key_or_path.rsplit(".", 1)
|
||
return getattr(importlib.import_module(mod_path), fn_name)
|
||
raise ValueError(f"unknown mapper: {key_or_path!r}")
|
||
```
|
||
|
||
Validator도 동일 패턴 (`src/kernbench/ccl/validators.py`). 입력은 **global
|
||
TensorHandle** (D8 참고).
|
||
|
||
### D6. Host-side install plan builder
|
||
|
||
```python
|
||
# src/kernbench/ccl/install_plan.py (new; 기존 install.py의 재구성)
|
||
from dataclasses import dataclass
|
||
from typing import Any, Mapping
|
||
|
||
@dataclass(frozen=True)
|
||
class NeighborTableEntry:
|
||
direction: str
|
||
peer_direction: str # ADR-0025
|
||
peer_sip: int
|
||
peer_cube: int
|
||
peer_pe: int
|
||
rx_base_pa: int
|
||
# ... 기타 IPCQ 설정 ...
|
||
|
||
@dataclass(frozen=True)
|
||
class PeInstallSpec:
|
||
cube: int
|
||
pe: int
|
||
neighbors: tuple[NeighborTableEntry, ...]
|
||
|
||
@dataclass(frozen=True)
|
||
class SipInstallPlan:
|
||
algorithm_name: str # human-readable ("ring_allreduce_tcm")
|
||
sip: int
|
||
rank: int
|
||
world_size: int
|
||
pe_installs: tuple[PeInstallSpec, ...] # per-PE neighbor tables
|
||
buffer_kind: str
|
||
n_slots: int
|
||
slot_size: int
|
||
kernel_module: str
|
||
participating_pes: tuple[tuple[int, int], ...]
|
||
kernel_config: Mapping[str, Any]
|
||
|
||
|
||
def build_install_plans(
|
||
world_size: int,
|
||
algorithm: str,
|
||
algorithm_config: dict,
|
||
spec: dict,
|
||
) -> list[SipInstallPlan]:
|
||
"""Compose topology + mapper + algorithm into per-SIP plan list."""
|
||
topo_fn = _resolve_topology(algorithm_config["topology"])
|
||
mapper = resolve_mapper(algorithm_config.get("mapper", "leader_only"))
|
||
|
||
# kernel_config: launch 시 kernel_args에 전달할 algorithm-specific params
|
||
kernel_config = {
|
||
k: v for k, v in algorithm_config.items()
|
||
if k in {"n_elem", "reduce_op", "chunk_size"} or k.startswith("kernel_")
|
||
}
|
||
|
||
plans = []
|
||
for rank in range(world_size):
|
||
sip = rank # identity mapping (non-identity는 open question)
|
||
pes = mapper(spec, rank)
|
||
pe_installs = _build_pe_installs(
|
||
rank=rank, world_size=world_size, sip=sip,
|
||
pes=pes, topo_fn=topo_fn, algorithm_config=algorithm_config, spec=spec,
|
||
)
|
||
plans.append(SipInstallPlan(
|
||
algorithm_name=algorithm,
|
||
sip=sip, rank=rank, world_size=world_size,
|
||
pe_installs=pe_installs,
|
||
buffer_kind=algorithm_config["buffer_kind"],
|
||
n_slots=algorithm_config["n_slots"],
|
||
slot_size=algorithm_config["slot_size"],
|
||
kernel_module=algorithm_config["module"],
|
||
participating_pes=tuple(pes),
|
||
kernel_config=kernel_config,
|
||
))
|
||
return plans
|
||
```
|
||
|
||
`_build_pe_installs`는 기존 `ccl/install.py`의 neighbor 계산 로직을 재활용
|
||
(ADR-0025의 `reverse_direction` 개선 반영).
|
||
|
||
**Multi-PE 매퍼와 neighbor 생성 책임**: mapper가 SIP 내 여러 PE를 반환하는
|
||
경우 (`all_pes` 등), PE-level neighbor 그래프는 `_build_pe_installs` 내부에
|
||
형성된다. 즉 topology 모듈은 rank-level 관계만 제공하고, PE-level 연결은
|
||
builder에서 풀어낸다. 복잡한 multi-level 패턴을 쓰는 알고리즘은 이 책임
|
||
분산이 관리 부담이 될 수 있음 — 관련 논의는 ADR-0029 참고.
|
||
|
||
### D7. Epoch-based collective barrier
|
||
|
||
Cross-rank submit 동기화. 각 collective 호출은 독립 epoch. 같은 rank의
|
||
중복 join은 즉시 에러.
|
||
|
||
```python
|
||
# src/kernbench/runtime_api/distributed.py
|
||
@dataclass
|
||
class _EpochState:
|
||
participants: set[int] = field(default_factory=set)
|
||
pending: list = field(default_factory=list)
|
||
drained: bool = False
|
||
returned: int = 0
|
||
|
||
|
||
class _CollectiveBarrier:
|
||
"""Epoch-based barrier.
|
||
|
||
Contract:
|
||
- Each call joins the earliest non-drained epoch.
|
||
- Each rank may join a given epoch at most once. Duplicate join raises.
|
||
- Last arriver (participants == world_size) performs drain and advances
|
||
_next_epoch. Earlier arrivers yield and re-check drained on resume.
|
||
- Epoch state is GC'd when returned == world_size (success path).
|
||
- On failure paths, residual state is acceptable; reset() clears it.
|
||
"""
|
||
|
||
def __init__(self, world_size: int):
|
||
self._world_size = world_size
|
||
self._next_epoch = 0
|
||
self._state: dict[int, _EpochState] = {}
|
||
|
||
def submit_and_drain(self, ctx, rank: int, submit_fn) -> None:
|
||
epoch = self._next_epoch
|
||
state = self._state.setdefault(epoch, _EpochState())
|
||
|
||
if rank in state.participants:
|
||
raise RuntimeError(
|
||
f"rank {rank} attempted duplicate join to epoch {epoch}"
|
||
)
|
||
state.participants.add(rank)
|
||
|
||
handles = submit_fn()
|
||
state.pending.extend(handles)
|
||
|
||
is_last = len(state.participants) >= self._world_size
|
||
|
||
if is_last:
|
||
for h in state.pending:
|
||
ctx.wait(h)
|
||
state.drained = True
|
||
self._next_epoch = epoch + 1
|
||
else:
|
||
from greenlet import getcurrent
|
||
g = getcurrent()
|
||
if g.parent is None:
|
||
raise RuntimeError("barrier requires a bound worker greenlet")
|
||
while not state.drained:
|
||
g.parent.switch()
|
||
|
||
state.returned += 1
|
||
if state.returned >= self._world_size:
|
||
self._state.pop(epoch, None)
|
||
|
||
def reset(self) -> None:
|
||
"""Explicit cleanup on spawn exception unwinding."""
|
||
self._state.clear()
|
||
self._next_epoch = 0
|
||
```
|
||
|
||
### D8. Per-rank tensor view + validator contract
|
||
|
||
**Validator** (host-side, pre-slice, global handle 기준):
|
||
|
||
```python
|
||
# src/kernbench/ccl/validators.py
|
||
Validator = Callable[[TensorHandle, int, dict], None]
|
||
|
||
def single_shard_per_rank(handle, world_size, spec):
|
||
"""Ring 계열: 정확히 world_size개 shard, SIP당 1개."""
|
||
if len(handle.shards) != world_size:
|
||
raise ValueError(...)
|
||
per_sip = {}
|
||
for s in handle.shards:
|
||
per_sip[s.sip] = per_sip.get(s.sip, 0) + 1
|
||
if any(c != 1 for c in per_sip.values()):
|
||
raise ValueError(...)
|
||
|
||
def multi_pe_sip_local(handle, world_size, spec):
|
||
"""Multi-PE per SIP layout: 각 SIP에 intra-SIP PE 수만큼 shard 존재.
|
||
Intra-SIP 전체 PE를 참여시키는 알고리즘이 사용."""
|
||
cm = spec["sip"]["cube_mesh"]
|
||
pl = spec["cube"]["pe_layout"]
|
||
per_sip = cm["w"] * cm["h"] * pl["pe_per_corner"] * len(pl["corners"])
|
||
if len(handle.shards) != world_size * per_sip:
|
||
raise ValueError(...)
|
||
|
||
VALIDATOR_REGISTRY = {...}
|
||
def resolve_validator(key_or_path): ...
|
||
```
|
||
|
||
Validator는 world 전체의 shard layout 불변량을 본다. Per-rank view는
|
||
backend가 validator 호출 **후** `_tensor_slice_for_sip`로 생성.
|
||
|
||
**Per-rank tensor view** — SIP-local slice:
|
||
|
||
```python
|
||
def _tensor_slice_for_sip(handle, sip) -> TensorArg:
|
||
sip_shards = [s for s in handle.shards if s.sip == sip]
|
||
if not sip_shards:
|
||
raise RuntimeError(f"tensor has no shards on SIP {sip}")
|
||
# Deterministic ordering contract: (cube, pe, offset_bytes) ascending.
|
||
# Multi-PE mappers (hierarchical 등) rely on this ordering to align
|
||
# per-PE tensor arg construction with participating_pes enumeration.
|
||
sip_shards.sort(key=lambda s: (s.cube, s.pe, s.offset_bytes))
|
||
min_offset = min(s.offset_bytes for s in sip_shards)
|
||
local_va_base = handle.va_base + min_offset if handle.va_base else 0
|
||
return TensorArg(
|
||
shards=tuple(TensorArgShard(...) for s in sip_shards),
|
||
va_base=local_va_base,
|
||
)
|
||
```
|
||
|
||
**Ordering invariant**: slice의 shard는 `(cube, pe, offset_bytes)` 오름차순.
|
||
Backend가 `participating_pes`를 iterate하며 `_tensor_arg_for_pe(view, cube, pe)`를
|
||
구성할 때, 결정론적 ordering을 전제할 수 있다. 특히 `all_pes` mapper +
|
||
hierarchical 알고리즘이 per-PE slice 조합을 순서 의존적으로 해석하는 경우에
|
||
중요.
|
||
|
||
### D9. Greenlet-local rank registry (+ debug warning)
|
||
|
||
```python
|
||
class DistributedContext:
|
||
def __init__(self):
|
||
self._backend = None
|
||
self._rank_by_greenlet: dict = {}
|
||
|
||
def _bind_rank(self, g, rank: int) -> None:
|
||
self._rank_by_greenlet[g] = int(rank)
|
||
|
||
def get_rank(self) -> int:
|
||
self._ensure_initialized()
|
||
from greenlet import getcurrent
|
||
g = getcurrent()
|
||
if g not in self._rank_by_greenlet:
|
||
if os.environ.get("KERNBENCH_DEBUG"):
|
||
warnings.warn(
|
||
"get_rank() called outside a bound greenlet — returning 0. "
|
||
"Likely a bug unless running single-driver."
|
||
)
|
||
return 0
|
||
return int(self._rank_by_greenlet[g])
|
||
```
|
||
|
||
### D10. `torch.ahbm.set_device(rank)` — SIP 바인딩
|
||
|
||
KernBench 백엔드 이름은 `ahbm` (ADR-0023 D10). Real PyTorch는
|
||
`torch.cuda.set_device(r)`이지만 우리는 CUDA가 아니므로 honestly-named
|
||
namespace를 사용한다.
|
||
|
||
```python
|
||
class _AhbmNamespace:
|
||
"""torch.ahbm — per-greenlet SIP device binding.
|
||
|
||
Real-PyTorch parity idiom: ``torch.cuda.set_device(rank)``. Since
|
||
KernBench's backend is 'ahbm' (not CUDA), we expose the equivalent
|
||
API under ``torch.ahbm`` to avoid pretending to be a CUDA runtime.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._device_by_greenlet: dict = {}
|
||
|
||
def set_device(self, device: int) -> None:
|
||
from greenlet import getcurrent
|
||
self._device_by_greenlet[getcurrent()] = int(device)
|
||
|
||
def current_device(self) -> int | None:
|
||
from greenlet import getcurrent
|
||
return self._device_by_greenlet.get(getcurrent())
|
||
|
||
# Attached to RuntimeContext as `self.ahbm = _AhbmNamespace()`.
|
||
# Bench code: `torch.ahbm.set_device(rank)` mirrors `torch.cuda.set_device`.
|
||
```
|
||
|
||
**PyTorch 2.x style 병행 지원**: 최신 PyTorch는 device-agnostic한
|
||
`torch.accelerator` 네임스페이스를 지향 (`torch.accelerator.set_device_index(r)`,
|
||
`torch.accelerator.current_device_index()`). Device vendor에 종속되지 않는
|
||
코드를 쓰려는 사용자를 위해 KernBench도 이 표면을 병행 지원한다.
|
||
|
||
```python
|
||
class _AcceleratorNamespace:
|
||
"""torch.accelerator — device-agnostic API (PyTorch 2.x style).
|
||
|
||
Aliases torch.ahbm for bench code that prefers device-neutral idiom:
|
||
torch.accelerator.set_device_index(rank)
|
||
torch.accelerator.current_device_index()
|
||
"""
|
||
|
||
def __init__(self, ahbm: _AhbmNamespace):
|
||
self._ahbm = ahbm
|
||
|
||
def set_device_index(self, device: int) -> None:
|
||
self._ahbm.set_device(device)
|
||
|
||
def current_device_index(self) -> int | None:
|
||
return self._ahbm.current_device()
|
||
|
||
# RuntimeContext
|
||
self.ahbm = _AhbmNamespace()
|
||
self.accelerator = _AcceleratorNamespace(self.ahbm) # alias
|
||
```
|
||
|
||
Bench 작성자는 다음 중 하나를 선택 — 둘 다 내부적으로 같은 레지스트리를 보유:
|
||
|
||
```python
|
||
torch.ahbm.set_device(rank) # KernBench-native, explicit backend
|
||
torch.accelerator.set_device_index(rank) # PyTorch 2.x device-agnostic
|
||
```
|
||
|
||
### D11. Tensor placement = structural (sip, cube, pe) 좌표
|
||
|
||
`resolve_dp_policy`가 `target_sip`을 직접 받아 구조적 좌표로 placement 생성.
|
||
세부는 ADR-0026.
|
||
|
||
```python
|
||
# RuntimeContext._create_tensor
|
||
current_sip = self.ahbm.current_device() # (D10 naming)
|
||
if current_sip is None:
|
||
current_sip = 0 # single-driver fallback (D9와 일관)
|
||
placement = resolve_dp_policy(
|
||
dp, shape=shape_2d, itemsize=itemsize,
|
||
num_pe=eff_num_pe, num_cubes=eff_num_cubes,
|
||
target_sip=current_sip,
|
||
)
|
||
```
|
||
|
||
Post-hoc `pe_index` shifting 제거 — ShardSpec이 `(sip, cube, pe)` 구조적
|
||
좌표 보유.
|
||
|
||
### D12. `torch.multiprocessing.spawn`-compat surface
|
||
|
||
Bench 작성자 표면은 real PyTorch `mp.spawn`과 동일:
|
||
|
||
```python
|
||
# src/kernbench/runtime_api/multiprocessing.py (new)
|
||
def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method="spawn"):
|
||
"""Drop-in for torch.multiprocessing.spawn.
|
||
Internal: greenlet fan-out + epoch-barrier sync + exception propagation.
|
||
"""
|
||
...
|
||
|
||
# torch namespace에 부착
|
||
torch.multiprocessing = SimpleNamespace(spawn=spawn)
|
||
```
|
||
|
||
Bench:
|
||
|
||
```python
|
||
import torch.multiprocessing as mp
|
||
mp.spawn(worker, nprocs=world_size, args=(world_size, torch))
|
||
```
|
||
|
||
### D13. Scheduler + exception handling
|
||
|
||
```python
|
||
def spawn(fn, args, nprocs, ...):
|
||
dist = torch.distributed
|
||
gs: list[greenlet] = []
|
||
errors: dict[int, Exception] = {}
|
||
|
||
for rank in range(nprocs):
|
||
def _entry(r=rank):
|
||
try:
|
||
fn(r, *args)
|
||
except Exception as e:
|
||
errors[r] = e
|
||
raise
|
||
g = greenlet(_entry)
|
||
dist._bind_rank(g, rank)
|
||
gs.append(g)
|
||
|
||
try:
|
||
while True:
|
||
alive = [g for g in gs if not g.dead]
|
||
if not alive:
|
||
break
|
||
for g in alive:
|
||
if not g.dead:
|
||
g.switch()
|
||
except Exception as outer:
|
||
for other in gs:
|
||
if not other.dead:
|
||
try:
|
||
other.throw(SystemExit)
|
||
except Exception:
|
||
pass
|
||
# Epoch barrier state 명시적 cleanup
|
||
backend = getattr(dist, "_backend", None)
|
||
if backend is not None and hasattr(backend, "_barrier"):
|
||
backend._barrier.reset()
|
||
raise SpawnException(errors) from outer
|
||
```
|
||
|
||
**Scheduler contract**:
|
||
- Deterministic round-robin over insertion order (rank 0, 1, ..., N-1).
|
||
- 동기화 지점은 epoch barrier (D7)만. Scheduler 순서에 의존하는 correctness 없음.
|
||
- 예외 발생 시 다른 greenlet 강제 종료 + `SpawnException` 전파.
|
||
|
||
**Starvation guideline**:
|
||
- 일반적으로 collective barrier가 workers를 동기화. 큰 편차 없음.
|
||
- 극단적 non-collective 루프 대비 cooperative yield 제공:
|
||
`torch.distributed.cooperative_yield()`.
|
||
|
||
### D14. Backward compatibility
|
||
|
||
1. **Single-driver 호출**: `get_rank()` 0 반환 (D9).
|
||
2. **`ccl.yaml` world_size override**: D1 fallback 우회 — legacy "rank = PE"
|
||
테스트 경로로 사용 가능.
|
||
3. **`DPPolicy.sip="column_wise"` 명시**: ADR-0026 scope.
|
||
4. **`install_ipcq()` compatibility wrapper**:
|
||
|
||
기존 `ccl/install.py`의 `install_ipcq()` API는 곧바로 제거하지 않는다.
|
||
Thin compatibility wrapper로 남겨 기존 직접 호출자가 점진적으로 migration할
|
||
수 있게 한다.
|
||
|
||
```python
|
||
# src/kernbench/ccl/install.py (after this ADR)
|
||
def install_ipcq(engine, spec, merged, *, algo_module=None, rank_to_pe=None):
|
||
"""DEPRECATED: legacy host-side PE installer.
|
||
|
||
Internally delegates to build_install_plans + engine-routed IpcqInitMsg.
|
||
Use dist.init_process_group() instead.
|
||
"""
|
||
from kernbench.ccl.install_plan import build_install_plans
|
||
import warnings
|
||
warnings.warn(
|
||
"install_ipcq() is deprecated; use dist.init_process_group()",
|
||
DeprecationWarning, stacklevel=2,
|
||
)
|
||
plans = build_install_plans(
|
||
world_size=merged.get("world_size", 1),
|
||
algorithm=merged["algorithm"],
|
||
algorithm_config=merged,
|
||
spec=spec,
|
||
)
|
||
handles = []
|
||
for plan in plans:
|
||
for pe_install in plan.pe_installs:
|
||
h = engine.submit(IpcqInitMsg(
|
||
target_sips=(plan.sip,),
|
||
target_cubes=(pe_install.cube,),
|
||
target_pe=pe_install.pe,
|
||
entries=pe_install.neighbors,
|
||
buffer_kind=plan.buffer_kind,
|
||
n_slots=plan.n_slots,
|
||
slot_size=plan.slot_size,
|
||
))
|
||
handles.append(h)
|
||
for h in handles:
|
||
engine.wait(h)
|
||
return {"world_size": merged.get("world_size", 1), "plans": plans}
|
||
```
|
||
|
||
Migration 스케줄:
|
||
- Phase 1: wrapper로 유지 + DeprecationWarning
|
||
- Phase 2: 직접 호출자 grep-audit → 각각 `dist.init_process_group()` 또는
|
||
`build_install_plans()` 직접 사용으로 이관
|
||
- Phase 3: wrapper 제거 (별도 cleanup ADR 또는 PR)
|
||
|
||
---
|
||
|
||
## Dependencies
|
||
|
||
- **ADR-0023** (IPCQ): `IpcqInitMsg` 메시지 타입과 PE_IPCQ 핸들링을 그대로
|
||
활용. Engine-routed submit으로 전환하는 것이 유일한 변경.
|
||
- **ADR-0025** (IPCQ direction fix): `_build_pe_installs`의 neighbor 계산이
|
||
2-rank ring 등에서 정확히 동작하려면 필요.
|
||
- **ADR-0003 / 0016** (IO_CPU): IO_CPU는 기존 transit 역할 그대로. 본 ADR에서
|
||
IO_CPU 역할 변경 없음.
|
||
|
||
---
|
||
|
||
## Non-goals
|
||
|
||
- **IPCQ protocol 수정**: ADR-0023 유지.
|
||
- **DPPolicy 필드 정리**: ADR-0026.
|
||
- **Megatron-style TP**: ADR-0027.
|
||
- **Multi-node (프로세스 간)**: 단일 프로세스.
|
||
- **IO_CPU SIP control-plane 단일 endpoint 원칙 채택**: 본 ADR 범위 밖. 현재
|
||
KernBench에 이 원칙이 없고, 도입은 별도 ADR.
|
||
- **Hierarchical all-reduce 알고리즘 설계**: ADR-0029. 본 ADR은 그 알고리즘이
|
||
쓸 framework 인프라 (`all_pes` mapper, `multi_pe_sip_local` validator,
|
||
registry 확장점)만 제공.
|
||
|
||
---
|
||
|
||
## Open questions
|
||
|
||
### 🟡 Nice-to-have — scope 경계 관련
|
||
|
||
- **Install timing 허용치**: SimPy 시간 상 install이 몇 ns~us 소모. 기존
|
||
sideband는 0ns. 기존 테스트가 t=0 시작을 전제로 하는지 확인 (audit 결과에
|
||
따라 테스트 교정 필요).
|
||
|
||
- **`IpcqInitMsg` 배치 가능성**: MmuMapMsg처럼 `target_pe="all"` 브로드캐스트
|
||
는 IPCQ에서는 부적합 (PE마다 neighbor가 다름). 현재는 per-PE 개별 submit.
|
||
Per-PE payload를 담는 batched IpcqInitMsg 타입은 future optimization.
|
||
|
||
- **`_rank_to_sip` 매핑**: 현재 identity. Non-trivial mapping 요구 시 별도.
|
||
|
||
- **Cooperative yield API 위치**: `torch.distributed.cooperative_yield()`로
|
||
노출 예정. 실제 필요성은 Phase 2 이후 벤치 추가 시 판단.
|
||
|
||
(PE-level topology 일원화 관련 중장기 방향은 **ADR-0029** 참고 — 복잡한
|
||
multi-level 알고리즘이 driving force가 되는 framework 진화 방향.)
|
||
|
||
---
|
||
|
||
## Consequences
|
||
|
||
### Positive
|
||
|
||
- **새 message 타입 0개**: 기존 `IpcqInitMsg` + `KernelLaunchMsg`만으로 구현.
|
||
- **IO_CPU / engine 변경 없음**: 기존 routing 그대로.
|
||
- **Sideband install convention 제거**: MmuMapMsg 등과 동일 패턴으로 일원화.
|
||
- **Plan state stale 문제 소멸**: Plan은 host 단일 소유.
|
||
- **Bench = real PyTorch DDP** (공개 API 관점).
|
||
- **Algorithm ABI 경량**: `kernel` + `kernel_args`만 필수.
|
||
- **Epoch-based barrier**: interleaved collective 안전.
|
||
- **Control/data plane 분리**: data plane(PE_IPCQ)은 ADR-0023 유지, control
|
||
plane은 host-driven.
|
||
- 장기 확장성: Megatron TP, DTensor 기반.
|
||
|
||
### Negative
|
||
|
||
- 신규 모듈: `install_plan.py`, `mappers.py`, `validators.py`,
|
||
`multiprocessing.py`.
|
||
- Engine이 `IpcqInitMsg`를 엔진-path로 라우팅할 수 있는지 구현 시 확인 필요
|
||
(minor hook 가능성).
|
||
- Install이 SimPy 시간을 소모 (positive로도 볼 수 있으나, 기존 sideband 시점
|
||
0ns 전제인 테스트가 있으면 교정 필요).
|
||
|
||
### Neutral
|
||
|
||
- IPCQ PE-level protocol (ADR-0023) 불변.
|
||
- `DPPolicy` 필드 변경은 ADR-0026.
|
||
- IO_CPU 역할 불변 (기존 transit 그대로).
|