Add worker middleware#1579
Conversation
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ntics Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Now that the reference lists both TaskMiddleware and WorkerMiddleware, clarify that kind-matching applies only to task middleware; worker middleware is always async and exempt. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The page now documents both task and worker middleware, so retitle it "Add middleware" and split it into parallel "Task middleware" and "Worker middleware" sections. Pull the cross-cutting "Failures and retries" and "Stopping the worker" guidance into shared sections that apply to both kinds. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The Django integration already closes Django's per-thread DB connections via a worker-wide task-middleware pair (contrib/django/db_cleanup.py), so present it as how Procrastinate does it rather than something the user wires up, and link to the Django integration how-to. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Caution Review failedAn error occurred during the review process. Please try again later. 📝 WalkthroughWalkthroughIntroduces ChangesWorker-level middleware feature
Sequence Diagram(s)sequenceDiagram
rect rgba(173, 216, 230, 0.5)
note right of Worker: Worker middleware layer (always async, event loop)
Worker->>WorkerMiddleware: await call_next(context, worker)
end
rect rgba(144, 238, 144, 0.5)
note right of WorkerMiddleware: Task middleware layer (sync/async matched)
WorkerMiddleware->>TaskMiddleware: call_next(context, worker)
TaskMiddleware->>TaskFunction: execute task
TaskFunction-->>TaskMiddleware: result
TaskMiddleware-->>WorkerMiddleware: result
end
WorkerMiddleware-->>Worker: job_result.result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||
A worker middleware must be async. Rejecting a sync one with MiddlewareKindMismatch instead of TypeError makes worker-middleware kind validation parallel to task middleware: non-callable raises TypeError, wrong sync/async kind raises MiddlewareKindMismatch. One ProcrastinateException now covers a wrong-kind middleware on either registration path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Implements the event-loop
worker_middlewarechain reserved as a follow-up in #1556. Builds on the task middleware from #1556 (which closed #1292).What
A worker middleware: an always-async callable
(call_next, context, worker)that wraps every job's run on the event loop — both sync and async tasks — registered worker-wide only.Unlike task middleware, a single worker middleware wraps both sync and async tasks, so there's no sync/async kind-matching. The motivating case is OpenTelemetry-style tracing around every job without monkey-patching (#1027).
Design
ensure_asynccall site inWorker._process_job, staying inside the existingtry/except/finally. The task's exception propagates throughcall_next; status classification and the DB status write stay outside the middleware. (The alternative — wrapping_process_jobincl. the status write — was rejected: it would overlap the futurejob_processing_endedhook and force a non-Pythonic outcome API, since_process_jobswallows exceptions internally.)TypeErrorat worker construction. No per-task parameter, noTask/Blueprintchanges.worker.stop(), not aStopWorkerexception (already thread-safe/cooperative since Add task middleware #1556).compose/is_async_middlewarefrom Add task middleware #1556 unchanged.Tests
tests/unit/test_middleware.py: wraps async task, wraps sync task (the headline), nests outside task middleware, result transform, exception→failed,JobAborted→aborted, retry-through-middleware,stop()from inside,worker_defaultsinjection, rejects sync/non-callable, no-op. Full unit suite 538 passing; ruff + basedpyright clean; docs-Wclean.Docs
Restructured
docs/howto/advanced/middleware.mdto present task and worker middleware as parallel sections with a comparison;WorkerMiddlewareadded to the API reference.Follow-up (out of scope)
The worker hooks from #1308 (
before_fetching_jobfor rate limiting;job_processing_ended(JobResult)for resolved-status observation) — complementary and deliberately separate. Keeping the middleware at boundary A keeps the hooks orthogonal.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Documentation