Skip to content

Commit 4e95764

Browse files
authored
Merge pull request #233 from adcontextprotocol/bokelley/a2a-skill-middleware
feat(server): per-skill middleware hook in ADCPAgentExecutor (#226)
2 parents ea21864 + afbbfac commit 4e95764

5 files changed

Lines changed: 532 additions & 10 deletions

File tree

docs/handler-authoring.md

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,14 +328,95 @@ share one push-notif bucket across every unauthenticated caller. The
328328
warning is the signal your auth middleware isn't populating the
329329
ContextVar — treat it as a P0.
330330

331-
### Known gaps
331+
### Per-skill middleware (audit, activity feeds, rate limiting, tracing)
332+
333+
Every A2A skill dispatch can be wrapped in a chain of middleware
334+
callables. Pass them as `middleware=[...]` to `create_a2a_server` /
335+
`serve` / `ADCPAgentExecutor` — first entry wraps outermost, matching
336+
Starlette/ASGI ordering:
337+
338+
```python
339+
from adcp.server import SkillMiddleware, ToolContext, serve
340+
341+
async def audit_middleware(
342+
skill_name: str,
343+
params: dict,
344+
context: ToolContext,
345+
call_next,
346+
) -> Any:
347+
started = time.monotonic()
348+
try:
349+
result = await call_next()
350+
except Exception as exc:
351+
audit_log.failure(skill_name, context.caller_identity, exc)
352+
raise
353+
audit_log.success(
354+
skill_name,
355+
context.caller_identity,
356+
elapsed_ms=(time.monotonic() - started) * 1000,
357+
)
358+
return result
332359

333-
- Per-skill middleware hooks for audit logging / activity feeds don't
334-
exist yet — tracked at
335-
[#226](https://github.com/adcontextprotocol/adcp-client-python/issues/226).
360+
serve(MyAgent(), transport="a2a", middleware=[audit_middleware])
361+
```
362+
363+
**Semantics worth knowing:**
364+
365+
- **Composition — put audit outermost.** `middleware=[Audit(),
366+
RateLimit(), Metrics()]` runs `Audit → RateLimit → Metrics →
367+
handler` on the way in and unwinds in the opposite order. **If you
368+
put rate-limiting before audit, rejected requests disappear from
369+
your audit log** — often the most interesting events for security
370+
review. Audit always outermost.
371+
- **Short-circuit — cache keys MUST include principal + tenant.** A
372+
middleware that returns without calling `call_next()` stops the
373+
chain; its return value becomes the dispatch result. Rate limiters
374+
/ feature flags use this. **Caching middleware that short-circuits
375+
must key on `(skill_name, params, context.caller_identity,
376+
context.tenant_id)`** — a cache keyed only on `skill_name + params`
377+
serves principal A's data to principal B on a matching-params call.
378+
- **Exception observation — never swallow an `ADCPError`.** Catch
379+
around `await call_next()` to log failures. Re-raise to let the
380+
executor's normal error path take over (`ADCPError` → failed task
381+
with `adcp_error` DataPart; other exceptions → opaque failed task).
382+
Swallowing an `ADCPError` (especially `IdempotencyConflictError` or
383+
`ADCPTaskError`) and returning a fake-success dict silently converts
384+
a rejected mutation into a "completed" task — double-billing,
385+
double-allocation, duplicated side effects. Don't.
386+
- **Exception messages end up in server logs.** Middleware-raised
387+
exceptions flow through `logger.exception` in the executor before
388+
client-facing sanitisation. Don't format `params` or
389+
`context.caller_identity` into exception text — operators read those
390+
logs.
391+
- **Retry is supported.** Call `call_next()` more than once (e.g.
392+
retry-on-transient-error middleware). Each call gets a fresh
393+
inner chain — composition is re-entrant by design.
394+
- **Transform on return, not on input.** `params` passed in is the
395+
same dict every middleware sees. Mutating it doesn't change what
396+
the next layer receives. Transforms happen on the *return* side by
397+
modifying the value of `await call_next()`.
398+
- **Context access**: the middleware sees the `ToolContext` produced
399+
by the `context_factory` (or the a2a-sdk fallback). Tenant id,
400+
caller identity, anything your factory populates. `ContextVar`s set
401+
before `call_next()` propagate to the handler — no `asyncio.create_task`
402+
needed.
403+
404+
**Security — middleware is a data processor for the full skill
405+
payload.** `params` carries decoded buyer briefs, budgets, brand
406+
refs, proposal text, PII in message parts. `context` carries
407+
`caller_identity` + `tenant_id`. Installing a third-party middleware
408+
(SaaS audit, observability vendor, bespoke tracing) hands that vendor
409+
the complete skill surface. Treat it as a data processor under your
410+
GDPR/CCPA controller-processor agreements.
411+
412+
MCP transport has its own middleware story (see "Pattern 2 —
413+
in-process HTTP middleware" above); `SkillMiddleware` is A2A-only.
414+
415+
### Known gaps
336416

337-
Once #226 lands, A2A adoption reaches parity with MCP for production
338-
agents.
417+
All three Phase-2 A2A hooks (#224 TaskStore, #225 PushNotificationConfigStore,
418+
#226 SkillMiddleware) have landed. A2A adoption now reaches parity with
419+
MCP for production agents.
339420

340421
## Testing
341422

src/adcp/server/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ async def get_products(params, context=None):
112112
from adcp.server.serve import (
113113
ContextFactory,
114114
RequestMetadata,
115+
SkillMiddleware,
115116
create_mcp_server,
116117
serve,
117118
)
@@ -153,6 +154,7 @@ async def get_products(params, context=None):
153154
"validate_discovery_set",
154155
# A2A integration
155156
"ADCPAgentExecutor",
157+
"SkillMiddleware",
156158
"create_a2a_server",
157159
# Idempotency middleware (AdCP #2315 seller side)
158160
"IdempotencyStore",

src/adcp/server/a2a_server.py

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@
3939
from adcp.server.base import ADCPHandler, ToolContext
4040

4141
if TYPE_CHECKING:
42+
from collections.abc import Sequence
43+
4244
from a2a.server.tasks.push_notification_config_store import (
4345
PushNotificationConfigStore,
4446
)
4547
from a2a.server.tasks.task_store import TaskStore
4648

47-
from adcp.server.serve import ContextFactory
49+
from adcp.server.serve import ContextFactory, SkillMiddleware
4850
from adcp.server.helpers import STANDARD_ERROR_CODES
4951
from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
5052
from adcp.server.test_controller import TestControllerStore, _handle_test_controller
@@ -69,9 +71,16 @@ def __init__(
6971
test_controller: TestControllerStore | None = None,
7072
*,
7173
context_factory: ContextFactory | None = None,
74+
middleware: Sequence[SkillMiddleware] | None = None,
7275
) -> None:
7376
self._handler = handler
7477
self._context_factory = context_factory
78+
# Store as a tuple so the executor can't be mutated from underneath
79+
# at runtime (a flaky test or a handler reaching self._middleware
80+
# can't corrupt the dispatch chain). Tuple ordering = runtime
81+
# ordering; first entry wraps outermost (see ``SkillMiddleware``
82+
# docstring for the composition semantics).
83+
self._middleware: tuple[SkillMiddleware, ...] = tuple(middleware or ())
7584
self._tool_callers: dict[str, Any] = {}
7685

7786
# Build tool callers for all tools this handler supports.
@@ -117,7 +126,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
117126

118127
tool_context = self._build_tool_context(skill_name, context)
119128
try:
120-
result = await self._tool_callers[skill_name](params, tool_context)
129+
result = await self._dispatch_with_middleware(skill_name, params, tool_context)
121130
await self._send_result(event_queue, context, skill_name, result)
122131
except ADCPError as exc:
123132
# Application-layer AdCP error (IdempotencyConflictError etc.).
@@ -131,6 +140,43 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
131140
logger.exception("Error executing skill %s", skill_name)
132141
await self._send_error(event_queue, context, f"Skill execution failed: {skill_name}")
133142

143+
async def _dispatch_with_middleware(
144+
self,
145+
skill_name: str,
146+
params: dict[str, Any],
147+
tool_context: ToolContext,
148+
) -> Any:
149+
"""Run the handler wrapped in the configured middleware chain.
150+
151+
Middleware composes outermost-first: the first entry in
152+
``self._middleware`` sees every call *before* the later entries
153+
and *before* the handler. This matches Starlette / ASGI
154+
conventions so sellers porting from those stacks aren't
155+
surprised. Composition is done via a small recursive dispatcher
156+
(no mutable indices, no lambdas closing over loop variables) —
157+
the chain reads the same whether you have zero or ten
158+
middlewares.
159+
160+
Middleware exceptions propagate to the executor's normal error
161+
handling path in ``execute()``; this method does no try/except
162+
so short-circuiting, transform, and exception-observation all
163+
work the same way they do for the underlying handler.
164+
"""
165+
if not self._middleware:
166+
return await self._tool_callers[skill_name](params, tool_context)
167+
168+
async def _step(index: int) -> Any:
169+
if index >= len(self._middleware):
170+
return await self._tool_callers[skill_name](params, tool_context)
171+
middleware = self._middleware[index]
172+
173+
async def call_next() -> Any:
174+
return await _step(index + 1)
175+
176+
return await middleware(skill_name, params, tool_context, call_next)
177+
178+
return await _step(0)
179+
134180
def _build_tool_context(self, skill_name: str, request: RequestContext) -> ToolContext:
135181
"""Build the :class:`ToolContext` handed to the skill dispatcher.
136182
@@ -445,6 +491,7 @@ def create_a2a_server(
445491
context_factory: ContextFactory | None = None,
446492
task_store: TaskStore | None = None,
447493
push_config_store: PushNotificationConfigStore | None = None,
494+
middleware: Sequence[SkillMiddleware] | None = None,
448495
) -> Any:
449496
"""Create an A2A Starlette application from an ADCP handler.
450497
@@ -492,6 +539,14 @@ def create_a2a_server(
492539
(via a ``ContextVar`` your auth middleware populates) or by
493540
composition with a tenant-scoped ``TaskStore`` — the reference
494541
impl shows the ContextVar pattern.
542+
middleware: Optional sequence of :data:`~adcp.server.SkillMiddleware`
543+
callables wrapping every A2A skill dispatch. Composes
544+
outermost-first (first entry sees the call before later
545+
entries and before the handler). Use for audit logging,
546+
activity-feed hooks, rate limiting, per-skill tracing. See
547+
:data:`~adcp.server.SkillMiddleware` for the signature,
548+
composition semantics, and the exception-capture pattern
549+
audit hooks need.
495550
496551
Returns:
497552
A Starlette app ready to be run with uvicorn.
@@ -501,7 +556,10 @@ def create_a2a_server(
501556
resolved_port = port or int(os.environ.get("PORT", "3001"))
502557

503558
executor = ADCPAgentExecutor(
504-
handler, test_controller=test_controller, context_factory=context_factory
559+
handler,
560+
test_controller=test_controller,
561+
context_factory=context_factory,
562+
middleware=middleware,
505563
)
506564

507565
agent_card = _build_agent_card(

0 commit comments

Comments
 (0)