3 Commits

Author SHA1 Message Date
mukesh fd56b6cacd adr: add ADR-0043/0044 (eval harnesses); reconcile ADR-0024/0032 for SIP w/h
Document the allreduce + GEMM evaluation harnesses and bring the affected
allreduce ADRs in line with the refactored code.

New (Accepted, EN + KO):
- ADR-0043 — allreduce evaluation harness (tests/sccl/): distributed-driven
  correctness, latency/buffer-kind sweeps, sessionfinish plot aggregators,
  topology + FSIM-comparison figures. Verified against the implementation.
- ADR-0044 — GEMM evaluation harness (scripts/gemm_sweep.py + tests/gemm/):
  heavy-script data gen vs. fast test-rendered figures, slow regenerator,
  the 3-figure set. Records two limitations as open questions: the
  theoretical-model constants are inherited (not yet traced to ADR-0033/
  0014), and the *_measured figure is a naming misnomer.

Updated (EN + KO):
- ADR-0024 — add D5: SIP grid w/h resolution (explicit sips.w/h, square
  fallback, fail-loud), documenting the AhbmCCLBackend fix.
- ADR-0032 — D4/D5/Non-goals reconciled: rectangular SIP grids (e.g. 6 SIPs
  as 3x2) are supported via explicit w/h; the square requirement now
  applies only to the fallback. Affected-files repointed to tests/sccl/.

Verification: ADR-0023 and ADR-0042 confirmed still matching the code (no
change). verify_adr_lang_pairs.py passes (EN/KO Status blocks byte-equal).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 10:26:25 -07:00
mukesh 0e346b939d gemm: test-generated GEMM plots under tests/gemm/ + docs/diagrams/gemm_plots/
Mirror the sccl pattern for GEMM figures: a tests/gemm/ package renders the
GEMM bar charts as PNGs from the committed docs/diagrams/gemm_sweep.json, so
the figures are fast test artifacts (run by default) while the heavy sim sweep
stays a manual script (scripts/gemm_sweep.py, kept) wrapped by a slow
regenerator test.

tests/gemm/:
- _gemm_plot_helpers.py: matplotlib renderers (series logic mirrors the
  GEMM _render_* functions in scripts/build_overview_slides.py).
- test_plot_gemm_stage_breakdown.py: gemm_stage_breakdown.png (load_ref).
- test_plot_gemm_mac_utilization.py: gemm_mac_utilization_measured.png +
  gemm_mac_utilization_theoretical_vs_measured.png (load_ref).
- test_gemm_sweep.py: @pytest.mark.slow regenerator (runs scripts/gemm_sweep.py).

Chart set trimmed to three (stage breakdown, MAC util, theoretical-vs-measured);
"formula" relabeled to "theoretical" throughout the comparison chart.

Known follow-ups (not blocking):
- gemm_mac_utilization_measured.png currently plots the theoretical ideal-
  pipeline model, not simulator-measured data; the name is a misnomer pending
  a decision to repoint its content or retitle.
- The theoretical-model constants (HBM 256 GB/s, T_stage 16 ns, 3 stages) are
  inherited verbatim from build_overview_slides.py and not yet verified against
  ADR-0033 / ADR-0014 / topology.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 09:58:08 -07:00
mukesh b610cb0d9a sccl: drive allreduce tests via torch.distributed; reorganize into tests/sccl/
Convert the multidevice allreduce correctness + latency/buffer-kind sweeps
to run through the real PyTorch-distributed path
(init_process_group(backend="ahbm") -> mp.spawn -> dist.all_reduce) instead
of direct ctx.launch, and reorganize the CCL/allreduce tests into a
tests/sccl/ package split one test per file.

Production change (required for the distributed path on non-square SIP grids):
- AhbmCCLBackend now reads explicit system.sips.w/h from the spec, with a
  square-only sqrt fallback that raises on ambiguity, instead of silently
  guessing round(sqrt(count)). This fixes the 2x3 / 3x2 torus + mesh cases,
  which previously resolved to a wrong 2x2 grid. Mirrors the test helper's
  _sip_topo_dims precedence (explicit w/h > square fallback > raise).

Test reorganization (tests/sccl/):
- _allreduce_helpers.py: shared plumbing (distributed driver, config writers,
  direct-launch run_allreduce parity reference, sweep/buffer-kind constants,
  plot aggregators, topology-diagram + FSIM-comparison emitters).
- test_allreduce_ring_torus_mesh.py: correctness across ring/torus/mesh.
- test_distributed_default_topology.py: full distributed path on topology.yaml.
- test_plot_latency_sweep.py / test_plot_buffer_kind_sweep.py: sweep rows.
- test_plot_topology_diagram.py / test_plot_comparison_fsim.py: plot emitters.
- test_intercube_root_center.py: moved in (ADR-0032 center-root latency guard).

Also:
- Move the FSIM comparison plot generator out of scripts/ into the sccl suite.
- Delete superseded test files (test_allreduce_multidevice,
  test_distributed_lrab_hierarchical_allreduce, test_allreduce_buffer_kind_sweep)
  and repoint conftest aggregators + the ipcq buffer-kind importers.
- Regenerate the allreduce_latency_plots derived artifacts from the full sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 22:24:43 -07:00
37 changed files with 1729 additions and 782 deletions
@@ -168,6 +168,36 @@ placement = resolve_dp_policy(
Post-hoc `pe_index` shifting 없음 — ShardSpec이 `(sip, cube, pe)` 구조적 Post-hoc `pe_index` shifting 없음 — ShardSpec이 `(sip, cube, pe)` 구조적
좌표를 직접 보유. ShardSpec 상세는 ADR-0026. 좌표를 직접 보유. ShardSpec 상세는 ADR-0026.
### D5. SIP 그리드 크기 — 명시적 `sips.w/h` 해석
2D inter-SIP topology (`torus_2d`, `mesh_2d_no_wrap`)의 SIP 그리드 형태
(width × height)는 `system.sips.w` / `system.sips.h`에서 해석한다. D1이
`sips.count``world_size`를 해석하는 것과 같은 방식이다. 우선순위:
명시적 `w/h` (`w*h == count` 검증) > 정사각 fallback
(`w/h` 미지정 시에만 `round(sqrt(count))²`) > error.
```python
sips = spec.get("system", {}).get("sips", {})
if sip_topo == "ring_1d":
w, h = 0, 0 # 1D sentinel (no grid)
elif sips.get("w") is not None and sips.get("h") is not None:
w, h = int(sips["w"]), int(sips["h"])
if w * h != n_sips:
raise ValueError(f"sip layout {w}x{h} != sips.count ({n_sips})")
else:
side = int(round(math.sqrt(n_sips)))
if side * side != n_sips:
raise ValueError("non-square sips.count requires explicit sips.w/h")
w, h = side, side
```
이로써 2D SIP 그리드가 완전 정사각이어야 한다는 기존 가정을 제거한다:
6-SIP `torus_2d` / `mesh_2d_no_wrap`은 이제 `w: 3, h: 2`(또는 `2x3`)로
표현 가능하다. 도출된 `(w, h)`는 알고리즘의 inter-SIP exchange로 전달된다
(ADR-0032 D5에서 소비). 이전 코드 경로는 ring이 아닌 모든 topology에서
`round(sqrt(count))²`를 조용히 취해 잘못된 그리드(예: 6 SIP에 2×2)를
만들었다. fail-loud fallback을 갖춘 명시적 `w/h` 경로가 이를 대체한다.
--- ---
## Dependencies ## Dependencies
@@ -135,21 +135,24 @@ system:
``` ```
- `ring_1d`: n_sips-1 라운드의 `send global_E / recv global_W`. - `ring_1d`: n_sips-1 라운드의 `send global_E / recv global_W`.
- `torus_2d`: sqrt(n_sips)×sqrt(n_sips) 랩핑 메시. `global_E/W`에서 - `torus_2d`: `w × h` 랩핑 메시. `global_E/W`에서 row ring, 이어서
row ring, 이어서 `global_S/N`에서 col ring. `global_S/N`에서 col ring.
- `mesh_2d_no_wrap`: 랩어라운드 없는 정사각형 메시. 차원별 chain - `mesh_2d_no_wrap`: 랩어라운드 없는 `w × h` 메시. 차원별 chain
reduce + 브로드캐스트. reduce + 브로드캐스트.
2D 변형은 `n_sips`가 완전 제곱수여야 한다. 2D 그리드 크기 `(w, h)``system.sips.w/h`에서 온다 (ADR-0024 D5).
정사각 fallback (`round(sqrt(n_sips))²`)은 `w/h`가 생략된 경우에만
적용되므로, 직사각형 그리드(예: 6 SIP을 `3×2`로)는 명시적 `w/h`
지원된다.
### D5. 프로세스-그룹 통합 — `AhbmCCLBackend` ### D5. 프로세스-그룹 통합 — `AhbmCCLBackend`
`init_process_group` 시점에 백엔드는: `init_process_group` 시점에 백엔드는:
1. `ccl.yaml` + `topology.yaml`을 로드한다. 1. `ccl.yaml` + `topology.yaml`을 로드한다.
2. 알고리즘 모듈의 `TOPO_NAME_TO_KIND` 사용하여 2. `system.sips.topology`로부터 알고리즘 모듈의 `TOPO_NAME_TO_KIND`
`system.sips.topology`로부터 `sip_topo_kind, sip_topo_w, sip_topo_h` 통해 `sip_topo_kind`를 도출하고, `sip_topo_w, sip_topo_h`
도출한다. `system.sips.w/h`에서 정사각 fallback과 함께 도출한다 (ADR-0024 D5).
3. `configure_sfr_intercube_multisip(engine, spec, cfg)`를 호출한다 — 3. `configure_sfr_intercube_multisip(engine, spec, cfg)`를 호출한다 —
일회성 SFR 와이어링, NCCL 커뮤니케이터 생성을 모방한다. 일회성 SFR 와이어링, NCCL 커뮤니케이터 생성을 모방한다.
@@ -221,8 +224,10 @@ sip:
- **PE별 allreduce** (큐브 내 PE-PE reduce). 범위 밖 — 본 알고리즘의 - **PE별 allreduce** (큐브 내 PE-PE reduce). 범위 밖 — 본 알고리즘의
워크로드는 큐브당 DP이다. 워크로드는 큐브당 DP이다.
- **비대칭 SIP 토폴로지** (정사각형이 아닌 메시/토러스). - **정사각 그리드 fallback은 `n_sips = k²`를 요구**: 직사각형 SIP
`torus_2d``mesh_2d_no_wrap``n_sips = k²`를 요구한다. 그리드(정사각형이 아닌 메시/토러스)는 지원되지만, `system.sips.w/h`
명시적으로 줄 때만 가능하다 (ADR-0024 D5). `w/h` 생략 시 2D 토폴로지는
정사각 그리드로 fallback하며 여전히 `n_sips = k²`를 요구한다.
- **파이프라인 청크**: 큐브당 단일 타일, 아직 파이프라이닝 없음. - **파이프라인 청크**: 큐브당 단일 타일, 아직 파이프라이닝 없음.
- **루트 큐브의 런타임 선출**: 커널은 현재 SIP 내부 임계 경로를 - **루트 큐브의 런타임 선출**: 커널은 현재 SIP 내부 임계 경로를
최소화하기 위해 기하학적 중심인 최소화하기 위해 기하학적 중심인
@@ -269,7 +274,6 @@ sip:
| `ccl.yaml` | 단일 `lrab_hierarchical_allreduce` 항목 | | `ccl.yaml` | 단일 `lrab_hierarchical_allreduce` 항목 |
| `topology.yaml` | `system.sips.topology` 추가 | | `topology.yaml` | `system.sips.topology` 추가 |
| `benches/ccl_allreduce.py` | Row-wise 큐브-메시 텐서 레이아웃 | | `benches/ccl_allreduce.py` | Row-wise 큐브-메시 텐서 레이아웃 |
| `tests/test_allreduce_multidevice.py` (신규) | 구성 기반 ring/torus/mesh | | `tests/sccl/` (테스트 패키지) | 구성 기반 ring/torus/mesh 정확성 + 전체 `dist.all_reduce` 경로 + latency/buffer-kind 스윕 (평가 하니스 — ADR-0043) |
| `tests/test_distributed_lrab_hierarchical_allreduce.py` (신규) | 전체 `dist.all_reduce` 경로 | | `tests/test_intercube_sfr_config.py` | SFR 와이어링 검증 |
| `tests/test_intercube_sfr_config.py` (신규) | SFR 와이어링 검증 |
| 제거 | `ring_allreduce.py`, `mesh_allreduce.py`, `tree_allreduce.py`, `hierarchical_allreduce.py`, `hello_send.py`, `testing.py` 및 그 테스트 | | 제거 | `ring_allreduce.py`, `mesh_allreduce.py`, `tree_allreduce.py`, `hierarchical_allreduce.py`, `hello_send.py`, `testing.py` 및 그 테스트 |
@@ -0,0 +1,126 @@
# ADR-0043: Allreduce 평가 하니스 — `tests/sccl/`
## Status
Accepted
`tests/sccl/` 평가 하니스를 문서화한다; 구현과 대조 검증 완료
(상수, 파일 집합, 스윕 차원을 교차 확인).
## Context
ADR-0032는 intercube all-reduce *알고리즘*을 정의하고, ADR-0023/0024/0027은
IPCQ 백엔드, rank=SIP launcher, `mp.spawn`을 정의한다. 그러나 어느 것도
**allreduce를 어떻게 구동하고 특성화하는가** — 정확성 테스트, latency/
buffer-kind 스윕, 파생 플롯 — 는 기술하지 않는다. ADR-0013(verification
strategy)이 일반 정책이라면, 본 ADR은 구체적 allreduce 하니스를 고정하여
작업의 "평가" 절반이 구현과 함께 문서화되도록 한다.
하니스는 `tests/sccl/`(allreduce 테스트 통합 시 생성된 패키지)에 위치한다.
이전의 평면적 `tests/test_allreduce_multidevice.py` +
`tests/test_distributed_*` 레이아웃을 대체한다.
## Decision
### D1. 평가를 공개 `torch.distributed` 경로로 구동
정확성과 스윕은 collective를 실제 DDP 형태 경로 —
`init_process_group(backend="ahbm") → mp.spawn → dist.all_reduce`
(ADR-0024/0027) — 로 실행하며, 하위 레벨 `ctx.launch`를 쓰지 않는다.
`tests/sccl/_allreduce_helpers.py`의 공유 헬퍼
`_run_distributed(tmp_path, monkeypatch, topo_path, corr_id, n_elem)`
엔진을 빌드하고 워커를 실행하고 `(engine, n_cubes)`를 반환한다.
`monkeypatch.chdir`이 백엔드의 `load_ccl_config()`(cwd 조회)를 케이스별
임시 `ccl.yaml`로 향하게 한다.
직접 launch 레퍼런스(`run_allreduce`)는 같은 헬퍼 모듈에 유지된다 —
distributed 테스트용이 아니라, `tests/`의 IPCQ buffer-kind / root-center
마이크로 테스트가 import하기 때문이다.
### D2. 평가 관심사별 파일 하나
| 파일 | 관심사 | `torch.distributed`? |
|---|---|---|
| `test_allreduce_ring_torus_mesh.py` | ring_1d / torus_2d (2×3) / mesh_2d_no_wrap (2×3) 정확성 | yes |
| `test_distributed_default_topology.py` | `topology.yaml` 그대로의 전체 경로 | yes |
| `test_plot_latency_sweep.py` | latency 스윕 행 (n_elem × topology) | yes |
| `test_plot_buffer_kind_sweep.py` | TCM/SRAM/HBM 스윕 행 | yes |
| `test_plot_topology_diagram.py` | topology.png (순수 matplotlib) | no |
| `test_plot_comparison_fsim.py` | broken-axis 모델 vs FSIM 비교 | no |
| `test_intercube_root_center.py` | ADR-0032 center-root latency 가드 (직접 경로) | no |
`_allreduce_helpers.py`는 공유 plumbing(드라이버, config writer, 스윕/
buffer-kind 상수, 플롯 aggregator, topology-diagram + FSIM 비교 emitter)을
보유한다. 수집되지 않는다(`test_` 접두사 없음).
### D3. Latency 메트릭 — critical-path `pe_exec_ns`
config별 보고 latency는 `engine._results`에 대한
`crit_ns = max(pe_exec_ns)` — 가장 느린 rank의 PE 실행 시간 — 이다.
모든 latency 차트에 그려지고 `summary.csv`에 기록되는 값이다.
### D4. 스윕 차원
- **Latency 스윕**: `n_elem ∈ {8, 32, 64, 128, 512, 1024, 2048, 4096,
8192, 16384, 32768, 49152}` (16 제외 — `n_cubes`와 충돌) × topology ∈
{ring_1d (6), torus_2d 2×3 (6), mesh_2d_no_wrap 2×3 (6)}.
- **Buffer-kind 스윕**: `buffer_kind ∈ {tcm, sram, hbm}` × 더 작은
`n_elem` 그리드, torus_2d 6-SIP (3×2)에서. buffer_kind는 임시
`ccl.yaml`에 설정되며(백엔드가 `init_process_group` 시점에 읽음,
ADR-0023 D6) 적용된다.
2×3 / 3×2 그리드는 명시적 `w/h` SIP 해석(ADR-0024 D5)을 행사한다.
### D5. `pytest_sessionfinish` aggregator를 통한 파생 플롯
스윕 테스트는 xdist 친화적이다: 각 parametrized 케이스가 staging 디렉터리에
JSON 행 하나를 쓴다. conftest `pytest_sessionfinish` 훅(controller 노드
전용)이 `_allreduce_helpers.py`의 aggregator를 호출한다:
- `_aggregate_sweep_plots()` → topology별 PNG + `summary.csv`
- `aggregate_buffer_kind_plot()` → TCM/SRAM/HBM 비교 PNG + csv
topology-diagram 및 FSIM-비교 figure는 각자의 `test_plot_*` 테스트가
직접 emit한다(행 staging 없음 — 각각 `topology.yaml`과 `summary.csv`의
순수 함수). 모든 출력은 `docs/diagrams/allreduce_latency_plots/`에 떨어지며
CLAUDE.md에 따라 **파생 아티팩트**다(ADR과 일관, Phase-2 게이트 없음).
### D6. FSIM 비교 레퍼런스는 하드코딩 상수
`emit_comparison_fsim_plot()`은 모델 곡선을 외부 FSIM single-device
레퍼런스(`366 µs`) 하나와 겹쳐 그리며, 이는 리터럴로 보유된다 — 외부 데이터
파일 없음. "measured" 시리즈는 시뮬레이터(`op_log` GEMM 카운트,
`composite_window_ns`)에서, "theoretical" 시리즈는 손으로 도출한 해석적
모델(ADR-0044 D5가 ADR-미검증으로 표시한 동일 모델)에서 온다.
## Consequences
### Positive
- allreduce가 실제 DDP 스크립트와 같은 API로 평가되므로, 하니스가
ADR-0024/0027의 통합 테스트 역할도 겸한다.
- figure는 매 `pytest` 실행마다 committed 데이터로 재생성된다; 수동 플롯
단계 없음.
- 직사각형 그리드 스윕이 ADR-0024 D5 `w/h` 수정을 드러낸 회귀 커버리지를
제공했다.
### Negative / limitations
- 전체 latency 스윕은 기본 `pytest`에서 실행된다(~분 단위); `slow`로
표시되지 않는다. (ADR-0044는 GEMM 스윕을 `slow`로 표시하는 것과 대조.)
- `test_intercube_root_center.py`는 latency *임계값* assertion(ADR-0032
center-root 가드)을 보유한다 — 스위트에서 유일한 절대-latency
assertion이며 latency 모델 변경(ADR-0033)에 민감하다.
## Dependencies
- **ADR-0013**: verification strategy (본 ADR이 특수화하는 일반 정책).
- **ADR-0023 / ADR-0024 / ADR-0027**: IPCQ 백엔드, rank=SIP launcher,
`mp.spawn` — D1이 구동하는 경로.
- **ADR-0032**: 평가 대상 알고리즘; D4 그리드가 그 topology 분기를 행사.
- **ADR-0044**: 형제 격인 GEMM 평가 하니스.
## Open questions
- GEMM 스윕과의 일관성을 위해 latency 스윕을 `slow`로 표시할 것인가?
- FSIM 레퍼런스를 하드코딩 상수에서 버전 관리되는 데이터 파일로 옮길 것인가?
+127
View File
@@ -0,0 +1,127 @@
# ADR-0044: GEMM 평가 하니스 — `scripts/gemm_sweep.py` + `tests/gemm/`
## Status
Accepted
GEMM 평가/특성화 하니스를 문서화한다; 구현과 대조 검증 완료
(상수, tile 크기, figure 집합, script↔test 분할을 교차 확인). D5/D6
caveat은 부정확이 아니라 기록된 한계다.
## Context
ADR-0014(PE pipeline)와 ADR-0042(tile-plan generator)는 GEMM *구현*을
정의하고, ADR-0033은 latency 모델을 정의한다. 그러나 어느 것도 **GEMM
성능을 어떻게 스윕하고 특성화하는가** — 타이밍 데이터를 만드는 shape/variant
스윕과 이를 해석하는 figure — 는 기술하지 않는다. 본 ADR이 그 하니스를
고정한다.
allreduce 하니스(ADR-0043)와 달리 GEMM 스윕은 **무겁다**(24 sim 실행:
8 shape × 3 operand-staging variant; `512` shape 하나가 2048 tile). 이
무게가 아래 분할을 결정한다.
## Decision
### D1. 두 계층 분할 — 무거운 데이터 생성(script) vs. 빠른 figure(test)
- **데이터 생성은 수동 script로 유지**: `scripts/gemm_sweep.py`
`matmul-composite`(ADR-0042 plan)를 CLI와 동일한 `run_bench` 경로로
shape × variant에 걸쳐 실행하고, `result.engine.op_log`를 수확하여
`docs/diagrams/gemm_sweep.json`(stage별/engine별 wall-clock + occupancy
+ record count + pe/composite window)을 쓴다.
- **figure 렌더링은 test 생성**: `tests/gemm/`이 committed `gemm_sweep.json`
읽어 matplotlib PNG를 `docs/diagrams/gemm_plots/`에 렌더링한다. 이
테스트는 빠르고 기본 실행된다.
근거: 슬라이드덱 규모의 sim 스윕은 매 `pytest` 실행에 속하지 않지만,
figure(저렴·결정적)는 자유롭게 재생성되고 CI로 가드되어야 한다. 이는
CLAUDE.md의 script-vs-test 분할(무거운/수동 생성은 script; 빠른 assertion은
test)을 반영한다.
### D2. Slow regenerator 테스트가 script를 감싼다
`tests/gemm/test_gemm_sweep.py``@pytest.mark.slow`로 표시된다(기본
`addopts: -m "not slow"`에서 제외). 이는 `scripts/gemm_sweep.py`
subprocess로 호출하여 `gemm_sweep.json`을 on-demand로 재생성한다
(`pytest -m slow tests/gemm/test_gemm_sweep.py`). 스윕 로직은 단일
home(script)을 가지며 테스트는 이를 감싸기만 하므로 sim 구동 코드의
중복이 없다.
### D3. Figure 집합 (3개 차트, `load_ref` variant)
| 테스트 | PNG | 내용 |
|---|---|---|
| `test_plot_gemm_stage_breakdown.py` | `gemm_stage_breakdown.png` | stage별 engine wall-clock (DMA in / Fetch / GEMM / DMA out) |
| `test_plot_gemm_mac_utilization.py` | `gemm_mac_utilization_measured.png` | GEMM util % + useful eff % |
| `test_plot_gemm_mac_utilization.py` | `gemm_mac_utilization_theoretical_vs_measured.png` | theoretical vs 시뮬레이터-measured util/eff |
`tests/gemm/_gemm_plot_helpers.py`가 공유 renderer를 보유한다(시리즈 로직은
`scripts/build_overview_slides.py`의 GEMM `_render_*` 함수를 미러링하며,
그쪽은 여전히 PPTX에 네이티브로 그린다). 수집되지 않음(`test_` 접두사
없음). 각 `test_plot_*``gemm_sweep.json`이 없으면 skip한다.
### D4. Tile 크기는 데이터 기반; under-tile shape는 표시
Tile 크기는 `gemm_sweep.json`(`tile_sizes`)에서 읽으며, 이는 스윕이
`PeSchedulerComponent.TILE_M/K/N = 32/64/32` — 권위 소스 — 에서 기록한
값이다. `M<TILE_M K<TILE_K N<TILE_N`인 shape는 차트에
("under-tile") 표시된다. `512³` shape는 figure에서 제외된다
(`EXCLUDED_SHAPES`).
### D5. Theoretical 모델 — 상속된 상수, 아직 ADR-미검증
"theoretical" 곡선은 `scripts/build_overview_slides.py`에서 그대로 복사한
상수로 해석적 ideal-pipeline 모델을 사용한다:
```
HBM_GBS = 256.0 # GB/s T_STAGE = 16.0 ns
D_STAGES = 3 BPE = 2
```
**이 값들은 아직 ADR과 대조 소싱되지 않았다.** 특히 ADR-0033의 `256`
`burst_bytes`(256 B)로 이 `256 GB/s`*다른* 양이며, ADR-0033은
대역폭을 `pc_bw_gbs = hbm_to_router_bw_gbs / num_pcs`로 도출한다.
`T_STAGE`/stage 수도 여기서 ADR-0014로 추적되지 않았다. 따라서 모델은
**기존 deck script와 일관할 뿐 ADR과 검증되지 않았고**, 상수가 중복된다
(deck + helper). 이를 조정(topology/ADR-0033/0014에서 소싱, 중복 제거)하는
것은 보류 — Open questions 참조.
### D6. 알려진 네이밍 caveat — `_measured` 차트
`gemm_mac_utilization_measured.png`는 현재 *theoretical* ideal-pipeline
수치를 그린다(footnote가 그렇게 명시). 파일명만 "measured"라고 한다. 이는
그 내용을 시뮬레이터-measured 시리즈로 재지정할지 또는 제목을 바꿀지
결정을 보류 중인 알려진 misnomer다.
## Consequences
### Positive
- GEMM figure가 allreduce처럼 test 생성·CI 가드된다.
- 무거운 스윕은 opt-in으로 유지되어 기본 테스트 실행이 빠르다.
- 스윕 로직의 단일 소스(script)를 slow 테스트가 재사용.
### Negative / limitations
- theoretical 모델 상수(D5)는 미검증·중복이다.
- `_measured` figure는 misnomer(D6).
- `build_overview_slides.py`는 여전히 이 PNG를 임베드하지 않고
`gemm_sweep.json`에서 GEMM 막대를 네이티브로 그린다 — test 아티팩트를
소비하도록 deck를 재배선하는 작업은 미완.
## Dependencies
- **ADR-0013**: verification strategy.
- **ADR-0014 / ADR-0042**: PE pipeline + tile-plan generator — 스윕이
측정하는 GEMM 구현; D4의 stage record count는 ADR-0042 D2/D3에서 온다.
- **ADR-0033**: latency 모델 — D5 상수가 (아직은 아니지만) 추적되어야 할
소스.
- **ADR-0043**: 형제 격인 allreduce 평가 하니스.
## Open questions
- D5 상수를 `topology.yaml` / ADR-0033 / ADR-0014와 대조 조정하고
중복 제거할 것인가(모델 파라미터의 단일 소스)?
- D6 `_measured` 네이밍 해결(내용 재지정 vs. 제목 변경)?
- `build_overview_slides.py`를 네이티브 막대 그리기 대신 `gemm_plots/`
PNG 임베드로 재배선할 것인가?
+31
View File
@@ -173,6 +173,37 @@ placement = resolve_dp_policy(
No post-hoc `pe_index` shifting — ShardSpec carries the `(sip, cube, pe)` No post-hoc `pe_index` shifting — ShardSpec carries the `(sip, cube, pe)`
structural coordinates directly. ShardSpec details in ADR-0026. structural coordinates directly. ShardSpec details in ADR-0026.
### D5. SIP grid dimensions — explicit `sips.w/h` resolution
For 2D inter-SIP topologies (`torus_2d`, `mesh_2d_no_wrap`) the SIP grid
shape (width × height) is resolved from `system.sips.w` / `system.sips.h`,
mirroring how D1 resolves `world_size` from `sips.count`. Precedence:
explicit `w/h` (validated `w*h == count`) > square fallback
(`round(sqrt(count))²`, used only when no `w/h` is given) > error.
```python
sips = spec.get("system", {}).get("sips", {})
if sip_topo == "ring_1d":
w, h = 0, 0 # 1D sentinel (no grid)
elif sips.get("w") is not None and sips.get("h") is not None:
w, h = int(sips["w"]), int(sips["h"])
if w * h != n_sips:
raise ValueError(f"sip layout {w}x{h} != sips.count ({n_sips})")
else:
side = int(round(math.sqrt(n_sips)))
if side * side != n_sips:
raise ValueError("non-square sips.count requires explicit sips.w/h")
w, h = side, side
```
This lifts the earlier assumption that 2D SIP grids must be perfect
squares: a 6-SIP `torus_2d` / `mesh_2d_no_wrap` is now expressible as
`w: 3, h: 2` (or `2x3`). The derived `(w, h)` feed the algorithm's
inter-SIP exchange (consumed in ADR-0032 D5). The prior code path silently
took `round(sqrt(count))²` for any non-ring topology, which produced a
wrong grid (e.g. 2×2 for 6 SIPs); the explicit-`w/h` path with a
fail-loud fallback replaces that.
--- ---
## Dependencies ## Dependencies
+16 -11
View File
@@ -138,20 +138,24 @@ system:
``` ```
- `ring_1d`: n_sips-1 rounds of `send global_E / recv global_W`. - `ring_1d`: n_sips-1 rounds of `send global_E / recv global_W`.
- `torus_2d`: sqrt(n_sips)×sqrt(n_sips) wrapping mesh. Row ring on - `torus_2d`: `w × h` wrapping mesh. Row ring on `global_E/W` then col
`global_E/W` then col ring on `global_S/N`. ring on `global_S/N`.
- `mesh_2d_no_wrap`: square mesh without wrap-around. Chain reduce + - `mesh_2d_no_wrap`: `w × h` mesh without wrap-around. Chain reduce +
broadcast per dimension. broadcast per dimension.
2D variants require `n_sips` to be a perfect square. 2D grid dims `(w, h)` come from `system.sips.w/h` (ADR-0024 D5). A square
fallback (`round(sqrt(n_sips))²`) applies **only** when `w/h` are omitted,
so rectangular grids (e.g. 6 SIPs as `3×2`) are supported by giving
explicit `w/h`.
### D5. Process-group integration — `AhbmCCLBackend` ### D5. Process-group integration — `AhbmCCLBackend`
At `init_process_group` time the backend: At `init_process_group` time the backend:
1. Loads `ccl.yaml` + `topology.yaml`. 1. Loads `ccl.yaml` + `topology.yaml`.
2. Derives `sip_topo_kind, sip_topo_w, sip_topo_h` from 2. Derives `sip_topo_kind` from `system.sips.topology` via the algorithm
`system.sips.topology` using the algorithm module's `TOPO_NAME_TO_KIND`. module's `TOPO_NAME_TO_KIND`, and `sip_topo_w, sip_topo_h` from
`system.sips.w/h` with a square-only fallback (ADR-0024 D5).
3. Calls `configure_sfr_intercube_multisip(engine, spec, cfg)` — one-time 3. Calls `configure_sfr_intercube_multisip(engine, spec, cfg)` — one-time
SFR wiring, mirrors NCCL communicator creation. SFR wiring, mirrors NCCL communicator creation.
@@ -222,8 +226,10 @@ Modules loaded via `cfg["module"]` must export:
- **Per-PE allreduce** (intra-cube PE-to-PE reduce). Out of scope — the - **Per-PE allreduce** (intra-cube PE-to-PE reduce). Out of scope — the
workload for this algorithm is per-cube DP. workload for this algorithm is per-cube DP.
- **Asymmetric SIP topologies** (non-square mesh/torus). `torus_2d` and - **Square-grid fallback requires `n_sips = k²`**: rectangular SIP grids
`mesh_2d_no_wrap` require `n_sips = k²`. (non-square mesh/torus) are supported, but only when `system.sips.w/h`
are given explicitly (ADR-0024 D5). With `w/h` omitted, 2D topologies
fall back to a square grid and still require `n_sips = k²`.
- **Pipelined chunks**: single-tile per cube, no pipelining yet. - **Pipelined chunks**: single-tile per cube, no pipelining yet.
- **Root cube runtime election**: the kernel currently uses - **Root cube runtime election**: the kernel currently uses
`root_cube = (mesh_h // 2) * mesh_w + (mesh_w // 2)` — the geometric `root_cube = (mesh_h // 2) * mesh_w + (mesh_w // 2)` — the geometric
@@ -270,7 +276,6 @@ Modules loaded via `cfg["module"]` must export:
| `ccl.yaml` | Single `lrab_hierarchical_allreduce` entry | | `ccl.yaml` | Single `lrab_hierarchical_allreduce` entry |
| `topology.yaml` | Added `system.sips.topology` | | `topology.yaml` | Added `system.sips.topology` |
| `benches/ccl_allreduce.py` | Row-wise cube-mesh tensor layout | | `benches/ccl_allreduce.py` | Row-wise cube-mesh tensor layout |
| `tests/test_allreduce_multidevice.py` (new) | Config-driven ring/torus/mesh | | `tests/sccl/` (test package) | Config-driven ring/torus/mesh correctness + full `dist.all_reduce` path + latency/buffer-kind sweeps (evaluation harness — ADR-0043) |
| `tests/test_distributed_lrab_hierarchical_allreduce.py` (new) | Full `dist.all_reduce` path | | `tests/test_intercube_sfr_config.py` | SFR wiring verification |
| `tests/test_intercube_sfr_config.py` (new) | SFR wiring verification |
| Removed | `ring_allreduce.py`, `mesh_allreduce.py`, `tree_allreduce.py`, `hierarchical_allreduce.py`, `hello_send.py`, `testing.py` and their tests | | Removed | `ring_allreduce.py`, `mesh_allreduce.py`, `tree_allreduce.py`, `hierarchical_allreduce.py`, `hello_send.py`, `testing.py` and their tests |
+130
View File
@@ -0,0 +1,130 @@
# ADR-0043: Allreduce Evaluation Harness — `tests/sccl/`
## Status
Accepted
Documents the `tests/sccl/` evaluation harness; verified against the
implementation (constants, file set, and sweep dimensions cross-checked).
## Context
ADR-0032 defines the intercube all-reduce *algorithm*; ADR-0023/0024/0027
define the IPCQ backend, the rank=SIP launcher, and `mp.spawn`. None of
them describe **how the allreduce is exercised and characterized** — the
correctness tests, the latency/buffer-kind sweeps, and the derived plots.
ADR-0013 (verification strategy) is the general policy; this ADR pins the
concrete allreduce harness so the "evaluation" half of the work is
documented, not just the implementation.
The harness lives under `tests/sccl/` (the package created when the
allreduce tests were consolidated). It supersedes the earlier flat
`tests/test_allreduce_multidevice.py` + `tests/test_distributed_*` layout.
## Decision
### D1. Drive evaluation through the public `torch.distributed` path
Correctness and the sweeps run the collective through the real DDP-shaped
path — `init_process_group(backend="ahbm") → mp.spawn → dist.all_reduce`
(ADR-0024/0027) — not the lower-level `ctx.launch`. A shared helper
`_run_distributed(tmp_path, monkeypatch, topo_path, corr_id, n_elem)` in
`tests/sccl/_allreduce_helpers.py` builds the engine, runs the workers, and
returns `(engine, n_cubes)`. `monkeypatch.chdir` points the backend's
`load_ccl_config()` (cwd lookup) at a per-case temp `ccl.yaml`.
A direct-launch reference (`run_allreduce`) is retained in the same helper
module — not for the distributed tests, but because the IPCQ buffer-kind /
root-center micro-tests under `tests/` import it.
### D2. One file per evaluation concern
| File | Concern | `torch.distributed`? |
|---|---|---|
| `test_allreduce_ring_torus_mesh.py` | correctness across ring_1d / torus_2d (2×3) / mesh_2d_no_wrap (2×3) | yes |
| `test_distributed_default_topology.py` | full path on `topology.yaml` as-is | yes |
| `test_plot_latency_sweep.py` | latency sweep rows (n_elem × topology) | yes |
| `test_plot_buffer_kind_sweep.py` | TCM/SRAM/HBM sweep rows | yes |
| `test_plot_topology_diagram.py` | topology.png (pure matplotlib) | no |
| `test_plot_comparison_fsim.py` | broken-axis model-vs-FSIM comparison | no |
| `test_intercube_root_center.py` | ADR-0032 center-root latency guard (direct path) | no |
`_allreduce_helpers.py` holds the shared plumbing (driver, config writers,
sweep/buffer-kind constants, plot aggregators, topology-diagram + FSIM
comparison emitters). It is not collected (no `test_` prefix).
### D3. Latency metric — critical-path `pe_exec_ns`
The reported latency per config is `crit_ns = max(pe_exec_ns)` over
`engine._results` — the slowest rank's PE execution time. This is the
number plotted on every latency chart and recorded in `summary.csv`.
### D4. Sweep dimensions
- **Latency sweep**: `n_elem ∈ {8, 32, 64, 128, 512, 1024, 2048, 4096,
8192, 16384, 32768, 49152}` (16 excluded — collides with `n_cubes`) ×
topology ∈ {ring_1d (6), torus_2d 2×3 (6), mesh_2d_no_wrap 2×3 (6)}.
- **Buffer-kind sweep**: `buffer_kind ∈ {tcm, sram, hbm}` × a smaller
`n_elem` grid, on torus_2d 6-SIP (3×2). buffer_kind is set in the temp
`ccl.yaml` (read by the backend at `init_process_group`, ADR-0023 D6).
The 2×3 / 3×2 grids exercise the explicit-`w/h` SIP resolution
(ADR-0024 D5).
### D5. Derived plots via `pytest_sessionfinish` aggregators
Sweep tests are xdist-friendly: each parametrized case writes one JSON row
to a staging dir. The conftest `pytest_sessionfinish` hook (controller node
only) calls the aggregators in `_allreduce_helpers.py`:
- `_aggregate_sweep_plots()` → per-topology PNGs + `summary.csv`
- `aggregate_buffer_kind_plot()` → the TCM/SRAM/HBM comparison PNG + csv
The topology-diagram and FSIM-comparison figures are emitted directly by
their own `test_plot_*` tests (no row staging — they are pure functions of
`topology.yaml` and `summary.csv` respectively). All outputs land in
`docs/diagrams/allreduce_latency_plots/` and are **derived artifacts** per
CLAUDE.md (consistent-with-ADRs, no Phase-2 gate).
### D6. The FSIM comparison reference is a hardcoded constant
`emit_comparison_fsim_plot()` overlays the model curves against a single
external FSIM single-device reference (`366 µs`), held as a literal — there
is no external data file. The "measured" series comes from the simulator
(`op_log` GEMM count, `composite_window_ns`); the "theoretical" series is a
hand-derived analytical model (the same one ADR-0044 D5 flags as
ADR-unverified).
## Consequences
### Positive
- The allreduce is evaluated through the same API a real DDP script uses,
so the harness doubles as an integration test of ADR-0024/0027.
- Figures regenerate on every `pytest` run from committed data; no manual
plot step.
- Rectangular-grid sweeps gave the regression coverage that surfaced the
ADR-0024 D5 `w/h` fix.
### Negative / limitations
- The full latency sweep runs in the default `pytest` (~minutes); it is not
marked `slow`. (Contrast ADR-0044, where the GEMM sweep is `slow`.)
- `test_intercube_root_center.py` carries a latency *threshold* assertion
(ADR-0032 center-root guard) — the only absolute-latency assertion in the
suite; it is sensitive to latency-model changes (ADR-0033).
## Dependencies
- **ADR-0013**: verification strategy (general policy this specializes).
- **ADR-0023 / ADR-0024 / ADR-0027**: IPCQ backend, rank=SIP launcher,
`mp.spawn` — the path D1 drives.
- **ADR-0032**: the algorithm under evaluation; D4 grids exercise its
topology branches.
- **ADR-0044**: the sibling GEMM evaluation harness.
## Open questions
- Should the latency sweep be marked `slow` for parity with the GEMM sweep?
- Should the FSIM reference move from a hardcoded constant to a versioned
data file?
+130
View File
@@ -0,0 +1,130 @@
# ADR-0044: GEMM Evaluation Harness — `scripts/gemm_sweep.py` + `tests/gemm/`
## Status
Accepted
Documents the GEMM evaluation/characterization harness; verified against the
implementation (constants, tile sizes, figure set, and the script↔test
split cross-checked). The D5/D6 caveats are recorded limitations, not
inaccuracies.
## Context
ADR-0014 (PE pipeline) and ADR-0042 (tile-plan generators) define the GEMM
*implementation*; ADR-0033 defines the latency model. None of them describe
**how GEMM performance is swept and characterized** — the shape/variant
sweep that produces the timing data, and the figures that interpret it.
This ADR pins that harness.
Unlike the allreduce harness (ADR-0043), the GEMM sweep is **heavy** (24
sim runs: 8 shapes × 3 operand-staging variants; the `512` shape alone is
2048 tiles). That weight drives the split below.
## Decision
### D1. Two-layer split — heavy data generation (script) vs. fast figures (tests)
- **Data generation stays a manual script**: `scripts/gemm_sweep.py` runs
`matmul-composite` (ADR-0042 plans) across shapes × variants via the same
`run_bench` path the CLI uses, harvests `result.engine.op_log`, and
writes `docs/diagrams/gemm_sweep.json` (per-stage / per-engine wall-clock
+ occupancy + record counts + pe/composite windows).
- **Figure rendering is test-generated**: `tests/gemm/` reads the committed
`gemm_sweep.json` and renders matplotlib PNGs into
`docs/diagrams/gemm_plots/`. These tests are fast and run by default.
Rationale: a slide-deck-scale sim sweep does not belong in every `pytest`
run, but the figures (cheap, deterministic) should regenerate freely and be
guarded by CI. This mirrors CLAUDE.md's script-vs-test split (scripts for
heavy/manual generation; tests for fast assertions).
### D2. Slow regenerator test wraps the script
`tests/gemm/test_gemm_sweep.py` is marked `@pytest.mark.slow` (excluded by
the default `addopts: -m "not slow"`). It invokes `scripts/gemm_sweep.py`
via subprocess to regenerate `gemm_sweep.json` on demand
(`pytest -m slow tests/gemm/test_gemm_sweep.py`). The sweep logic has a
single home (the script); the test only wraps it, so there is no duplicated
sim-driving code.
### D3. Figure set (3 charts, `load_ref` variant)
| Test | PNG | Content |
|---|---|---|
| `test_plot_gemm_stage_breakdown.py` | `gemm_stage_breakdown.png` | per-stage engine wall-clock (DMA in / Fetch / GEMM / DMA out) |
| `test_plot_gemm_mac_utilization.py` | `gemm_mac_utilization_measured.png` | GEMM util % + useful eff % |
| `test_plot_gemm_mac_utilization.py` | `gemm_mac_utilization_theoretical_vs_measured.png` | theoretical vs simulator-measured util/eff |
`tests/gemm/_gemm_plot_helpers.py` holds the shared renderers (series logic
mirrors the GEMM `_render_*` functions in `scripts/build_overview_slides.py`,
which still draws these natively in the PPTX). Not collected (no `test_`
prefix). Each `test_plot_*` skips if `gemm_sweep.json` is absent.
### D4. Tile sizes are data-driven; under-tile shapes are flagged
Tile sizes are read from `gemm_sweep.json` (`tile_sizes`), which the sweep
records from `PeSchedulerComponent.TILE_M/K/N = 32/64/32` — the authoritative
source. Shapes with `M<TILE_M K<TILE_K N<TILE_N` are flagged
("under-tile") on the charts. The `512³` shape is excluded from the figures
(`EXCLUDED_SHAPES`).
### D5. Theoretical model — inherited constants, NOT yet ADR-verified
The "theoretical" curves use an analytical ideal-pipeline model with
constants copied verbatim from `scripts/build_overview_slides.py`:
```
HBM_GBS = 256.0 # GB/s T_STAGE = 16.0 ns
D_STAGES = 3 BPE = 2
```
**These are not yet sourced against the ADRs.** Notably ADR-0033's `256`
is `burst_bytes` (256 B), a *different* quantity than this `256 GB/s`, and
ADR-0033 derives bandwidth as `pc_bw_gbs = hbm_to_router_bw_gbs / num_pcs`.
`T_STAGE`/stage-count are not traced to ADR-0014 here. The model is
therefore **consistent with the existing deck script, not verified against
the ADRs**, and the constants are duplicated (deck + helper). Reconciling
them (source from topology/ADR-0033/0014, de-duplicate) is deferred — see
Open questions.
### D6. Known naming caveat — `_measured` chart
`gemm_mac_utilization_measured.png` currently plots the *theoretical*
ideal-pipeline numbers (its footnote says so), only the filename says
"measured". This is a known misnomer pending a decision to either repoint
its content to the simulator-measured series or retitle it.
## Consequences
### Positive
- GEMM figures are test-generated and CI-guarded, like allreduce.
- The heavy sweep stays opt-in, keeping the default test run fast.
- Single source for the sweep logic (the script), reused by the slow test.
### Negative / limitations
- The theoretical-model constants (D5) are unverified and duplicated.
- The `_measured` figure is a misnomer (D6).
- `build_overview_slides.py` still renders the GEMM bars natively from
`gemm_sweep.json` rather than embedding these PNGs — the deck rewiring to
consume the test artifacts is not done.
## Dependencies
- **ADR-0013**: verification strategy.
- **ADR-0014 / ADR-0042**: PE pipeline + tile-plan generators — the GEMM
implementation the sweep measures; D4's stage record counts come from
ADR-0042 D2/D3.
- **ADR-0033**: latency model — the source the D5 constants should (but do
not yet) trace to.
- **ADR-0043**: the sibling allreduce evaluation harness.
## Open questions
- Reconcile D5 constants against `topology.yaml` / ADR-0033 / ADR-0014 and
de-duplicate (one source for the model parameters)?
- Resolve the D6 `_measured` naming (repoint content vs. retitle)?
- Rewire `build_overview_slides.py` to embed the `gemm_plots/` PNGs instead
of native bar-drawing?
Binary file not shown.

Before

Width:  |  Height:  |  Size: 38 KiB

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

After

Width:  |  Height:  |  Size: 36 KiB

@@ -1,13 +1,13 @@
buffer_kind,sip_topology,n_sips,n_elem,bytes_per_pe,latency_ns buffer_kind,sip_topology,n_sips,n_elem,bytes_per_pe,latency_ns
hbm,torus_2d,6,128,256,2120.0399999999754 hbm,torus_2d,6,128,256,2120.040000000012
hbm,torus_2d,6,1024,2048,2716.74499999995 hbm,torus_2d,6,1024,2048,2717.2783333333473
hbm,torus_2d,6,8192,16384,7315.185000000081 hbm,torus_2d,6,8192,16384,7315.184999999989
hbm,torus_2d,6,32768,65536,23081.265000008738 hbm,torus_2d,6,32768,65536,23081.26500000037
sram,torus_2d,6,128,256,2060.0399999999754 sram,torus_2d,6,128,256,2060.040000000012
sram,torus_2d,6,1024,2048,2908.74499999995 sram,torus_2d,6,1024,2048,2909.2783333333473
sram,torus_2d,6,8192,16384,9523.185000000081 sram,torus_2d,6,8192,16384,9523.184999999869
sram,torus_2d,6,32768,65536,32201.265000008752 sram,torus_2d,6,32768,65536,32201.265000000385
tcm,torus_2d,6,128,256,1964.0399999999754 tcm,torus_2d,6,128,256,1964.040000000012
tcm,torus_2d,6,1024,2048,2476.74499999995 tcm,torus_2d,6,1024,2048,2477.2783333333473
tcm,torus_2d,6,8192,16384,6403.185000000081 tcm,torus_2d,6,8192,16384,6403.185000000109
tcm,torus_2d,6,32768,65536,19865.265000008738 tcm,torus_2d,6,32768,65536,19865.265000000378
1 buffer_kind sip_topology n_sips n_elem bytes_per_pe latency_ns
2 hbm torus_2d 6 128 256 2120.0399999999754 2120.040000000012
3 hbm torus_2d 6 1024 2048 2716.74499999995 2717.2783333333473
4 hbm torus_2d 6 8192 16384 7315.185000000081 7315.184999999989
5 hbm torus_2d 6 32768 65536 23081.265000008738 23081.26500000037
6 sram torus_2d 6 128 256 2060.0399999999754 2060.040000000012
7 sram torus_2d 6 1024 2048 2908.74499999995 2909.2783333333473
8 sram torus_2d 6 8192 16384 9523.185000000081 9523.184999999869
9 sram torus_2d 6 32768 65536 32201.265000008752 32201.265000000385
10 tcm torus_2d 6 128 256 1964.0399999999754 1964.040000000012
11 tcm torus_2d 6 1024 2048 2476.74499999995 2477.2783333333473
12 tcm torus_2d 6 8192 16384 6403.185000000081 6403.185000000109
13 tcm torus_2d 6 32768 65536 19865.265000008738 19865.265000000378
Binary file not shown.

Before

Width:  |  Height:  |  Size: 75 KiB

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 86 KiB

@@ -1,37 +1,37 @@
algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns algorithm,sip_topology,n_sips,n_elem,bytes_per_pe,bytes_per_sip,latency_ns
intercube_allreduce,mesh_2d_no_wrap,6,8,16,256,2666.5524999999725 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,8,16,256,2666.552500000015
intercube_allreduce,mesh_2d_no_wrap,6,32,64,1024,2747.7399999999725 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,32,64,1024,2747.7400000000152
intercube_allreduce,mesh_2d_no_wrap,6,64,128,2048,2855.98999999998 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,64,128,2048,2855.990000000018
intercube_allreduce,mesh_2d_no_wrap,6,128,256,4096,3072.4899999999725 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,128,256,4096,3072.490000000019
intercube_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3336.579999999951 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,512,1024,16384,3337.1133333333582
intercube_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3707.49999999992 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,1024,2048,32768,3708.0333333333692
intercube_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4449.339999999875 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,2048,4096,65536,4449.873333333393
intercube_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,5933.020000000055 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,4096,8192,131072,5933.020000000124
intercube_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,8900.380000000157 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,8192,16384,262144,8900.379999999863
intercube_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,14835.099999997583 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,16384,32768,524288,14835.099999999224
intercube_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,26704.540000017492 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,32768,65536,1048576,26704.540000000765
intercube_allreduce,mesh_2d_no_wrap,6,49152,98304,1572864,38573.980000026335 lrab_hierarchical_allreduce,mesh_2d_no_wrap,6,49152,98304,1572864,38573.97999999701
intercube_allreduce,ring_1d,6,8,16,256,2365.2558333333036 lrab_hierarchical_allreduce,ring_1d,6,8,16,256,2365.255833333347
intercube_allreduce,ring_1d,6,32,64,1024,2436.9433333333036 lrab_hierarchical_allreduce,ring_1d,6,32,64,1024,2436.9433333333473
intercube_allreduce,ring_1d,6,64,128,2048,2532.526666666643 lrab_hierarchical_allreduce,ring_1d,6,64,128,2048,2532.526666666683
intercube_allreduce,ring_1d,6,128,256,4096,2723.6933333333036 lrab_hierarchical_allreduce,ring_1d,6,128,256,4096,2723.693333333349
intercube_allreduce,ring_1d,6,512,1024,16384,3042.0349999999544 lrab_hierarchical_allreduce,ring_1d,6,512,1024,16384,3048.635000000021
intercube_allreduce,ring_1d,6,1024,2048,32768,3390.201666666597 lrab_hierarchical_allreduce,ring_1d,6,1024,2048,32768,3393.4016666666957
intercube_allreduce,ring_1d,6,2048,4096,65536,4079.7349999998714 lrab_hierarchical_allreduce,ring_1d,6,2048,4096,65536,4082.401666666714
intercube_allreduce,ring_1d,6,4096,8192,131072,5458.801666666721 lrab_hierarchical_allreduce,ring_1d,6,4096,8192,131072,5458.80166666677
intercube_allreduce,ring_1d,6,8192,16384,262144,8216.93500000014 lrab_hierarchical_allreduce,ring_1d,6,8192,16384,262144,8216.934999999943
intercube_allreduce,ring_1d,6,16384,32768,524288,13733.201666664638 lrab_hierarchical_allreduce,ring_1d,6,16384,32768,524288,13733.201666665835
intercube_allreduce,ring_1d,6,32768,65536,1048576,24765.735000014545 lrab_hierarchical_allreduce,ring_1d,6,32768,65536,1048576,24765.73500000064
intercube_allreduce,ring_1d,6,49152,98304,1572864,35798.268333355256 lrab_hierarchical_allreduce,ring_1d,6,49152,98304,1572864,35798.268333331536
intercube_allreduce,torus_2d,6,8,16,256,1700.6024999999754 lrab_hierarchical_allreduce,torus_2d,6,8,16,256,1700.6025000000095
intercube_allreduce,torus_2d,6,32,64,1024,1753.2899999999754 lrab_hierarchical_allreduce,torus_2d,6,32,64,1024,1753.2900000000102
intercube_allreduce,torus_2d,6,64,128,2048,1823.539999999979 lrab_hierarchical_allreduce,torus_2d,6,64,128,2048,1823.540000000012
intercube_allreduce,torus_2d,6,128,256,4096,1964.0399999999754 lrab_hierarchical_allreduce,torus_2d,6,128,256,4096,1964.040000000012
intercube_allreduce,torus_2d,6,512,1024,16384,2196.2849999999653 lrab_hierarchical_allreduce,torus_2d,6,512,1024,16384,2196.8183333333463
intercube_allreduce,torus_2d,6,1024,2048,32768,2476.74499999995 lrab_hierarchical_allreduce,torus_2d,6,1024,2048,32768,2477.2783333333473
intercube_allreduce,torus_2d,6,2048,4096,65536,3037.664999999919 lrab_hierarchical_allreduce,torus_2d,6,2048,4096,65536,3038.1983333333583
intercube_allreduce,torus_2d,6,4096,8192,131072,4159.50500000003 lrab_hierarchical_allreduce,torus_2d,6,4096,8192,131072,4159.5050000000665
intercube_allreduce,torus_2d,6,8192,16384,262144,6403.185000000081 lrab_hierarchical_allreduce,torus_2d,6,8192,16384,262144,6403.185000000109
intercube_allreduce,torus_2d,6,16384,32768,524288,10890.544999998769 lrab_hierarchical_allreduce,torus_2d,6,16384,32768,524288,10890.5449999995
intercube_allreduce,torus_2d,6,32768,65536,1048576,19865.265000008738 lrab_hierarchical_allreduce,torus_2d,6,32768,65536,1048576,19865.265000000378
intercube_allreduce,torus_2d,6,49152,98304,1572864,28839.985000013185 lrab_hierarchical_allreduce,torus_2d,6,49152,98304,1572864,28839.98500000059
1 algorithm sip_topology n_sips n_elem bytes_per_pe bytes_per_sip latency_ns
2 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 8 16 256 2666.5524999999725 2666.552500000015
3 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 32 64 1024 2747.7399999999725 2747.7400000000152
4 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 64 128 2048 2855.98999999998 2855.990000000018
5 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 128 256 4096 3072.4899999999725 3072.490000000019
6 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 512 1024 16384 3336.579999999951 3337.1133333333582
7 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 1024 2048 32768 3707.49999999992 3708.0333333333692
8 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 2048 4096 65536 4449.339999999875 4449.873333333393
9 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 4096 8192 131072 5933.020000000055 5933.020000000124
10 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 8192 16384 262144 8900.380000000157 8900.379999999863
11 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 16384 32768 524288 14835.099999997583 14835.099999999224
12 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 32768 65536 1048576 26704.540000017492 26704.540000000765
13 intercube_allreduce lrab_hierarchical_allreduce mesh_2d_no_wrap 6 49152 98304 1572864 38573.980000026335 38573.97999999701
14 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 8 16 256 2365.2558333333036 2365.255833333347
15 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 32 64 1024 2436.9433333333036 2436.9433333333473
16 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 64 128 2048 2532.526666666643 2532.526666666683
17 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 128 256 4096 2723.6933333333036 2723.693333333349
18 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 512 1024 16384 3042.0349999999544 3048.635000000021
19 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 1024 2048 32768 3390.201666666597 3393.4016666666957
20 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 2048 4096 65536 4079.7349999998714 4082.401666666714
21 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 4096 8192 131072 5458.801666666721 5458.80166666677
22 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 8192 16384 262144 8216.93500000014 8216.934999999943
23 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 16384 32768 524288 13733.201666664638 13733.201666665835
24 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 32768 65536 1048576 24765.735000014545 24765.73500000064
25 intercube_allreduce lrab_hierarchical_allreduce ring_1d 6 49152 98304 1572864 35798.268333355256 35798.268333331536
26 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 8 16 256 1700.6024999999754 1700.6025000000095
27 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 32 64 1024 1753.2899999999754 1753.2900000000102
28 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 64 128 2048 1823.539999999979 1823.540000000012
29 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 128 256 4096 1964.0399999999754 1964.040000000012
30 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 512 1024 16384 2196.2849999999653 2196.8183333333463
31 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 1024 2048 32768 2476.74499999995 2477.2783333333473
32 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 2048 4096 65536 3037.664999999919 3038.1983333333583
33 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 4096 8192 131072 4159.50500000003 4159.5050000000665
34 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 8192 16384 262144 6403.185000000081 6403.185000000109
35 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 16384 32768 524288 10890.544999998769 10890.5449999995
36 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 32768 65536 1048576 19865.265000008738 19865.265000000378
37 intercube_allreduce lrab_hierarchical_allreduce torus_2d 6 49152 98304 1572864 28839.985000013185 28839.98500000059
Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

-176
View File
@@ -1,176 +0,0 @@
"""One-shot: render the broken-y-axis allreduce comparison with the FSIM
single-device reference. Reads docs/diagrams/allreduce_latency_plots/summary.csv
and writes comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png
alongside it.
This is a derived-artifact generator (per CLAUDE.md): plotting only, no production
or test logic touched.
"""
from __future__ import annotations
import csv
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
ROOT = Path(__file__).resolve().parent.parent
PLOT_DIR = ROOT / "docs" / "diagrams" / "allreduce_latency_plots"
CSV_PATH = PLOT_DIR / "summary.csv"
EXT_LABEL = "FSIM (single device): 366 µs"
EXT_LATENCY_NS = 366_000.0
COLORS = {
"ring_1d": "tab:blue",
"torus_2d": "tab:orange",
"mesh_2d_no_wrap": "tab:green",
}
# Display labels (data keys above stay as the summary.csv sip_topology
# values; these are only the human-readable legend strings). All non-FSIM
# runs use 6 devices; the grid differs per topology.
DISPLAY = {
"ring_1d": "Ring 1x6 (6 devices)",
"torus_2d": "2D Torus 2x3 (6 devices)",
"mesh_2d_no_wrap": "2D Mesh 2x3 (6 devices)",
}
# Hand-derived theoretical model for torus_2d (6 SIPs). Mirrors
# _aggregate_sweep_plots in tests/test_allreduce_multidevice.py.
NOC_PACKET_BYTES = 128
PES_PER_CUBE = 8
T_STARTUP_NS = 1346.0
TAU_NS = (8741.0 - 1346.0) / (6144 - 1)
def _theoretical_torus_2d_ns(bytes_per_pe: int) -> float:
bytes_per_cube = int(bytes_per_pe) * PES_PER_CUBE
n_packets = max(1, -(-bytes_per_cube // NOC_PACKET_BYTES))
return T_STARTUP_NS + (n_packets - 1) * TAU_NS
def _plot_theoretical(ax, records):
torus_rs = sorted(
[r for r in records if r["sip_topology"] == "torus_2d"],
key=lambda r: r["bytes_per_pe"],
)
if not torus_rs:
return
ax.plot(
[r["bytes_per_pe"] for r in torus_rs],
[_theoretical_torus_2d_ns(r["bytes_per_pe"]) for r in torus_rs],
color="tab:red", linestyle="--", linewidth=1.6, marker="x",
label="Theoretical 2D Torus 2x3",
)
def _bytes_fmt(x, _pos):
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f}M"
if x >= 1024:
return f"{x / 1024:.0f}K"
return f"{int(x)}"
def _load_records():
rows = []
with open(CSV_PATH, newline="") as f:
r = csv.DictReader(f)
for row in r:
rows.append({
"sip_topology": row["sip_topology"],
"bytes_per_pe": int(row["bytes_per_pe"]),
"latency_ns": float(row["latency_ns"]),
})
return rows
def _ext_x(records):
"""Anchor the external reference at the largest payload (96 KB / PE)."""
return max(r["bytes_per_pe"] for r in records)
def _plot_curves(ax, records, topologies):
for topo in topologies:
rs = sorted([r for r in records if r["sip_topology"] == topo],
key=lambda r: r["bytes_per_pe"])
if not rs:
continue
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o",
label=DISPLAY.get(topo, topo),
color=COLORS.get(topo),
)
def emit_broken(records):
topologies = sorted({r["sip_topology"] for r in records})
max_local = max(r["latency_ns"] for r in records)
fig, (ax_top, ax_bot) = plt.subplots(
2, 1, sharex=True,
gridspec_kw={"height_ratios": [1, 4], "hspace": 0.05},
figsize=(9, 6.5),
)
# Bottom panel: today's three curves + theoretical, linear y.
_plot_curves(ax_bot, records, topologies)
_plot_theoretical(ax_bot, records)
ax_bot.set_ylim(0, max_local * 1.10)
# Top panel: only the external reference marker, linear y around 366 µs.
ax_top.scatter(
[_ext_x(records)], [EXT_LATENCY_NS],
marker="*", s=240, color="tab:red", zorder=5,
label=EXT_LABEL,
)
ax_top.set_ylim(EXT_LATENCY_NS * 0.93, EXT_LATENCY_NS * 1.05)
# Hide the spine between the two panels and draw diagonal "break" ticks.
ax_top.spines["bottom"].set_visible(False)
ax_bot.spines["top"].set_visible(False)
ax_top.tick_params(labeltop=False, bottom=False)
ax_bot.xaxis.tick_bottom()
d = 0.012 # diagonal-tick size, in axis-fraction
kw = dict(transform=ax_top.transAxes, color="k", clip_on=False, lw=1)
ax_top.plot((-d, +d), (-d, +d), **kw)
ax_top.plot((1 - d, 1 + d), (-d, +d), **kw)
kw.update(transform=ax_bot.transAxes)
ax_bot.plot((-d, +d), (1 - d * 4, 1 + d * 4), **kw)
ax_bot.plot((1 - d, 1 + d), (1 - d * 4, 1 + d * 4), **kw)
ax_bot.set_xscale("log", base=2)
ax_bot.set_xlabel("Bytes per PE (log scale)")
ax_bot.set_ylabel("Time (ns)")
ax_top.set_ylabel("Time (ns)")
ax_bot.grid(True, alpha=0.3)
ax_top.grid(True, alpha=0.3)
ax_bot.xaxis.set_major_formatter(mticker.FuncFormatter(_bytes_fmt))
# One legend covering both axes.
handles_bot, labels_bot = ax_bot.get_legend_handles_labels()
handles_top, labels_top = ax_top.get_legend_handles_labels()
ax_bot.legend(handles_bot + handles_top, labels_bot + labels_top,
loc="upper left")
fig.suptitle("Multidevice allreduce (ring, Mesh, 2DTorus) vs FSIM latency")
fig.tight_layout()
out = PLOT_DIR / "comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png"
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def main():
records = _load_records()
if not records:
raise SystemExit(f"no rows in {CSV_PATH}")
emit_broken(records)
if __name__ == "__main__":
main()
+13
View File
@@ -59,10 +59,23 @@ class AhbmCCLBackend:
self._sip_topo_kind = topo_map.get(self._sip_topo, 0) self._sip_topo_kind = topo_map.get(self._sip_topo, 0)
else: else:
self._sip_topo_kind = 0 self._sip_topo_kind = 0
sips = spec.get("system", {}).get("sips", {})
if self._sip_topo == "ring_1d": if self._sip_topo == "ring_1d":
self._sip_topo_w, self._sip_topo_h = 0, 0 self._sip_topo_w, self._sip_topo_h = 0, 0
elif sips.get("w") is not None and sips.get("h") is not None:
w, h = int(sips["w"]), int(sips["h"])
if w * h != self._n_sips:
raise ValueError(
f"sip layout {w}x{h} != sips.count ({self._n_sips})"
)
self._sip_topo_w, self._sip_topo_h = w, h
else: else:
side = int(round(math.sqrt(self._n_sips))) side = int(round(math.sqrt(self._n_sips)))
if side * side != self._n_sips:
raise ValueError(
f"SIP topology '{self._sip_topo}' requires square "
f"sips.count or explicit sips.w/h, got {self._n_sips}"
)
self._sip_topo_w, self._sip_topo_h = side, side self._sip_topo_w, self._sip_topo_h = side, side
# IPCQ install: wire all pe0s across all cubes and SIPs # IPCQ install: wire all pe0s across all cubes and SIPs
+2 -2
View File
@@ -46,8 +46,8 @@ def pytest_sessionfinish(session, exitstatus):
except Exception as e: except Exception as e:
print(f"[conftest] aggregator {attr}() in {name} failed: {e}") print(f"[conftest] aggregator {attr}() in {name} failed: {e}")
_exec("test_allreduce_multidevice.py", "_aggregate_sweep_plots") _exec("sccl/_allreduce_helpers.py", "_aggregate_sweep_plots")
_exec("test_allreduce_buffer_kind_sweep.py", "aggregate_buffer_kind_plot") _exec("sccl/_allreduce_helpers.py", "aggregate_buffer_kind_plot")
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
+283
View File
@@ -0,0 +1,283 @@
"""Shared plotting plumbing for the GEMM figure tests.
Not a test module (no ``test_`` prefix -> pytest does not collect it).
Reads the committed ``docs/diagrams/gemm_sweep.json`` (produced by the heavy
``scripts/gemm_sweep.py`` sim sweep) and renders matplotlib PNGs into
``docs/diagrams/gemm_plots/``. No simulation here -> the figure tests are fast
and run by default; regenerating the underlying data stays a manual script.
Chart set (mirrors the GEMM MAC slides in scripts/build_overview_slides.py):
- stage breakdown (load_ref operand staging)
- MAC utilization — measured (load_ref)
- MAC utilization — theoretical vs measured (load_ref)
"""
from __future__ import annotations
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent.parent
GEMM_SWEEP_JSON = ROOT / "docs" / "diagrams" / "gemm_sweep.json"
GEMM_PLOTS_DIR = ROOT / "docs" / "diagrams" / "gemm_plots"
# Shapes excluded from the figures (mirrors build_overview_slides).
EXCLUDED_SHAPES = {(512, 512, 512)}
# Stage bars shown (raw op_log stage_type keys) + display names + colors.
STAGE_KEYS = ["DMA_READ", "FETCH", "GEMM", "DMA_WRITE"]
STAGE_DISPLAY = {
"DMA_READ": "DMA in",
"FETCH": "Fetch",
"GEMM": "GEMM",
"DMA_WRITE": "DMA out",
}
STAGE_COLORS = {
"DMA_READ": "#3B82F6",
"FETCH": "#10B981",
"GEMM": "#F59E0B",
"DMA_WRITE": "#A855F7",
}
# MAC-utilization model constants (mirror build_overview_slides).
_HBM_GBS = 256.0
_BPE = 2
_T_STAGE = 16.0
_D_STAGES = 3
_PLOT_VARIANT = "load_ref"
def _load_sweep_data() -> dict:
if not GEMM_SWEEP_JSON.exists():
return {"rows": []}
data = json.loads(GEMM_SWEEP_JSON.read_text())
data["rows"] = [
r for r in data.get("rows", [])
if (r["M"], r["K"], r["N"]) not in EXCLUDED_SHAPES
]
return data
def _shape_label(r: dict) -> str:
if r["M"] == r["K"] == r["N"]:
return f"M=K=N={r['M']}"
return f"M={r['M']} K={r['K']} N={r['N']}"
def _under_tile(M, K, N, tile_M, tile_K, tile_N) -> bool:
return M < tile_M or K < tile_K or N < tile_N
def _xtick_labels(shape_labels, tile_counts, flagged) -> list[str]:
out = []
for lbl, tc, fl in zip(shape_labels, tile_counts, flagged):
s = f"{lbl}\n({tc} tiles)"
if fl:
s += " *"
out.append(s)
return out
def _grouped_bar_png(
out_name: str, *, title: str, subtitle: str | None,
shape_labels, tile_counts, flagged, series: dict, colors: dict,
y_label: str, threshold: float | None = None, footnote: str | None = None,
) -> str:
"""Render one grouped-bar chart to GEMM_PLOTS_DIR/out_name; return the path."""
import matplotlib.pyplot as plt
import numpy as np
n_groups = len(shape_labels)
n_series = max(1, len(series))
x = np.arange(n_groups)
width = 0.8 / n_series
fig, ax = plt.subplots(figsize=(11, 6))
for i, (name, vals) in enumerate(series.items()):
offset = (i - (n_series - 1) / 2) * width
ax.bar(x + offset, vals, width, label=name, color=colors.get(name))
ax.set_xticks(x)
ax.set_xticklabels(
_xtick_labels(shape_labels, tile_counts, flagged), fontsize=8,
)
ax.set_ylabel(y_label)
ax.set_title(title, fontsize=13, fontweight="bold")
if subtitle:
ax.text(0.5, 1.01, subtitle, transform=ax.transAxes, ha="center",
va="bottom", fontsize=8, color="#475569")
if threshold is not None:
ax.axhline(threshold, ls="--", color="gray", lw=1.0)
ax.legend(fontsize=8, loc="upper right")
ax.grid(True, axis="y", alpha=0.3)
caption = "* = under-tile shape (M<TILE_M, K<TILE_K, or N<TILE_N)"
if footnote:
caption = footnote + "\n" + caption
fig.text(0.5, 0.01, caption, ha="center", fontsize=7, color="gray",
wrap=True)
fig.tight_layout(rect=(0, 0.05, 1, 1))
GEMM_PLOTS_DIR.mkdir(parents=True, exist_ok=True)
out = GEMM_PLOTS_DIR / out_name
fig.savefig(out, dpi=120)
plt.close(fig)
return str(out)
# ── individual chart renderers (read sweep JSON, emit one PNG each) ─────
def emit_stage_breakdown() -> str | None:
"""Per-stage engine wall-clock per shape (load_ref operand staging)."""
data = _load_sweep_data()
rows = [r for r in data["rows"] if r.get("variant") == _PLOT_VARIANT]
if not rows:
return None
tile = data["tile_sizes"]
shape_labels = [_shape_label(r) for r in rows]
flagged = [_under_tile(r["M"], r["K"], r["N"], tile["M"], tile["K"], tile["N"])
for r in rows]
tile_counts = [r["tile_count_expected"] for r in rows]
series = {
STAGE_DISPLAY[s]: [r.get("stages", {}).get(s, {}).get("wall_ns", 0.0)
for r in rows]
for s in STAGE_KEYS
}
colors = {STAGE_DISPLAY[s]: STAGE_COLORS[s] for s in STAGE_KEYS}
return _grouped_bar_png(
"gemm_stage_breakdown.png",
title="GEMM stage breakdown",
subtitle=(f"Per-stage engine wall-clock (DMA in / Fetch / GEMM / "
f"DMA out), {_PLOT_VARIANT} staging. "
f"Tile {tile['M']}x{tile['K']}x{tile['N']}."),
shape_labels=shape_labels, tile_counts=tile_counts, flagged=flagged,
series=series, colors=colors, y_label="ns",
footnote="Bars = engine wall-clock interval (merged overlaps).",
)
def emit_mac_utilization_measured() -> str | None:
"""GEMM util % and useful pipeline-eff % (analytical model, load_ref)."""
data = _load_sweep_data()
rows = data["rows"]
if not rows:
return None
tile = data["tile_sizes"]
TILE_M, TILE_K, TILE_N = tile["M"], tile["K"], tile["N"]
tile_flops = 2 * TILE_M * TILE_K * TILE_N
dma_w_per_pair = (TILE_M * TILE_N * _BPE) / _HBM_GBS
head_ns = (_D_STAGES - 1) * _T_STAGE
by_shape = {(r["M"], r["K"], r["N"]): r
for r in rows if r["variant"] == _PLOT_VARIANT}
shapes = list(by_shape)
if not shapes:
return None
shape_labels = [_shape_label(by_shape[k]) for k in shapes]
flagged = [_under_tile(*k, TILE_M, TILE_K, TILE_N) for k in shapes]
tile_counts = [by_shape[k]["tile_count_expected"] for k in shapes]
gemm_util, useful_eff = [], []
for k in shapes:
r = by_shape[k]
M, K, N = r["M"], r["K"], r["N"]
useful = 2 * M * K * N
tiles = r["tile_count_expected"]
gu = useful / (tile_flops * tiles) * 100
gemm_util.append(gu)
m_tiles = (M + TILE_M - 1) // TILE_M
n_tiles = (N + TILE_N - 1) // TILE_N
n_mn = m_tiles * n_tiles
compute_total = tiles * _T_STAGE
wall = head_ns + tiles * _T_STAGE + max(0, n_mn - 1) * dma_w_per_pair
ueff = (compute_total * (gu / 100.0) / wall) * 100 if wall > 0 else 0.0
useful_eff.append(ueff)
series = {"GEMM util %": gemm_util, "Useful eff %": useful_eff}
colors = {"GEMM util %": "#10B981", "Useful eff %": "#F59E0B"}
return _grouped_bar_png(
"gemm_mac_utilization_measured.png",
title="GEMM MAC utilization — load_ref",
subtitle=("GEMM util = useful FLOPs / (tile FLOPs x tiles); "
"Useful eff = GEMM util x ideal pipeline efficiency."),
shape_labels=shape_labels, tile_counts=tile_counts, flagged=flagged,
series=series, colors=colors, y_label="%", threshold=100.0,
footnote="Theoretical ideal-pipeline model (not simulator data).",
)
def emit_mac_utilization_theoretical_vs_measured() -> str | None:
"""Theoretical vs simulator-measured GEMM util / useful eff (load_ref)."""
data = _load_sweep_data()
rows = data["rows"]
if not rows:
return None
tile = data["tile_sizes"]
TILE_M, TILE_K, TILE_N = tile["M"], tile["K"], tile["N"]
tile_flops = 2 * TILE_M * TILE_K * TILE_N
dma_w_per_pair = (TILE_M * TILE_N * _BPE) / _HBM_GBS
head_ns = (_D_STAGES - 1) * _T_STAGE
peak_per_ns = tile_flops / _T_STAGE
by_shape = {(r["M"], r["K"], r["N"]): r
for r in rows if r["variant"] == _PLOT_VARIANT}
shapes = list(by_shape)
if not shapes:
return None
shape_labels = [_shape_label(by_shape[k]) for k in shapes]
flagged = [_under_tile(*k, TILE_M, TILE_K, TILE_N) for k in shapes]
tile_counts = [by_shape[k]["tile_count_expected"] for k in shapes]
gu_t, gu_m, eff_t, eff_m = [], [], [], []
for k in shapes:
r = by_shape[k]
M, K, N = r["M"], r["K"], r["N"]
useful = 2 * M * K * N
tiles = r["tile_count_expected"]
gut = useful / (tile_flops * tiles)
gu_t.append(gut * 100)
rec = r.get("stages", {}).get("GEMM", {}).get("record_count", 0) or tiles
gu_m.append((useful / (tile_flops * rec) * 100) if rec else 0.0)
m_tiles = (M + TILE_M - 1) // TILE_M
n_tiles = (N + TILE_N - 1) // TILE_N
n_mn = m_tiles * n_tiles
compute_total = tiles * _T_STAGE
wall_t = head_ns + compute_total + max(0, n_mn - 1) * dma_w_per_pair
eff_t.append((compute_total * gut / wall_t * 100) if wall_t > 0 else 0.0)
cw = r.get("composite_window_ns", 0.0) or 0.0
eff_m.append((useful / cw / peak_per_ns * 100) if cw > 0 else 0.0)
series = {
"GEMM util % (theoretical)": gu_t,
"GEMM util % (measured)": gu_m,
"Theoretical eff %": eff_t,
"Measured eff %": eff_m,
}
colors = {
"GEMM util % (theoretical)": "#10B981",
"GEMM util % (measured)": "#6EE7B7",
"Theoretical eff %": "#F59E0B",
"Measured eff %": "#3B82F6",
}
return _grouped_bar_png(
"gemm_mac_utilization_theoretical_vs_measured.png",
title="GEMM MAC utilization — theoretical vs measured (load_ref)",
subtitle=("theoretical model vs simulator op_log; agreement "
"validates the analytical pipeline model."),
shape_labels=shape_labels, tile_counts=tile_counts, flagged=flagged,
series=series, colors=colors, y_label="%", threshold=100.0,
)
def emit_all_gemm_plots() -> list[str]:
"""Render every GEMM figure that has data; return the list of paths written."""
paths = []
for fn in (emit_stage_breakdown,
emit_mac_utilization_measured,
emit_mac_utilization_theoretical_vs_measured):
p = fn()
if p:
paths.append(p)
return paths
+36
View File
@@ -0,0 +1,36 @@
"""Regenerate docs/diagrams/gemm_sweep.json by running the GEMM sweep.
Heavy: drives matmul-composite across all shapes x variants through the
simulator (24 runs; the 512 shape alone is 2048 tiles). Marked ``slow`` so it
is excluded from the default ``pytest`` run (addopts: -m "not slow") and runs
on demand:
pytest -m slow tests/gemm/test_gemm_sweep.py
Delegates to scripts/gemm_sweep.py (the single source of the sweep logic) via
subprocess so there is no duplicated sim-driving code.
"""
from __future__ import annotations
import subprocess
import sys
from pathlib import Path
import pytest
from tests.gemm._gemm_plot_helpers import GEMM_SWEEP_JSON, ROOT
@pytest.mark.slow
def test_gemm_sweep_regenerates_json():
script = ROOT / "scripts" / "gemm_sweep.py"
assert script.exists(), f"missing {script}"
proc = subprocess.run(
[sys.executable, str(script)],
cwd=str(ROOT), capture_output=True, text=True,
)
assert proc.returncode == 0, (
f"gemm_sweep.py failed (rc={proc.returncode})\n"
f"stdout:\n{proc.stdout[-2000:]}\nstderr:\n{proc.stderr[-2000:]}"
)
assert Path(GEMM_SWEEP_JSON).exists()
@@ -0,0 +1,35 @@
"""Emit the GEMM MAC-utilization bar charts.
A measured chart (load_ref) plus the theoretical-vs-measured overlay (load_ref).
Reads docs/diagrams/gemm_sweep.json and writes gemm_mac_utilization*.png into
docs/diagrams/gemm_plots/.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tests.gemm._gemm_plot_helpers import (
GEMM_SWEEP_JSON,
emit_mac_utilization_measured,
emit_mac_utilization_theoretical_vs_measured,
)
@pytest.mark.skipif(
not GEMM_SWEEP_JSON.exists(),
reason="gemm_sweep.json absent; run scripts/gemm_sweep.py first",
)
def test_plot_gemm_mac_utilization_measured():
out = emit_mac_utilization_measured()
assert out is not None and Path(out).exists()
@pytest.mark.skipif(
not GEMM_SWEEP_JSON.exists(),
reason="gemm_sweep.json absent; run scripts/gemm_sweep.py first",
)
def test_plot_gemm_mac_utilization_theoretical_vs_measured():
out = emit_mac_utilization_theoretical_vs_measured()
assert out is not None and Path(out).exists()
@@ -0,0 +1,24 @@
"""Emit the GEMM per-stage engine wall-clock bar chart (load_ref).
Reads docs/diagrams/gemm_sweep.json (run scripts/gemm_sweep.py to refresh it)
and writes gemm_stage_breakdown.png into docs/diagrams/gemm_plots/.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tests.gemm._gemm_plot_helpers import (
GEMM_SWEEP_JSON,
emit_stage_breakdown,
)
@pytest.mark.skipif(
not GEMM_SWEEP_JSON.exists(),
reason="gemm_sweep.json absent; run scripts/gemm_sweep.py first",
)
def test_plot_gemm_stage_breakdown():
out = emit_stage_breakdown()
assert out is not None and Path(out).exists()
@@ -1,25 +1,193 @@
"""Config-driven multi-device allreduce test application. """Shared plumbing for the sccl allreduce tests.
Reads ``ccl.yaml`` + ``topology.yaml``, dynamically loads the kernel Not a test module (no ``test_`` prefix pytest does not collect it).
module from ``ccl.yaml module``, and picks the inter-SIP exchange Holds the distributed driver, the direct-launch parity reference, the
pattern from ``topology.yaml system.sips.topology``. config writers, the sweep/buffer-kind constants, the plot aggregators
(called from ``conftest.pytest_sessionfinish``), and the topology-diagram
Run directly:: emitter. The per-test files under ``tests/sccl/`` import from here, as do
the external buffer-kind / root-center tests under ``tests/``.
python -m pytest tests/allreduce_app.py -v -s
""" """
from __future__ import annotations from __future__ import annotations
import importlib import importlib
import math import math
import textwrap
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import numpy as np import numpy as np
import pytest
import yaml
from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config from kernbench.ccl.install import load_ccl_config, resolve_algorithm_config
from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip from kernbench.ccl.sfr_config import configure_sfr_intercube_multisip
from kernbench.policy.placement.dp import DPPolicy from kernbench.policy.placement.dp import DPPolicy
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent.parent / "topology.yaml"
DEFAULT_N_ELEM = 8
# ── config writers ────────────────────────────────────────────────────
def _write_ccl_yaml(tmp_path) -> str:
body = textwrap.dedent("""\
defaults:
algorithm: lrab_hierarchical_allreduce
buffer_kind: tcm
backpressure: sleep
n_slots: 4
slot_size: 4096
vc_chunk_size: 256
ipcq_credit_size_bytes: 16
algorithms:
lrab_hierarchical_allreduce:
module: kernbench.ccl.algorithms.lrab_hierarchical_allreduce
topology: none
buffer_kind: tcm
n_elem: 8
root_cube: 15
""")
(tmp_path / "ccl.yaml").write_text(body)
return str(tmp_path)
def _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
sip_w=None, sip_h=None,
):
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
with open(TOPOLOGY_PATH) as f:
topo_cfg = yaml.safe_load(f)
topo_cfg["system"]["sips"]["count"] = n_sips
topo_cfg["system"]["sips"]["topology"] = sip_topology
if sip_w is not None and sip_h is not None:
topo_cfg["system"]["sips"]["w"] = int(sip_w)
topo_cfg["system"]["sips"]["h"] = int(sip_h)
else:
topo_cfg["system"]["sips"].pop("w", None)
topo_cfg["system"]["sips"].pop("h", None)
topo_path = tmp_path / "topology.yaml"
with open(topo_path, "w") as f:
yaml.dump(topo_cfg, f, default_flow_style=False)
ccl_path = Path(__file__).parent.parent.parent / "ccl.yaml"
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg["defaults"]["algorithm"] = algorithm
if n_elem_override is not None:
ccl_cfg.setdefault("algorithms", {}).setdefault(
algorithm, {},
)["n_elem"] = int(n_elem_override)
# Ensure IPCQ slot is big enough for the per-message payload.
per_msg_bytes = int(n_elem_override) * 2 # f16
default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096))
if per_msg_bytes > default_slot:
ccl_cfg["defaults"]["slot_size"] = per_msg_bytes
tmp_ccl = tmp_path / "ccl.yaml"
with open(tmp_ccl, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
return str(topo_path), str(tmp_ccl)
# ── distributed driver (init_process_group → mp.spawn → all_reduce) ────
def _worker(rank: int, n_cubes: int, n_elem: int, n_sips: int, torch) -> None:
"""Per-SIP worker: allocate, fill, all_reduce, verify."""
torch.ahbm.set_device(rank)
dp = DPPolicy(
cube="row_wise", pe="replicate",
num_pes=1, num_cubes=n_cubes,
)
tensor = torch.zeros(
(n_cubes, n_elem), dtype="f16", dp=dp,
name=f"sip{rank}",
)
tensor.copy_(torch.from_numpy(
np.full((n_cubes, n_elem), float(rank + 1), dtype=np.float16)
))
torch.distributed.all_reduce(tensor, op="sum")
arr = tensor.numpy()
expected = float(n_cubes * sum(range(1, n_sips + 1)))
for cube_id in range(n_cubes):
assert np.allclose(arr[cube_id], expected, rtol=1e-1, atol=1e-1), (
f"SIP{rank} cube {cube_id}: "
f"got {arr[cube_id][:4]}, expected {expected}"
)
if rank == 0:
print(f"\n lrab_hierarchical_allreduce (ws={n_sips}): "
f"{n_sips * n_cubes} OK")
def _crit_ns(engine) -> float:
"""Critical-path latency = max per-result pe_exec_ns over engine results."""
vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
return max(vals) if vals else 0.0
def _run_distributed(tmp_path, monkeypatch, topo_path, correlation_id, n_elem):
"""Build engine + run the collective via the full distributed path.
Returns ``(engine, n_cubes)``. ``monkeypatch.chdir`` points the backend's
``load_ccl_config()`` (cwd lookup) at the temp ``ccl.yaml``.
"""
monkeypatch.chdir(tmp_path)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
n_sips = int(spec["system"]["sips"]["count"])
cm = spec["sip"]["cube_mesh"]
n_cubes = int(cm["w"]) * int(cm["h"])
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=correlation_id,
spec=spec,
) as ctx:
ctx.distributed.init_process_group(backend="ahbm")
assert ctx.distributed.get_world_size() == n_sips
ctx.multiprocessing.spawn(
_worker, args=(n_cubes, n_elem, n_sips, ctx), nprocs=n_sips,
)
return engine, n_cubes
# ── correctness config matrix (used by test_allreduce) ─────────────────
CONFIGS = [
pytest.param(
"lrab_hierarchical_allreduce", "ring_1d", 6, None, None,
id="ring_6sip",
),
pytest.param(
"lrab_hierarchical_allreduce", "torus_2d", 6, 2, 3,
id="torus_6sip_2x3",
),
pytest.param(
"lrab_hierarchical_allreduce", "mesh_2d_no_wrap", 6, 2, 3,
id="mesh_6sip_2x3",
),
]
# ── direct-launch helper (parity reference only) ───────────────────────
def _sip_topo_dims( def _sip_topo_dims(
@@ -51,14 +219,14 @@ def run_allreduce(
algorithm: str | None = None, algorithm: str | None = None,
ccl_yaml: str | None = None, ccl_yaml: str | None = None,
) -> dict: ) -> dict:
"""Config-driven allreduce: read yaml, load kernel, run. """Config-driven allreduce via direct ctx.launch (no distributed wrapper).
Everything is resolved from config no hardcoded kernel imports. Retained as the parity reference for the distributed path and reused by
the external buffer-kind / root-center micro-tests.
""" """
cfg_all = load_ccl_config(ccl_yaml) cfg_all = load_ccl_config(ccl_yaml)
cfg = resolve_algorithm_config(cfg_all, algorithm) cfg = resolve_algorithm_config(cfg_all, algorithm)
# Dynamic import from ccl.yaml → module
algo_module = importlib.import_module(cfg["module"]) algo_module = importlib.import_module(cfg["module"])
kernel_fn = algo_module.kernel kernel_fn = algo_module.kernel
topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND topo_name_to_kind = algo_module.TOPO_NAME_TO_KIND
@@ -83,15 +251,6 @@ def run_allreduce(
) )
algo_name = cfg.get("algorithm", "allreduce") algo_name = cfg.get("algorithm", "allreduce")
print(f"\n{'=' * 60}")
print(f"algorithm: {algo_name}")
print(f"module: {cfg['module']}")
print(f"sip_topology: {sip_topo}")
print(f"kernel: {kernel_fn.__name__}")
print(f"n_sips: {n_sips}")
print(f"n_cubes: {n_cubes}")
print(f"n_elem: {n_elem}")
print(f"{'=' * 60}")
configure_sfr_intercube_multisip(engine, spec, cfg) configure_sfr_intercube_multisip(engine, spec, cfg)
@@ -112,11 +271,6 @@ def run_allreduce(
)) ))
tensors.append(t) tensors.append(t)
for sip in range(n_sips):
arr = tensors[sip].numpy()
print(f"[SIP {sip}] input cube0[:4] = {arr[0][:4].tolist()} "
f"cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}")
t_start = engine._env.now t_start = engine._env.now
all_pending = [] all_pending = []
@@ -129,31 +283,14 @@ def run_allreduce(
) )
all_pending.extend(pending) all_pending.extend(pending)
for h, sip_id, meta in all_pending: for h, _sip_id, meta in all_pending:
ctx.wait(h, _meta=meta) ctx.wait(h, _meta=meta)
t_end = engine._env.now t_end = engine._env.now
latency_ns = t_end - t_start latency_ns = t_end - t_start
print(f"\n[{algo_name} ws={n_sips}] sim latency = "
f"{latency_ns:.1f} ns ({latency_ns / 1000:.3f} us)")
for key, (_, trace) in engine._results.items():
if not isinstance(trace, dict):
continue
total = trace.get("total_ns", 0.0)
pe_exec = trace.get("pe_exec_ns", 0.0) or 0.0
network = total - pe_exec
print(f" [{key}] total={total:.1f} ns "
f"pe_exec={pe_exec:.1f} ns network={network:.1f} ns")
expected = float(n_cubes * sum(range(1, n_sips + 1))) expected = float(n_cubes * sum(range(1, n_sips + 1)))
print()
for sip in range(n_sips):
arr = tensors[sip].numpy()
print(f"[SIP {sip}] output cube0[:4] = {arr[0][:4].tolist()}")
print(f"[SIP {sip}] output cube{n_cubes - 1}[:4] = {arr[-1][:4].tolist()}")
ok_cubes = 0 ok_cubes = 0
for sip in range(n_sips): for sip in range(n_sips):
arr = tensors[sip].numpy() arr = tensors[sip].numpy()
@@ -166,8 +303,6 @@ def run_allreduce(
) )
ok_cubes += 1 ok_cubes += 1
print(f"\n {algo_name} (ws={n_sips}): {ok_cubes} OK")
return { return {
"expected": expected, "expected": expected,
"latency_ns": latency_ns, "latency_ns": latency_ns,
@@ -175,101 +310,7 @@ def run_allreduce(
} }
# ── pytest entry point ─────────────────────────────────────────────── # ── Latency sweep constants + aggregator ──────────────────────────────
import pytest
import yaml
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
CONFIGS = [
pytest.param(
"lrab_hierarchical_allreduce", "ring_1d", 6, None, None,
id="ring_6sip",
),
pytest.param(
"lrab_hierarchical_allreduce", "torus_2d", 6, 2, 3,
id="torus_6sip_2x3",
),
pytest.param(
"lrab_hierarchical_allreduce", "mesh_2d_no_wrap", 6, 2, 3,
id="mesh_6sip_2x3",
),
]
def _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm, n_elem_override=None,
sip_w=None, sip_h=None,
):
"""Write temp topology.yaml and ccl.yaml with the given overrides."""
with open(TOPOLOGY_PATH) as f:
topo_cfg = yaml.safe_load(f)
topo_cfg["system"]["sips"]["count"] = n_sips
topo_cfg["system"]["sips"]["topology"] = sip_topology
if sip_w is not None and sip_h is not None:
topo_cfg["system"]["sips"]["w"] = int(sip_w)
topo_cfg["system"]["sips"]["h"] = int(sip_h)
else:
topo_cfg["system"]["sips"].pop("w", None)
topo_cfg["system"]["sips"].pop("h", None)
topo_path = tmp_path / "topology.yaml"
with open(topo_path, "w") as f:
yaml.dump(topo_cfg, f, default_flow_style=False)
ccl_path = Path(__file__).parent.parent / "ccl.yaml"
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg["defaults"]["algorithm"] = algorithm
if n_elem_override is not None:
ccl_cfg.setdefault("algorithms", {}).setdefault(
algorithm, {},
)["n_elem"] = int(n_elem_override)
# Ensure IPCQ slot is big enough for the per-message payload.
per_msg_bytes = int(n_elem_override) * 2 # f16
default_slot = int(ccl_cfg["defaults"].get("slot_size", 4096))
if per_msg_bytes > default_slot:
ccl_cfg["defaults"]["slot_size"] = per_msg_bytes
tmp_ccl = tmp_path / "ccl.yaml"
with open(tmp_ccl, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
return str(topo_path), str(tmp_ccl)
@pytest.mark.parametrize(
"algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS,
)
def test_allreduce(
tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h,
):
topo_path, ccl_path = _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"test_{algorithm}_{sip_topology}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm=algorithm, ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0
# ── Latency sweep (parametrized + xdist-friendly) ─────────────────────
# avoid 16 (== n_cubes, dim_map collision). Goes up to 96 KB per PE: # avoid 16 (== n_cubes, dim_map collision). Goes up to 96 KB per PE:
# bytes_per_pe = n_elem * 2 (f16). 49152 elem * 2 = 96 KB / PE. # bytes_per_pe = n_elem * 2 (f16). 49152 elem * 2 = 96 KB / PE.
@@ -289,7 +330,7 @@ _SWEEP_TOPOLOGIES = [
# parametrized invocation writes one JSON file here; the aggregator # parametrized invocation writes one JSON file here; the aggregator
# (run from conftest.pytest_sessionfinish) reads them and emits the # (run from conftest.pytest_sessionfinish) reads them and emits the
# combined CSV + PNG plots. # combined CSV + PNG plots.
_SWEEP_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams" _SWEEP_OUT_DIR = (Path(__file__).parent.parent.parent / "docs" / "diagrams"
/ "allreduce_latency_plots") / "allreduce_latency_plots")
_SWEEP_ROWS_DIR = _SWEEP_OUT_DIR / "_rows" _SWEEP_ROWS_DIR = _SWEEP_OUT_DIR / "_rows"
@@ -305,69 +346,6 @@ def _sweep_params():
return out return out
@pytest.mark.parametrize(
"algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(),
)
def test_allreduce_latency_one(
tmp_path, algorithm, sip_topology, n_sips, sip_w, sip_h, n_elem,
):
"""One config of the latency sweep. xdist parallelizes across params.
Writes a single JSON row to ``_SWEEP_ROWS_DIR``. The conftest
sessionfinish hook aggregates rows into CSV + plots after all
parametrized cases finish.
"""
import json
topo_path, ccl_path = _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
n_elem_override=n_elem,
)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"sweep_{algorithm}_{sip_topology}_{n_elem}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm=algorithm, ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0
pe_exec_vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
cm = spec["sip"]["cube_mesh"]
n_cubes = int(cm["w"]) * int(cm["h"])
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
bytes_per_pe = n_elem * _ELEM_BYTES_F16
record = {
"algorithm": algorithm,
"sip_topology": sip_topology,
"n_sips": n_sips,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"bytes_per_sip": bytes_per_sip,
"latency_ns": crit_ns,
}
_SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True)
row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json"
with open(row_path, "w", encoding="utf-8") as f:
json.dump(record, f)
def _aggregate_sweep_plots() -> bool: def _aggregate_sweep_plots() -> bool:
"""Read all per-config rows and emit CSV + PNG plots. """Read all per-config rows and emit CSV + PNG plots.
@@ -469,7 +447,7 @@ def _aggregate_sweep_plots() -> bool:
plt.close(fig) plt.close(fig)
# Combined overview.png is no longer emitted — the broken-y-axis # Combined overview.png is no longer emitted — the broken-y-axis
# comparison (scripts/emit_overview_with_external_ref.py # comparison (emit_comparison_fsim_plot() below
# comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png) # comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png)
# supersedes it. Per-topology plots above and summary.csv are still # supersedes it. Per-topology plots above and summary.csv are still
# produced. # produced.
@@ -491,6 +469,118 @@ def _aggregate_sweep_plots() -> bool:
return True return True
# ── Buffer-kind sweep constants + aggregator ──────────────────────────
#
# Parametrized over (buffer_kind, n_elem) on torus_2d 6 SIPs (3×2). Pre
# slot-latency modeling the three lines overlap exactly (slot access is
# latency-free today); they spread out once tcm/sram/hbm carry distinct
# access costs.
_BUFFER_KINDS = ["tcm", "sram", "hbm"]
_BK_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot
_BK_ROWS_DIR = _SWEEP_OUT_DIR / "_buffer_kind_rows"
# Descriptive output stem (shared by the .png and .csv).
_BK_OUT_STEM = "AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM"
def _bk_params():
out = []
for bk in _BUFFER_KINDS:
for n_elem in _BK_N_ELEM_GRID:
out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}"))
return out
def aggregate_buffer_kind_plot() -> bool:
"""Read per-config rows and emit the descriptive .png + .csv (_BK_OUT_STEM).
Called from conftest.pytest_sessionfinish (controller-only).
Returns True if rows were aggregated.
"""
import csv
import json
if not _BK_ROWS_DIR.exists():
return False
row_files = sorted(_BK_ROWS_DIR.glob("*.json"))
if not row_files:
return False
records = []
for p in row_files:
with open(p, encoding="utf-8") as f:
records.append(json.load(f))
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
def _fmt_bytes(x, _pos):
if x <= 0:
return "0"
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f} MB"
if x >= 1024:
return f"{x / 1024:.0f} KB"
return f"{x:.0f} B"
_bytes_fmt = FuncFormatter(_fmt_bytes)
_SWEEP_OUT_DIR.mkdir(parents=True, exist_ok=True)
with open(_SWEEP_OUT_DIR / f"{_BK_OUT_STEM}.csv", "w",
newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=[
"buffer_kind", "sip_topology", "n_sips", "n_elem",
"bytes_per_pe", "latency_ns",
])
w.writeheader()
for r in sorted(records, key=lambda r: (
r["buffer_kind"], r["bytes_per_pe"],
)):
w.writerow(r)
colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"}
fig, ax = plt.subplots(figsize=(10, 6))
for bk in ["tcm", "sram", "hbm"]:
rs = sorted(
[r for r in records if r["buffer_kind"] == bk],
key=lambda r: r["bytes_per_pe"],
)
if not rs:
continue
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o", lw=2.0,
color=colors[bk], label=f"buffer_kind = {bk}",
)
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
ax.set_ylabel("Time (ns)")
ax.set_title(
"AllReduce_LRAB_2Dtorus_6SiP(2x3) — IPCQ memory (SRAM, TCM, HBM)"
)
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(_SWEEP_OUT_DIR / f"{_BK_OUT_STEM}.png", dpi=130)
plt.close(fig)
for p in row_files:
try:
p.unlink()
except OSError:
pass
try:
_BK_ROWS_DIR.rmdir()
except OSError:
pass
print(f"\nWrote {_SWEEP_OUT_DIR / f'{_BK_OUT_STEM}.png'} "
f"from {len(records)} rows")
return True
# ── Topology diagram (device-level + cube-level reduction) ──────────── # ── Topology diagram (device-level + cube-level reduction) ────────────
# Convention: "rows × cols" everywhere, row-major rank assignment # Convention: "rows × cols" everywhere, row-major rank assignment
@@ -781,7 +871,143 @@ def emit_topology_diagram() -> str:
return str(out_path) return str(out_path)
def test_emit_topology_diagram(): # ── Comparison vs FSIM (broken-y-axis) ────────────────────────────────
"""Emit topology.png alongside the sweep plots. Pure plotting; no sim.""" #
out = emit_topology_diagram() # Post-processes summary.csv: today's three model curves + a hand-derived
assert Path(out).exists() # theoretical torus_2d line in the bottom panel, and a single external FSIM
# single-device reference marker in the top panel (hardcoded 366 µs; no
# external data file). Reads summary.csv written by _aggregate_sweep_plots.
_FSIM_EXT_LABEL = "FSIM (single device): 366 µs"
_FSIM_EXT_LATENCY_NS = 366_000.0
_CMP_COLORS = {
"ring_1d": "tab:blue",
"torus_2d": "tab:orange",
"mesh_2d_no_wrap": "tab:green",
}
_CMP_DISPLAY = {
"ring_1d": "Ring 1x6 (6 devices)",
"torus_2d": "2D Torus 2x3 (6 devices)",
"mesh_2d_no_wrap": "2D Mesh 2x3 (6 devices)",
}
# Hand-derived theoretical model for torus_2d (6 SIPs): per-PE NOC-packet
# count fit to the simulated startup + per-packet tau.
_CMP_NOC_PACKET_BYTES = 128
_CMP_PES_PER_CUBE = 8
_CMP_T_STARTUP_NS = 1346.0
_CMP_TAU_NS = (8741.0 - 1346.0) / (6144 - 1)
def emit_comparison_fsim_plot() -> str | None:
"""Render comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png.
Reads ``summary.csv`` (written by ``_aggregate_sweep_plots``). Returns the
output path, or ``None`` if summary.csv is absent / empty.
"""
import csv
csv_path = _SWEEP_OUT_DIR / "summary.csv"
if not csv_path.exists():
return None
records = []
with open(csv_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
records.append({
"sip_topology": row["sip_topology"],
"bytes_per_pe": int(row["bytes_per_pe"]),
"latency_ns": float(row["latency_ns"]),
})
if not records:
return None
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
def _theoretical_torus_2d_ns(bytes_per_pe: int) -> float:
bytes_per_cube = int(bytes_per_pe) * _CMP_PES_PER_CUBE
n_packets = max(1, -(-bytes_per_cube // _CMP_NOC_PACKET_BYTES))
return _CMP_T_STARTUP_NS + (n_packets - 1) * _CMP_TAU_NS
def _bytes_fmt(x, _pos):
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f}M"
if x >= 1024:
return f"{x / 1024:.0f}K"
return f"{int(x)}"
topologies = sorted({r["sip_topology"] for r in records})
max_local = max(r["latency_ns"] for r in records)
ext_x = max(r["bytes_per_pe"] for r in records)
fig, (ax_top, ax_bot) = plt.subplots(
2, 1, sharex=True,
gridspec_kw={"height_ratios": [1, 4], "hspace": 0.05},
figsize=(9, 6.5),
)
# Bottom panel: model curves + theoretical torus, linear y.
for topo in topologies:
rs = sorted([r for r in records if r["sip_topology"] == topo],
key=lambda r: r["bytes_per_pe"])
if not rs:
continue
ax_bot.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o", label=_CMP_DISPLAY.get(topo, topo),
color=_CMP_COLORS.get(topo),
)
torus_rs = sorted(
[r for r in records if r["sip_topology"] == "torus_2d"],
key=lambda r: r["bytes_per_pe"],
)
if torus_rs:
ax_bot.plot(
[r["bytes_per_pe"] for r in torus_rs],
[_theoretical_torus_2d_ns(r["bytes_per_pe"]) for r in torus_rs],
color="tab:red", linestyle="--", linewidth=1.6, marker="x",
label="Theoretical 2D Torus 2x3",
)
ax_bot.set_ylim(0, max_local * 1.10)
# Top panel: external FSIM single-device reference marker.
ax_top.scatter(
[ext_x], [_FSIM_EXT_LATENCY_NS],
marker="*", s=240, color="tab:red", zorder=5,
label=_FSIM_EXT_LABEL,
)
ax_top.set_ylim(_FSIM_EXT_LATENCY_NS * 0.93, _FSIM_EXT_LATENCY_NS * 1.05)
# Hide spine between panels; draw diagonal break ticks.
ax_top.spines["bottom"].set_visible(False)
ax_bot.spines["top"].set_visible(False)
ax_top.tick_params(labeltop=False, bottom=False)
ax_bot.xaxis.tick_bottom()
d = 0.012
kw = dict(transform=ax_top.transAxes, color="k", clip_on=False, lw=1)
ax_top.plot((-d, +d), (-d, +d), **kw)
ax_top.plot((1 - d, 1 + d), (-d, +d), **kw)
kw.update(transform=ax_bot.transAxes)
ax_bot.plot((-d, +d), (1 - d * 4, 1 + d * 4), **kw)
ax_bot.plot((1 - d, 1 + d), (1 - d * 4, 1 + d * 4), **kw)
ax_bot.set_xscale("log", base=2)
ax_bot.set_xlabel("Bytes per PE (log scale)")
ax_bot.set_ylabel("Time (ns)")
ax_top.set_ylabel("Time (ns)")
ax_bot.grid(True, alpha=0.3)
ax_top.grid(True, alpha=0.3)
ax_bot.xaxis.set_major_formatter(mticker.FuncFormatter(_bytes_fmt))
handles_bot, labels_bot = ax_bot.get_legend_handles_labels()
handles_top, labels_top = ax_top.get_legend_handles_labels()
ax_bot.legend(handles_bot + handles_top, labels_bot + labels_top,
loc="upper left")
fig.suptitle("Multidevice allreduce (ring, Mesh, 2DTorus) vs FSIM latency")
fig.tight_layout()
out = (_SWEEP_OUT_DIR
/ "comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png")
fig.savefig(out, dpi=120)
plt.close(fig)
return str(out)
@@ -0,0 +1,35 @@
"""Correctness of intercube allreduce across SIP topologies (distributed path).
Routes through init_process_group → mp.spawn → dist.all_reduce for ring_1d,
torus_2d (2×3), and mesh_2d_no_wrap (2×3). Per-rank correctness is asserted
inside the worker; spawn raises on failure.
"""
from __future__ import annotations
import pytest
from tests.sccl._allreduce_helpers import (
CONFIGS,
DEFAULT_N_ELEM,
_crit_ns,
_run_distributed,
_write_temp_configs,
)
@pytest.mark.parametrize(
"algorithm,sip_topology,n_sips,sip_w,sip_h", CONFIGS,
)
def test_allreduce(
tmp_path, monkeypatch, algorithm, sip_topology, n_sips, sip_w, sip_h,
):
topo_path, _ = _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
)
engine, _n_cubes = _run_distributed(
tmp_path, monkeypatch, topo_path,
f"test_{algorithm}_{sip_topology}", DEFAULT_N_ELEM,
)
# A positive critical path confirms the kernel actually ran.
assert _crit_ns(engine) > 0.0
@@ -0,0 +1,47 @@
"""Full distributed path against topology.yaml as-is (no overrides).
The same flow a real DDP training script would use:
init_process_group(backend="ahbm") → mp.spawn → dist.all_reduce.
"""
from __future__ import annotations
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
from tests.sccl._allreduce_helpers import (
DEFAULT_N_ELEM,
TOPOLOGY_PATH,
_worker,
_write_ccl_yaml,
)
def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch):
monkeypatch.chdir(_write_ccl_yaml(tmp_path))
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
n_sips = int(spec["system"]["sips"]["count"])
cm = spec["sip"]["cube_mesh"]
n_cubes = int(cm["w"]) * int(cm["h"])
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="dist_intercube_ar",
spec=spec,
) as ctx:
ctx.distributed.init_process_group(backend="ahbm")
assert ctx.distributed.get_world_size() == n_sips
t_start = engine._env.now
ctx.multiprocessing.spawn(
_worker, args=(n_cubes, DEFAULT_N_ELEM, n_sips, ctx),
nprocs=n_sips,
)
t_end = engine._env.now
print(f"\n[distributed] sim latency = "
f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)")
@@ -40,7 +40,7 @@ from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology from kernbench.topology.builder import resolve_topology
from tests.test_allreduce_multidevice import ( from tests.sccl._allreduce_helpers import (
_write_temp_configs, _write_temp_configs,
run_allreduce, run_allreduce,
) )
+66
View File
@@ -0,0 +1,66 @@
"""Buffer-kind sweep (TCM / SRAM / HBM) on torus_2d 6 SIPs (3×2), distributed.
Each parametrized case writes one JSON row; the conftest sessionfinish hook
calls ``aggregate_buffer_kind_plot`` to emit the comparison PNG + csv. Pre
slot-latency modeling the three lines overlap exactly (slot access is
latency-free today).
"""
from __future__ import annotations
import json
import pytest
import yaml
from tests.sccl._allreduce_helpers import (
_BK_ROWS_DIR,
_ELEM_BYTES_F16,
_bk_params,
_crit_ns,
_run_distributed,
_write_temp_configs,
)
@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params())
def test_buffer_kind_allreduce_one(tmp_path, monkeypatch, buffer_kind, n_elem):
sub = tmp_path / f"{buffer_kind}_{n_elem}"
sub.mkdir()
topo_path, ccl_path = _write_temp_configs(
sub,
sip_topology="torus_2d",
n_sips=6,
algorithm="lrab_hierarchical_allreduce",
sip_w=3, sip_h=2,
n_elem_override=n_elem,
)
# Override buffer_kind in the temp ccl.yaml (read by the ahbm backend
# at init_process_group time via load_ccl_config()).
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind
ccl_cfg.setdefault("algorithms", {}).setdefault(
"lrab_hierarchical_allreduce", {},
)["buffer_kind"] = buffer_kind
with open(ccl_path, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
engine, _ = _run_distributed(
sub, monkeypatch, topo_path,
f"bk_sweep_{buffer_kind}_{n_elem}", n_elem,
)
crit_ns = _crit_ns(engine)
bytes_per_pe = n_elem * _ELEM_BYTES_F16
record = {
"buffer_kind": buffer_kind,
"sip_topology": "torus_2d",
"n_sips": 6,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"latency_ns": crit_ns,
}
_BK_ROWS_DIR.mkdir(parents=True, exist_ok=True)
row_path = _BK_ROWS_DIR / f"{buffer_kind}_{n_elem}.json"
with open(row_path, "w", encoding="utf-8") as f:
json.dump(record, f)
+23
View File
@@ -0,0 +1,23 @@
"""Emit the broken-y-axis allreduce-vs-FSIM comparison plot.
Post-processes summary.csv (written by the latency sweep) into
comparison_mesh_vs_ring_vs_2DTorus_vs_theoretical_vs_fsim.png. Pure
plotting; reads the on-disk summary.csv (skips if the sweep has never run).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tests.sccl._allreduce_helpers import (
_SWEEP_OUT_DIR,
emit_comparison_fsim_plot,
)
def test_emit_comparison_fsim_plot():
if not (_SWEEP_OUT_DIR / "summary.csv").exists():
pytest.skip("summary.csv absent; run the latency sweep first")
out = emit_comparison_fsim_plot()
assert out is not None and Path(out).exists()
+58
View File
@@ -0,0 +1,58 @@
"""Allreduce latency sweep (distributed path), xdist-friendly.
Each parametrized case writes one JSON row to the shared staging dir; the
conftest sessionfinish hook calls ``_aggregate_sweep_plots`` to emit the
per-topology PNGs + summary.csv after all cases finish.
"""
from __future__ import annotations
import json
import pytest
from tests.sccl._allreduce_helpers import (
_ELEM_BYTES_F16,
_SWEEP_ROWS_DIR,
_crit_ns,
_run_distributed,
_sweep_params,
_write_temp_configs,
)
@pytest.mark.parametrize(
"algorithm,sip_topology,n_sips,sip_w,sip_h,n_elem", _sweep_params(),
)
def test_allreduce_latency_one(
tmp_path, monkeypatch, algorithm, sip_topology, n_sips, sip_w, sip_h,
n_elem,
):
topo_path, _ = _write_temp_configs(
tmp_path, sip_topology, n_sips, algorithm,
sip_w=sip_w, sip_h=sip_h,
n_elem_override=n_elem,
)
engine, n_cubes = _run_distributed(
tmp_path, monkeypatch, topo_path,
f"sweep_{algorithm}_{sip_topology}_{n_elem}", n_elem,
)
crit_ns = _crit_ns(engine)
bytes_per_sip = n_cubes * n_elem * _ELEM_BYTES_F16
bytes_per_pe = n_elem * _ELEM_BYTES_F16
record = {
"algorithm": algorithm,
"sip_topology": sip_topology,
"n_sips": n_sips,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"bytes_per_sip": bytes_per_sip,
"latency_ns": crit_ns,
}
_SWEEP_ROWS_DIR.mkdir(parents=True, exist_ok=True)
row_path = _SWEEP_ROWS_DIR / f"{sip_topology}_{n_elem}.json"
with open(row_path, "w", encoding="utf-8") as f:
json.dump(record, f)
+11
View File
@@ -0,0 +1,11 @@
"""Emit topology.png (device-level + cube-level reduction). Pure plotting; no sim."""
from __future__ import annotations
from pathlib import Path
from tests.sccl._allreduce_helpers import emit_topology_diagram
def test_emit_topology_diagram():
out = emit_topology_diagram()
assert Path(out).exists()
-199
View File
@@ -1,199 +0,0 @@
"""Phase 1 buffer-kind allreduce sweep — torus_2d 6 SIPs.
Parametrized over (buffer_kind, n_elem). Each case runs the standard
config-driven allreduce app and writes a JSON row to a shared staging
dir; the conftest sessionfinish hook (added in Phase 1) aggregates
rows into ``docs/diagrams/allreduce_latency_plots/
AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM.png``.
Pre-Phase-2: the three buffer-kind lines overlap exactly because slot
access is latency-free today. Post-Phase-2 they spread out (tcm
fastest, hbm slowest).
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import yaml
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
# Reuse the allreduce app helpers.
from tests.test_allreduce_multidevice import (
_write_temp_configs,
run_allreduce,
)
_BUFFER_KINDS = ["tcm", "sram", "hbm"]
_N_ELEM_GRID = [128, 1024, 8192, 32768] # 256 B → 64 KB per slot
_ELEM_BYTES_F16 = 2
_OUT_DIR = (Path(__file__).parent.parent / "docs" / "diagrams"
/ "allreduce_latency_plots")
_ROWS_DIR = _OUT_DIR / "_buffer_kind_rows"
# Descriptive output stem (shared by the .png and .csv).
_OUT_STEM = "AllReduce_LRAB_2Dtorus_6SiP_2x3_with_TCM_SRAM_HBM"
def _bk_params():
out = []
for bk in _BUFFER_KINDS:
for n_elem in _N_ELEM_GRID:
out.append(pytest.param(bk, n_elem, id=f"{bk}-n_elem{n_elem}"))
return out
@pytest.mark.parametrize("buffer_kind,n_elem", _bk_params())
def test_buffer_kind_allreduce_one(tmp_path, buffer_kind, n_elem):
"""One config of the buffer-kind sweep. xdist parallelizes."""
sub = tmp_path / f"{buffer_kind}_{n_elem}"
sub.mkdir()
topo_path, ccl_path = _write_temp_configs(
sub,
sip_topology="torus_2d",
n_sips=6,
algorithm="lrab_hierarchical_allreduce",
sip_w=3, sip_h=2,
n_elem_override=n_elem,
)
# Override buffer_kind in the temp ccl.yaml.
with open(ccl_path) as f:
ccl_cfg = yaml.safe_load(f)
ccl_cfg.setdefault("defaults", {})["buffer_kind"] = buffer_kind
ccl_cfg.setdefault("algorithms", {}).setdefault(
"lrab_hierarchical_allreduce", {},
)["buffer_kind"] = buffer_kind
with open(ccl_path, "w") as f:
yaml.dump(ccl_cfg, f, default_flow_style=False)
topo = resolve_topology(topo_path)
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id=f"bk_sweep_{buffer_kind}_{n_elem}",
spec=spec,
) as ctx:
result = run_allreduce(
ctx, engine, spec,
algorithm="lrab_hierarchical_allreduce", ccl_yaml=ccl_path,
)
assert result["ok_cubes"] > 0
pe_exec_vals = [
float(tr.get("pe_exec_ns", 0.0) or 0.0)
for _, (_, tr) in engine._results.items()
if isinstance(tr, dict)
]
crit_ns = max(pe_exec_vals) if pe_exec_vals else 0.0
bytes_per_pe = n_elem * _ELEM_BYTES_F16
record = {
"buffer_kind": buffer_kind,
"sip_topology": "torus_2d",
"n_sips": 6,
"n_elem": n_elem,
"bytes_per_pe": bytes_per_pe,
"latency_ns": crit_ns,
}
_ROWS_DIR.mkdir(parents=True, exist_ok=True)
row_path = _ROWS_DIR / f"{buffer_kind}_{n_elem}.json"
with open(row_path, "w", encoding="utf-8") as f:
json.dump(record, f)
def aggregate_buffer_kind_plot() -> bool:
"""Read per-config rows and emit the descriptive .png + .csv (_OUT_STEM).
Called from conftest.pytest_sessionfinish (controller-only).
Returns True if rows were aggregated.
"""
import csv
if not _ROWS_DIR.exists():
return False
row_files = sorted(_ROWS_DIR.glob("*.json"))
if not row_files:
return False
records = []
for p in row_files:
with open(p, encoding="utf-8") as f:
records.append(json.load(f))
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
def _fmt_bytes(x, _pos):
if x <= 0:
return "0"
if x >= 1024 * 1024:
return f"{x / (1024 * 1024):.0f} MB"
if x >= 1024:
return f"{x / 1024:.0f} KB"
return f"{x:.0f} B"
_bytes_fmt = FuncFormatter(_fmt_bytes)
_OUT_DIR.mkdir(parents=True, exist_ok=True)
with open(_OUT_DIR / f"{_OUT_STEM}.csv", "w",
newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=[
"buffer_kind", "sip_topology", "n_sips", "n_elem",
"bytes_per_pe", "latency_ns",
])
w.writeheader()
for r in sorted(records, key=lambda r: (
r["buffer_kind"], r["bytes_per_pe"],
)):
w.writerow(r)
colors = {"tcm": "tab:blue", "sram": "tab:orange", "hbm": "tab:red"}
fig, ax = plt.subplots(figsize=(10, 6))
for bk in ["tcm", "sram", "hbm"]:
rs = sorted(
[r for r in records if r["buffer_kind"] == bk],
key=lambda r: r["bytes_per_pe"],
)
if not rs:
continue
ax.plot(
[r["bytes_per_pe"] for r in rs],
[r["latency_ns"] for r in rs],
marker="o", lw=2.0,
color=colors[bk], label=f"buffer_kind = {bk}",
)
ax.set_xscale("log", base=2)
ax.set_xlabel("Bytes per PE (log scale)")
ax.set_ylabel("Time (ns)")
ax.set_title(
"AllReduce_LRAB_2Dtorus_6SiP(2x3) — IPCQ memory (SRAM, TCM, HBM)"
)
ax.grid(True, alpha=0.3)
ax.legend()
ax.xaxis.set_major_formatter(_bytes_fmt)
fig.tight_layout()
fig.savefig(_OUT_DIR / f"{_OUT_STEM}.png", dpi=130)
plt.close(fig)
for p in row_files:
try:
p.unlink()
except OSError:
pass
try:
_ROWS_DIR.rmdir()
except OSError:
pass
print(f"\nWrote {_OUT_DIR / f'{_OUT_STEM}.png'} "
f"from {len(records)} rows")
return True
@@ -1,119 +0,0 @@
"""End-to-end distributed test for intercube allreduce.
Exercises the full process-group path:
dist.init_process_group(backend="ahbm")
→ mp.spawn(nprocs=n_sips)
→ each worker: set_device → allocate → fill → dist.all_reduce → verify
This is the same flow a real DDP training script would use.
"""
from __future__ import annotations
import os
import textwrap
from pathlib import Path
import numpy as np
import pytest
TOPOLOGY_PATH = Path(__file__).parent.parent / "topology.yaml"
N_CUBES = 16
N_ELEM = 8
def _write_ccl_yaml(tmp_path) -> str:
body = textwrap.dedent("""\
defaults:
algorithm: lrab_hierarchical_allreduce
buffer_kind: tcm
backpressure: sleep
n_slots: 4
slot_size: 4096
vc_chunk_size: 256
ipcq_credit_size_bytes: 16
algorithms:
lrab_hierarchical_allreduce:
module: kernbench.ccl.algorithms.lrab_hierarchical_allreduce
topology: none
buffer_kind: tcm
n_elem: 8
root_cube: 15
""")
(tmp_path / "ccl.yaml").write_text(body)
return str(tmp_path)
def _worker(rank: int, n_sips: int, torch) -> None:
"""Per-SIP worker: allocate, fill, all_reduce, verify."""
from kernbench.policy.placement.dp import DPPolicy
torch.ahbm.set_device(rank)
dp = DPPolicy(
cube="row_wise", pe="replicate",
num_pes=1, num_cubes=N_CUBES,
)
tensor = torch.zeros(
(N_CUBES, N_ELEM), dtype="f16", dp=dp,
name=f"sip{rank}",
)
init_arr = np.full((N_CUBES, N_ELEM), float(rank + 1), dtype=np.float16)
tensor.copy_(torch.from_numpy(init_arr))
print(f"[SIP {rank}] input cube0[:4] = {tensor.numpy()[0][:4].tolist()}")
torch.distributed.all_reduce(tensor, op="sum")
arr = tensor.numpy()
expected = float(N_CUBES * sum(range(1, n_sips + 1)))
print(f"[SIP {rank}] output cube0[:4] = {arr[0][:4].tolist()}")
print(f"[SIP {rank}] output cube15[:4] = {arr[15][:4].tolist()}")
for cube_id in range(N_CUBES):
assert np.allclose(arr[cube_id], expected, rtol=1e-1, atol=1e-1), (
f"SIP{rank} cube {cube_id}: "
f"got {arr[cube_id][:4]}, expected {expected}"
)
if rank == 0:
print(f"\n lrab_hierarchical_allreduce (ws={n_sips}): "
f"{n_sips * N_CUBES} OK")
def test_distributed_lrab_hierarchical_allreduce(tmp_path, monkeypatch):
"""Full distributed path: init_process_group → mp.spawn → all_reduce."""
from kernbench.runtime_api.context import RuntimeContext
from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology
monkeypatch.chdir(_write_ccl_yaml(tmp_path))
topo = resolve_topology(str(TOPOLOGY_PATH))
engine = GraphEngine(topo.topology_obj, enable_data=True)
spec = topo.topology_obj.spec
n_sips = int(spec["system"]["sips"]["count"])
with RuntimeContext(
engine=engine,
target_device=DeviceSelector("all"),
correlation_id="dist_intercube_ar",
spec=spec,
) as ctx:
ctx.distributed.init_process_group(backend="ahbm")
assert ctx.distributed.get_world_size() == n_sips
t_start = engine._env.now
ctx.multiprocessing.spawn(
_worker, args=(n_sips, ctx), nprocs=n_sips,
)
t_end = engine._env.now
print(f"\n[distributed] sim latency = "
f"{t_end - t_start:.1f} ns ({(t_end - t_start) / 1000:.3f} us)")
+2 -2
View File
@@ -20,7 +20,7 @@ Reference (Phase 2 will edit these):
- ccl.yaml — algorithm.buffer_kind - ccl.yaml — algorithm.buffer_kind
The tests reuse the existing config-driven allreduce app The tests reuse the existing config-driven allreduce app
(``run_allreduce`` in tests/test_allreduce_multidevice.py) with a 2-SIP (``run_allreduce`` in tests/sccl/_allreduce_helpers.py) with a 2-SIP
ring topology and a SMALL n_elem so they finish fast (~3-5 s each). ring topology and a SMALL n_elem so they finish fast (~3-5 s each).
""" """
from __future__ import annotations from __future__ import annotations
@@ -37,7 +37,7 @@ from kernbench.topology.builder import resolve_topology
# Reuse the test app's helpers so this micro-test file does not # Reuse the test app's helpers so this micro-test file does not
# duplicate the run-allreduce + write-temp-configs plumbing. # duplicate the run-allreduce + write-temp-configs plumbing.
from tests.test_allreduce_multidevice import ( from tests.sccl._allreduce_helpers import (
_write_temp_configs, _write_temp_configs,
run_allreduce, run_allreduce,
) )
+4 -3
View File
@@ -47,7 +47,7 @@ from kernbench.runtime_api.types import DeviceSelector
from kernbench.sim_engine.engine import GraphEngine from kernbench.sim_engine.engine import GraphEngine
from kernbench.topology.builder import resolve_topology from kernbench.topology.builder import resolve_topology
from tests.test_allreduce_multidevice import ( from tests.sccl._allreduce_helpers import (
_write_temp_configs, _write_temp_configs,
run_allreduce, run_allreduce,
) )
@@ -59,8 +59,9 @@ def _run_allreduce_with_buffer_kind(
"""Run one torus_2d 6-SIP allreduce with the given buffer_kind and """Run one torus_2d 6-SIP allreduce with the given buffer_kind and
return critical-path pe_exec_ns (max across all PEs). return critical-path pe_exec_ns (max across all PEs).
Mirrors the sweep harness in test_allreduce_buffer_kind_sweep.py Mirrors the buffer-kind sweep harness in
so the assertions below compare apples-to-apples against that PNG. tests/sccl/test_plot_buffer_kind_sweep.py so the assertions
below compare apples-to-apples against that PNG.
""" """
sub = tmp_path / f"{buffer_kind}_{n_elem}" sub = tmp_path / f"{buffer_kind}_{n_elem}"
sub.mkdir() sub.mkdir()