Add semaphore registry and admission gate REPLACE for adaptive concurrency (#1391)#1391
Open
justinvjoseph wants to merge 3 commits into
Open
Add semaphore registry and admission gate REPLACE for adaptive concurrency (#1391)#1391justinvjoseph wants to merge 3 commits into
justinvjoseph wants to merge 3 commits into
Conversation
Contributor
|
@justinvjoseph has exported this pull request. If you are a Meta employee, you can view the originating Diff in D102929216. |
…earch#1389) Summary: Add `ResizableSemaphore` — an asyncio-compatible semaphore whose max permit count can be adjusted at runtime. This is the foundational primitive for dynamic concurrency control in SPDL pipelines. Key features: - `resize(new_max)` adjusts permits at runtime; resize-up wakes blocked waiters immediately, resize-down drains gracefully (no preemption) - `acquire()`/`release()` semantics match `asyncio.Semaphore` - `max_value` and `active` properties for observability - Thread-safe within asyncio's single-threaded model - Comprehensive error handling for invalid values This is Diff 1 of a 5-diff series implementing a unified adaptive scheduler for SPDL pipelines (T262755626). Differential Revision: D99920401
…ops (facebookresearch#1390) Summary: Introduces an opt-in priority scheduler that replaces the ThreadPoolExecutor's FIFO dispatch with a PriorityQueue. Deeper pipeline stages (closer to sink) are dispatched first, reducing pipeline bubble time and WIP. Key changes: - New `_scheduler.py` with `AdaptiveScheduler` class - `_PipeArgs` gains `nice` and `_depth` fields for priority control - `convert_to_async()` routes through scheduler when provided - `_build_node()` registers stages and intercepts sync ops - `build_pipeline()` gains `use_scheduler=True` flag - `PipelineBuilder.pipe()` gains `nice` parameter When `use_scheduler=False` (default), behavior is identical to today. Differential Revision: D99935461
…rency (facebookresearch#1391) Summary: This is Diff 3a in the T262755626 stack. Adds the foundation for runtime concurrency adjustment in SPDL pipelines: 1. `_PipelineImpl._semaphore_registry: dict[str, ResizableSemaphore]` — sibling registry keyed on the qualified stage name. Populated at pipeline build time when a stage opts in. NO modification to `StageInfo` (preserves frozen=True / hashability for third-party code). 2. `_PipelineImpl._dynamic_concurrency: dict[str, int]` — current value per stage; updated by future `Pipeline.resize_concurrency()` calls. 3. `_PipelineImpl._stage_info_by_name: dict[str, StageInfo]` — for error messages and future Track B logging hooks. 4. `_pipe()` admission gate REPLACE (V5.6): when a `ResizableSemaphore` is registered for a stage, `await semaphore.acquire()` becomes the admission gate, REPLACING the static `len(tasks) >= concurrency` check. When no semaphore is registered (the default), behaviour is unchanged — ZERO per-task overhead added to the existing fast path. The branch is taken ONCE outside the hot loop (dispatching to `_pipe_with_semaphore`). 5. `build_pipeline(_install_semaphores_for_test: bool = False)` test-only knob: when True, every Pipe stage is built with a `ResizableSemaphore` and registered. Production code MUST NOT pass this — it bypasses the selective opt-in semantics of `Pipeline.resize_concurrency` (Diff 3b). 6. V5.5 throughput regression test (`admission_gate_perf_test.py`): 4-stage sync pipeline, N=10000 items, 5 trials. Regression > 2% on either p50 or p99 fails the diff. Includes registry wiring smoke tests and an in-flight-cap assertion. Note (perf-reviewer WARN-1): the V5.5 perf test covers sync stages only — extending to async/sync_iter handlers is a follow-up after Diff 3a lands. Note (perf-reviewer WARN-2): qualified stage names == `info.stage_name` for non-MultiPipe stages. LCA's MultiPipe is a single SPDL Pipe with internal dispatcher; the `_qualified_name(branch_label=...)` scheme is forward-compatible for true SPDL fan-out. The user-facing API (`Pipeline.resize_concurrency`) lands in Diff 3b. Differential Revision: D102929216
e20fa16 to
826c0bd
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
This is Diff 3a in the T262755626 stack. Adds the foundation for runtime concurrency adjustment in SPDL pipelines:
_PipelineImpl._semaphore_registry: dict[str, ResizableSemaphore]— sibling registry keyed on the qualified stage name. Populated at pipeline build time when a stage opts in. NO modification toStageInfo(preserves frozen=True / hashability for third-party code)._PipelineImpl._dynamic_concurrency: dict[str, int]— current value per stage; updated by futurePipeline.resize_concurrency()calls._PipelineImpl._stage_info_by_name: dict[str, StageInfo]— for error messages and future Track B logging hooks._pipe()admission gate REPLACE (V5.6): when aResizableSemaphoreis registered for a stage,await semaphore.acquire()becomes the admission gate, REPLACING the staticlen(tasks) >= concurrencycheck. When no semaphore is registered (the default), behaviour is unchanged — ZERO per-task overhead added to the existing fast path. The branch is taken ONCE outside the hot loop (dispatching to_pipe_with_semaphore).build_pipeline(_install_semaphores_for_test: bool = False)test-only knob: when True, every Pipe stage is built with aResizableSemaphoreand registered. Production code MUST NOT pass this — it bypasses the selective opt-in semantics ofPipeline.resize_concurrency(Diff 3b).admission_gate_perf_test.py): 4-stage sync pipeline, N=10000 items, 5 trials. Regression > 2% on either p50 or p99 fails the diff. Includes registry wiring smoke tests and an in-flight-cap assertion.Note (perf-reviewer WARN-1): the V5.5 perf test covers sync stages only — extending to async/sync_iter handlers is a follow-up after Diff 3a lands.
Note (perf-reviewer WARN-2): qualified stage names ==
info.stage_namefor non-MultiPipe stages. LCA's MultiPipe is a single SPDL Pipe with internal dispatcher; the_qualified_name(branch_label=...)scheme is forward-compatible for true SPDL fan-out.The user-facing API (
Pipeline.resize_concurrency) lands in Diff 3b.Differential Revision: D102929216