Skip to content

Add semaphore registry and admission gate REPLACE for adaptive concurrency (#1391)#1391

Open
justinvjoseph wants to merge 3 commits into
facebookresearch:mainfrom
justinvjoseph:export-D102929216
Open

Add semaphore registry and admission gate REPLACE for adaptive concurrency (#1391)#1391
justinvjoseph wants to merge 3 commits into
facebookresearch:mainfrom
justinvjoseph:export-D102929216

Conversation

@justinvjoseph
Copy link
Copy Markdown

@justinvjoseph justinvjoseph commented Apr 30, 2026

Summary:

This is Diff 3a in the T262755626 stack. Adds the foundation for runtime concurrency adjustment in SPDL pipelines:

  1. _PipelineImpl._semaphore_registry: dict[str, ResizableSemaphore] — sibling registry keyed on the qualified stage name. Populated at pipeline build time when a stage opts in. NO modification to StageInfo (preserves frozen=True / hashability for third-party code).
  2. _PipelineImpl._dynamic_concurrency: dict[str, int] — current value per stage; updated by future Pipeline.resize_concurrency() calls.
  3. _PipelineImpl._stage_info_by_name: dict[str, StageInfo] — for error messages and future Track B logging hooks.
  4. _pipe() admission gate REPLACE (V5.6): when a ResizableSemaphore is registered for a stage, await semaphore.acquire() becomes the admission gate, REPLACING the static len(tasks) >= concurrency check. When no semaphore is registered (the default), behaviour is unchanged — ZERO per-task overhead added to the existing fast path. The branch is taken ONCE outside the hot loop (dispatching to _pipe_with_semaphore).
  5. build_pipeline(_install_semaphores_for_test: bool = False) test-only knob: when True, every Pipe stage is built with a ResizableSemaphore and registered. Production code MUST NOT pass this — it bypasses the selective opt-in semantics of Pipeline.resize_concurrency (Diff 3b).
  6. V5.5 throughput regression test (admission_gate_perf_test.py): 4-stage sync pipeline, N=10000 items, 5 trials. Regression > 2% on either p50 or p99 fails the diff. Includes registry wiring smoke tests and an in-flight-cap assertion.

Note (perf-reviewer WARN-1): the V5.5 perf test covers sync stages only — extending to async/sync_iter handlers is a follow-up after Diff 3a lands.

Note (perf-reviewer WARN-2): qualified stage names == info.stage_name for non-MultiPipe stages. LCA's MultiPipe is a single SPDL Pipe with internal dispatcher; the _qualified_name(branch_label=...) scheme is forward-compatible for true SPDL fan-out.

The user-facing API (Pipeline.resize_concurrency) lands in Diff 3b.

Differential Revision: D102929216

@meta-cla meta-cla Bot added the CLA Signed This label is managed by the Meta Open Source bot. label Apr 30, 2026
@meta-codesync
Copy link
Copy Markdown
Contributor

meta-codesync Bot commented Apr 30, 2026

@justinvjoseph has exported this pull request. If you are a Meta employee, you can view the originating Diff in D102929216.

…earch#1389)

Summary:

Add `ResizableSemaphore` — an asyncio-compatible semaphore whose max permit count can be adjusted at runtime. This is the foundational primitive for dynamic concurrency control in SPDL pipelines.

Key features:
- `resize(new_max)` adjusts permits at runtime; resize-up wakes blocked waiters immediately, resize-down drains gracefully (no preemption)
- `acquire()`/`release()` semantics match `asyncio.Semaphore`
- `max_value` and `active` properties for observability
- Thread-safe within asyncio's single-threaded model
- Comprehensive error handling for invalid values

This is Diff 1 of a 5-diff series implementing a unified adaptive scheduler for SPDL pipelines (T262755626).

Differential Revision: D99920401
…ops (facebookresearch#1390)

Summary:

Introduces an opt-in priority scheduler that replaces the ThreadPoolExecutor's
FIFO dispatch with a PriorityQueue. Deeper pipeline stages (closer to sink)
are dispatched first, reducing pipeline bubble time and WIP.

Key changes:
- New `_scheduler.py` with `AdaptiveScheduler` class
- `_PipeArgs` gains `nice` and `_depth` fields for priority control
- `convert_to_async()` routes through scheduler when provided
- `_build_node()` registers stages and intercepts sync ops
- `build_pipeline()` gains `use_scheduler=True` flag
- `PipelineBuilder.pipe()` gains `nice` parameter

When `use_scheduler=False` (default), behavior is identical to today.

Differential Revision: D99935461
…rency (facebookresearch#1391)

Summary:

This is Diff 3a in the T262755626 stack. Adds the foundation for runtime concurrency adjustment in SPDL pipelines:

1. `_PipelineImpl._semaphore_registry: dict[str, ResizableSemaphore]` — sibling registry keyed on the qualified stage name. Populated at pipeline build time when a stage opts in. NO modification to `StageInfo` (preserves frozen=True / hashability for third-party code).
2. `_PipelineImpl._dynamic_concurrency: dict[str, int]` — current value per stage; updated by future `Pipeline.resize_concurrency()` calls.
3. `_PipelineImpl._stage_info_by_name: dict[str, StageInfo]` — for error messages and future Track B logging hooks.
4. `_pipe()` admission gate REPLACE (V5.6): when a `ResizableSemaphore` is registered for a stage, `await semaphore.acquire()` becomes the admission gate, REPLACING the static `len(tasks) >= concurrency` check. When no semaphore is registered (the default), behaviour is unchanged — ZERO per-task overhead added to the existing fast path. The branch is taken ONCE outside the hot loop (dispatching to `_pipe_with_semaphore`).
5. `build_pipeline(_install_semaphores_for_test: bool = False)` test-only knob: when True, every Pipe stage is built with a `ResizableSemaphore` and registered. Production code MUST NOT pass this — it bypasses the selective opt-in semantics of `Pipeline.resize_concurrency` (Diff 3b).
6. V5.5 throughput regression test (`admission_gate_perf_test.py`): 4-stage sync pipeline, N=10000 items, 5 trials. Regression > 2% on either p50 or p99 fails the diff. Includes registry wiring smoke tests and an in-flight-cap assertion.

Note (perf-reviewer WARN-1): the V5.5 perf test covers sync stages only — extending to async/sync_iter handlers is a follow-up after Diff 3a lands.

Note (perf-reviewer WARN-2): qualified stage names == `info.stage_name` for non-MultiPipe stages. LCA's MultiPipe is a single SPDL Pipe with internal dispatcher; the `_qualified_name(branch_label=...)` scheme is forward-compatible for true SPDL fan-out.

The user-facing API (`Pipeline.resize_concurrency`) lands in Diff 3b.

Differential Revision: D102929216
@meta-codesync meta-codesync Bot changed the title Add semaphore registry and admission gate REPLACE for adaptive concurrency Add semaphore registry and admission gate REPLACE for adaptive concurrency (#1391) Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot. fb-exported meta-exported

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant