Skip to content

bug: throttle acquire timeout is hardcoded at 300s, causes early shutdown on slow inference endpoints #551

@andreatgretel

Description

@andreatgretel

Priority Level

Medium (Annoying but has workaround)

Describe the bug

When running the async engine against a slow inference endpoint (e.g. a large model on DGX Spark), the ThrottleManager.acquire_async() timeout causes spurious ModelTimeoutError failures that cascade into early shutdown and incomplete datasets.

The root cause is that DEFAULT_ACQUIRE_TIMEOUT is hardcoded to 300s in throttle_manager.py:31 and not exposed through ThrottleConfig or RunConfig. With a slow model (e.g. 60-120s per request) and AIMD starting at concurrency=1, tasks queued behind in-flight requests can wait 300s+ for a permit without ever starting a request. When the deadline expires, the throttle layer raises TimeoutError, which propagates as:

TimeoutError (throttle_manager.py:397)
  -> ProviderError(kind=TIMEOUT) (throttled.py:166)
    -> ModelTimeoutError (errors.py:291)

ModelTimeoutError is retryable, so tasks get deferred to salvage rounds. But salvage rounds hit the same timeout, the error rate crosses the threshold, and the scheduler triggers early shutdown - dropping rows and producing an incomplete dataset.

Importantly, the model itself responds correctly (just slowly). No actual request times out - the tasks fail waiting in the concurrency queue before they ever send a request.

Steps/Code to reproduce bug

The scenario can be reproduced with a mock slow generator that raises ModelTimeoutError after a few successful calls, simulating what the throttle layer does when permits are exhausted:

Mock generators and reproduction test
class MockSlowCellGenerator(ColumnGenerator[ExpressionColumnConfig]):
    """Cell generator that simulates DGX Spark-class latency.

    Each call sleeps for ``latency`` seconds before returning. When used behind
    the throttle layer with a tight concurrency limit and the default 300s
    acquire timeout, queued tasks time out before getting a permit.

    For testing, ``timeout_after`` makes the generator raise ModelTimeoutError
    after the Nth call, reproducing what the ThrottledModelClient does when the
    throttle acquire deadline expires.
    """

    def __init__(
        self,
        *args: Any,
        latency: float = 0.0,
        timeout_after: int | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self._latency = latency
        self._timeout_after = timeout_after
        self._call_count = 0

    @staticmethod
    def get_generation_strategy() -> GenerationStrategy:
        return GenerationStrategy.CELL_BY_CELL

    def generate(self, data: dict) -> dict:
        self._call_count += 1
        if self._timeout_after is not None and self._call_count > self._timeout_after:
            raise ModelTimeoutError(
                f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
            )
        data[self.config.name] = f"slow_{data.get('seed', '?')}"
        return data

    async def agenerate(self, data: dict) -> dict:
        self._call_count += 1
        if self._timeout_after is not None and self._call_count > self._timeout_after:
            raise ModelTimeoutError(
                f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
            )
        await asyncio.sleep(self._latency)
        data[self.config.name] = f"slow_{data.get('seed', '?')}"
        return data


@pytest.mark.asyncio(loop_scope="session")
async def test_scheduler_slow_model_timeout_triggers_early_shutdown() -> None:
    provider = _mock_provider()
    num_records = 6

    configs = [
        SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]}),
        LLMTextColumnConfig(name="answer", prompt="{{ seed }}", model_alias=MODEL_ALIAS),
    ]
    strategies = {
        "seed": GenerationStrategy.FULL_COLUMN,
        "answer": GenerationStrategy.CELL_BY_CELL,
    }

    slow_gen = MockSlowCellGenerator(
        config=_expr_config("answer"),
        resource_provider=provider,
        timeout_after=2,  # First 2 calls succeed, then every call times out
    )
    generators: dict[str, ColumnGenerator] = {
        "seed": MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider),
        "answer": slow_gen,
    }

    graph = ExecutionGraph.create(configs, strategies)
    row_groups = [(0, num_records)]
    tracker = CompletionTracker.with_graph(graph, row_groups)
    buffer_manager = RowGroupBufferManager(MagicMock())

    scheduler = AsyncTaskScheduler(
        generators=generators,
        graph=graph,
        tracker=tracker,
        row_groups=row_groups,
        buffer_manager=buffer_manager,
        shutdown_error_window=4,
        shutdown_error_rate=0.5,
    )
    await asyncio.wait_for(scheduler.run(), timeout=10.0)

    assert scheduler._early_shutdown, "Early shutdown flag should be set"
    dropped = sum(1 for ri in range(num_records) if tracker.is_dropped(0, ri))
    assert dropped > 0, "Expected some rows to be dropped due to timeout errors"

Expected behavior

Users should be able to configure the throttle acquire timeout via ThrottleConfig (e.g. acquire_timeout: float = 300.0) so that slow-inference endpoints don't trigger spurious timeouts. The current workaround is disable_early_shutdown: true in RunConfig, but that masks all errors, not just throttle queue timeouts.

Agent Diagnostic / Prior Investigation

  • DEFAULT_ACQUIRE_TIMEOUT is defined at throttle_manager.py:31 (300s)
  • Used by acquire_sync (line 324) and acquire_async (line 373) with no override path from config
  • ThrottleConfig in run_config.py has AIMD tuning knobs but no acquire timeout field
  • ThrottledModelClient._athrottled() catches TimeoutError and wraps it as ProviderError(kind=TIMEOUT) (throttled.py:165-171)
  • The model facade maps ProviderErrorKind.TIMEOUT to ModelTimeoutError (errors.py:291)
  • ModelTimeoutError is in _RETRYABLE_MODEL_ERRORS (async_scheduler.py:49-54), so it gets deferred and retried - but retries hit the same timeout

Additional context

Reported by @mvansegbroeck running on DGX Spark with large models where per-request latency is 60-120s. The issue is specific to slow endpoints with low concurrency - fast endpoints or high-concurrency setups won't saturate the queue long enough to hit the 300s deadline.

Checklist

  • I reproduced this issue or provided a minimal example
  • I searched the docs/issues myself, or had my agent do so
  • If I used an agent, I included its diagnostics above

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions