feat: make async engine the default execution path#592
Conversation
The async engine has been hardening as opt-in for several releases. Make it the default and address the prerequisites flagged for the flip. Default flip - DATA_DESIGNER_ASYNC_ENGINE defaults to "1" at both consumption sites - Set DATA_DESIGNER_ASYNC_ENGINE=0 for one transitional release to opt out - allow_resize=True still falls back to sync with a DeprecationWarning Python 3.10 support - Replace asyncio.TaskGroup (3.11+) in async_concurrency.py with gather-with-explicit-cancel; semantics preserved because _run_task already swallows its own exceptions and uses _shutdown_event for sibling cancellation - Remove the sys.version_info < (3, 11) runtime guard - Remove the matching pytest skipif so the executor tests run on 3.10 too Derived timeouts (replaces two hardcoded 300s constants) - ThrottleManager.acquire_sync/async default to timeout=None (no deadline) instead of DEFAULT_ACQUIRE_TIMEOUT=300; HTTP request timeout already bounds actual work, queue waits scale with provider speed and AIMD - _AsyncBridgedModelFacade derives the sync->async bridge timeout from the model's inference_parameters.timeout and the call's max_correction_steps; one knob (per-model timeout) drives both deadlines, no new config surface - Add ModelFacade.request_timeout property so the bridge can read the effective timeout the client is configured with Root-cause surfacing - AsyncTaskScheduler captures the first non-retryable error and exposes it via first_non_retryable_error - Interface threads it through DataDesignerGenerationError when 0 records are produced without early-shutdown, so deterministic failures (e.g. bad seed sources) surface their original message instead of a wrapped FileNotFoundError on the parquet path Tests - New: throttle no-deadline default behavior (sync+async), parametrized derived bridge timeout, restored async_concurrency tests on 3.10 - Updated: test_dataset_builder.py uses an autouse fixture to pin its Mock-based tests to the sync engine they cover; existing bridge tests set facade.request_timeout for the new derivation Docs - Replace the stale LiteLLM security notice in README with a short async-default heads-up and link to the migration guide - Add docs/migration-async-default.md covering per-model timeouts, custom-column thread safety, mocking model calls, run outcomes, and the opt-out - Append a short Update section to the async-all-the-way-down dev note
|
Docs preview: https://2a16db98.dd-docs-preview.pages.dev
|
Review: PR #592 — feat: make async engine the default execution pathSummaryFlips
Also adds FindingsCorrectness
Code quality / consistency
Tests
Docs
Security / Risk
VerdictLooks good to merge. The scope is cohesive (flip + the three pre-flip
None are blocking. |
Greptile SummaryThis PR promotes the async engine to default (
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py | Replaces asyncio.TaskGroup with gather+cancel for Python 3.10 compat; semantics preserved because _run_task catches Exception (not BaseException), so CancelledError still propagates correctly through the except-BaseException cancel path |
| packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py | Removes 300 s hard deadline on acquire_sync/acquire_async; timeout=None now waits indefinitely. Intentional design: per-request HTTP timeout is the only live deadline; test coverage added for both sync and async no-deadline paths |
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py | Bridge timeout now derived via _compute_bridge_timeout from facade.request_timeout * attempts * 1.5, floored at 60 s; honors per-call timeout= override; removes hardcoded 300 s SYNC_BRIDGE_TIMEOUT |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Adds first_non_retryable_error capture (first non-retryable exception only); fixes non-FromScratch seed dispatch to pass rg_size-row buffer snapshot instead of empty DataFrame |
| packages/data-designer/src/data_designer/interface/data_designer.py | Adds _resolve_client_concurrency_mode to align client mode with actual engine path (allow_resize forces SYNC); threads first_non_retryable_error into DataDesignerGenerationError when 0 records produced without early_shutdown |
| packages/data-designer-engine/src/data_designer/engine/models/facade.py | New request_timeout property mirrors 60 s factory.py fallback; used by bridge timeout derivation |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Defaults DATA_DESIGNER_ASYNC_ENGINE to "1"; removes Python 3.11 version guard; threads first_non_retryable_error from scheduler into builder; resets state on each run |
| packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py | Adds client_concurrency_mode kwarg; defaults env-var check from "0" to "1" for direct callers; interface passes explicit value computed by _resolve_client_concurrency_mode |
| tests_e2e/src/data_designer_e2e_tests/plugins/regex_filter/impl.py | Moves regex filter from process_before_batch to process_after_generation to comply with async engine's row-count invariance requirement on pre/post-batch stages |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[DataDesigner.create / preview] --> B[_resolve_client_concurrency_mode]
B --> C{DATA_DESIGNER_ASYNC_ENGINE?}
C -- "=0 env var" --> D[ClientConcurrencyMode.SYNC\n+ DeprecationWarning]
C -- "=1 default" --> E{any allow_resize columns?}
E -- yes --> F[ClientConcurrencyMode.SYNC\n builder-layer warning]
E -- no --> G[ClientConcurrencyMode.ASYNC]
D & F & G --> H[create_resource_provider\nexplicit client_concurrency_mode]
H --> I[DatasetBuilder.build]
I --> J{DATA_DESIGNER_ASYNC_ENGINE?}
J -- sync --> K[Sync engine\ncolumn-at-a-time]
J -- async default --> L[AsyncTaskScheduler]
L --> M[asyncio.gather + cancel loop\nPython 3.10 compat]
L --> Q[_handle_task_failure]
Q --> R{retryable?}
R -- yes --> S[defer/retry]
R -- no --> T[capture first_non_retryable_error\ndrop row]
I --> U{0 records produced?}
U -- early_shutdown=True --> V[DataDesignerEarlyShutdownError]
U -- root_cause captured --> W[DataDesignerGenerationError\nwith original cause]
U -- no cause --> X[generic DataDesignerGenerationError]
Reviews (12): Last reviewed commit: "docs: breadcrumb explaining why SYNC_BRI..." | Re-trigger Greptile
The parametrized bridge-timeout test was patching ``concurrent.futures.Future.result`` to capture the timeout the bridge passed in. That reaches into stdlib internals (DEVELOPMENT.md "Mock at boundaries: Keep mocking shallow") and the ``ids=`` argument on the parametrize was missing. Extracts the formula into a module-level ``_compute_bridge_timeout`` helper. The test now calls the helper directly with no mocking, and the parametrize gets readable ids. Behavior is unchanged.
The e2e demo plugins exercise plugin discovery and full DD lifecycle. Two of them were written against sync-engine semantics that the async engine restricts: - DemoColumnGeneratorImpl was a ColumnGeneratorFullColumn with no required_columns. The async engine routes ``no-upstream`` columns through the from-scratch path, which passes an empty DataFrame to generators that aren't FromScratchColumnGenerator subclasses. The generator then produces 0 rows and the scheduler raises ``update_batch received 0 values``. Switching the plugin to FromScratchColumnGenerator with generate_from_scratch(num_records) matches what the plugin actually does (produces a constant column without input) and works on both engines. - RegexFilterProcessor implemented process_before_batch with row-count changes. The async engine enforces row-count invariance in pre- and post-batch processor stages by design. Moving the filter to process_after_generation preserves the plugin's purpose (regex-based row filtering) at a stage that supports row-count changes on both engines. Test assertions check the final dataset, so the stage shift is transparent. Both changes are demo-plugin updates only; no production code change.
Three bugs and two test-quality concerns surfaced by an independent review of
the prior commits. Each was real and worth fixing in the flip PR.
Bug fixes
- Sync-fallback path was creating async-only model clients. The default flip
meant ``client_concurrency_mode = ASYNC`` for every default run, but the
``allow_resize=True`` path falls back to the sync engine — sync ``model.generate()``
calls then hit ``SyncClientUnavailableError``. The resolution decision now
lives at the DataDesigner interface level via
``_resolve_client_concurrency_mode``: it considers both the env var and the
config (allow_resize forces sync clients) and is passed explicitly to
``create_resource_provider``. Direct callers of the factory still get the
env-var default.
- Sync→async bridge timeout ignored the per-call ``timeout=`` override. A
custom column calling ``model.generate(timeout=600)`` against a slow endpoint
was being cancelled at the model-config default, not 600s. The bridge now
prefers ``kwargs.get("timeout")`` over ``facade.request_timeout``.
- Bridge timeout formula missed ``max_conversation_restarts``. One logical
generation can do ``(1 + max_conversation_restarts) × (1 + max_correction_steps)``
HTTP requests; the formula now multiplies both, matching the worst-case
attempt budget.
Engine routing fix (also surfaced by failing e2e plugin tests)
- ``_run_from_scratch`` else-branch passed an empty DataFrame to non-FromScratch
generators classified as seeds (no upstream columns), so ``ColumnGeneratorFullColumn``
with no required_columns produced 0 rows for an ``rg_size``-row buffer. Now
passes an ``rg_size``-row snapshot of the row-group buffer, mirroring the
sync engine's FULL_COLUMN contract.
- The earlier ``DemoColumnGeneratorImpl`` workaround (rewrite as ``FromScratchColumnGenerator``)
is reverted; the engine fix subsumes it. The processor-plugin fix
(``process_after_generation`` for the regex filter) stays — pre-batch
row-count change is intentionally rejected by the async engine.
Test improvements
- Throttle no-deadline tests are parametrized over ``(timeout=0.0, raises)``
and ``(timeout=None, waits)``, pinning that ``None`` is genuinely distinct
from any finite default. Sync and async counterparts mirror.
- New regression tests for ``first_non_retryable_error`` surfacing covering
both load-raises and load-returns-empty paths, asserting the original
exception is chained via ``__cause__`` and that the typed
``DataDesignerEarlyShutdownError`` doesn't fire in this branch.
- New parametrized regression test for ``_resolve_client_concurrency_mode``
covering all four (env × allow_resize) combinations.
- New parametrized test for the per-call ``timeout=`` override flowing into
the bridge timeout calculation.
- Bridge formula tests extended with ``max_conversation_restarts`` cases.
…sync-default-prereqs # Conflicts: # packages/data-designer/src/data_designer/interface/data_designer.py
Three parametrize cases were duplicating coverage already provided by existing standalone tests: - ``test_acquire_*_timeout_branches`` parametrized over ``(0.0, raises)`` and ``(None, waits)``. The ``raises`` half duplicates ``test_acquire_*_raises_timeout_when_at_capacity``. Replaced with two focused ``..._default_no_deadline_waits_for_release`` tests covering only the no-deadline branch. - ``test_resolve_client_concurrency_mode_matches_engine_choice`` had four cases. The ``async-off + allow-resize`` case asserts ``SYNC`` because the env var alone forces it; the allow_resize input is moot. Dropped. - ``test_async_bridge_honors_per_call_timeout`` had three cases. The "override below floor" case cross-products the per-call override flow with the floor-clamping behavior already covered by ``test_compute_bridge_timeout``. Dropped. Net: -25 lines of test code with no loss of essential coverage.
The standalone ``Migrating to the async default`` page didn't fit the existing docs style — present tense, behavior over comparisons, content in the natural concept home. Folding it in: - ``architecture-and-performance.md`` gets a new ``Async Engine`` section covering per-model timeouts, run outcomes (partial completion + ``DataDesignerEarlyShutdownError``), and the transitional opt-out. Three stale ``async engine is landing soon`` callouts updated to reflect the flip. - ``custom_columns.md`` gets two short notes: a thread-safety callout near Generation Strategies, and a mocking-with-spec note in Development Testing. - ``async-all-the-way-down.md`` Update section now points at the new arch-and-perf section. - README heads-up links to the same anchor. - ``migration-async-default.md`` removed; mkdocs.yml entry dropped.
Small targeted edits to make the user-facing concept docs consistent with the post-flip state. No restructuring. - ``architecture-and-performance.md``: the ``Execution Model`` callout now opens with two engines, links to the new ``Async Engine`` section, and frames the existing column-at-a-time description as sync-engine semantics. The ``Step 2: Process columns sequentially`` paragraph notes the async engine relaxes this. The ``Key Concepts`` table differentiates per-engine for ``Batching`` and ``Sequential columns``; ``Parallel cells`` is the same on both. - ``processors.md``: added a warning callout about the async engine's row-count invariance in pre- and post-batch stages, with the guidance to use ``process_after_generation()`` for row-filtering or expansion.
nabinchha
left a comment
There was a problem hiding this comment.
Thanks for putting this together, @andreatgretel — this is a thoughtful, well-scoped flip with the prerequisites carefully addressed and the test coverage to back it up.
Summary
Makes the async engine the default execution path and lands the Python 3.10 compat, derived-timeout, and root-cause-surfacing prerequisites alongside it. Implementation matches the stated intent across all four "Attention Areas" — the bridge timeout derivation, the throttle no-deadline default, the _resolve_client_concurrency_mode alignment, and the first_non_retryable_error chaining all behave as documented.
Findings
Warnings — Worth addressing
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py:19 — SYNC_BRIDGE_TIMEOUT = 300 still hardcoded in _run_coroutine_sync
- What: The PR replaces the
SYNC_BRIDGE_TIMEOUTimport incustom.pywith the derived_compute_bridge_timeout, but the original constant is still defined inbase.pyand still used by the defaultColumnGenerator.generate()fallback (_run_coroutine_sync). Any subclass that only implementsagenerate()and is then called from sync context (e.g. user code outside the engine) hits the same 300s ceiling the PR explicitly set out to remove. - Why: The PR description calls out "two hardcoded 300s timeouts that broke slow self-hosted endpoints." The throttle one and the custom-column bridge one are addressed; this one is structurally identical and would bite the same slow-endpoint scenarios for users who call
generator.generate(...)directly. Since async-first generators are about to become the norm, this code path will likely see more, not less, traffic. - Suggestion: Either route this bridge through the same derivation (if a
request_timeoutis reachable from the generator's resource provider) or, at minimum, hoist the floor up to match_BRIDGE_TIMEOUT_FLOOR_Sand accept the 60s minimum here too. If neither fits in this PR, an explicit follow-up issue would be enough — happy to leave it for the next pass, but the PR description should probably acknowledge the third 300s hidden inbase.py.
packages/data-designer-engine/tests/engine/models/clients/test_throttle_manager.py:524 — asyncio.create_task reference dropped
- What:
asyncio.create_task(release_after(0.05))doesn't keep a reference to the resulting task. Python's asyncio docs note that the loop only holds weak references to tasks, so a GC cycle between scheduling and the innerawaitcan collect the task before it runs. - Why: In practice the test almost certainly passes because the
asyncio.sleep(0.05)keeps the task alive long enough foracquire_asyncto acquire the lock, but it's a known flake source on slower CI runners or under heavy load. The new sync counterpart correctly retains itsThreadvia the daemon thread reference. - Suggestion:
release_task = asyncio.create_task(release_after(0.05)) try: await asyncio.wait_for( tm.acquire_async(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN), timeout=2.0, ) finally: await release_task
Suggestions — Take it or leave it
packages/data-designer/src/data_designer/interface/data_designer.py:267 — Error formatting may obscure exception type
- What:
raise DataDesignerGenerationError(f"🛑 {root_cause}") from root_causecalls__str__on the exception, which drops the type name. AValueError("invalid seed source: no rows after hydration")becomes "🛑 invalid seed source: no rows after hydration" — the chained__cause__still has full context, but the surface message is less informative than it could be for users reading the top frame. - Why: This is the user-facing message we just went out of our way to surface. Including the exception type makes it clearer that the underlying failure came from outside the framework.
- Suggestion: Consider
f"🛑 {type(root_cause).__name__}: {root_cause}"for a small readability win. Take it or leave it — chaining via__cause__is the contract anyway.
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py:47-48 — Magic numbers in _compute_bridge_timeout
- What: The 1.5x buffer factor is a literal in the function body. The formula is well-explained in the docstring, but the constant itself doesn't have a name.
- Why: A reader scanning the function for the buffer multiplier has to read the docstring; named constants would make the formula self-documenting and easier to tune.
- Suggestion: Optional —
_BRIDGE_TIMEOUT_OVERHEAD_FACTOR: float = 1.5 # HTTP connect/teardown + serialization buffer def _compute_bridge_timeout(...): attempts = (1 + max_conversation_restarts) * (1 + max_correction_steps) return max(_BRIDGE_TIMEOUT_FLOOR_S, attempts * request_timeout * _BRIDGE_TIMEOUT_OVERHEAD_FACTOR)
packages/data-designer/src/data_designer/interface/data_designer.py:580 — Defensive getattr likely unnecessary
- What:
any(getattr(c, "allow_resize", False) for c in config_builder.get_column_configs())usesgetattrwith a fallback, but every member ofColumnConfigT(which is what the builder accepts) inheritsallow_resize: bool = FalsefromSingleColumnConfig. The builder rejects anything that isn't inColumnConfigT.__args__atadd_columntime. - Why: Mirroring the engine's check style would make it obvious that this catches every user-reachable column config; the current form reads as if some configs might not have the field.
- Suggestion: Optional —
any(c.allow_resize for c in config_builder.get_column_configs()). If you want to keep the defensive form for symmetry with_resolve_async_compatibility, that's fine too.
packages/data-designer/src/data_designer/interface/data_designer.py:265-267 — Sentinel-driven branch is implicit
- What:
if root_cause is not None and builder.actual_num_records == 0:works correctly because the sync engine never sets_first_non_retryable_errorand leavesactual_num_records = -1. The interface code reads as if it might also fire on the sync path, but it doesn't. - Why: A future contributor adding sync-engine error capture (e.g. for symmetry) might wire
actual_num_records = 0for the sync path and accidentally double-trigger this branch. - Suggestion: A one-line comment near each call site would make the async-only assumption explicit:
# actual_num_records is only set on the async path; sync runs leave it at -1 # so this branch never fires for the sync engine.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:886-887 — First-error capture loses subsequent context
- What: Only the first non-retryable error is captured; later ones are dropped (the warning log line is the only trace). For a run where the first failure is misleading (e.g. a single corrupt seed row that fails with
KeyError) and the systemic failure (e.g. wrong provider auth) shows up later, the surfaced root cause may not be the most actionable one. - Why: "First wins" is a reasonable heuristic, especially for the deterministic-failure case the PR targets, but worth being explicit about the trade-off.
- Suggestion: Take it or leave it — for a follow-up, you might track per-column counts of distinct error types and surface the most frequent. For now the docstring on the property already communicates "first" clearly enough.
What Looks Good
- Test coverage matches the surface area. The parametrized
_compute_bridge_timeoutcases (with thecorrections × restarts compoundandsmall-clamped-to-floorids) make the formula concrete; the regression tests forfirst_non_retryable_errorcover both the load-raises and load-returns-empty paths and explicitly assert the typed early-shutdown error doesn't fire spuriously. Thetest_resolve_client_concurrency_mode_matches_engine_choicematrix is exactly the right shape. - The
gather-with-cancel rewrite inasync_concurrency.pyis conservative and well-commented. The defensive cancel block calls out that_run_taskswallows its own exceptions today, so the explicit cancel only matters under future change. That's the right framing. - Smart capture of root cause through the scheduler. The mechanism — capture the first non-retryable error in the scheduler, expose it via property, thread it through the interface only when the run produced 0 records — is a clean way to recover the context the sync engine got "for free" via raises. The triple guard on the interface (
builder.first_non_retryable_error is not None and builder.actual_num_records == 0plus the early-shutdown check first) correctly prevents partial-salvage runs from masking unrelated load failures. - Documentation lands in the right places. Folding the migration guidance into
architecture-and-performance.mdandprocessors.md(rather than a standalone migration page) matches the existing docs style. The thread-safety callout incustom_columns.mdand theMagicMock(spec=ModelFacade)note are the kind of "you'll wish you knew this before debugging" footnotes that pay back fast. - Backward compatibility preserved at the factory boundary.
create_resource_provideraccepting an explicitclient_concurrency_modewhile keeping the env-var default for direct callers is the right shape — the interface owns the policy, the factory stays mechanical.
Verdict
Ship it (with nits) — the only Warning-level item (the lingering SYNC_BRIDGE_TIMEOUT in base.py) is arguably scope-adjacent and fine as a follow-up issue if you'd rather keep this PR focused. Everything else is suggestions.
This review was generated by an AI assistant.
Four targeted fixes from the review.
Worth-addressing (warning):
- ``test_acquire_async_default_no_deadline_waits_for_release`` was
spawning the release task without holding a strong reference. The
loop's weak-ref bookkeeping could GC it before the inner ``await``
observes the release, producing a CI flake. Hold the task and
``await`` it in ``finally``.
Take-it-or-leave-it (applied):
- Root-cause error surfacing now includes the exception type name:
``f"🛑 {type(root_cause).__name__}: {root_cause}"`` so users see
``ValueError: ...`` instead of just the message string. The
``__cause__`` chain is preserved either way.
- Drop the defensive ``getattr(c, "allow_resize", False)`` in
``_resolve_client_concurrency_mode`` — every member of
``ColumnConfigT`` inherits ``allow_resize: bool = False`` from
``SingleColumnConfig``.
- One-line comment near the root-cause surfacing branch noting that
``actual_num_records == 0`` is async-only (sync runs leave it at
``-1``), so the branch is async-only by construction.
Not addressed in this PR (filing as follow-ups):
- ``SYNC_BRIDGE_TIMEOUT = 300`` still hardcoded in
``column_generators/generators/base.py:_run_coroutine_sync``. That
bridge has no model-facade context to derive a timeout from, so the
fix is a structural refactor outside this PR's scope.
- First-error capture loses subsequent-error context. The "first wins"
heuristic is documented; richer aggregation is a follow-up.
This was the third hardcoded 300s timeout (Nabin flagged it on PR #592). The path is the generic sync→async bridge in ``ColumnGenerator.generate()``: when a subclass overrides only ``agenerate()``, the sync entry point runs the coroutine in a background thread. Same philosophy we applied to the throttle queue wait elsewhere in the PR: a defensive deadline on top of work that's already bounded by the HTTP timeout doesn't add safety, it just produces spurious failures on slow self-hosted endpoints. Drop the constant, the timeout exception handling, and the ``timed_out`` bookkeeping. ``pool.shutdown(wait=True)`` becomes the simple cleanup. Tests in ``test_async_generators.py`` exercise the happy path only and don't depend on the timeout firing.
This reverts commit 7a0b77d.
|
Thanks for the careful review. Pushed Applied:
Deferred to a follow-up PR:
Skipped (pure style):
|
|
|
||
| ### Opting out | ||
|
|
||
| The legacy sync engine is available as a transitional opt-out: |
There was a problem hiding this comment.
Should we add "deprecation warning" language here?
There was a problem hiding this comment.
Did both. The doc now uses a !!! warning "Deprecated" admonition naming the env var as a deprecated escape hatch (43f71130), and the runtime emits a DeprecationWarning plus logger.warning when DATA_DESIGNER_ASYNC_ENGINE=0 is set, mirroring the existing allow_resize shape (4dfa200e). Both signals now match the precedent in _resolve_async_compatibility.
Nabin asked whether the docs should adopt explicit "deprecation" language
on the opt-out path. Doing both:
- Doc: ``architecture-and-performance.md``'s ``Opting out`` section now
uses an ``!!! warning "Deprecated"`` admonition that names the env var
as a deprecated escape hatch and notes the run-time warning.
- Code: ``DataDesigner._resolve_client_concurrency_mode`` emits a
``DeprecationWarning`` when ``DATA_DESIGNER_ASYNC_ENGINE=0`` is detected.
Same precedent as the existing ``allow_resize=True`` warning. Auto-fallback
via ``allow_resize`` does not double-warn here; the builder layer emits
its own warning later.
- Test: parametrized regression now asserts ``pytest.warns(DeprecationWarning)``
on the opt-out branch and treats any warning on the async-on branches as
a failure (``simplefilter("error")`` inside the ``catch_warnings`` block).
nabinchha
left a comment
There was a problem hiding this comment.
Thanks for putting this together, @andreatgretel — I re-walked the PR with the latest commits in hand, and the prior round of feedback has been thoroughly addressed (with the trade-offs you couldn't apply spelled out cleanly in bf36d256's body). At this point the change reads as a careful, well-scoped flip: async by default, prerequisite gaps filled in, and follow-ups explicitly punted with rationale.
Summary
Flips DATA_DESIGNER_ASYNC_ENGINE to "1" at both consumption sites and lands the prerequisites that surface alongside the flip — Python 3.10 compatibility for the async stack (gather-with-cancel in place of TaskGroup), removal of two hardcoded 300 s timeouts that broke slow self-hosted endpoints (sync bridge and throttle queue-wait), and root-cause surfacing for runs that produce 0 records. Implementation matches the stated intent.
Findings
Suggestions — Take it or leave it
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:72 — Silent opt-out leaves DATA_DESIGNER_ASYNC_ENGINE=0 users in the dark
- What: The README and
architecture-and-performance.mdboth call the env var a "transitional opt-out" that "will be removed in a future release", but at runtime there's noDeprecationWarningemitted when the user setsDATA_DESIGNER_ASYNC_ENGINE=0. The only deprecation signal in the engine flip is theallow_resize=Trueauto-fallback path in_resolve_async_compatibility(dataset_builder.py:330). Users who explicitly opt out via the env var get the legacy behavior silently and won't have a programmatic signal to migrate before the switch is removed. - Why: This was the spirit of the inline comment I left on
architecture-and-performance.md:304last round. The docs now lean harder on "transitional"/"will be removed" wording, which makes the runtime silence more noticeable. - Suggestion: Emit a one-time
DeprecationWarning(and alogger.warning, mirroring theallow_resizeshape) at the env-var read site whenDATA_DESIGNER_ASYNC_ENGINEis explicitly"0". Something like:The same logic could live once in a small helper consumed by bothif os.environ.get("DATA_DESIGNER_ASYNC_ENGINE") == "0": msg = ( "DATA_DESIGNER_ASYNC_ENGINE=0 forces the legacy sync engine. " "This switch is transitional and will be removed in a future release; " "please run on the async engine and report any regressions." ) logger.warning(f"⚠️ {msg}") warnings.warn(msg, DeprecationWarning, stacklevel=2) DATA_DESIGNER_ASYNC_ENGINE = os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1"
dataset_builder.pyandresource_provider.py. Happy to defer to a follow-up issue if you'd rather keep this PR tight.
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py:19 — SYNC_BRIDGE_TIMEOUT = 300 survives without a follow-up breadcrumb
-
What: The constant and the timeout-handling block in
_run_coroutine_syncare unchanged on this branch (7a0b77d4dropped them,cf0826d1reverted with no body). The rationale for keeping it lives only inbf36d256's commit message (no facade context to derive a timeout). That's a fine call for this PR — the path is the genericColumnGenerator.generate()fallback, and there's no model-facade reachable from there to derive a bounded deadline. -
Why: A future maintainer reading
base.pywill see the same 300 s pattern that the rest of the PR removed and not know it was deliberately preserved. The revert commit body is also empty, sogit blamewon't lead them to the rationale either. -
Suggestion: Two small breadcrumbs would close the loop:
- A short comment above
SYNC_BRIDGE_TIMEOUT = 300(or above_run_coroutine_sync) noting "Preserved deliberately: this is the generic sync→async bridge forColumnGenerator.generate()overrides; it has noModelFacadecontext to derive a per-call deadline. Tracked for structural follow-up in #." - File the follow-up issue (or link an existing one) so the breadcrumb has a destination.
Same nit applies to the empty revert message in
cf0826d1— even just "Reverting; preserved as-is for follow-up — see bf36d25 for context" in the body would help future archaeologists. - A short comment above
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py:30,47 — Bridge-timeout magic numbers could be explained inline
- What:
_BRIDGE_TIMEOUT_FLOOR_S = 60.0has a docstring; the1.5multiplier inattempts * request_timeout * 1.5does not. The PR description explains it ("absorbs HTTP connect/teardown and serialization overhead") and the function docstring partially mirrors it. - Why: Future tuners (or anyone debugging a flaky bridge timeout) will reach for the multiplier first. A named constant or an inline comment makes the intent self-documenting.
- Suggestion: Extract the buffer to a named constant alongside the floor:
Then
_BRIDGE_TIMEOUT_FLOOR_S: float = 60.0 _BRIDGE_TIMEOUT_OVERHEAD_FACTOR: float = 1.5 # HTTP connect/teardown + serialization buffer
attempts * request_timeout * _BRIDGE_TIMEOUT_OVERHEAD_FACTOR. Tiny, but makes intent obvious and keeps test fixtures explicit if the buffer ever needs adjusting.
What Looks Good
bf36d256is exemplary "address review feedback" hygiene. Each item is labelled by severity, applied changes are described, and the items not applied are filed as follow-ups with concrete rationale ("no facade context to derive a timeout from", "richer aggregation is a follow-up"). That's exactly the level of context that makes a re-review fast.- Root-cause surfacing is well-thought-through. The async-only-by-construction comment at
data_designer.py:265-267makes the load/empty branches readable, andf"🛑 {type(root_cause).__name__}: {root_cause}"plusfrom root_causegives users both a useful one-liner and the full chain for debugging. The triple-site pattern (create()load failure,create()empty dataset,preview()empty dataset) is consistent. - Test fix at
test_throttle_manager.py:524-534reads cleanly. Strong reference +await … finallydefends against the asyncio loop's weak-ref bookkeeping, and the inline comment explains why anyone reading it later — the same defense maintainers should reach for elsewhere. - Clean Python 3.10 fallback. The
gather-with-cancel pattern inasync_concurrency.py:_run_allis genuinely behavior-equivalent toTaskGroupfor this use (since_run_taskswallows its own exceptions), and the comment block explicitly says so. Removing the runtime guard and the matchingpytestmark.skipifkeeps this minimal. - Engine/client-mode alignment via
_resolve_client_concurrency_modeis the right shape. Single point of truth, doesn't leak the env var into the interface, and correctly accounts forallow_resizeforcing sync. Dropping the defensivegetattrwas the right call once we confirmed everyColumnConfigTmember has the field.
Verdict
Ship it (with nits) — The Suggestions above are all optional polish; nothing is blocking. The deprecation-warning gap is the most user-facing of them and would close a small loop between docs and runtime, but it's reasonable to file as a follow-up if you'd rather not expand the scope of this PR. Linter and formatter both pass on the changed files (ruff check clean, ruff format --check clean).
This review was generated by an AI assistant.
Parity fix from Nabin's re-review of PR #592. The ``allow_resize=True`` auto-fallback path in ``_resolve_async_compatibility`` emits both a ``logger.warning("⚠️ ...")`` and a ``DeprecationWarning``. The new ``DATA_DESIGNER_ASYNC_ENGINE=0`` opt-out path was only emitting the ``DeprecationWarning``, leaving users who run with default warning filters silenced and inconsistent with the established precedent. Match the pattern: same message body, both signals, same stacklevel.
Nabin's re-review pointed out that ``base.py`` is the lone place where the 300s pattern survives, while ``custom.py`` and ``throttle_manager.py`` both retired theirs. Without a comment, a future reader (or a lint sweep) could mistake this for an oversight and "consistency-fix" it the wrong way. Add a short note at the constant naming the two retired siblings, the reason this one stayed (no ``ModelFacade`` context to derive from), and the fact that it's tracked for a structural follow-up.
|
Thanks for the re-review, Nabin. Pushed two more small commits addressing the first two suggestions:
Skipping the Re-requesting your review since the push will dismiss the prior approval. Sorry for the dance. |
Summary
Makes the async execution engine the default path and addresses the prerequisites that surface alongside the flip: Python 3.10 compatibility for the async stack, two hardcoded 300s timeouts that broke slow self-hosted endpoints, and root-cause surfacing for runs that produce 0 records. Closes #551.
Changes
Added
Async Enginesection inarchitecture-and-performance.mdcovering per-model timeouts, run outcomes (partial completion +DataDesignerEarlyShutdownError), and the transitional opt-out.MagicMock(spec=ModelFacade)mocking note incustom_columns.md.ModelFacade.request_timeoutproperty exposing the effective per-request HTTP timeout.AsyncTaskScheduler.first_non_retryable_error— captures the first task error so the interface can surface the original cause when a run produces 0 records.DataDesigner._resolve_client_concurrency_mode— picks the model-client mode that matches the engine the run will actually use, so anallow_resizesync-fallback doesn't end up with async-only clients.Updatesection on the existingasync-all-the-way-downdev note pointing at the new arch-and-perf section.Changed
DATA_DESIGNER_ASYNC_ENGINEdefaults to"1"at both consumption sites. Set to"0"to fall back for one transitional release.allow_resize=Truestill triggers an automatic sync-engine fallback with aDeprecationWarning.ThrottleManager.acquire_sync/asyncdefault totimeout=None(no queue-wait deadline). Per-request HTTP timeout is the only deadline that bounds actual work; queue waits scale with provider speed and AIMD. Existing tests that pass explicittimeout=floats (0.0,0.5) are unaffected._AsyncBridgedModelFacadederives the sync→async bridge timeout via the new_compute_bridge_timeouthelper as(1 + max_conversation_restarts) × (1 + max_correction_steps) × request_timeout × 1.5, floored at 60s. Honors per-calltimeout=overrides. One knob — the per-modelinference_parameters.timeout— drives both the HTTP deadline and the bridge deadline. No new user-facing config surface._run_from_scratchfor non-FromScratchColumnGeneratorgenerators dispatched as seeds (no upstream columns) now passes anrg_size-row buffer snapshot instead of an empty DataFrame. Fixes a routing footgun for plugin authors writingColumnGeneratorFullColumnwith norequired_columns.first_non_retryable_errorintoDataDesignerGenerationErrorwhen 0 records are produced without early-shutdown, so deterministic seed/generator failures surface the original message instead of a wrapped parquetFileNotFoundError.DataDesigner._resolve_client_concurrency_mode, threaded intocreate_resource_providervia a new explicitclient_concurrency_modekwarg. Direct callers of the factory still get the env-var default.async engine is landing sooncallouts in arch-and-perf updated to reflect the flip.Fixed
asyncio.TaskGroupuse inasync_concurrency.py:_run_allis replaced withasyncio.gatherplus an explicit cancel-on-BaseExceptionloop._run_taskalready swallows its own exceptions and uses_shutdown_eventfor sibling cancellation, so the rewrite is behavior-equivalent. Removed thesys.version_info < (3, 11)runtime guard and the matchingpytestmarkskipif.Attention Areas
async_concurrency.py— thegather-with-cancel rewrite preserves TaskGroup semantics for our use only because_run_taskdoesn't raise into the parent. The explicit cancel loop in theexcept BaseException:branch is defensive — kicks in only if a future change causes children to raise into gather, mirroring what TaskGroup would have done.throttle_manager.py—timeout=Nonesemantics: when no deadline is passed, the loop waits indefinitely for permits. The HTTP timeout is the meaningful deadline.custom.py— bridge timeout derivation honorskwargs.get("timeout")overfacade.request_timeout, andmax_conversation_restartsis in the formula. Sync customs that callmodel.generate(timeout=600)get the override.data_designer.py— root-cause surfacing fires only whenactual_num_records == 0AND notearly_shutdown. Partial-salvage runs that fail to load for unrelated reasons still surface the original load error._resolve_client_concurrency_modeis the single point where the engine-vs-client mismatch is prevented.Verification
make testpasses on Python 3.10 and 3.12 (3117/3117 across all three packages).make test-e2epasses locally (6 passed, 2 skipped).integrate.api.nvidia.com(Build) andinference-api.nvidia.com(Inference) with no env var set: 13/13 across single-model, multi-model, mixed-provider, multi-row-group, preview, embeddings, native async customs, sync→async bridge, allow_resize fallback, and non-retryable error path.Follow-ups (out of scope)
architecture-and-performance.mdstill describes the sync engine's column-at-a-time execution model in its mainExecution Modelsection. Worth a separate refresh PR alongside the newAsync Enginesection added here.asyncio.TaskGroup.LoggingConfig.default()still swallows scheduler WARNs (called out as a follow-up in fix(async): pack of fixes for async engine under degraded providers #585). Orthogonal to the flip; should ship soon to make the new degraded-provider WARN visible by default.Description updated with AI