Priority Level
Medium (Annoying but has workaround)
Describe the bug
When running the async engine against a slow inference endpoint (e.g. a large model on DGX Spark), the ThrottleManager.acquire_async() timeout causes spurious ModelTimeoutError failures that cascade into early shutdown and incomplete datasets.
The root cause is that DEFAULT_ACQUIRE_TIMEOUT is hardcoded to 300s in throttle_manager.py:31 and not exposed through ThrottleConfig or RunConfig. With a slow model (e.g. 60-120s per request) and AIMD starting at concurrency=1, tasks queued behind in-flight requests can wait 300s+ for a permit without ever starting a request. When the deadline expires, the throttle layer raises TimeoutError, which propagates as:
TimeoutError (throttle_manager.py:397)
-> ProviderError(kind=TIMEOUT) (throttled.py:166)
-> ModelTimeoutError (errors.py:291)
ModelTimeoutError is retryable, so tasks get deferred to salvage rounds. But salvage rounds hit the same timeout, the error rate crosses the threshold, and the scheduler triggers early shutdown - dropping rows and producing an incomplete dataset.
Importantly, the model itself responds correctly (just slowly). No actual request times out - the tasks fail waiting in the concurrency queue before they ever send a request.
Steps/Code to reproduce bug
The scenario can be reproduced with a mock slow generator that raises ModelTimeoutError after a few successful calls, simulating what the throttle layer does when permits are exhausted:
Mock generators and reproduction test
class MockSlowCellGenerator(ColumnGenerator[ExpressionColumnConfig]):
"""Cell generator that simulates DGX Spark-class latency.
Each call sleeps for ``latency`` seconds before returning. When used behind
the throttle layer with a tight concurrency limit and the default 300s
acquire timeout, queued tasks time out before getting a permit.
For testing, ``timeout_after`` makes the generator raise ModelTimeoutError
after the Nth call, reproducing what the ThrottledModelClient does when the
throttle acquire deadline expires.
"""
def __init__(
self,
*args: Any,
latency: float = 0.0,
timeout_after: int | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self._latency = latency
self._timeout_after = timeout_after
self._call_count = 0
@staticmethod
def get_generation_strategy() -> GenerationStrategy:
return GenerationStrategy.CELL_BY_CELL
def generate(self, data: dict) -> dict:
self._call_count += 1
if self._timeout_after is not None and self._call_count > self._timeout_after:
raise ModelTimeoutError(
f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
)
data[self.config.name] = f"slow_{data.get('seed', '?')}"
return data
async def agenerate(self, data: dict) -> dict:
self._call_count += 1
if self._timeout_after is not None and self._call_count > self._timeout_after:
raise ModelTimeoutError(
f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
)
await asyncio.sleep(self._latency)
data[self.config.name] = f"slow_{data.get('seed', '?')}"
return data
@pytest.mark.asyncio(loop_scope="session")
async def test_scheduler_slow_model_timeout_triggers_early_shutdown() -> None:
provider = _mock_provider()
num_records = 6
configs = [
SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]}),
LLMTextColumnConfig(name="answer", prompt="{{ seed }}", model_alias=MODEL_ALIAS),
]
strategies = {
"seed": GenerationStrategy.FULL_COLUMN,
"answer": GenerationStrategy.CELL_BY_CELL,
}
slow_gen = MockSlowCellGenerator(
config=_expr_config("answer"),
resource_provider=provider,
timeout_after=2, # First 2 calls succeed, then every call times out
)
generators: dict[str, ColumnGenerator] = {
"seed": MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider),
"answer": slow_gen,
}
graph = ExecutionGraph.create(configs, strategies)
row_groups = [(0, num_records)]
tracker = CompletionTracker.with_graph(graph, row_groups)
buffer_manager = RowGroupBufferManager(MagicMock())
scheduler = AsyncTaskScheduler(
generators=generators,
graph=graph,
tracker=tracker,
row_groups=row_groups,
buffer_manager=buffer_manager,
shutdown_error_window=4,
shutdown_error_rate=0.5,
)
await asyncio.wait_for(scheduler.run(), timeout=10.0)
assert scheduler._early_shutdown, "Early shutdown flag should be set"
dropped = sum(1 for ri in range(num_records) if tracker.is_dropped(0, ri))
assert dropped > 0, "Expected some rows to be dropped due to timeout errors"
Expected behavior
Users should be able to configure the throttle acquire timeout via ThrottleConfig (e.g. acquire_timeout: float = 300.0) so that slow-inference endpoints don't trigger spurious timeouts. The current workaround is disable_early_shutdown: true in RunConfig, but that masks all errors, not just throttle queue timeouts.
Agent Diagnostic / Prior Investigation
DEFAULT_ACQUIRE_TIMEOUT is defined at throttle_manager.py:31 (300s)
- Used by
acquire_sync (line 324) and acquire_async (line 373) with no override path from config
ThrottleConfig in run_config.py has AIMD tuning knobs but no acquire timeout field
ThrottledModelClient._athrottled() catches TimeoutError and wraps it as ProviderError(kind=TIMEOUT) (throttled.py:165-171)
- The model facade maps
ProviderErrorKind.TIMEOUT to ModelTimeoutError (errors.py:291)
ModelTimeoutError is in _RETRYABLE_MODEL_ERRORS (async_scheduler.py:49-54), so it gets deferred and retried - but retries hit the same timeout
Additional context
Reported by @mvansegbroeck running on DGX Spark with large models where per-request latency is 60-120s. The issue is specific to slow endpoints with low concurrency - fast endpoints or high-concurrency setups won't saturate the queue long enough to hit the 300s deadline.
Checklist
Priority Level
Medium (Annoying but has workaround)
Describe the bug
When running the async engine against a slow inference endpoint (e.g. a large model on DGX Spark), the
ThrottleManager.acquire_async()timeout causes spuriousModelTimeoutErrorfailures that cascade into early shutdown and incomplete datasets.The root cause is that
DEFAULT_ACQUIRE_TIMEOUTis hardcoded to 300s inthrottle_manager.py:31and not exposed throughThrottleConfigorRunConfig. With a slow model (e.g. 60-120s per request) and AIMD starting at concurrency=1, tasks queued behind in-flight requests can wait 300s+ for a permit without ever starting a request. When the deadline expires, the throttle layer raisesTimeoutError, which propagates as:ModelTimeoutErroris retryable, so tasks get deferred to salvage rounds. But salvage rounds hit the same timeout, the error rate crosses the threshold, and the scheduler triggers early shutdown - dropping rows and producing an incomplete dataset.Importantly, the model itself responds correctly (just slowly). No actual request times out - the tasks fail waiting in the concurrency queue before they ever send a request.
Steps/Code to reproduce bug
The scenario can be reproduced with a mock slow generator that raises
ModelTimeoutErrorafter a few successful calls, simulating what the throttle layer does when permits are exhausted:Mock generators and reproduction test
Expected behavior
Users should be able to configure the throttle acquire timeout via
ThrottleConfig(e.g.acquire_timeout: float = 300.0) so that slow-inference endpoints don't trigger spurious timeouts. The current workaround isdisable_early_shutdown: trueinRunConfig, but that masks all errors, not just throttle queue timeouts.Agent Diagnostic / Prior Investigation
DEFAULT_ACQUIRE_TIMEOUTis defined atthrottle_manager.py:31(300s)acquire_sync(line 324) andacquire_async(line 373) with no override path from configThrottleConfiginrun_config.pyhas AIMD tuning knobs but no acquire timeout fieldThrottledModelClient._athrottled()catchesTimeoutErrorand wraps it asProviderError(kind=TIMEOUT)(throttled.py:165-171)ProviderErrorKind.TIMEOUTtoModelTimeoutError(errors.py:291)ModelTimeoutErroris in_RETRYABLE_MODEL_ERRORS(async_scheduler.py:49-54), so it gets deferred and retried - but retries hit the same timeoutAdditional context
Reported by @mvansegbroeck running on DGX Spark with large models where per-request latency is 60-120s. The issue is specific to slow endpoints with low concurrency - fast endpoints or high-concurrency setups won't saturate the queue long enough to hit the 300s deadline.
Checklist