From b90facbe79e87f7bde23bab3bdfa83eeada0f55d Mon Sep 17 00:00:00 2001 From: ekirimlioglu <80229265+ekirimlioglu@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:12:08 -0700 Subject: [PATCH 1/5] fix(evals): prevent scheduler wedge on dispatch failure or actor death Two changes to evals/scheduling/controller.run_multi_pipeline_inference that close a deterministic hang we hit on a 10-config grid run. Symptom: MainThread parks in `time.sleep(0.5)` at the "Check if all actors busy" branch while Ray reports 0% GPU/CPU usage. py-spy stack shows the loop spinning; the bookkeeping says actors are busy but Ray says they are idle. The hang is permanent. Root cause has two triggers: (1) Exception during the batch-submission block. The init step (`actor.initialize_for_pipeline.remote(...)`) is already wrapped by an existing try/except that calls `scheduler.remove_pipeline`, but the subsequent `actor.process_batch.remote(...)` loop, the `active_tasks[actor_id] = {...}` assignment, and the three `db.set_actor_task_*` writes are not. If any of these raise synchronously (serialization error, transient DB hiccup, etc.) the actor was already marked busy by `scheduler.schedule()` and leaks busy state; the controller never calls `set_completed_task` or `remove_pipeline` for it. (2) Actor death after dispatch. Once batches are submitted and tracked in `active_tasks[actor_id]["futures"]`, the only completion-reaping path is `ray.wait(futures, timeout=0)` -- a non-blocking poll. If the actor process dies (OOM-kill, segfault from a broken native dep, etc.) the futures become orphaned: Ray will eventually surface them as failed, but `ray.wait(timeout=0)` only catches what is already ready in this Python tick. The controller falls into the "all actors busy" branch, sleeps 0.5s with no health check, and repeats forever. Fixes: - Wrap the batch-submission + bookkeeping block in try/except that mirrors the existing init-failure handler: mark the pipeline FAILED, drop any partial active_tasks entry, call `scheduler.remove_pipeline` so the actor is freed, and continue. - Replace the bare `time.sleep(0.5)` in the busy-loop with `ray.wait(all_pending, num_returns=1, timeout=0.5)`. ray.wait surfaces both successful and failed futures (including RayActorError from dead actors), so the next iteration's reap path at the "Check for completed tasks" block frees the actor via the existing exception handler. Falls back to time.sleep when no futures are dispatched yet (initial loop entry). The two changes are independent and complementary: either alone fixes a subset of triggers; together they close the wedge. --- rapidfireai/evals/scheduling/controller.py | 93 +++++++++++++++------- 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/rapidfireai/evals/scheduling/controller.py b/rapidfireai/evals/scheduling/controller.py index 7688abf3..02ae0428 100644 --- a/rapidfireai/evals/scheduling/controller.py +++ b/rapidfireai/evals/scheduling/controller.py @@ -1789,7 +1789,18 @@ def _format_limits(limits: dict) -> str: f"Busy actors: {status['busy_actors']}, " f"Gen: {status['current_generation']}" ) - time.sleep(0.5) + # Block on any in-flight future instead of a blind sleep. ray.wait + # surfaces both successful and failed futures (incl. RayActorError + # from a dead actor), so the next iteration's reap path will free + # the actor via remove_pipeline. Avoids wedging when an actor dies + # silently after dispatch and its futures never become ready. + all_pending = [] + for task_info in active_tasks.values(): + all_pending.extend(task_info["futures"]) + if all_pending: + ray.wait(all_pending, num_returns=1, timeout=0.5) + else: + time.sleep(0.5) continue # Execute schedule @@ -1921,35 +1932,59 @@ def _format_limits(limits: dict) -> str: self.logger.debug(f"Initialized actor {actor_id} for pipeline {pipeline_id} ({pipeline_name})") - futures = [] - preprocess_fn = pipeline_config.get("preprocess_fn") - postprocess_fn = pipeline_config.get("postprocess_fn") - compute_metrics_fn = pipeline_config.get("compute_metrics_fn") - accumulate_metrics_fn = pipeline_config.get("accumulate_metrics_fn") - for batch in batches: - future = actor.process_batch.remote( - batch, - preprocess_fn=preprocess_fn, - postprocess_fn=postprocess_fn, - compute_metrics_fn=compute_metrics_fn if accumulate_metrics_fn else None, - ) - futures.append(future) - - # Track task - task_start_time = time.time() - active_tasks[actor_id] = { - "futures": futures, - "pipeline_id": pipeline_id, - "shard_id": shard_id, - "task_id": task_id, - "batch_count": len(batches), - "start_time": task_start_time, - } + # Submit all batches and register the task. If anything raises + # synchronously here (e.g. a serialization error inside + # process_batch.remote, or a transient DB failure), the actor was + # marked busy by scheduler.schedule() at pipeline_scheduler.py and + # would leak busy state forever. Mirror the init-failure handler + # above: mark the pipeline FAILED and free the actor via + # remove_pipeline so the rest of the experiment can proceed. + try: + futures = [] + preprocess_fn = pipeline_config.get("preprocess_fn") + postprocess_fn = pipeline_config.get("postprocess_fn") + compute_metrics_fn = pipeline_config.get("compute_metrics_fn") + accumulate_metrics_fn = pipeline_config.get("accumulate_metrics_fn") + for batch in batches: + future = actor.process_batch.remote( + batch, + preprocess_fn=preprocess_fn, + postprocess_fn=postprocess_fn, + compute_metrics_fn=compute_metrics_fn if accumulate_metrics_fn else None, + ) + futures.append(future) + + # Track task + task_start_time = time.time() + active_tasks[actor_id] = { + "futures": futures, + "pipeline_id": pipeline_id, + "shard_id": shard_id, + "task_id": task_id, + "batch_count": len(batches), + "start_time": task_start_time, + } - # Update task status to in-progress - db.set_actor_task_start_time(task_id, task_start_time) - db.set_actor_task_status(task_id, TaskStatus.IN_PROGRESS) - db.set_pipeline_current_shard(pipeline_id, shard_id) + # Update task status to in-progress + db.set_actor_task_start_time(task_id, task_start_time) + db.set_actor_task_status(task_id, TaskStatus.IN_PROGRESS) + db.set_pipeline_current_shard(pipeline_id, shard_id) + except Exception as dispatch_err: + error_msg = str(dispatch_err) + self.logger.exception( + f"Pipeline {pipeline_id} ({pipeline_name}) failed during batch " + f"dispatch on actor {actor_id}: {error_msg}" + ) + db.set_actor_task_status(task_id, TaskStatus.FAILED) + db.set_actor_task_error(task_id, error_msg) + db.set_pipeline_status(pipeline_id, PipelineStatus.FAILED) + db.set_pipeline_error(pipeline_id, error_msg) + # Drop any partial entry so the reap loop doesn't ray.wait on it + active_tasks.pop(actor_id, None) + scheduler.remove_pipeline(pipeline_id) + if progress_display: + progress_display.update_pipeline(pipeline_id, status="FAILED") + continue # Finalize Optuna shard callback with final metrics from all pipelines. # We must accumulate the raw per-shard metric lists into flat dicts From 024f8251880c8e04b05716e305eb21d18aea506a Mon Sep 17 00:00:00 2001 From: ekirimlioglu <80229265+ekirimlioglu@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:12:37 -0700 Subject: [PATCH 2/5] test(evals): cover PipelineScheduler bookkeeping + add wedge repro Adds tests/test_pipeline_scheduler.py with 6 unit tests for the busy/free transitions in PipelineScheduler. They document the invariant the controller's scheduling loop relies on: an actor is marked busy by schedule() and MUST be freed by either set_completed_task() (success) or remove_pipeline() (failure). If the controller dispatch path fails without calling one of those, the actor leaks busy state and the loop wedges. Also adds tools/repro_scheduler_wedge.py, a Ray-free standalone reproduction of the bug. Scenario 1 simulates the buggy controller path (dispatch fails, no remove_pipeline call) and detects the wedge by counting consecutive "all actors busy" returns from schedule(). Scenario 2 demonstrates recovery via remove_pipeline. Useful for maintainers to confirm the bookkeeping contract without spinning up the full evals stack. Existing tests on this branch: pytest tests/test_pipeline_scheduler.py -> 6 passed python tools/repro_scheduler_wedge.py -> exits 0, prints "BUG REPRODUCED + FIX VERIFIED" --- tests/test_pipeline_scheduler.py | 115 +++++++++++++++++++++++ tools/repro_scheduler_wedge.py | 155 +++++++++++++++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 tests/test_pipeline_scheduler.py create mode 100644 tools/repro_scheduler_wedge.py diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py new file mode 100644 index 00000000..a2490203 --- /dev/null +++ b/tests/test_pipeline_scheduler.py @@ -0,0 +1,115 @@ +""" +Tests for the evals PipelineScheduler bookkeeping. + +These tests document the invariant the controller's main scheduling loop relies +on: an actor is marked busy by `schedule()` and MUST be freed by either +`set_completed_task(actor_id)` (success path) or `remove_pipeline(pipeline_id)` +(failure path). If neither is called, the actor leaks busy state and the +controller's busy-loop wedges indefinitely (see +`run_multi_pipeline_inference` in +`rapidfireai/evals/scheduling/controller.py`). +""" + +import pytest + +from rapidfireai.evals.scheduling.pipeline_scheduler import PipelineScheduler + + +class TestPipelineSchedulerBookkeeping: + """Cover the busy/free transitions in PipelineScheduler.""" + + def test_schedule_marks_actor_busy(self): + """schedule() must record actor->pipeline assignment.""" + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=3) + result = scheduler.schedule() + actor_id = result["actor_id"] + pipeline_id = result["pipeline_id"] + assert actor_id in (0, 1) + assert pipeline_id in (1, 2) + assert scheduler.actor_current_pipeline[actor_id] == pipeline_id + + def test_set_completed_task_frees_actor_and_advances_progress(self): + """Success path: set_completed_task frees actor and counts the shard.""" + scheduler = PipelineScheduler(pipeline_ids=[1], num_actors=1, num_shards=2) + result = scheduler.schedule() + actor_id = result["actor_id"] + assert scheduler.actor_current_pipeline[actor_id] == 1 + assert scheduler.pipeline_shards_completed[1] == 0 + + scheduler.set_completed_task(actor_id) + + assert scheduler.actor_current_pipeline[actor_id] == -1 + assert scheduler.pipeline_shards_completed[1] == 1 + + def test_remove_pipeline_frees_actor_on_dispatch_failure(self): + """ + Failure path: when the controller cannot dispatch (e.g. actor init throws, + a process_batch.remote raises synchronously, or the actor dies before any + future is created), the controller must call remove_pipeline so the actor + is freed and the rest of the experiment can proceed. + """ + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=2) + + first = scheduler.schedule() + actor_id = first["actor_id"] + pipeline_id = first["pipeline_id"] + assert scheduler.actor_current_pipeline[actor_id] == pipeline_id + + # Simulate dispatch failure: controller catches the exception and + # decides to drop the pipeline. The bookkeeping fix is that the actor + # gets freed, not left wedged. + scheduler.remove_pipeline(pipeline_id) + + assert scheduler.actor_current_pipeline[actor_id] == -1 + assert pipeline_id not in scheduler.pipeline_ids + + # The next schedule() must hand out the freed actor (with the surviving + # pipeline), not return the busy sentinel. + second = scheduler.schedule() + assert second["pipeline_id"] != -1 + assert second["actor_id"] != -1 + + def test_dispatch_failure_does_not_wedge_remaining_pipelines(self): + """ + End-to-end bookkeeping replay: 1 actor, 2 pipelines, the first + dispatch fails. The freed actor must serve the surviving pipeline + through completion -- the symptom of the bug we are fixing was an + infinite loop of `{pipeline_id: -1}` returns from schedule(). + """ + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) + + first = scheduler.schedule() + failed_pipeline = first["pipeline_id"] + actor_id = first["actor_id"] + # Controller hits an exception and drops the pipeline. + scheduler.remove_pipeline(failed_pipeline) + assert scheduler.actor_current_pipeline[actor_id] == -1 + + # Now the surviving pipeline should run on the freed actor. + second = scheduler.schedule() + survivor = second["pipeline_id"] + assert survivor != -1 + assert survivor != failed_pipeline + scheduler.set_completed_task(second["actor_id"]) + + # All shards completed -- termination signal. + terminal = scheduler.schedule() + assert terminal["pipeline_id"] is None + + def test_all_actors_busy_returns_busy_sentinel(self): + """When every actor is busy, schedule() must return {-1, -1, -1}.""" + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=4) + scheduler.schedule() + scheduler.schedule() + # Both actors busy now. + result = scheduler.schedule() + assert result == {"pipeline_id": -1, "actor_id": -1, "shard_id": -1} + + def test_set_completed_task_idempotent_on_free_actor(self): + """Calling set_completed_task on an already-free actor is a no-op.""" + scheduler = PipelineScheduler(pipeline_ids=[1], num_actors=1, num_shards=1) + # Actor 0 starts free + assert scheduler.actor_current_pipeline[0] == -1 + scheduler.set_completed_task(0) + assert scheduler.actor_current_pipeline[0] == -1 + assert scheduler.pipeline_shards_completed[1] == 0 diff --git a/tools/repro_scheduler_wedge.py b/tools/repro_scheduler_wedge.py new file mode 100644 index 00000000..070b4273 --- /dev/null +++ b/tools/repro_scheduler_wedge.py @@ -0,0 +1,155 @@ +""" +Standalone reproduction for the evals scheduler actor-wedge bug. + +Symptom (before fix): + controller.run_multi_pipeline_inference parks MainThread in + `time.sleep(0.5)` at controller.py "Check if all actors busy" branch + while Ray reports 0% GPU/CPU usage. Bookkeeping says actors are busy; + Ray says they are idle. The hang is permanent. + +Root cause: + PipelineScheduler.schedule() marks an actor busy *before* the controller + dispatches Ray work. If anything between "schedule() returned" and + "active_tasks[actor_id] = {...}" raises (init failure, batch serialization + error, transient DB hiccup, actor death) AND the controller does not + call scheduler.remove_pipeline(pipeline_id), the actor stays busy forever. + + All subsequent schedule() calls return {pipeline_id: -1, actor_id: -1} + ("all actors busy"). The controller's busy-branch was a bare + `time.sleep(0.5)` -- no health check, no progress -- so the loop spins + forever. (See controller.py "Check if all actors busy" block.) + +How this script demonstrates it: + + 1. Without the fix: simulate a dispatch failure that does NOT call + remove_pipeline. Show that PipelineScheduler.schedule() returns the + "all busy" sentinel forever. + 2. With the fix: same setup, but the controller catches the dispatch + exception and calls remove_pipeline. Show that schedule() recovers + and serves the surviving pipeline. + +Run: + python tools/repro_scheduler_wedge.py + +Exits 0 with PASS lines if the fix is in place (Change 2 in controller.py), +exits 1 with FAIL if the bookkeeping invariant is violated. + +This is a deliberately small, Ray-free reproduction. Running the real +controller end-to-end would also wedge but requires the full evals stack; +the bookkeeping invariant alone is enough to expose the bug. +""" + +from __future__ import annotations + +import sys +import time + +from rapidfireai.evals.scheduling.pipeline_scheduler import PipelineScheduler + +DISPATCH_FAILURES_TO_SIMULATE = 1 +WEDGE_DETECTION_ITERS = 50 # 50 * 0.01s = 500ms; matches controller's busy-loop cadence + + +def _print(tag: str, msg: str) -> None: + print(f"[{tag}] {msg}", flush=True) + + +def reproduce_wedge_without_fix() -> bool: + """Replay the buggy path: actor marked busy, dispatch fails, no cleanup.""" + _print("repro", "scenario 1: dispatch fails AND controller forgets to call remove_pipeline") + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) + + schedule = scheduler.schedule() + actor_id = schedule["actor_id"] + pipeline_id = schedule["pipeline_id"] + _print( + "repro", + f"schedule() -> actor {actor_id} <- pipeline {pipeline_id}; " + f"actor_current_pipeline={scheduler.actor_current_pipeline}", + ) + + _print("repro", "simulating dispatch failure WITHOUT cleanup...") + # ^^ This is the bug: an exception thrown between schedule() and + # active_tasks[actor_id] = {...} would land here. If the controller + # does not call scheduler.remove_pipeline(pipeline_id) in its except + # block, the actor stays busy forever. + + # Try to schedule the next pipeline. With the actor leaked busy, this + # should return the "all actors busy" sentinel forever. + busy_sentinel_count = 0 + for _ in range(WEDGE_DETECTION_ITERS): + result = scheduler.schedule() + if result["pipeline_id"] == -1: + busy_sentinel_count += 1 + else: + break + time.sleep(0.01) + + if busy_sentinel_count == WEDGE_DETECTION_ITERS: + _print("FAIL", f"WEDGED -- {WEDGE_DETECTION_ITERS} consecutive 'all actors busy' returns") + _print("FAIL", "this is the bug: actor leaked busy state, scheduler can never recover") + return False + _print("OK", "scheduler recovered without explicit cleanup -- bug not reproduced") + return True + + +def reproduce_recovery_with_fix() -> bool: + """Replay the fixed path: dispatch fails, controller calls remove_pipeline.""" + _print("repro", "scenario 2: dispatch fails AND controller calls remove_pipeline") + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) + + schedule = scheduler.schedule() + actor_id = schedule["actor_id"] + pipeline_id = schedule["pipeline_id"] + _print( + "repro", + f"schedule() -> actor {actor_id} <- pipeline {pipeline_id}; " + f"actor_current_pipeline={scheduler.actor_current_pipeline}", + ) + + _print("repro", "simulating dispatch failure -- controller catches and calls remove_pipeline...") + scheduler.remove_pipeline(pipeline_id) + _print( + "repro", + f"after remove_pipeline: actor_current_pipeline={scheduler.actor_current_pipeline}, " + f"surviving pipelines={scheduler.pipeline_ids}", + ) + + if scheduler.actor_current_pipeline[actor_id] != -1: + _print("FAIL", f"actor {actor_id} is still marked busy after remove_pipeline") + return False + + next_schedule = scheduler.schedule() + if next_schedule["pipeline_id"] == -1: + _print("FAIL", "scheduler still returns 'all actors busy' after remove_pipeline") + return False + _print( + "PASS", + f"scheduler recovered: next assignment is actor {next_schedule['actor_id']} " + f"<- pipeline {next_schedule['pipeline_id']}", + ) + return True + + +def main() -> int: + print("=" * 72) + print("rapidfireai evals scheduler -- actor-wedge reproduction") + print("=" * 72) + + bug_present = not reproduce_wedge_without_fix() + print("-" * 72) + fix_works = reproduce_recovery_with_fix() + print("=" * 72) + + if bug_present and fix_works: + print("BUG REPRODUCED + FIX VERIFIED") + print(" - without remove_pipeline: scheduler wedges (returns 'all busy' forever)") + print(" - with remove_pipeline: scheduler recovers and serves surviving pipelines") + return 0 + print("UNEXPECTED STATE") + print(f" bug_present={bug_present} fix_works={fix_works}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) From 8d93bd9b1d16642c9de63a84f4ca2825bf42664a Mon Sep 17 00:00:00 2001 From: ekirimlioglu <80229265+ekirimlioglu@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:27:09 -0700 Subject: [PATCH 3/5] chore(evals): trim verbose comments and prints in scheduler wedge work Tighten the new comments in controller.py, drop narrative docstrings from test_pipeline_scheduler.py, and simplify repro_scheduler_wedge.py output. No behavior change. --- rapidfireai/evals/scheduling/controller.py | 18 +-- tests/test_pipeline_scheduler.py | 45 +------- tools/repro_scheduler_wedge.py | 124 +++++---------------- 3 files changed, 39 insertions(+), 148 deletions(-) diff --git a/rapidfireai/evals/scheduling/controller.py b/rapidfireai/evals/scheduling/controller.py index 02ae0428..e2953e5d 100644 --- a/rapidfireai/evals/scheduling/controller.py +++ b/rapidfireai/evals/scheduling/controller.py @@ -1789,11 +1789,8 @@ def _format_limits(limits: dict) -> str: f"Busy actors: {status['busy_actors']}, " f"Gen: {status['current_generation']}" ) - # Block on any in-flight future instead of a blind sleep. ray.wait - # surfaces both successful and failed futures (incl. RayActorError - # from a dead actor), so the next iteration's reap path will free - # the actor via remove_pipeline. Avoids wedging when an actor dies - # silently after dispatch and its futures never become ready. + # Block on any in-flight future so a dead actor's failed futures + # surface here and get reaped on the next iteration. all_pending = [] for task_info in active_tasks.values(): all_pending.extend(task_info["futures"]) @@ -1932,13 +1929,9 @@ def _format_limits(limits: dict) -> str: self.logger.debug(f"Initialized actor {actor_id} for pipeline {pipeline_id} ({pipeline_name})") - # Submit all batches and register the task. If anything raises - # synchronously here (e.g. a serialization error inside - # process_batch.remote, or a transient DB failure), the actor was - # marked busy by scheduler.schedule() at pipeline_scheduler.py and - # would leak busy state forever. Mirror the init-failure handler - # above: mark the pipeline FAILED and free the actor via - # remove_pipeline so the rest of the experiment can proceed. + # Mirror the init-failure handler: if dispatch or bookkeeping + # raises, free the actor via remove_pipeline so it doesn't leak + # busy state. try: futures = [] preprocess_fn = pipeline_config.get("preprocess_fn") @@ -1979,7 +1972,6 @@ def _format_limits(limits: dict) -> str: db.set_actor_task_error(task_id, error_msg) db.set_pipeline_status(pipeline_id, PipelineStatus.FAILED) db.set_pipeline_error(pipeline_id, error_msg) - # Drop any partial entry so the reap loop doesn't ray.wait on it active_tasks.pop(actor_id, None) scheduler.remove_pipeline(pipeline_id) if progress_display: diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py index a2490203..b5cc0aa8 100644 --- a/tests/test_pipeline_scheduler.py +++ b/tests/test_pipeline_scheduler.py @@ -1,25 +1,15 @@ -""" -Tests for the evals PipelineScheduler bookkeeping. - -These tests document the invariant the controller's main scheduling loop relies -on: an actor is marked busy by `schedule()` and MUST be freed by either -`set_completed_task(actor_id)` (success path) or `remove_pipeline(pipeline_id)` -(failure path). If neither is called, the actor leaks busy state and the -controller's busy-loop wedges indefinitely (see -`run_multi_pipeline_inference` in -`rapidfireai/evals/scheduling/controller.py`). -""" +"""Tests for PipelineScheduler bookkeeping. -import pytest +Invariant: an actor marked busy by schedule() must be freed by either +set_completed_task(actor_id) or remove_pipeline(pipeline_id). If neither +is called, the controller's busy-loop wedges. +""" from rapidfireai.evals.scheduling.pipeline_scheduler import PipelineScheduler class TestPipelineSchedulerBookkeeping: - """Cover the busy/free transitions in PipelineScheduler.""" - def test_schedule_marks_actor_busy(self): - """schedule() must record actor->pipeline assignment.""" scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=3) result = scheduler.schedule() actor_id = result["actor_id"] @@ -29,7 +19,6 @@ def test_schedule_marks_actor_busy(self): assert scheduler.actor_current_pipeline[actor_id] == pipeline_id def test_set_completed_task_frees_actor_and_advances_progress(self): - """Success path: set_completed_task frees actor and counts the shard.""" scheduler = PipelineScheduler(pipeline_ids=[1], num_actors=1, num_shards=2) result = scheduler.schedule() actor_id = result["actor_id"] @@ -42,12 +31,6 @@ def test_set_completed_task_frees_actor_and_advances_progress(self): assert scheduler.pipeline_shards_completed[1] == 1 def test_remove_pipeline_frees_actor_on_dispatch_failure(self): - """ - Failure path: when the controller cannot dispatch (e.g. actor init throws, - a process_batch.remote raises synchronously, or the actor dies before any - future is created), the controller must call remove_pipeline so the actor - is freed and the rest of the experiment can proceed. - """ scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=2) first = scheduler.schedule() @@ -55,60 +38,42 @@ def test_remove_pipeline_frees_actor_on_dispatch_failure(self): pipeline_id = first["pipeline_id"] assert scheduler.actor_current_pipeline[actor_id] == pipeline_id - # Simulate dispatch failure: controller catches the exception and - # decides to drop the pipeline. The bookkeeping fix is that the actor - # gets freed, not left wedged. scheduler.remove_pipeline(pipeline_id) assert scheduler.actor_current_pipeline[actor_id] == -1 assert pipeline_id not in scheduler.pipeline_ids - # The next schedule() must hand out the freed actor (with the surviving - # pipeline), not return the busy sentinel. second = scheduler.schedule() assert second["pipeline_id"] != -1 assert second["actor_id"] != -1 def test_dispatch_failure_does_not_wedge_remaining_pipelines(self): - """ - End-to-end bookkeeping replay: 1 actor, 2 pipelines, the first - dispatch fails. The freed actor must serve the surviving pipeline - through completion -- the symptom of the bug we are fixing was an - infinite loop of `{pipeline_id: -1}` returns from schedule(). - """ scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) first = scheduler.schedule() failed_pipeline = first["pipeline_id"] actor_id = first["actor_id"] - # Controller hits an exception and drops the pipeline. scheduler.remove_pipeline(failed_pipeline) assert scheduler.actor_current_pipeline[actor_id] == -1 - # Now the surviving pipeline should run on the freed actor. second = scheduler.schedule() survivor = second["pipeline_id"] assert survivor != -1 assert survivor != failed_pipeline scheduler.set_completed_task(second["actor_id"]) - # All shards completed -- termination signal. terminal = scheduler.schedule() assert terminal["pipeline_id"] is None def test_all_actors_busy_returns_busy_sentinel(self): - """When every actor is busy, schedule() must return {-1, -1, -1}.""" scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=2, num_shards=4) scheduler.schedule() scheduler.schedule() - # Both actors busy now. result = scheduler.schedule() assert result == {"pipeline_id": -1, "actor_id": -1, "shard_id": -1} def test_set_completed_task_idempotent_on_free_actor(self): - """Calling set_completed_task on an already-free actor is a no-op.""" scheduler = PipelineScheduler(pipeline_ids=[1], num_actors=1, num_shards=1) - # Actor 0 starts free assert scheduler.actor_current_pipeline[0] == -1 scheduler.set_completed_task(0) assert scheduler.actor_current_pipeline[0] == -1 diff --git a/tools/repro_scheduler_wedge.py b/tools/repro_scheduler_wedge.py index 070b4273..a2c48475 100644 --- a/tools/repro_scheduler_wedge.py +++ b/tools/repro_scheduler_wedge.py @@ -1,42 +1,13 @@ """ -Standalone reproduction for the evals scheduler actor-wedge bug. - -Symptom (before fix): - controller.run_multi_pipeline_inference parks MainThread in - `time.sleep(0.5)` at controller.py "Check if all actors busy" branch - while Ray reports 0% GPU/CPU usage. Bookkeeping says actors are busy; - Ray says they are idle. The hang is permanent. - -Root cause: - PipelineScheduler.schedule() marks an actor busy *before* the controller - dispatches Ray work. If anything between "schedule() returned" and - "active_tasks[actor_id] = {...}" raises (init failure, batch serialization - error, transient DB hiccup, actor death) AND the controller does not - call scheduler.remove_pipeline(pipeline_id), the actor stays busy forever. - - All subsequent schedule() calls return {pipeline_id: -1, actor_id: -1} - ("all actors busy"). The controller's busy-branch was a bare - `time.sleep(0.5)` -- no health check, no progress -- so the loop spins - forever. (See controller.py "Check if all actors busy" block.) - -How this script demonstrates it: - - 1. Without the fix: simulate a dispatch failure that does NOT call - remove_pipeline. Show that PipelineScheduler.schedule() returns the - "all busy" sentinel forever. - 2. With the fix: same setup, but the controller catches the dispatch - exception and calls remove_pipeline. Show that schedule() recovers - and serves the surviving pipeline. +Reproduction for the evals scheduler actor-wedge bug. + +PipelineScheduler.schedule() marks an actor busy before the controller +dispatches Ray work. If dispatch raises and the controller doesn't call +scheduler.remove_pipeline(pipeline_id), the actor leaks busy state and +schedule() returns {-1, -1, -1} forever. Run: python tools/repro_scheduler_wedge.py - -Exits 0 with PASS lines if the fix is in place (Change 2 in controller.py), -exits 1 with FAIL if the bookkeeping invariant is violated. - -This is a deliberately small, Ray-free reproduction. Running the real -controller end-to-end would also wedge but requires the full evals stack; -the bookkeeping invariant alone is enough to expose the bug. """ from __future__ import annotations @@ -46,36 +17,19 @@ from rapidfireai.evals.scheduling.pipeline_scheduler import PipelineScheduler -DISPATCH_FAILURES_TO_SIMULATE = 1 -WEDGE_DETECTION_ITERS = 50 # 50 * 0.01s = 500ms; matches controller's busy-loop cadence - +WEDGE_DETECTION_ITERS = 50 -def _print(tag: str, msg: str) -> None: - print(f"[{tag}] {msg}", flush=True) - -def reproduce_wedge_without_fix() -> bool: - """Replay the buggy path: actor marked busy, dispatch fails, no cleanup.""" - _print("repro", "scenario 1: dispatch fails AND controller forgets to call remove_pipeline") +def reproduce_wedge_without_cleanup() -> bool: + """Dispatch fails, controller does not call remove_pipeline. Scheduler wedges.""" scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) schedule = scheduler.schedule() actor_id = schedule["actor_id"] pipeline_id = schedule["pipeline_id"] - _print( - "repro", - f"schedule() -> actor {actor_id} <- pipeline {pipeline_id}; " - f"actor_current_pipeline={scheduler.actor_current_pipeline}", - ) - - _print("repro", "simulating dispatch failure WITHOUT cleanup...") - # ^^ This is the bug: an exception thrown between schedule() and - # active_tasks[actor_id] = {...} would land here. If the controller - # does not call scheduler.remove_pipeline(pipeline_id) in its except - # block, the actor stays busy forever. - - # Try to schedule the next pipeline. With the actor leaked busy, this - # should return the "all actors busy" sentinel forever. + print(f"scheduled actor {actor_id} <- pipeline {pipeline_id}") + print("simulating dispatch failure with no cleanup") + busy_sentinel_count = 0 for _ in range(WEDGE_DETECTION_ITERS): result = scheduler.schedule() @@ -86,68 +40,48 @@ def reproduce_wedge_without_fix() -> bool: time.sleep(0.01) if busy_sentinel_count == WEDGE_DETECTION_ITERS: - _print("FAIL", f"WEDGED -- {WEDGE_DETECTION_ITERS} consecutive 'all actors busy' returns") - _print("FAIL", "this is the bug: actor leaked busy state, scheduler can never recover") + print(f"wedged: {WEDGE_DETECTION_ITERS} consecutive 'all actors busy' returns") return False - _print("OK", "scheduler recovered without explicit cleanup -- bug not reproduced") + print("recovered without cleanup (bug not reproduced)") return True -def reproduce_recovery_with_fix() -> bool: - """Replay the fixed path: dispatch fails, controller calls remove_pipeline.""" - _print("repro", "scenario 2: dispatch fails AND controller calls remove_pipeline") +def reproduce_recovery_with_cleanup() -> bool: + """Dispatch fails, controller calls remove_pipeline. Scheduler recovers.""" scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) schedule = scheduler.schedule() actor_id = schedule["actor_id"] pipeline_id = schedule["pipeline_id"] - _print( - "repro", - f"schedule() -> actor {actor_id} <- pipeline {pipeline_id}; " - f"actor_current_pipeline={scheduler.actor_current_pipeline}", - ) + print(f"scheduled actor {actor_id} <- pipeline {pipeline_id}") - _print("repro", "simulating dispatch failure -- controller catches and calls remove_pipeline...") scheduler.remove_pipeline(pipeline_id) - _print( - "repro", - f"after remove_pipeline: actor_current_pipeline={scheduler.actor_current_pipeline}, " - f"surviving pipelines={scheduler.pipeline_ids}", - ) + print(f"removed pipeline {pipeline_id}; surviving={scheduler.pipeline_ids}") if scheduler.actor_current_pipeline[actor_id] != -1: - _print("FAIL", f"actor {actor_id} is still marked busy after remove_pipeline") + print(f"actor {actor_id} still busy after remove_pipeline") return False next_schedule = scheduler.schedule() if next_schedule["pipeline_id"] == -1: - _print("FAIL", "scheduler still returns 'all actors busy' after remove_pipeline") + print("scheduler still returns 'all actors busy' after remove_pipeline") return False - _print( - "PASS", - f"scheduler recovered: next assignment is actor {next_schedule['actor_id']} " - f"<- pipeline {next_schedule['pipeline_id']}", - ) + print(f"recovered: actor {next_schedule['actor_id']} <- pipeline {next_schedule['pipeline_id']}") return True def main() -> int: - print("=" * 72) - print("rapidfireai evals scheduler -- actor-wedge reproduction") - print("=" * 72) - - bug_present = not reproduce_wedge_without_fix() - print("-" * 72) - fix_works = reproduce_recovery_with_fix() - print("=" * 72) + print("scenario 1: no cleanup") + bug_present = not reproduce_wedge_without_cleanup() + print() + print("scenario 2: with remove_pipeline") + fix_works = reproduce_recovery_with_cleanup() + print() if bug_present and fix_works: - print("BUG REPRODUCED + FIX VERIFIED") - print(" - without remove_pipeline: scheduler wedges (returns 'all busy' forever)") - print(" - with remove_pipeline: scheduler recovers and serves surviving pipelines") + print("bug reproduced and fix verified") return 0 - print("UNEXPECTED STATE") - print(f" bug_present={bug_present} fix_works={fix_works}") + print(f"unexpected: bug_present={bug_present} fix_works={fix_works}") return 1 From 5788117bbf233ea558ab3fe79f278634ae885aa2 Mon Sep 17 00:00:00 2001 From: ekirimlioglu <80229265+ekirimlioglu@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:30:30 -0700 Subject: [PATCH 4/5] test(evals): fold wedge repro into pipeline_scheduler tests Drop the standalone tools/repro_scheduler_wedge.py and add the wedge scenario as a regression test in test_pipeline_scheduler.py. The unit test encodes the same invariant deterministically and runs in CI, so the standalone script no longer pulls its weight. --- tests/test_pipeline_scheduler.py | 11 ++++ tools/repro_scheduler_wedge.py | 89 -------------------------------- 2 files changed, 11 insertions(+), 89 deletions(-) delete mode 100644 tools/repro_scheduler_wedge.py diff --git a/tests/test_pipeline_scheduler.py b/tests/test_pipeline_scheduler.py index b5cc0aa8..f6a80520 100644 --- a/tests/test_pipeline_scheduler.py +++ b/tests/test_pipeline_scheduler.py @@ -78,3 +78,14 @@ def test_set_completed_task_idempotent_on_free_actor(self): scheduler.set_completed_task(0) assert scheduler.actor_current_pipeline[0] == -1 assert scheduler.pipeline_shards_completed[1] == 0 + + def test_actor_leaks_busy_when_neither_completion_nor_removal_called(self): + """Regression: if dispatch fails and the controller forgets remove_pipeline, + schedule() returns the busy sentinel indefinitely. This is the wedge the + controller fix in run_multi_pipeline_inference prevents.""" + scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) + scheduler.schedule() + + for _ in range(20): + result = scheduler.schedule() + assert result == {"pipeline_id": -1, "actor_id": -1, "shard_id": -1} diff --git a/tools/repro_scheduler_wedge.py b/tools/repro_scheduler_wedge.py deleted file mode 100644 index a2c48475..00000000 --- a/tools/repro_scheduler_wedge.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Reproduction for the evals scheduler actor-wedge bug. - -PipelineScheduler.schedule() marks an actor busy before the controller -dispatches Ray work. If dispatch raises and the controller doesn't call -scheduler.remove_pipeline(pipeline_id), the actor leaks busy state and -schedule() returns {-1, -1, -1} forever. - -Run: - python tools/repro_scheduler_wedge.py -""" - -from __future__ import annotations - -import sys -import time - -from rapidfireai.evals.scheduling.pipeline_scheduler import PipelineScheduler - -WEDGE_DETECTION_ITERS = 50 - - -def reproduce_wedge_without_cleanup() -> bool: - """Dispatch fails, controller does not call remove_pipeline. Scheduler wedges.""" - scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) - - schedule = scheduler.schedule() - actor_id = schedule["actor_id"] - pipeline_id = schedule["pipeline_id"] - print(f"scheduled actor {actor_id} <- pipeline {pipeline_id}") - print("simulating dispatch failure with no cleanup") - - busy_sentinel_count = 0 - for _ in range(WEDGE_DETECTION_ITERS): - result = scheduler.schedule() - if result["pipeline_id"] == -1: - busy_sentinel_count += 1 - else: - break - time.sleep(0.01) - - if busy_sentinel_count == WEDGE_DETECTION_ITERS: - print(f"wedged: {WEDGE_DETECTION_ITERS} consecutive 'all actors busy' returns") - return False - print("recovered without cleanup (bug not reproduced)") - return True - - -def reproduce_recovery_with_cleanup() -> bool: - """Dispatch fails, controller calls remove_pipeline. Scheduler recovers.""" - scheduler = PipelineScheduler(pipeline_ids=[1, 2], num_actors=1, num_shards=1) - - schedule = scheduler.schedule() - actor_id = schedule["actor_id"] - pipeline_id = schedule["pipeline_id"] - print(f"scheduled actor {actor_id} <- pipeline {pipeline_id}") - - scheduler.remove_pipeline(pipeline_id) - print(f"removed pipeline {pipeline_id}; surviving={scheduler.pipeline_ids}") - - if scheduler.actor_current_pipeline[actor_id] != -1: - print(f"actor {actor_id} still busy after remove_pipeline") - return False - - next_schedule = scheduler.schedule() - if next_schedule["pipeline_id"] == -1: - print("scheduler still returns 'all actors busy' after remove_pipeline") - return False - print(f"recovered: actor {next_schedule['actor_id']} <- pipeline {next_schedule['pipeline_id']}") - return True - - -def main() -> int: - print("scenario 1: no cleanup") - bug_present = not reproduce_wedge_without_cleanup() - print() - print("scenario 2: with remove_pipeline") - fix_works = reproduce_recovery_with_cleanup() - print() - - if bug_present and fix_works: - print("bug reproduced and fix verified") - return 0 - print(f"unexpected: bug_present={bug_present} fix_works={fix_works}") - return 1 - - -if __name__ == "__main__": - sys.exit(main()) From 1d33b88d4377c7cdb941eac844a3c5da8740e3ae Mon Sep 17 00:00:00 2001 From: ekirimlioglu <80229265+ekirimlioglu@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:59:57 -0700 Subject: [PATCH 5/5] fix(evals): avoid busy-spin in scheduler busy-loop on partial batch completion The previous busy-branch passed every in-flight batch future to ray.wait(num_returns=1, timeout=0.5). ray.wait is a snapshot of readiness, not an event consumer -- once any single batch resolved (which happens early in a multi-batch task), that already-ready ref permanently satisfied num_returns=1 and the 0.5s timeout never engaged. Combined with the reap path only draining when all of an actor's futures are ready, the all-actors-busy branch hot-spun the IC poll and scheduler.schedule() at thousands of iterations per second instead of the original two per second. Partition first via a non-blocking ray.wait and only block on the not-yet-ready set. Dead-actor detection is preserved: a RayActorError-bearing future is "ready" to ray.wait, lands in the ready partition, and gets drained by the next iteration's reap path via the existing handler at controller.py:1312. --- rapidfireai/evals/scheduling/controller.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/rapidfireai/evals/scheduling/controller.py b/rapidfireai/evals/scheduling/controller.py index e2953e5d..f25e61f8 100644 --- a/rapidfireai/evals/scheduling/controller.py +++ b/rapidfireai/evals/scheduling/controller.py @@ -1789,13 +1789,17 @@ def _format_limits(limits: dict) -> str: f"Busy actors: {status['busy_actors']}, " f"Gen: {status['current_generation']}" ) - # Block on any in-flight future so a dead actor's failed futures - # surface here and get reaped on the next iteration. - all_pending = [] + # Block until something new happens so a dead actor's failed + # futures surface here. ray.wait must be filtered to not-yet-ready + # futures: any already-resolved batch ref would satisfy + # num_returns=1 and turn this into a tight spin. + all_futures = [] for task_info in active_tasks.values(): - all_pending.extend(task_info["futures"]) - if all_pending: - ray.wait(all_pending, num_returns=1, timeout=0.5) + all_futures.extend(task_info["futures"]) + if all_futures: + _ready, not_ready = ray.wait(all_futures, num_returns=len(all_futures), timeout=0) + if not_ready: + ray.wait(not_ready, num_returns=1, timeout=0.5) else: time.sleep(0.5) continue