diff --git a/docs/adr/ADR-0026-dppolicy-intra-device.md b/docs/adr/ADR-0026-dppolicy-intra-device.md index 0b93535..964e202 100644 --- a/docs/adr/ADR-0026-dppolicy-intra-device.md +++ b/docs/adr/ADR-0026-dppolicy-intra-device.md @@ -69,9 +69,9 @@ class DPPolicy: class DPPolicy: """Intra-device (cube × PE) data-parallel policy. - SIP-level placement is controlled by ``torch.cuda.set_device(rank)`` - (ADR-0024) and, for model-level TP, by Megatron-style parallel layers - (ADR-0027). DPPolicy does not cross SIP boundaries. + SIP-level placement is controlled by ``torch.ahbm.set_device(rank)`` + (ADR-0024 D10) and, for model-level TP, by Megatron-style parallel + layers (ADR-0027). DPPolicy does not cross SIP boundaries. """ cube: Literal["replicate", "column_wise", "row_wise"] = "replicate" pe: Literal["replicate", "column_wise", "row_wise"] = "replicate" diff --git a/docs/adr/ADR-0027-megatron-tp.md b/docs/adr/ADR-0027-megatron-tp.md index a6c8ab3..4c28d53 100644 --- a/docs/adr/ADR-0027-megatron-tp.md +++ b/docs/adr/ADR-0027-megatron-tp.md @@ -2,7 +2,9 @@ ## Status -Proposed +Proposed (Revision 7 — resume invariant / main-context wait 비재귀 invariant / +global barrier over-serialization tradeoff / TP forward yield-safety 명시, +2026-04-14) ## Context @@ -11,75 +13,564 @@ Proposed SIP 간 tensor parallelism(TP)을 **Megatron-LM 스타일의 명시적 parallel layer** API로 지원한다. DTensor 같은 선언적 추상화는 별도 ADR(0028) future work. -Megatron-style을 선택한 이유 (사용자 지시): -- TP는 일반적으로 **model의 특정 layer 경계**에서 발생. 명시적 primitive가 - mental model에 자연스러움. -- NVIDIA Megatron / DeepSpeed가 확립한 인더스트리 표준 패턴. -- DTensor는 선언적이라 **디자인 공간이 더 크다** → 단계적으로 접근. +Megatron-style을 선택한 이유: +- TP는 model의 특정 layer 경계에서 발생. 명시적 primitive가 mental model에 + 자연스러움. +- NVIDIA Megatron / DeepSpeed가 확립한 인더스트리 표준. +- DTensor는 선언적이라 디자인 공간이 더 크다 → 단계적. ### 현재 상태 -- KernBench는 TP가 없음. 기존 `DPPolicy.sip="column_wise"` 경로가 "SIP 간 - column sharding"을 흉내 냈으나 DP와 TP가 섞인 상태 (ADR-0026에서 정리). -- ADR-0024가 launcher 인프라 (rank = SIP, `set_device`, greenlet-local) 제공. -- 이 인프라 위에 TP primitive를 얹는다. +- KernBench는 TP가 없음. 기존 `DPPolicy.sip="column_wise"` 경로는 ADR-0026에서 + 제거됨. rank = SIP launcher (ADR-0024) 위에 TP primitive를 얹는다. +- ADR-0024 Phase B에서 **worker-greenlet env.run 재진입 버그**가 드러남: + worker가 `ctx.wait(h)` (tensor 생성 시 MmuMapMsg 등)를 호출하면 `env.run`이 + worker 컨텍스트에서 돌고, 이때 spawn되는 kernel greenlet의 `_parent`가 + worker가 되어 orphan 발생. `ring_default_ws` strict xfail의 근본 원인. +- `dist.all_reduce`는 이미 `_defer_wait=True` + worker yield 패턴으로 이 문제를 + 피함 ([distributed.py:119-134](src/kernbench/runtime_api/distributed.py#L119-L134)). +- TP layer의 forward는 매번 `torch.launch("gemm", ...)`를 호출하고, 그 뒤에 + `dist.all_reduce`가 따라오는 패턴이 반복됨. worker-wait 문제를 **반드시** + 해결하지 않으면 TP 샘플이 첫 실행에서 실패. ### TP primitive 스펙 (Megatron-LM 참조) -- **ColumnParallelLinear**: weight의 **column** 축을 TP ranks에 분산. 입력 - full-replicated, 출력 column-sharded. 후속에 row-parallel이 올 때 all-reduce - 없음. -- **RowParallelLinear**: weight의 **row** 축을 TP ranks에 분산. 입력이 이미 - column-sharded (ColumnParallel의 출력). forward 끝에 **all-reduce** 필요. -- **VocabParallelEmbedding**: embedding을 vocab 축에 분산. forward 끝에 all-reduce. +- **ColumnParallelLinear**: weight의 **column(out_features)** 축을 TP ranks에 + 분산. 입력 full-replicated, 출력 column-sharded. 후속 RowParallelLinear가 + 올 때 forward all-reduce 없음. +- **RowParallelLinear**: weight의 **row(in_features)** 축을 TP ranks에 분산. + 입력이 이미 column-sharded (ColumnParallel의 출력). forward 끝에 + **all-reduce** 필요. +- **VocabParallelEmbedding**: embedding을 vocab 축에 분산. forward 끝에 + all-reduce. (초기 scope에서는 stub, 실제 구현은 all-gather kernel 선행 필요.) - **`copy_to_tp_region`**, **`reduce_from_tp_region`**, **`scatter_to_tp_region`**, - **`gather_from_tp_region`** — 기본 primitive (`identity` forward, `all-reduce` - backward 등). + **`gather_from_tp_region`** — 기본 primitive. ### 풀어야 할 문제 -1. **Per-rank weight 분산 표현**: 각 worker(rank)가 weight tensor의 자기 - slice를 소유. ADR-0024의 `set_device(rank)` + ADR-0026의 intra-device - DPPolicy로 자연스러운 표현. +1. **Worker-wait 일반화 (D0)**: `dist.all_reduce`의 defer/yield/drain 패턴을 + 모든 `ctx.wait` 경로로 확장. **이 ADR의 가장 큰 아키텍처 결정**. -2. **Forward / backward activation 흐름**: 현재 KernBench는 backward가 없음 - (simulation 목적). 본 ADR은 **forward만** 우선 지원. Training simulation이 - 추가되면 확장. +2. **런처 API 정규화 (D1)**: 현 bench들이 hand-rolled greenlet loop을 사용. + `torch.multiprocessing.spawn(fn, args, nprocs)`로 흡수해 real-PyTorch API 면 + 유지 + D0의 scheduler drain을 단일 구현 위치에 집중. -3. **Collective 호출 지점**: RowParallelLinear가 forward 끝에 `all_reduce`를 - 호출. ADR-0024의 multi-greenlet 구조에서 자연스럽게 동작 (각 rank가 동시에 - 호출). +3. **Per-rank weight 분산 표현**: 각 worker가 weight tensor의 자기 slice를 + 소유. ADR-0024의 `set_device(rank)` + ADR-0026의 intra-device DPPolicy로 + 자연스럽게 표현. -4. **TP group 개념**: Megatron은 일반적으로 data_parallel × tensor_parallel × - pipeline_parallel group을 교차 사용. 초기 scope는 **TP group = 전체 SIP**로 - 단순화. Mixed DP+TP는 future. +4. **Forward-only scope**: 현재 KernBench는 backward가 없음 (simulation 목적). + 본 ADR은 **forward만** 우선 지원. Training simulation은 별도 ADR. + +5. **Collective 호출 지점**: RowParallelLinear가 forward 끝에 `all_reduce` 호출. + ADR-0024의 multi-greenlet 구조 + D0 generalization에서 자연스럽게 동작. + +6. **TP group 개념**: Megatron은 DP × TP × PP group을 교차 사용. 초기 scope는 + **TP group = 전체 SIP** 단순화. Mixed DP+TP는 future. --- ## Decision -### D1. 새 패키지 `kernbench.tp` +### D0. Worker-wait 일반화 — `ctx.wait`가 worker 컨텍스트면 main으로 defer + +**문제 재확인**. `kernel_runner.run`은 spawn 시점의 `greenlet.getcurrent()`를 +kernel greenlet의 `_parent`로 캡처한다 +([kernel_runner.py:94](src/kernbench/triton_emu/kernel_runner.py#L94)). +main 컨텍스트에서 `env.run`이 돌면 parent=main이라 safe. worker 컨텍스트에서 +`env.run`이 돌면 parent=worker가 되고, worker가 yield/finish하는 순간 kernel +greenlet은 orphan → `GreenletExit` → ADR-0024 Phase B의 `ring_default_ws` 실패. + +**해결**. worker greenlet이 `ctx.wait(h)`를 호출하면 직접 `env.run`을 driving +하는 대신 **main scheduler로 yield**. main이 env.run을 drive해 handle이 완료 +되면 worker로 control return. + +#### D0.1 `RuntimeContext` 확장 + +```python +# context.py +@dataclass +class RuntimeContext: + ... + _pending_worker_waits: list[RequestHandle] = field(default_factory=list, init=False) +``` + +#### D0.2 `ctx.wait`의 worker fork + +```python +def wait(self, handle, *, _meta=None): + # Fast-path: already completed — skip enqueue + switch (consistent with + # D0.4-(3) idempotency). Avoids needless worker→main→worker round-trip + # and prevents redundant _pending_worker_waits growth. + if handle in self._completed: + completion, _trace = self.engine.get_completion(handle) + return completion + + from greenlet import getcurrent + g = getcurrent() + if g.parent is not None and not g.parent.dead: + # Worker greenlet: defer to main. Push handle, yield to parent. + # Parent (scheduler loop) drains env.run, then switches back. + self._pending_worker_waits.append(handle) + g.parent.switch() + # On resume: handle must have completed (main drained the list). + # Fall through to the status-quo completion/trace assembly. + + # Main context (or single-driver): drive engine directly. + wait_fn = getattr(self.engine, "wait", None) + if wait_fn is not None: + wait_fn(handle) + completion, trace = self.engine.get_completion(handle) + self._completed.add(handle) + if _meta is not None and trace is not None: + entry = dict(trace) if isinstance(trace, dict) else {"raw": trace} + entry.update(_meta) + self._traces.append(entry) + return completion +``` + +#### D0.3 `ctx.wait`의 worker-context 세만틱 contract (normative) + +본 ADR은 `ctx.wait`의 세만틱을 worker 컨텍스트에서 **명시적으로 변경**한다. + +- **Submit-vs-complete 분리**: `ctx.wait(h)`는 worker에서 호출될 때 "즉시 완료 + 보장"이 아니라 "**다음 scheduler drain 이후** 완료 보장"이다. worker가 + `wait()`에서 return하는 시점 = main이 해당 handle에 대해 `engine.wait`을 + 마친 시점. Main context 호출은 기존대로 즉시-동기 (status quo). +- **Resume invariant (normative)**: worker-deferred `ctx.wait(h)`에서 + `g.parent.switch()`가 return해 worker가 resume되는 시점에는 **반드시 + `h in ctx._completed`가 True여야 한다**. 이 invariant가 깨지면 worker가 + stale 상태에서 이후 단계를 진행하므로 `_drain_pending` / scheduler loop / + `ctx.wait` 어느 부분을 수정하든 이 불변식을 지켜야 한다. T3.b가 이 + invariant를 직접 assert한다. +- **관찰 가능 변화**: worker 안에서 `h = ctx.submit(msg); ctx.wait(h); + read(handle_result)` 패턴은 여전히 성립 — 단 `wait()`와 `read` 사이에는 + 자동으로 main-drain이 삽입되었다는 사실을 세만틱 명세로 포함한다. +- **Host 객체 직접 read는 D0.5 참조**: `ctx.wait` 없이 `tensor.numpy()`를 + 부르는 경우의 계약은 D0.5에서 별도로 규정. + +#### D0.4 Main scheduler drain — 규약 (normative) + +(D1의 `multiprocessing.spawn` 내부 구현. 아래는 세만틱 정의.) + +```python +while alive: + for g in alive: # (1) round-based worker switch + g.switch() + _drain_pending(ctx) # (2) drain in main context +``` + +(`_drain_pending`의 실제 정의는 D0.5 참조 — outer while-loop으로 두 큐가 +모두 빌 때까지 drain.) + +**규약**: + +1. **Round-based cooperative scheduling & yield 의무 (worker contract)**. + `g.switch()`는 해당 worker가 **자발적으로 yield**할 때까지 return하지 않는다 + (cooperative greenlet 세만틱). 따라서: + - Worker가 yield 없이 `while True: do_compute()` 같은 pure-compute loop를 + 돌면 `g.switch()`는 영원히 return하지 않고 **scheduler loop 자체가 hard + block**된다 (다른 worker는 switch 기회를 못 얻음, drain도 안 일어남). 이는 + starvation이 아니라 **scheduler non-progress (deadlock 등가)**이며 본 + ADR이 **unsupported**로 규정한다. + - Worker는 **반드시** `ctx.wait(h)`, `dist.all_reduce`, host-read barrier + (D0.5) 중 하나를 유한 step 내에 호출해야 한다. TP layer의 `forward`는 + 매 layer 끝에서 launch→wait 쌍을 포함하므로 자연스럽게 이 조건을 만족. + CCL kernel도 `dist.all_reduce` 내부에서 yield한다. + - 구현이 이를 **감지**할 필요는 없다 (타임아웃/steps-since-yield 카운터 + 등). 이는 user contract이며 위반 시 증상은 "simulation hang"이다. + - **Future extension**: non-collective 긴 계산 경로가 자주 나오면 + ADR-0024 D13의 `torch.distributed.cooperative_yield()` primitive (명시적 + no-op yield)를 도입할 수 있다. 현 ADR 범위 밖. Breaking change 아님 — + 필요 시 추가하면 됨. + - Round 내에서는 alive worker 전체가 한 번씩 `switch`를 받는다. 단일 round + 안에서 한 worker가 여러 번 wait를 호출해도 그 turn 안에서 순차적으로 + enqueue된 뒤 scheduler drain 한 번에 일괄 처리 (FIFO). + +2. **Drain 순서 = submission 순서 (FIFO)**. `_pending_worker_waits`는 list + append/pop(0)로 엄격한 FIFO. 완료 순서가 아니라 submission 순서로 drain되며, + SimPy scheduler 자체가 인과적으로 올바른 완료 순서를 보장하므로 submission + 순서 drain이 안전하다. `completion order`와 `drain order`는 혼동하지 말 것. + + **Two-queue ordering (worker waits → collectives)**: `_drain_pending`은 + worker wait 큐를 먼저, collective 큐를 나중에 drain한다. 이 순서의 근거: + - **두 큐는 서로 다른 dependency source**: worker wait은 worker가 직접 + `submit + wait` 쌍으로 만들어낸 handle (tensor deploy, MmuMap 등). collective + 큐는 `dist.all_reduce`가 내부적으로 enqueue한 kernel launch handle이며 + worker는 이걸 직접 wait하지 않는다 (ADR-0024 D7). + - **Correctness 관점 독립**: collective는 worker 관점에선 "이미 submit된 + 후 yield한" 상태. 그 완료 타이밍은 worker의 다음 action 시점 이전이기만 + 하면 됨. worker wait 큐와의 순서 dependency 없음. + - **단일 drain barrier 안에서 둘 다 완료**: D0.5의 loop-until-empty 규약에 + 따라 한 barrier invocation에서 worker → collective → (새로 생긴 것이 + 있으면 반복) 순으로 모두 빠짐. worker가 resume될 땐 양쪽 모두 drained. + - **대안 (collective 먼저)도 가능**: 본 ADR은 현 구현 단순성을 위해 worker + 먼저를 고정했을 뿐 의미상 동치. 성능 프로파일 차이가 관찰되면 재조정. + +3. **중복 enqueue — correctness는 idempotent drain, dedup은 non-guaranteed**. + `ctx.wait(h)`는 `h in ctx._completed`면 즉시 return. `_drain_pending`도 + 동일 guard. 같은 handle이 `_pending_worker_waits`에 여러 번 appended + 되더라도 실제 `engine.wait`는 한 번만 호출된다 (idempotent). + - **Correctness**: idempotent drain에 의존 → safe. + - **Memory/성능**: 본 ADR은 `_pending_worker_waits`의 **dedup을 보장하지 + 않는다**. 같은 handle이 N번 enqueue되면 큐에 N개 element가 보관되고 + drain 시 N번 pop + in-set guard가 돈다. 단일 worker가 같은 handle을 + 반복 wait하는 비정상 패턴이 아니면 N은 1~수 수준. + - **Implementation freedom**: 구현은 선택적으로 dedup (예: `set`을 side + index로 두거나 append 전 `h not in pending_set` 검사) 가능. correctness + 를 바꾸지 않는 최적화로 분류. + +4. **Exception propagation + sibling cleanup (ADR-0024 D13 방식 채택)**. + worker greenlet이 raise하면 `g.switch()`가 main으로 예외를 전달한다. + scheduler loop은 즉시 중단되고 다음 cleanup을 **명시적으로** 수행: + + ```python + try: + while True: + alive = [g for g in gs if not g.dead] + if not alive: + break + for g in alive: + if not g.dead: + g.switch() + _drain_pending(ctx) + except Exception as outer: + # (a) 살아남은 sibling worker greenlet 강제 종료. + for other in gs: + if not other.dead: + try: + other.throw(SystemExit) + except Exception: + pass # 사일런트 — 이미 예외 상황 + # (b) Backend barrier / pending 상태 초기화 (장래 epoch barrier 도입 대비). + backend = getattr(ctx.distributed, "_backend", None) + if backend is not None and hasattr(backend, "_barrier"): + backend._barrier.reset() + backend_pending = getattr(backend, "_pending_collective_handles", None) + if backend_pending is not None: + backend_pending.clear() + ctx._pending_worker_waits.clear() + # (c) 원인 예외는 SpawnException으로 래핑. + raise SpawnException(errors) from outer + ``` + + 규약: + - **Sibling abort 보장**: worker 하나가 raise하면 모든 sibling greenlet에 + `SystemExit`을 throw — greenlet은 즉시 terminate된다. greenlet leak 없음. + - **Pending queue 명시적 clear**: worker-wait + collective-pending 두 큐를 + 비움. 재사용 시 오염 방지. + - **`SpawnException(errors)` 래핑**: `errors: dict[int, Exception]`에 각 + rank의 원래 예외를 담는다. real-PyTorch `torch.multiprocessing.spawn`의 + failure 패턴과 호환. + - **Scope 제한**: `errors`에는 **자기 코드로 raise한 rank (root cause)만** + 포함된다. Sibling cleanup 과정에서 `throw(SystemExit)`으로 종료된 rank는 + `errors`에 나타나지 않는다 (SystemExit은 D1.2의 entry 래퍼 `try/except + Exception`에 걸리지 않음 — 의도된 설계: sibling 종료는 실패가 아니라 + cleanup signal). 독자가 "모든 failed rank가 다 들어올 것"으로 기대하지 + 않도록 명시. + - **`ctx._traces`는 예외 이전 시점까지의 partial 상태**. trace completeness + 는 보장되지 않음 (일부 launch/all_reduce가 entry를 남기지 못한 채 종료 + 가능). + - **Allocator / MemoryStore**는 예외 이전 상태 유지 — 재사용은 non-goal, + 새 `RuntimeContext` 생성 권장. + - **`join=False` / retry / partial recovery**는 본 ADR의 non-goal. + + `SpawnException`은 `runtime_api/multiprocessing.py`에 정의: + + ```python + class SpawnException(RuntimeError): + def __init__(self, errors: dict[int, Exception]): + self.errors = errors + first = next(iter(errors.items()), None) + msg = (f"spawn failed on ranks {sorted(errors.keys())}" + + (f": rank {first[0]} raised {first[1]!r}" if first else "")) + super().__init__(msg) + ``` + +5. **Single-driver 호환**. `g.parent is None`인 main-only 실행 (legacy 단일 + 드라이버 테스트)에서는 D0.2의 worker-fork 조건이 거짓 → 기존 즉시-동기 + 경로 유지. `_drain_pending`은 호출되지 않는다. + +#### D0.5 Host-read barrier — 결정 (normative) + +Worker 안에서 `tensor.numpy()`, `tensor.__getitem__`, `tensor.data` 등 +**host-observable read**는 **자동 drain barrier**로 정의한다. 호출 직전: + +1. `ctx._pending_worker_waits`와 `backend._pending_collective_handles`가 비어 + 있지 않으면 `g.parent.switch()`로 main에 yield → main은 `_drain_pending` + 실행 → 완료 후 worker resume. +2. 두 큐가 모두 비어 있으면 즉시 read. + +**Barrier 반복 규약 (normative — re-entrance)**: `_drain_pending`은 while-loop +로 **두 큐가 모두 완전히 비어질 때까지** drain한다. 단일 pass가 아님: + +```python +def _drain_pending(ctx): + while ctx._pending_worker_waits or ( + ctx.distributed._backend + and ctx.distributed._backend._pending_collective_handles + ): + while ctx._pending_worker_waits: + h = ctx._pending_worker_waits.pop(0) + if h not in ctx._completed: + ctx.engine.wait(h) + backend = ctx.distributed._backend + if backend is not None: + while backend._pending_collective_handles: + h, _sip_id, meta = backend._pending_collective_handles.pop(0) + ctx.wait(h, _meta=meta) # main context: safe; ctx.wait가 + # 다시 pending에 push하지 않음 +``` + +**Main-context ctx.wait 비재귀 invariant (normative)**: `_drain_pending` 내부의 +`ctx.wait(h, _meta=meta)` 호출은 main greenlet 컨텍스트에서 실행된다. D0.2의 +worker-fork 조건(`g.parent is not None and not g.parent.dead`)이 False이므로 +즉시-동기 경로로 진입 → **`_pending_worker_waits`에 절대 enqueue하지 않는다**. +이 invariant 덕분에 drain loop은 재귀/큐 재증가 없이 끝난다. 구현 시 +`g.parent is None`을 단일 main greenlet 보장으로 유지하는 것이 중요. + +**왜 loop인가**: `ctx.wait(h, _meta=meta)`는 main 컨텍스트에서 호출되므로 D0.2 +경로에 따라 engine을 **직접 drive**한다 (추가 enqueue 없음 — 위 invariant). +따라서 이론적으로는 single pass로 충분하지만 — 규약은 **loop-until-empty**로 +고정한다. 이유: + +1. **미래 확장 안전성**: 향후 drain 중 새 pending이 enqueue되는 구현 (예: + collective가 sub-handle을 가진 tree-reduce)이 생길 수 있다. loop 규약이면 + 이때도 correctness 유지. +2. **가독성**: "barrier는 pending이 빌 때까지 drain"이라는 단일 문장으로 + 의미가 닫힘. `ctx.wait` 호출이 새 enqueue를 안 한다는 non-trivial invariant + 에 의존하지 않음. +3. **Barrier의 세만틱은 "해당 read에 필요한 모든 dependency 완료"**: 현 모델 + 에선 모든 pending이 곧 모든 dependency이므로 둘은 동일. 사용자 mental model + 은 전자. + +**Termination 보증**: 두 체제로 분리해 서술한다. + +- **현재 구현**: `ctx.wait`는 main context에서 호출 시 engine을 직접 drive + (D0.2) → 새 pending을 enqueue하지 않는다. 한 iteration마다 pending의 크기가 + `pop(0)` + `engine.wait`로 엄격히 감소. iteration 수는 **초기 pending 크기 + 자체가 상한** → 유한 종료. +- **Future extension (loop 규약을 정당화하는 상한)**: 향후 drain 중 새 pending이 + enqueue되는 구현 (예: tree-reduce sub-handle)이 도입되면 초기 크기 상한은 + 깨진다. 그러나 SimPy causality는 handle의 dependency가 유한 DAG임을 보장하므로 + **nested depth가 finite**. loop 규약이 이 경우까지 자동 수용한다. + +두 체제 모두 무한 루프가 불가능함을 보장. 현 구현의 단일-pass 상한은 공격적 +최적화 시 참고 값일 뿐 규약은 loop-until-empty로 고정. + +**왜 implicit drain at read가 맞는가**: + +- 기존 open question에서 (a) implicit drain, (b) explicit barrier 둘 중 선택 + 문제였다. (b)는 명확하지만 TP layer 사용자가 `out = fc1.forward(x); + ctx.drain(); result = out.numpy()` 3-step을 매번 써야 하는 부담. (a)는 + "읽을 때 반영된 값을 보장"하는 단일 규약으로 CUDA의 `cudaDeviceSynchronize + before host copy` 패턴과 동일 — 숨은 규칙이 아닌 **명명된 entry-point의 + contract**이다. +- 본 ADR은 (a)를 채택하되 그 entry-point 목록을 **명시적으로 닫는다**: + `Tensor.numpy()`, `Tensor.data` (numpy alias), `Tensor.__getitem__`, + `Tensor.__repr__` (data가 포함되는 경우), 그 외 공식 host-read API는 본 + ADR 구현 시점에 코드베이스 검색으로 확정. 추가되는 host-read API는 반드시 + 이 contract를 따라야 한다 (테스트로 회귀 방지). +- `ctx.submit`만 하고 `wait` 없이 `numpy`를 직접 호출하는 경우도 drain + barrier가 동작 (pending queue에 handle이 있기 때문). 사용자가 explicit + wait을 생략해도 read 시점에 invariant가 복원된다. + +**`Tensor.copy_(source)` — write barrier 규정**: + +`copy_`는 semantically "target에 write"이지만 내부적으로 `source.numpy()`를 +호출하여 host에서 source 데이터를 가져온 뒤 `target._memory_store.write(...)` +로 각 shard에 쓴다. 두 방향 모두 barrier 처리: + +1. **Source-side (read barrier)**: `source.numpy()`가 D0.5 read barrier를 + 트리거 (source 자체가 deployed tensor이고 pending이 있을 때). +2. **Target-side (write barrier — global pending 기준)**: `copy_` 진입 시 + `ctx._pending_worker_waits` 또는 `backend._pending_collective_handles`가 + 비어 있지 않으면 write 전에 `g.parent.switch()`로 drain. **Per-tensor / + per-shard dependency tracking이 아니라 global pending queue 기준**. + - 왜 global인가: KernBench의 handle 표현에는 "이 handle이 target의 어느 + shard를 write한다"는 역추적 정보가 없다. 안전한 보수적 규약으로 "전역 + pending이 있으면 drain". 이 결과로 **unrelated tensor의 pending도 copy_를 + 막을 수 있다** — drop-in invariant 우선. + - **명시적 tradeoff**: 이 규약은 서로 독립적인 tensor 사이에도 불필요한 + serialization을 도입할 수 있다. 그러나 현 single-queue execution model + 하에서는 이 비용이 허용 가능 — cross-rank correctness와 "읽을 때 최신" + invariant를 단순한 규칙으로 보장하는 편이 우선. + - 실질적 영향: 단일 worker는 대부분 한 layer step 안에서 pending이 주로 + 자기 작업 — over-barrier로 인한 추가 context switch는 round 끝 scheduler + drain 시점과 일치하는 경우가 많아 큰 문제 안 됨. + - Future refinement: per-tensor pending tracking을 도입하면 이 규약을 + 좁힐 수 있으나 본 ADR scope 밖. + +**Non-barrier**: + +- `tensor.shape`, `tensor.dtype`, `tensor.name` 등 **metadata-only** 접근은 + drain하지 않음. 데이터 의존성이 없음. +- `tensor.pa`, `tensor.va` 등 raw address accessor도 drain하지 않음 (주소만, + 내용 아님). + +**공식 barrier entry-point (closed set)**: + +| API | Kind | Rationale | +|---|---|---| +| `Tensor.numpy()` | read | host-observable copy | +| `Tensor.data` | read | `numpy()` alias | +| `Tensor.__getitem__` | read | shard-aligned read | +| `Tensor.__repr__` (data 포함 시) | read | debugging/log | +| `Tensor.copy_(source)` | read + write | source read + target write | + +이 contract를 T5/T6에서 직접 검증. + +#### D0.6 왜 worker 함수 API는 불변인가 (informative) + +- `torch.zeros(...)` 내부는 `self.submit(msg)` + `self.wait(h)` 쌍. `wait`가 + D0.2/D0.3에 따라 자동으로 main-defer → 겉보기 동기적으로 보이지만 한 번 + yield. +- `tensor.numpy()`는 D0.5에 따라 host-read barrier → pending이 있으면 + drain→read, 없으면 즉시 read. +- `dist.all_reduce`는 기존 `_defer_wait=True` + `_pending_collective_handles` + 경로를 그대로 사용. D0.4의 drain이 두 큐를 함께 처리. + +#### D0.7 불변 조건 (invariants) + +- **kernel greenlet의 `_parent`는 항상 main**: env.run이 worker 컨텍스트에서 + 절대 돌지 않기 때문. (T3의 핵심 assertion.) +- **cross-rank 동기 지점**: 모든 worker가 yield한 뒤에만 drain → 모든 rank의 + kernel이 한 라운드에 함께 진행 (cross-rank IPCQ 교환의 필수 조건). +- **Single-driver 호환**: D0.4-(5). + +### D1. `torch.multiprocessing.spawn(fn, args, nprocs)` + +Real-PyTorch API 파리티 + D0의 scheduler loop의 단일 구현 위치. + +#### D1.0 API parity only — execution parity 아님 (normative) + +`torch.multiprocessing.spawn` 이름은 **API signature parity**에 한정된다. +실제 실행 모델은 **cooperative greenlet scheduler** (단일 Python 프로세스, +단일 OS 스레드, D0.4의 round-robin drive)이다. 다음은 **본 ADR이 제공하지 +않는 속성** — real-PyTorch `torch.multiprocessing.spawn`이 보장하는 것 중 +명시적으로 **non-goal**: + +- 프로세스 격리 (independent OS process per rank). +- 독립 address space (각 rank가 자기 Python heap 보유). +- Failure isolation (한 rank의 hard crash가 다른 rank 영향 없음). +- OS-level scheduler fairness (rank 간 preemptive time slicing). +- `mp.Queue`, `mp.Lock` 등 inter-process primitive. + +이 구현의 실제 성질: + +- 모든 rank는 같은 Python 프로세스 안의 greenlet. shared global state가 + 그대로 보임 (의도된 simulation convenience). +- GIL 하의 단일 스레드 → parallel execution 아님. SimPy 이벤트 순서로 + "논리적 동시성"만 재현. +- 한 worker에서 unhandled exception → 전체 simulation 중단 (D0.4-(4)). + +**호출자 의무**: real-PyTorch multi-process 샘플을 KernBench로 이식할 때 +프로세스 격리에 의존하는 로직 (예: `os.getpid`, 독립 임시 파일, 신호 처리 +등)은 지워야 한다. Namespace 이름은 코드 이식성을 위해 유지 — 세만틱은 +다르다. + +#### D1.1 Public surface + +```python +# runtime_api/multiprocessing.py (new) +class _MultiprocessingNamespace: + def __init__(self, ctx): + self._ctx = ctx + + def spawn(self, fn, args: tuple, nprocs: int, join: bool = True) -> None: + """Spawn `nprocs` worker greenlets, each calling fn(rank, *args). + + Mirrors torch.multiprocessing.spawn signature (minus `daemon`). + Drives the D0 scheduler loop until all workers finish. + """ + ... +``` + +#### D1.2 구현 + +```python +def spawn(self, fn, args, nprocs, join=True): + from greenlet import greenlet + ctx = self._ctx + dist = ctx.distributed + gs: list[greenlet] = [] + errors: dict[int, Exception] = {} + for rank in range(nprocs): + def _entry(r=rank): + try: + fn(r, *args) + except Exception as e: + errors[r] = e + raise + g = greenlet(_entry) + dist._bind_rank(g, rank) + gs.append(g) + + try: + while True: + alive = [g for g in gs if not g.dead] + if not alive: + break + for g in alive: + if not g.dead: + g.switch() + _drain_pending(ctx) # D0.5 + except Exception as outer: + # Sibling cleanup per D0.4-(4) + for other in gs: + if not other.dead: + try: + other.throw(SystemExit) + except Exception: + pass + backend = getattr(dist, "_backend", None) + if backend is not None: + if hasattr(backend, "_barrier"): + backend._barrier.reset() + if getattr(backend, "_pending_collective_handles", None) is not None: + backend._pending_collective_handles.clear() + ctx._pending_worker_waits.clear() + raise SpawnException(errors) from outer + # `join=True` semantics: we already wait for all workers. +``` + +#### D1.3 `torch` namespace attach + +`runtime_api/context.py` `__post_init__`에서: +```python +self.multiprocessing = _MultiprocessingNamespace(self) +``` + +→ bench 코드에서 `torch.multiprocessing.spawn(worker, args=(ws,), nprocs=ws)`. + +#### D1.4 기존 bench 마이그레이션 + +`benches/ccl_allreduce.py`의 hand-rolled loop은 `torch.multiprocessing.spawn` +한 줄로 축소. 기존 matrix 회귀는 그대로 유지. 현재 xfail인 `ring_default_ws`는 +D0 덕분에 PASS로 전환 예상 (worker가 kernel greenlet orphan을 발생시키지 않음). + +### D2. 새 패키지 `kernbench.tp` ``` src/kernbench/tp/ __init__.py — public API re-exports - parallel_state.py — TP group 관리 (현재 단일 global group) + parallel_state.py — TP group 관리 (현재 single global group) layers.py — ColumnParallelLinear, RowParallelLinear, VocabParallelEmbedding primitives.py — copy/reduce/scatter/gather_to/from_tp_region - mappings.py — identity/all_reduce forward, all_reduce/identity backward (stub) + kernels.py — TP layer가 launch하는 gemm kernel (재사용 가능) + mappings.py — forward identity/all_reduce, backward stub ``` -### D2. `parallel_state` — TP group +### D3. `parallel_state` — TP group ```python # parallel_state.py _TP_WORLD_SIZE = None -_TP_RANK = None # greenlet-local via dist.get_rank() def initialize_model_parallel(tensor_model_parallel_size: int) -> None: """Initialize TP group. Must be called after dist.init_process_group.""" global _TP_WORLD_SIZE - from kernbench.runtime_api.distributed import get_dist + from kernbench.runtime_api.distributed import get_dist # or torch.distributed dist = get_dist() total = dist.get_world_size() if tensor_model_parallel_size != total: @@ -93,52 +584,115 @@ def get_tensor_model_parallel_world_size() -> int: def get_tensor_model_parallel_rank() -> int: from kernbench.runtime_api.distributed import get_dist - return get_dist().get_rank() # ADR-0024의 greenlet-local rank + return get_dist().get_rank() # ADR-0024 greenlet-local rank ``` -초기 scope: **TP 사이즈 = world_size = topology SIP 수**. Pure TP 모델. +초기 scope: TP size = world_size = topology SIP count. Pure TP 모델. -### D3. `ColumnParallelLinear` +### D4-pre. TP shard ownership vs DPPolicy — 역할 분리 (normative) + +TP layer의 weight/output 표현에서 두 개념을 명확히 분리한다: + +| 개념 | 결정 주체 | 범위 | +|---|---|---| +| **TP shard ownership** (어느 rank가 weight의 어떤 slice를 소유하는가) | greenlet-local rank + `torch.ahbm.set_device(rank)` (ADR-0024 D9/D10) | **cross-rank, cross-SIP** | +| **Intra-rank placement** (소유된 slice를 rank 내부에서 cube × PE로 어떻게 분산하는가) | `DPPolicy(cube=..., pe=...)` (ADR-0026) | **한 rank 내부 (SIP 경계 안)** | + +따라서 `ColumnParallelLinear`가 `(in_features, out_features // ws)` shape로 +weight를 생성하고 `DPPolicy(cube="column_wise", pe="column_wise")`를 부여 +하면: + +- **Rank r**이 소유하는 slice = weight의 column 축 [r * k_local, (r+1) * + k_local) — **set_device(r)**가 이걸 결정 (해당 rank가 SIP r에 존재). +- **그 slice 내부**에서 cube × PE column-wise 분산 — **DPPolicy**가 이걸 + 결정. + +두 축은 **독립적**이다. 같은 DPPolicy로 두 rank가 자기 slice를 만들면 +slice 자체는 다른 SIP에 있지만 intra-SIP placement 패턴은 동일. 반대로 +DPPolicy를 `cube="replicate", pe="replicate"`로 바꿔도 TP shard ownership은 +유지되고 intra-rank placement만 달라짐. + +**이 경계가 흐려지는 실수** (본 ADR이 금지): + +- DPPolicy에 "SIP 축"이 다시 등장 (ADR-0026에서 제거됨). +- TP layer가 `set_device` 없이 `DPPolicy`만으로 cross-rank sharding을 + 표현 → 단일 rank 안에서 세로로 자른 것과 구분 안 됨. + +본 ADR의 TP layer는 항상 "rank = SIP = one slice 소유 + DPPolicy intra-SIP +분산" 관점에서만 weight/output을 다룬다. + +### D4. `ColumnParallelLinear` + +**중요**: host-side `torch.matmul` 추상화를 신규 도입하지 않는다. layer의 +forward는 `torch.launch("gemm", gemm_kernel, ...)`로 기존 gemm kernel을 +호출 — KernBench bench들이 이미 쓰는 패턴 +([benches/gemm_single_pe.py](benches/gemm_single_pe.py), +[benches/gpt3_qkv.py](benches/gpt3_qkv.py)). ```python # layers.py +from kernbench.policy.placement.dp import DPPolicy +from kernbench.tp.kernels import _gemm_kernel +from kernbench.tp.parallel_state import ( + get_tensor_model_parallel_rank, + get_tensor_model_parallel_world_size, +) + class ColumnParallelLinear: """Weight의 K(out_features) 축을 TP rank에 분산. forward(x): x: (M, N) — full-replicated across ranks - W_k: (N, K / world_size) — rank-local slice + W_k: (N, K / world_size) — rank-local slice (set_device로 SIP r에 거주) y_k = x @ W_k → (M, K / world_size) — rank-local output - 출력은 sharded. 후속 RowParallelLinear가 기대하는 입력 형태. + 출력은 column-sharded. RowParallelLinear가 기대하는 입력 형태. """ def __init__(self, in_features: int, out_features: int, bias: bool = False, dtype: str = "f16", torch=None): ws = get_tensor_model_parallel_world_size() assert out_features % ws == 0 - k_local = out_features // ws - # 각 rank가 자기 slice 소유 (ADR-0024 set_device + ADR-0026 DPPolicy) + self.in_features = in_features + self.k_local = out_features // ws + self._torch = torch + # 각 rank가 자기 slice 소유 — set_device(rank)에 의해 SIP r에 배치. self.weight = torch.zeros( - (in_features, k_local), dtype=dtype, + (in_features, self.k_local), dtype=dtype, dp=DPPolicy(cube="column_wise", pe="column_wise"), name="col_parallel_w", ) - # init with something sensible — TODO + self.bias = None if bias: - self.bias = torch.zeros((k_local,), ...) - else: - self.bias = None + self.bias = torch.zeros( + (self.k_local,), dtype=dtype, + dp=DPPolicy(cube="replicate", pe="replicate"), + name="col_parallel_b", + ) def forward(self, x): - # x는 full-replicated (caller 보장). 단순 local matmul. - y = torch.matmul(x, self.weight) - if self.bias is not None: - y = y + self.bias - return y + # x는 full-replicated (caller 보장). 단순 local gemm. + M = x.shape[0] + out = self._torch.empty( + (M, self.k_local), dtype=x.dtype, + dp=DPPolicy(cube="column_wise", pe="column_wise"), + name="col_parallel_out", + ) + self._torch.launch( + "col_parallel_gemm", _gemm_kernel, + x, self.weight, out, M, self.in_features, self.k_local, + ) + # bias add는 별도 kernel 혹은 composite gemm의 fused bias. + # 초기 scope에서는 bias=False만 충분히 검증. + return out ``` -### D4. `RowParallelLinear` +**Yield-safety contract (normative)**: `ColumnParallelLinear.forward`는 한 번의 +`torch.launch` 호출로 kernel launch → 내부 `ctx.wait` 쌍을 포함한다. 이는 +D0.4-(1)의 "worker는 유한 step 내 yield" 조건을 자동으로 만족 — TP layer +사용자가 yield 패턴을 수동으로 삽입할 필요 없음. + +### D5. `RowParallelLinear` ```python class RowParallelLinear: @@ -155,29 +709,42 @@ class RowParallelLinear: dtype: str = "f16", torch=None): ws = get_tensor_model_parallel_world_size() assert in_features % ws == 0 - n_local = in_features // ws + self.n_local = in_features // ws + self.out_features = out_features + self._torch = torch self.weight = torch.zeros( - (n_local, out_features), dtype=dtype, + (self.n_local, out_features), dtype=dtype, dp=DPPolicy(cube="column_wise", pe="column_wise"), name="row_parallel_w", ) - # bias는 rank 0에만 (Megatron convention) - self.bias = torch.zeros(...) if bias else None - self._torch = torch + # bias는 rank 0에만 (Megatron convention). 초기 scope에서는 생략. + self.bias = None def forward(self, x): - y_partial = torch.matmul(x, self.weight) - # Final all-reduce sums partial products across ranks + M = x.shape[0] + y_partial = self._torch.empty( + (M, self.out_features), dtype=x.dtype, + dp=DPPolicy(cube="column_wise", pe="column_wise"), + name="row_parallel_partial", + ) + self._torch.launch( + "row_parallel_gemm", _gemm_kernel, + x, self.weight, y_partial, M, self.n_local, self.out_features, + ) + # Cross-rank reduce. ADR-0024의 dist.all_reduce는 D0 + mp.spawn 하에서 + # 정상 동작 (kernel parent = main 유지). self._torch.distributed.all_reduce(y_partial, op="sum") - if self.bias is not None: - # bias는 reduce 이후에만 추가 (rank 0 보유) - rank = get_tensor_model_parallel_rank() - if rank == 0: - y_partial = y_partial + self.bias return y_partial ``` -### D5. Primitive 함수 +**Yield-safety contract (normative)**: `RowParallelLinear.forward`는 launch → +내부 wait에 이어 `all_reduce` (defer + worker yield 패턴)까지 포함하므로 forward +한 번당 **최소 2회 yield**가 보장됨. D0.4-(1)의 scheduler progress 조건 자동 +만족. 모든 본 ADR의 TP layer forward는 "최소 하나의 wait 또는 collective를 +포함해 yield-safe하다"를 invariant로 유지한다 — 이후 추가되는 TP primitive +(VocabParallelEmbedding 등)도 동일 계약 필수. + +### D6. Primitive 함수 ```python # primitives.py @@ -185,123 +752,307 @@ def copy_to_tp_region(x): """Forward: identity. Backward: all-reduce. (Training 추가 시 구현).""" return x -def reduce_from_tp_region(x): +def reduce_from_tp_region(x, torch): """Forward: all-reduce. Backward: identity.""" torch.distributed.all_reduce(x, op="sum") return x def scatter_to_tp_region(x): - """x를 K 축으로 scatter. Forward: split. Backward: all-gather.""" - # 초기 scope에서는 사용자가 이미 sharded tensor를 만들었다고 가정 → - # no-op 또는 metadata 추가 - raise NotImplementedError("Phase 2 feature") + raise NotImplementedError( + "Phase 2: 사용자가 이미 sharded tensor를 생성하는 것으로 대체" + ) def gather_from_tp_region(x): - """x를 K 축으로 all-gather. Forward: all-gather. Backward: split.""" - raise NotImplementedError("all-gather kernel이 먼저 필요 (future)") + raise NotImplementedError( + "Phase 2: all-gather kernel 선행 필요 (future)" + ) ``` -### D6. 샘플 bench — 2-layer MLP with TP +### D7. 샘플 bench — 2-layer MLP with TP ```python -# benches/tp_mlp.py (새 파일) -def worker(rank, world_size, torch): - torch.cuda.set_device(rank) +# benches/tp_mlp.py (신규) +from kernbench.policy.placement.dp import DPPolicy +import kernbench.tp as tp +import numpy as np + + +def worker(rank: int, world_size: int, torch): + torch.ahbm.set_device(rank) tp.initialize_model_parallel(world_size) B, D_in, D_hidden, D_out = 1, 512, 2048, 512 fc1 = tp.ColumnParallelLinear(D_in, D_hidden, torch=torch) fc2 = tp.RowParallelLinear(D_hidden, D_out, torch=torch) - x = torch.zeros((B, D_in), dtype="f16", - dp=DPPolicy(cube="column_wise", pe="column_wise"), - name="x") - # ... init x ... + x = torch.zeros( + (B, D_in), dtype="f16", + dp=DPPolicy(cube="replicate", pe="replicate"), + name="x", + ) + # init x with some pattern (e.g., constant) + x.copy_(torch.from_numpy(np.full((B, D_in), 0.1, dtype=np.float16))) - h = fc1.forward(x) # column-sharded output - y = fc2.forward(h) # all-reduced output, full on every rank + h = fc1.forward(x) # column-sharded (B, D_hidden / ws) + y = fc2.forward(h) # all-reduced (B, D_out) on every rank + + # rank 0만 결과 출력 / 검증 + if rank == 0: + result = y.numpy() + # 실제 검증 값은 zero-init weight이면 전부 0 — scope에서는 "완료 자체" 검증 + print(f" tp_mlp: shape={result.shape}, mean={float(result.mean()):.4f}") - # verify... def run(torch): torch.distributed.init_process_group(backend="ahbm") - torch.distributed.spawn(worker, nprocs=torch.distributed.get_world_size(), - args=(...,)) + ws = torch.distributed.get_world_size() + torch.multiprocessing.spawn(worker, args=(ws,), nprocs=ws) ``` -### D7. Non-functional — training 미지원 +### D8. Non-functional — training 미지원 본 ADR은 **inference/forward only**. Backward / gradient / optimizer는 future. 기존 KernBench가 training이 아니므로 자연스러움. -### D8. 초기 scope 제약 +### D9. 초기 scope 제약 -- TP 사이즈 = world_size (mixed DP+TP 없음) -- `scatter_to_tp_region`, `gather_from_tp_region`은 unimplemented (별도 kernel - 필요) -- Weight init은 단순 zero (적절한 init은 future) -- Pipeline parallelism은 scope 밖 +- TP size = world_size (mixed DP+TP 없음). +- `scatter_to_tp_region`, `gather_from_tp_region`은 unimplemented. +- **Weight 기본값은 zero**. 적절한 init scheme (Xavier, Kaiming 등)은 future. + 단 테스트는 `tensor.copy_`로 결정론적 non-zero pattern을 주입해 numerical + correctness를 검증 (T2/T6). 즉 "production default = zero, 검증 = 결정론적 + non-zero"로 운영 분리. +- Bias 초기 scope에서 생략 (Megatron의 rank 0-only bias 정책은 future). +- Pipeline parallelism은 scope 밖. +- VocabParallelEmbedding은 all-gather 선행 필요 → stub only. -### D9. `distributed.all_reduce` 기반 +### D10. 회귀: `ring_default_ws` xfail 해제 — 필수 acceptance -RowParallelLinear의 모든 collective는 ADR-0024의 `dist.all_reduce`를 사용. -별도 TP-전용 collective 엔진 불필요. +D0 (worker-wait 일반화) + D0.5 (host-read barrier) 덕분에 모든 worker-driven +`ctx.wait` 및 host-read가 main-drain 경로로 routing됨 → ADR-0024 Phase B의 +kernel-greenlet orphan 원인이 소멸. 기존 matrix test의 `ring_default_ws` +strict-xfail 케이스를 본 ADR 구현 이후 **PASS**로 전환하는 것을 **필수 회귀 +기준**으로 포함. Observable acceptance criteria는 **T7**에 명시 (deadlock +부재, GreenletExit 부재, numerical tolerance 등). --- ## Dependencies - **ADR-0024** (launcher): rank = SIP, greenlet-local rank, `dist.all_reduce`, - `torch.cuda.set_device(rank)`, `spawn_workers` 제공. + `torch.ahbm.set_device(rank)`. 본 ADR의 D0/D1이 이 인프라를 확장. - **ADR-0026** (DPPolicy intra-device): weight tensor의 per-rank slice 표현. - **ADR-0023 / ADR-0025** (IPCQ): `dist.all_reduce` 구현의 기반. +### Supersedes (partial) + +ADR-0024의 다음 섹션은 **미구현 상태의 설계**이며, 본 ADR이 더 단순한 모델로 +대체한다: + +- **ADR-0024 D7 (`_CollectiveBarrier.submit_and_drain`)** — epoch 기반 last- + arriver-drains 패턴. 문제: last arriver가 **worker 컨텍스트에서** `ctx.wait`을 + 호출해 env.run을 drive → D0.2가 막으려는 orphan 원인을 재현한다. 본 ADR의 + **D0.4 two-queue drain** (worker가 모두 yield한 뒤 main이 drain)이 동일한 + "모든 rank가 submit 완료 전까지 어떤 rank의 collective도 진행되지 않음" + invariant를 **worker-safe하게** 제공한다. `_CollectiveBarrier` 클래스는 + 구현하지 않는다. +- **ADR-0024 D12/D13 (`spawn_workers` skeleton)** — signature / scheduler + loop / exception handling 설계. 본 ADR의 **D1**이 real-PyTorch API와 일치하는 + signature (`spawn(fn, args, nprocs)`)로 재정의하며, D0 scheduler drain을 단일 + 위치에서 수행한다. ADR-0024 D13의 exception cleanup (siblings + `throw(SystemExit)` + `SpawnException` 래핑)은 본 ADR에 그대로 흡수 + (D0.4-(4) 참조). + +현 구현은 ADR-0024의 D7/D12/D13 어느 것도 landing하지 않았으므로 supersede에 +따른 마이그레이션 비용은 없음. 향후 `docs/adr/ADR-0024`에 "superseded by +ADR-0027 D0/D1" 주석만 추가하면 정합. + +**Source of truth (normative, 구현자 대상)**: worker scheduling / collective +drain / spawn / exception cleanup의 구현 기준은 **ADR-0027 D0/D1이다**. 구현 +시 ADR-0024 D7/D12/D13의 pseudocode / contract / signature를 참고하지 말 것 — +두 ADR이 다른 결론을 낼 때는 항상 ADR-0027이 우선한다. 리뷰어도 이 원칙으로 +PR을 심사. + --- ## Non-goals - **Backward pass / training**: inference only. Training simulation은 별도 ADR. -- **Mixed parallelism (DP + TP + PP)**: 초기엔 pure TP만. -- **Weight init schemes**: 단순 zero / debug pattern. 실제 training init는 future. -- **Fused ops**: Megatron의 fused matmul+bias+gelu 등은 KernBench kernel 수준 - 문제. 본 ADR은 host-side API만. +- **Mixed parallelism (DP + TP + PP)**: 초기엔 pure TP only. +- **Weight init schemes**: 단순 zero / debug pattern. +- **Fused ops**: Megatron의 fused matmul+bias+gelu는 kernel 레벨 문제. - **DTensor 통합**: ADR-0028 future. +- **Host-side `torch.matmul` 추상화**: TP layer는 `torch.launch(gemm_kernel, ...)` + 로 기존 gemm kernel을 호출. 신규 matmul host-op 도입 안 함. --- ## Open questions - **`initialize_model_parallel` 위치**: `kernbench.tp.initialize_model_parallel` - vs `torch.distributed.init_tp(...)` 확장. real PyTorch는 `torch.distributed. - init_device_mesh` 등을 권장. 우리는 당분간 TP-전용 모듈. -- **Weight의 DP 전략**: 본 ADR은 `DPPolicy(cube="column_wise", pe="column_wise")` - 를 가정. Intra-SIP DP를 다르게 주면? 성능 벤치마크로 결정. -- **Bias 배치 정책**: Megatron은 bias를 split하지 않음. RowParallelLinear는 - rank 0에만. 이게 항상 맞는가? 대안: replicate across ranks. -- **`VocabParallelEmbedding`**: 처음 몇 벤치엔 불필요할 수도. 샘플 구현은 넣되 - scope에서 제외할 수도. + (현 결정) vs real-PyTorch의 `torch.distributed.init_device_mesh`. TP 전용 + 모듈에 유지. +- **Weight init**: ADR은 zero. Debug pattern (e.g., identity)이 유효 검증에 + 필요할 수 있음 — Phase 1 test에서 필요 시 추가. +- **bias 배치 정책**: Megatron은 RowParallelLinear bias를 rank 0에만. 초기 + scope에서는 bias=False로 회피. +- **GEMM kernel 위치**: `kernbench.tp.kernels._gemm_kernel` vs 기존 + `benches/gemm_single_pe.py`에서 import. TP가 bench 의존을 가지면 안 되므로 + tp 내부에 복제. 향후 `kernbench.kernels` 공용 패키지로 이관 가능. + +**Resolved (이전 rev에서 open이었던 것들)**: +- ~~`tensor.numpy()` 호출 시 drain 타이밍~~ → **D0.5에서 결정**: 공식 host-read + entry-point(`numpy`, `data`, `__getitem__`, data-포함 `__repr__`)는 자동 + drain barrier. metadata-only accessor는 barrier 아님. --- ## Test strategy -### T1. Unit — `tests/test_tp_layers.py` (신규) +### T1. Unit — `tests/test_tp_parallel_state.py` (신규) -- `ColumnParallelLinear` forward: rank별 weight slice, 출력이 `(M, K / ws)`. -- `RowParallelLinear` forward: 입력이 sharded, all_reduce 후 `(M, K)` 일치. -- `VocabParallelEmbedding` forward (if implemented). -- `parallel_state` 초기화 / rank 조회. +- `initialize_model_parallel(ws)`가 world_size와 일치하는 경우만 통과. +- `get_tensor_model_parallel_rank()`가 greenlet-local rank 반환 (ADR-0024 D9 + 회귀). +- 미초기화 상태에서 `get_tensor_model_parallel_world_size()`가 적절히 실패. -### T2. E2E — `tests/test_tp_mlp.py` (신규) +### T2. Unit — `tests/test_tp_layers.py` (신규) -- 2-layer MLP (ColumnParallel → RowParallel) forward가 single-driver reference - 와 일치 (numerical check, rtol/atol). -- ws = SIP count (current: 2). +**Shape / structural checks**: -### T3. 회귀 +- `ColumnParallelLinear(in=256, out=512).weight.shape` per-rank가 `(256, 512/ws)`. +- `RowParallelLinear(in=512, out=256).weight.shape` per-rank가 `(512/ws, 256)`. +- `ColumnParallelLinear.forward(x)`의 출력 텐서 shape이 `(M, K/ws)`. -- ADR-0024의 `test_ccl_allreduce_matrix` 그대로 통과 (TP가 호출하는 - `dist.all_reduce`의 기반). +**Numerical correctness (weight ≠ zero)**: 단순 shape assert는 대수적 오류를 +놓치므로, 결정론적 non-zero 입력/weight으로 실제 연산 결과 검증: + +- **T2.a (ColumnParallel, deterministic)**: weight를 per-rank identity + (또는 `(i, j) → i + rank * k_local + j` 같은 결정론적 패턴)으로 초기화 + (`tensor.copy_`). 입력 `x`를 상수 벡터로 둔 뒤 forward. 각 rank의 출력이 + **기대치 `x @ W_rank_local`와 rtol/atol 1e-2 이내로 일치** (gemm kernel의 + fp16 round-off 고려). +- **T2.b (RowParallel, reduced output equality — primary)**: 모든 rank의 + forward 결과가 동일 전역 행렬 곱 `concat([x_0..x_{ws-1}]) @ concat([W_0.. + W_{ws-1}])`과 일치하는지 검증. rank-별 `y.numpy()` 비교로 (i) all-reduce 후 + elementwise equality와 (ii) 기대치(host-side numpy로 계산) 일치 **둘 다** + assert. observable-only 검증 — internal hook 불필요. + + *Optional implementation note*: partial-sum 단계를 더 세밀히 관찰하고 싶으면 + `_pending_collective_handles` enqueue 직전 intercept hook을 쓸 수 있으나, + 이는 내부 구현 detail에 결합되므로 ADR 수준의 test contract는 T2.b의 + observable equality만 요구한다. +- **T2.c (rank-identity after all_reduce)**: 모든 rank의 `y.numpy()`이 elementwise + identical (mean뿐 아니라 full array equality, rtol 1e-2). + +**기존 weak assertion 금지**: `output mean이 identical` 같은 aggregate-only +검증은 silently 깨지기 쉽기에 **main assertion으로 쓰지 말 것** — 보조 +sanity로만 사용. + +### T3. Worker-wait 일반화 + orphan regression — `tests/test_worker_wait_drain.py` (신규) + +본 테스트의 핵심 목적은 queue 동작이 아니라 **ADR-0024 Phase B orphan +regression의 직접 방지**이다. 다음을 assert: + +- **T3.a**: Worker가 `ctx.wait(h)`을 호출하면 `_pending_worker_waits`에 + handle이 enqueue되고 main이 drain하기 전까지 worker는 resume되지 않는다. +- **T3.b**: `_drain_pending` 직후 worker가 resume되고 handle은 `_completed` + 상태. +- **T3.c**: Multi-worker에서 모든 worker가 같은 drain 지점에서 resume. +- **T3.d (orphan invariant, 핵심)**: Worker 함수가 `torch.launch(...)`를 + 호출한 뒤, SimPy engine이 실제로 돌기 시작하는 시점에 **kernel greenlet의 + `_parent`는 main greenlet**이다. 테스트는 `kernel_runner.run`을 monkey-patch + 하거나 `KernelRunner._parent` capture 시점에 assertion hook을 걸어 이 + invariant를 직접 검증. +- **T3.e (symptom regression)**: D0 없이는 T3.d와 등가인 GreenletExit 실패가 + 재현되어야 함 (historical failure mode 문서화 — 실제 테스트는 D0 도입 후 + skip 또는 xfail 처리). +- **T3.f (idempotency)**: 같은 handle을 `ctx.wait(h)`로 두 번 호출해도 + `engine.wait`은 한 번만 불린다 (D0.4-(3)). +- **T3.g (exception propagation)**: Worker가 `wait` 호출 후 raise하면 main + scheduler loop이 즉시 중단되고 예외가 위로 전파. 남은 `_pending_worker_waits`는 + drain되지 않는다 (D0.4-(4)). + +### T4. `torch.multiprocessing.spawn` — `tests/test_mp_spawn.py` (신규) + +- `spawn(fn, args, nprocs)`이 nprocs 개의 greenlet을 생성하고 각각 rank로 bind. +- 모든 worker 완료 후 return. +- 기존 bench `ccl_allreduce.py`의 hand-rolled loop을 `mp.spawn`으로 교체해도 + matrix 회귀 통과. + +### T5. Host-read barrier — `tests/test_host_read_barrier.py` (신규) + +D0.5 contract를 직접 검증: + +- **T5.a**: Worker가 `launch → tensor.numpy()`를 연속 호출하면 barrier가 동작, + numpy 결과는 kernel 완료 후 값 (post-drain). +- **T5.b**: `launch → tensor.shape` (metadata)는 barrier 발동 안 함 (pending + queue 그대로 유지). +- **T5.c**: Pending 큐가 비어 있는 상태의 `numpy()` 호출은 yield 없이 즉시 + read (불필요한 context switch 방지). +- **T5.d**: `__getitem__`, `data` 역시 T5.a와 동일한 barrier 발동. +- **T5.e**: Collective pending (all_reduce) 진행 중 상태에서 `numpy()` 호출 시 + collective drain까지 기다린 뒤 read. +- **T5.f (copy_ write barrier)**: target tensor에 미완료 pending handle이 + 있는 상태에서 `target.copy_(source)` 호출 시, write 전에 drain 발동. + 주입한 host source가 drain-이후 상태에 덮어써지는지 확인 (stale-overwrite + 없음). +- **T5.g (closed-set via registry)**: barrier entry-point의 closed-set은 + **명시적 registry** (예: `tensor.py` 상단의 `_HOST_READ_BARRIERS = frozenset + ({"numpy", "data", "__getitem__", "__repr__", "copy_"})`)로 유지한다. + 테스트는: + 1. registry에 나열된 각 entry-point에 **실제 barrier 주입이 되어 있는지** + (invocation 시 pending queue를 확인하고 yield 경로를 거치는지) 관찰. + 2. 새 host-read semantic API 추가는 code review에서 registry 업데이트를 + 의무화 (CODEOWNERS / review checklist로 운영). + + **Non-goal**: Python introspection (method 시그니처, docstring 분석 등)으로 + barrier-부재 API를 자동 탐지하는 것은 정밀도 문제로 ADR scope 밖. registry + + review 접근으로 충분. + +### T6. E2E — `tests/test_tp_mlp.py` (신규) + +2-layer MLP (ColumnParallel → RowParallel) forward: + +**Structural / liveness**: + +- `ws = SIP count` (topology.yaml 기준 current 2) 모델로 실행 완료. +- **Deadlock 없음**: scheduler loop이 유한 시간 내 종료 (pytest-timeout 등). +- **Completion trace**: 각 `launch` 및 `all_reduce`가 `ctx._traces`에 entry + 남김 (count = 예상 layer 수). + +**Numerical correctness (필수)**: + +- **T6.a (zero-weight sanity)**: weight 전부 0 → 출력 전부 0. 파이프라인이 + 돌긴 하는지 확인용 smoke test. **이것만으로는 불충분 — T6.b/T6.c와 함께 + 채택**. +- **T6.b (deterministic pattern)**: 모든 weight를 결정론적 non-zero pattern + (예: all 0.01, 또는 per-rank identity에서 파생된 값)으로 `copy_`. 입력도 + 상수. 기대 출력을 host-side numpy로 계산한 뒤 각 rank의 `y.numpy()`와 rtol + 1e-2로 비교. +- **T6.c (rank-consistency post all-reduce)**: RowParallel의 all-reduce + 이후 **모든 rank의 output이 elementwise identical** (T2.c와 동일 기준). + 단순 mean 일치가 아니라 full array equality. +- **T6.d (shape contract)**: ColumnParallel 출력이 `(B, D_hidden / ws)`, + RowParallel 출력이 `(B, D_out)`. + +### T7. 회귀 — `ring_default_ws` xfail 해제 + +- `tests/test_ccl_allreduce_matrix.py::test_ccl_allreduce_matrix[ring_default_ws]`의 + `@pytest.mark.xfail(strict=True)` 제거 → **PASS**여야 함. +- Acceptance criteria (observable): + - **Deadlock 없음**: bench가 유한 시간 내 종료. + - **GreenletExit 없음**: stderr/log에 GreenletExit trace 없음. + - **Rank 0 산출**: `ring_allreduce_tcm (ws=2): 2 OK` 문자열이 출력. + - **Completion trace**: `all_reduce` trace entry 존재. + - **Numerical**: 각 rank의 입력 `r+1`에 대한 sum(1..ws)=3 결과를 tolerance + 1e-1 이내로 달성. + +### T8. 회귀 — 기존 전체 test suite + +- ADR-0026까지 통과하던 모든 test가 그대로 통과 (523 passed + 1 xfail). +- Phase 2 완료 기준: 524 passed (xfail 해제 포함) + 0 xfail + 위 T1~T7 신규 + 테스트 전부 통과. --- @@ -312,17 +1063,23 @@ RowParallelLinear의 모든 collective는 ADR-0024의 `dist.all_reduce`를 사 - **Megatron 코드 이식 용이**: real training code와 API 일치. - **TP 벤치마크 가능**: scaling, communication-compute overlap 등 HW 특성 연구. -- **DPPolicy 의미 명확화** (ADR-0026과 시너지). +- **`ring_default_ws` xfail 해제**: D0의 부산물로 ADR-0024 Phase B 블로커 해소. +- **Scheduler loop 단일화**: D1 (`mp.spawn`) 도입으로 hand-rolled loop 제거. + 후속 collective/TP 벤치가 동일 패턴 재사용. +- **DPPolicy 의미 명확화** (ADR-0026 시너지): TP layer가 intra-device DPPolicy + 만 사용하는 모범 사례. ### Negative - 새 모듈 (`kernbench.tp`) 유지보수 비용. -- 초기 scope가 제한적 (pure TP only). +- 초기 scope가 제한적 (pure TP only, forward only). +- D0 generalization이 `ctx.wait`의 세만틱을 바꿈 — 단일 드라이버 테스트와의 + 호환성을 명시적으로 검증 필요 (T7). ### Neutral - ADR-0024/0026 기반 위에 순수한 상위 레이어 추가. Hardware simulation - stack에 영향 없음. + stack에 영향 없음 (D0 제외). --- @@ -330,12 +1087,22 @@ RowParallelLinear의 모든 collective는 ADR-0024의 `dist.all_reduce`를 사 | File | Change | |------|--------| +| `src/kernbench/runtime_api/context.py` | D0.1/D0.2: `_pending_worker_waits` + `ctx.wait`의 worker fork, D1.3: `self.multiprocessing` namespace attach | +| `src/kernbench/runtime_api/multiprocessing.py` | 신규 (D1): `_MultiprocessingNamespace.spawn` + `_drain_pending` + `SpawnException` | +| `src/kernbench/runtime_api/distributed.py` | `_pending_collective_handles` 타입 annotation 보강 (`list[tuple[RequestHandle, int, dict]]`); spawn exception cleanup에서 clear 호출 지점 노출 | +| `src/kernbench/runtime_api/tensor.py` | D0.5 barrier 주입: `numpy`, `__getitem__`, `data`, `__repr__`, `copy_` (source read + target write) | | `src/kernbench/tp/__init__.py` | 신규: public API re-export | -| `src/kernbench/tp/parallel_state.py` | 신규: D2 | -| `src/kernbench/tp/layers.py` | 신규: D3/D4 | -| `src/kernbench/tp/primitives.py` | 신규: D5 | +| `src/kernbench/tp/parallel_state.py` | 신규: D3 | +| `src/kernbench/tp/layers.py` | 신규: D4/D5 | +| `src/kernbench/tp/primitives.py` | 신규: D6 | +| `src/kernbench/tp/kernels.py` | 신규: TP layer용 `_gemm_kernel` (bench 복제) | | `src/kernbench/tp/mappings.py` | 신규 stub (backward TODO) | -| `benches/tp_mlp.py` | 신규: D6 샘플 | -| `tests/test_tp_layers.py` | 신규: T1 | -| `tests/test_tp_mlp.py` | 신규: T2 | -| `docs/tp-author-guide.md` | 신규 (선택): 사용자 가이드 | +| `benches/tp_mlp.py` | 신규 샘플 (D7) | +| `benches/ccl_allreduce.py` | hand-rolled loop → `torch.multiprocessing.spawn`으로 교체 (D1.4) | +| `tests/test_tp_parallel_state.py` | 신규 (T1) | +| `tests/test_tp_layers.py` | 신규 (T2) | +| `tests/test_worker_wait_drain.py` | 신규 (T3): orphan invariant 직접 검증 포함 | +| `tests/test_mp_spawn.py` | 신규 (T4) | +| `tests/test_host_read_barrier.py` | 신규 (T5): D0.5 host-read barrier contract | +| `tests/test_tp_mlp.py` | 신규 (T6) | +| `tests/test_ccl_allreduce_matrix.py` | `ring_default_ws` xfail 제거 (T7) |