Skip to content

feat(py): generate middleware#5253

Open
huangjeff5 wants to merge 57 commits into
mainfrom
jh-mw
Open

feat(py): generate middleware#5253
huangjeff5 wants to merge 57 commits into
mainfrom
jh-mw

Conversation

@huangjeff5
Copy link
Copy Markdown
Contributor

@huangjeff5 huangjeff5 commented May 7, 2026

Python middleware for generate()use=[...]

Adds a middleware system for Python that lets you intercept and wrap generate() calls at three granularities: the full generate iteration, each model API call, and each tool execution. Replaces the old ModelMiddleware abstraction.

What it does

Middleware is applied per-generate() call via a use=[...] parameter. Each entry in the list wraps the call in a chain — first entry is outermost (runs first/last). Three hooks are available:

  • wrap_generate — wraps each iteration of the tool loop (model call + tool resolution). Runs once per agentic turn.
  • wrap_model — wraps each raw model API call. Use for retry, fallback, logging latency.
  • wrap_tool — wraps each individual tool execution. Use for approval gates, sandboxing, error enrichment.
  • tools() — contribute extra tools dynamically per generate() call (e.g. skills libraries, sandboxed filesystem ops). Tools are scoped to the call and don't pollute the root registry.

Defining middleware inline (app developers)

Subclass BaseMiddleware (a Pydantic model) — config fields and hook overrides in one class. Pass instances directly in use=[...]:

import time

from genkit import Genkit
from genkit.middleware import BaseMiddleware, ModelHookParams, middleware
from genkit.plugins.middleware import Retry

@middleware(name='latency_logger')
class LatencyLogger(BaseMiddleware):
    prefix: str = '[trace]'

    async def wrap_model(self, params: ModelHookParams, next_fn):
        t = time.monotonic()
        resp = await next_fn(params)
        print(f'{self.prefix} model call took {time.monotonic() - t:.3f}s')
        return resp

ai = Genkit(plugins=[...])

response = await ai.generate(
    model='googleai/gemini-2.0-flash',
    prompt='Hello',
    use=[
        Retry(max_retries=5),
        LatencyLogger(prefix='[myapp]'),
    ],
)

Dev UI integration

Plugin middleware (e.g. anything registered via middleware_bundle() or a custom middleware_plugin(...)) is automatically available in the Dev UI — no extra step needed.

For middleware you define yourself in app code, call ai.define_middleware() to publish it to the registry so the Dev UI can discover it by name:

from genkit import Genkit
from genkit.middleware import BaseMiddleware, ModelHookParams, middleware

@middleware(name='latency_logger')
class LatencyLogger(BaseMiddleware):
    prefix: str = '[trace]'

    async def wrap_model(self, params: ModelHookParams, next_fn):
        ...

ai = Genkit(plugins=[...])
ai.define_middleware(LatencyLogger)  # now visible in Dev UI

Once registered, the middleware shows up on the Model Runner page in the Dev UI, where you can mix-and-match middleware and set config values interactively. When you run a generate call from there, the Dev UI passes a MiddlewareRef — a name plus a config dict — into generate_action. The framework resolves that ref against the registry, instantiates the middleware class with the provided config (cls(**config)), and runs the chain exactly as it would inline. The MiddlewareRef wire format is what makes dynamic, config-driven dispatch possible in the Dev UI case without requiring code changes.

Pre-packaging middleware through a plugin (plugin authors)

Use new_middleware to build a MiddlewareDesc from a BaseMiddleware subclass, then wrap them with middleware_plugin to produce a standard Plugin for plugins=[...]:

mylib/middleware.py (plugin author):

from genkit.middleware import BaseMiddleware, ModelHookParams, middleware
from genkit.plugin_api import new_middleware, middleware_plugin

@middleware(name='retry', description='Retries model calls on transient failures')
class Retry(BaseMiddleware):
    max_retries: int = 3

    async def wrap_model(self, params: ModelHookParams, next_fn):
        for attempt in range(self.max_retries + 1):
            try:
                return await next_fn(params)
            except Exception:
                if attempt == self.max_retries:
                    raise

@middleware(name='fallback')
class Fallback(BaseMiddleware):
    models: list[str] = []

    async def wrap_model(self, params: ModelHookParams, next_fn):
        ...  # try primary, then each fallback model

def my_middleware_plugin():
    return middleware_plugin([
        new_middleware(Retry),
        new_middleware(Fallback),
    ])

app.py (app developer):

from genkit import Genkit
from mylib.middleware import Retry, Fallback, my_middleware_plugin

ai = Genkit(plugins=[my_middleware_plugin()])

await ai.generate(
    model='googleai/gemini-2.0-flash',
    prompt='Hello',
    use=[
        Retry(max_retries=5),
        Fallback(models=['googleai/gemini-1.5-flash']),
    ],
)

@github-actions github-actions Bot added docs Improvements or additions to documentation python Python config labels May 7, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a robust middleware system for Genkit, enabling interception and modification of model generation, API calls, and tool executions. It introduces the genkit-plugin-middleware package, providing standard middleware for retries, fallbacks, tool approval, skills, and filesystem operations. Core generation logic was refactored to handle middleware normalization and per-call scoping. Feedback identifies redundant model copies in the asynchronous generation methods and a design conflict between validation tests and the normalization implementation. Furthermore, the reviewer noted blocking synchronous I/O in asynchronous tool implementations and issues with the jitter calculation in the retry middleware that could cause delays to exceed configured maximums.

Comment thread py/packages/genkit/src/genkit/_ai/_aio.py Outdated
Comment thread py/packages/genkit/src/genkit/_ai/_aio.py Outdated
Comment thread py/packages/genkit/tests/genkit/ai/generate_test.py Outdated
Comment thread py/plugins/middleware/src/genkit/plugins/middleware/_filesystem.py Outdated
Comment thread py/plugins/middleware/src/genkit/plugins/middleware/_retry.py Outdated
Comment thread py/plugins/middleware/src/genkit/plugins/middleware/_skills.py Outdated
Comment thread py/packages/genkit/src/genkit/_core/_plugin.py Outdated
Comment thread py/packages/genkit/src/genkit/_ai/_generate.py Outdated
Comment thread py/packages/genkit/tests/genkit/ai/generate_test.py Outdated
Comment thread py/packages/genkit/tests/genkit/ai/generate_test.py Outdated
Comment thread py/packages/genkit/tests/genkit/ai/generate_test.py Outdated
Comment thread py/packages/genkit/src/genkit/_core/_plugin.py Outdated
Base automatically changed from jh-dynamic-tools to main May 14, 2026 12:54
@huangjeff5 huangjeff5 marked this pull request as ready for review May 15, 2026 15:09
# FunctionResponse.response must be a dict, not a raw value.
output = tool_output if isinstance(tool_output, dict) else {'result': tool_output}

# --- Primary path: ToolResponse.content (set by MultipartToolResponse.content) ---
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a more descriptive comment

is covariant: ``list[Tool]`` or ``list[str]`` are both assignable to
``Sequence[str | Tool]``, but not to ``list[str | Tool]``.
"""
registry = await registry_with_inline_tools(self.registry, tools)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child_registry = registry.new_child()
await register_tools(child_registry, tools)
refs = register_middleware(child_registry, middleware) # unnamed middleware get auto-generated names

raise GenkitError(
status='NOT_FOUND',
message=(
f'No middleware named "{entry.name}" is registered on this app. '
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"A middleware with the name "{entry.name}" cannot be found. Please make sure the middleware is registered correctly via the @ai.middleware(...) decorator if you defined one inline, or pass a middleware instance directly."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

config docs Improvements or additions to documentation python Python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant