Skip to content

Add worker middleware#1579

Merged
medihack merged 8 commits into
mainfrom
worker-middleware-feature
Jun 19, 2026
Merged

Add worker middleware#1579
medihack merged 8 commits into
mainfrom
worker-middleware-feature

Conversation

@medihack

@medihack medihack commented Jun 18, 2026

Copy link
Copy Markdown
Member

Implements the event-loop worker_middleware chain reserved as a follow-up in #1556. Builds on the task middleware from #1556 (which closed #1292).

What

A worker middleware: an always-async callable (call_next, context, worker) that wraps every job's run on the event loop — both sync and async tasks — registered worker-wide only.

async def otel_mw(call_next, context, worker):
    with tracer.start_as_current_span(f"run/{context.task.name}"):
        return await call_next()

app.run_worker(worker_middleware=[otel_mw])

Unlike task middleware, a single worker middleware wraps both sync and async tasks, so there's no sync/async kind-matching. The motivating case is OpenTelemetry-style tracing around every job without monkey-patching (#1027).

Design

  • Boundary A — wraps the task run, not the bookkeeping. Composed as the outermost layer around the task-middleware chain at the ensure_async call site in Worker._process_job, staying inside the existing try/except/finally. The task's exception propagates through call_next; status classification and the DB status write stay outside the middleware. (The alternative — wrapping _process_job incl. the status write — was rejected: it would overlap the future job_processing_ended hook and force a non-Pythonic outcome API, since _process_job swallows exceptions internally.)
  • Always async, worker-wide only. Non-async / non-callable entries raise TypeError at worker construction. No per-task parameter, no Task/Blueprint changes.
  • worker.stop(), not a StopWorker exception (already thread-safe/cooperative since Add task middleware #1556).
  • Nesting: worker middleware → worker-wide task middleware → per-task task middleware → task.
  • No-op when unset; reuses the generic compose / is_async_middleware from Add task middleware #1556 unchanged.

Tests

tests/unit/test_middleware.py: wraps async task, wraps sync task (the headline), nests outside task middleware, result transform, exception→failed, JobAborted→aborted, retry-through-middleware, stop() from inside, worker_defaults injection, rejects sync/non-callable, no-op. Full unit suite 538 passing; ruff + basedpyright clean; docs -W clean.

Docs

Restructured docs/howto/advanced/middleware.md to present task and worker middleware as parallel sections with a comparison; WorkerMiddleware added to the API reference.

Follow-up (out of scope)

The worker hooks from #1308 (before_fetching_job for rate limiting; job_processing_ended(JobResult) for resolved-status observation) — complementary and deliberately separate. Keeping the middleware at boundary A keeps the hooks orthogonal.

🤖 Generated with Claude Code

Summary by CodeRabbit

Release Notes

  • New Features

    • Added worker-level middleware support to wrap jobs on the event loop, enabling middleware to run in the event loop context for both sync and async tasks.
  • Documentation

    • Reorganized and clarified middleware concepts, distinguishing between task middleware and worker middleware with improved explanations of execution contexts and ordering.
    • Updated API reference to include the new worker middleware type.

medihack and others added 7 commits June 18, 2026 11:22
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ntics

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Now that the reference lists both TaskMiddleware and WorkerMiddleware, clarify
that kind-matching applies only to task middleware; worker middleware is always
async and exempt.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The page now documents both task and worker middleware, so retitle it "Add
middleware" and split it into parallel "Task middleware" and "Worker middleware"
sections. Pull the cross-cutting "Failures and retries" and "Stopping the worker"
guidance into shared sections that apply to both kinds.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The Django integration already closes Django's per-thread DB connections via a
worker-wide task-middleware pair (contrib/django/db_cleanup.py), so present it as
how Procrastinate does it rather than something the user wires up, and link to the
Django integration how-to.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@medihack medihack requested a review from a team as a code owner June 18, 2026 13:40
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown

Review Change Stack

Caution

Review failed

An error occurred during the review process. Please try again later.

📝 Walkthrough

Walkthrough

Introduces WorkerMiddleware, an always-async middleware type that wraps every job at the worker level on the event loop. The type alias is added to middleware.py, exposed in WorkerOptions, validated at Worker construction time, and applied in _process_job via middleware.compose. Documentation and tests are updated accordingly.

Changes

Worker-level middleware feature

Layer / File(s) Summary
WorkerMiddleware type alias and WorkerOptions field
procrastinate/middleware.py, procrastinate/app.py
Defines WorkerMiddleware as an always-async callable (AsyncCallNext, JobContext, Worker) -> Awaitable[Any] and adds worker_middleware: NotRequired[list[WorkerMiddleware]] to WorkerOptions with matching run_worker_async docstring.
Worker constructor validation and _process_job routing
procrastinate/worker.py
Worker.__init__ accepts worker_middleware, stores it, and raises TypeError for non-callable or non-async entries. _process_job wraps the inner run_job callable through middleware.compose(self.worker_middleware, ...) and assigns the result to job_result.result.
Worker middleware tests
tests/unit/test_middleware.py
Adds tests for init defaults, TypeError rejection of invalid entries, correct before/after ordering for async and sync tasks, nesting outside per-task middleware, result transformation, failed/aborted status propagation, retry observation, worker.stop() from middleware, and worker_defaults wiring.
How-to and reference documentation
docs/howto/advanced/middleware.md, docs/reference.rst
Rewrites the middleware how-to to distinguish task vs worker middleware, add worker-wide task middleware, ordering, comparison, failures/retries, and worker-stopping sections. Updates reference.rst to include WorkerMiddleware in autodoc and clarify MiddlewareKindMismatch exemption.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(173, 216, 230, 0.5)
    note right of Worker: Worker middleware layer (always async, event loop)
    Worker->>WorkerMiddleware: await call_next(context, worker)
  end
  rect rgba(144, 238, 144, 0.5)
    note right of WorkerMiddleware: Task middleware layer (sync/async matched)
    WorkerMiddleware->>TaskMiddleware: call_next(context, worker)
    TaskMiddleware->>TaskFunction: execute task
    TaskFunction-->>TaskMiddleware: result
    TaskMiddleware-->>WorkerMiddleware: result
  end
  WorkerMiddleware-->>Worker: job_result.result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • procrastinate-org/procrastinate#1556: This PR builds directly on the task middleware infrastructure introduced there, reusing middleware.compose and is_async_middleware to add the new worker-level middleware layer on top.

Poem

A rabbit hops through jobs with ease,
Wrapped in middleware layers, if you please!
Worker middleware runs on the loop so bright,
Async all the way from morning to night.
call_next() leads the task to its place,
Every job now runs with middleware grace! 🐇✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 6.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main feature added in this PR: worker middleware.
Linked Issues check ✅ Passed The PR implements worker middleware following design pattern 2 from issue #1292, wrapping complete job processing including database status writes.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing worker middleware as outlined in the linked issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch worker-middleware-feature

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions

github-actions Bot commented Jun 18, 2026

Copy link
Copy Markdown

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  procrastinate
  app.py
  middleware.py
  worker.py
Project Total  

This report was generated by python-coverage-comment-action

A worker middleware must be async. Rejecting a sync one with
MiddlewareKindMismatch instead of TypeError makes worker-middleware kind
validation parallel to task middleware: non-callable raises TypeError, wrong
sync/async kind raises MiddlewareKindMismatch. One ProcrastinateException now
covers a wrong-kind middleware on either registration path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@ewjoachim ewjoachim left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thank you !

@medihack medihack merged commit 3d82ddb into main Jun 19, 2026
14 checks passed
@medihack medihack deleted the worker-middleware-feature branch June 19, 2026 16:52
@medihack medihack added the PR type: feature ⭐️ Contains new features label Jun 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PR type: feature ⭐️ Contains new features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Middleware feature

2 participants