Skip to content

Commit 9d29754

Browse files
bokelleyclaude
andauthored
feat(decisioning): F12 — auto-emit completion webhook on sync mutating responses (#331)
* feat(decisioning): F12 — auto-emit completion webhook on sync mutating responses Sync ``create_media_buy``, ``update_media_buy``, ``sync_creatives`` responses now auto-fire a completion webhook when the buyer supplied ``push_notification_config.url``. Previously only the HITL TaskHandoff path emitted, so sync responses left buyers polling. Mirrors the JS-side ``emitSyncCompletionWebhook`` implementation (commits ``8dc427f9`` and ``7a887dfa`` on ``src/lib/server/decisioning/runtime/from-platform.ts``). Wire-format is identical: ``task_type``, ``status: 'completed'``, ``result`` field carrying the projected sync response, echoed ``token`` via ``X-AdCP-Push-Token`` header. ``task_id`` is synthesized as ``f"sync-{uuid4()}"`` since sync responses don't allocate a registry task; buyers correlate via the resource ids embedded in ``result``. New module ``adcp.decisioning.webhook_emit``: * ``SPEC_WEBHOOK_TASK_TYPES`` — closed 20-value set mirroring the on-disk spec enum at ``schemas/cache/enums/task-type.json``. The ``test_spec_webhook_task_types_matches_schema_cache`` test pins the constant so out-of-band drift surfaces in CI. * ``maybe_emit_sync_completion`` — fire-and-forget gate. Skips when disabled, no sender wired, no push URL on the request, or the tool isn't in the spec enum (logged warning so adopters notice they extended the surface beyond spec). * ``_BACKGROUND_WEBHOOK_TASKS`` — module-level strong-ref pin so the asyncio loop's weak-ref behavior doesn't garbage-collect in-flight emissions mid-flight. Mirrors the same pattern in ``dispatch._BACKGROUND_HANDOFF_TASKS``. **Fire-and-forget posture (DoS defense).** Webhook delivery runs in a background asyncio task; the sync response returns inline immediately. A buyer-supplied slowloris webhook URL must not be able to hold the seller's request worker for the full retry budget — the JS round-2 fix at ``7a887dfa`` documented this DoS vector and Python preserves the same posture from the start. **TaskHandoff path doesn't double-fire.** The ``_maybe_auto_emit_sync_completion`` helper detects the projected Submitted envelope (``status == 'submitted'`` shape) and skips delivery. The HITL path's registry completion emits its own webhook on terminal state. Configuration on ``create_adcp_server_from_platform`` and ``serve``: * ``webhook_sender: WebhookSender | None = None`` — BYO emitter. ``None`` silently disables auto-emit. * ``auto_emit_completion_webhooks: bool = True`` — default-on. Adopters who emit webhooks manually inside their handlers pass ``False`` to avoid duplicate delivery. 21 new tests cover: drift-guard against the on-disk schema cache, URL+token extraction (incl. dict-params test fixtures), gate skips (disabled, no sender, no URL, tool outside spec enum, no running loop), happy-path delivery via ``WebhookSender.send_mcp``, token echo via ``X-AdCP-Push-Token`` header, delivery-failure swallow, sync-success fires, TaskHandoff doesn't double-fire, opt-out suppresses, default-on, no-sender silent, sync_creatives fires too. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(decisioning,webhooks): F12 round-2 — token via payload + exception isolation Two P0 expert-review findings on PR #331: P0-1 (cross-language wire divergence): the buyer's ``push_notification_config.token`` was being echoed via ``X-AdCP-Push-Token`` HTTP header. Per ``schemas/cache/core/push_notification_config.json`` ("Echoed back in webhook payload to validate request authenticity") AND the JS reference impl (``buildTaskWebhookPayload`` in ``src/lib/server/decisioning/runtime/from-platform.ts``), the token belongs on ``payload.token``. Buyers validating against the spec read ``body.token``, not custom headers — header echo would silently fail their auth check. Fix: extend ``create_mcp_webhook_payload`` and ``WebhookSender.send_mcp`` to accept ``token`` and write it onto the payload. Update F12's ``_emit_sync_completion_webhook`` to pass ``token=`` through instead of building ``extra_headers``. Cross-language wire-parity restored. P0-2 (exception isolation): ``maybe_emit_sync_completion`` runs AFTER the platform method's successful return. ANY exception in the gate body — extraction quirk on a weird ``params`` shape, ``loop.create_task`` failure — would propagate to the handler shim and lose the buyer's sync response. Fix: wrap the entire gate body in ``try/except Exception``; logged-and-swallowed. Last-line defense ensures the post-success path can never poison the buyer's response. P1 fixes folded in: * Submitted-shape detection tightened to the EXACT 2-key dict ``{"task_id", "status"}`` (not the loose ``status == "submitted"`` predicate). An adopter who legitimately returns a sync ``{"status": "submitted", ...}`` with extra metadata (queue acceptance) now correctly gets the auto-emit fired. * No-running-loop branch bumped from ``logger.debug`` to ``logger.warning`` — production code landing here is mis-wired and should be visible. Round-2 tests added: * ``test_handler_returns_before_webhook_delivers`` — pins the non-blocking invariant (sync response returns before webhook delivery completes). * ``test_concurrent_emissions_dont_corrupt_strong_ref_set`` — 100 concurrent emissions exercising the ``_BACKGROUND_WEBHOOK_TASKS`` add/discard pattern. * ``test_handler_does_not_skip_loose_submitted_shape`` — pins the tightened submitted-shape detection. * ``test_gate_swallows_unexpected_exceptions`` — pins the exception-isolation invariant via a sender that raises on attribute access. 25 F12 tests pass total (up from 21); 2208 total tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6490948 commit 9d29754

6 files changed

Lines changed: 1106 additions & 31 deletions

File tree

src/adcp/decisioning/handler.py

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
_build_request_context,
4040
_invoke_platform_method,
4141
)
42+
from adcp.decisioning.webhook_emit import maybe_emit_sync_completion
4243
from adcp.server.base import ADCPHandler, ToolContext
4344

4445
if TYPE_CHECKING:
@@ -70,6 +71,7 @@
7071
UpdateMediaBuyRequest,
7172
UpdateMediaBuySuccessResponse,
7273
)
74+
from adcp.webhook_sender import WebhookSender
7375

7476

7577
# ---------------------------------------------------------------------------
@@ -141,13 +143,17 @@ def __init__(
141143
registry: TaskRegistry,
142144
state_reader: StateReader | None = None,
143145
resource_resolver: ResourceResolver | None = None,
146+
webhook_sender: WebhookSender | None = None,
147+
auto_emit_completion_webhooks: bool = True,
144148
) -> None:
145149
super().__init__()
146150
self._platform = platform
147151
self._executor = executor
148152
self._registry = registry
149153
self._state_reader = state_reader
150154
self._resource_resolver = resource_resolver
155+
self._webhook_sender = webhook_sender
156+
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks
151157

152158
# ----- account resolution helper -----
153159

@@ -211,6 +217,43 @@ def _extract_auth_info(ctx: ToolContext) -> AuthInfo | None:
211217
)
212218
return None
213219

220+
def _maybe_auto_emit_sync_completion(
221+
self,
222+
method_name: str,
223+
params: Any,
224+
result: Any,
225+
) -> None:
226+
"""Fire the F12 sync-completion webhook if applicable.
227+
228+
Skips TaskHandoff projections — those go through the registry
229+
completion path which emits its own webhook on terminal state.
230+
The auto-emit fires on the sync-success arm only, mirroring the
231+
JS-side ``routeIfHandoff`` logic at
232+
``src/lib/server/decisioning/runtime/from-platform.ts``.
233+
234+
TaskHandoff projection returns the exact 2-key dict ``{"task_id":
235+
..., "status": "submitted"}`` from ``_project_handoff``; we
236+
match the full key set rather than the loose ``status ==
237+
"submitted"`` predicate so an adopter who legitimately returns a
238+
sync ``{"status": "submitted", ...}`` (e.g., synchronous queue
239+
acceptance with extra metadata) still gets the auto-emit.
240+
"""
241+
if (
242+
isinstance(result, dict)
243+
and set(result.keys()) == {"task_id", "status"}
244+
and result.get("status") == "submitted"
245+
):
246+
# TaskHandoff projection — registry completion path emits
247+
# its own webhook on terminal state.
248+
return
249+
maybe_emit_sync_completion(
250+
sender=self._webhook_sender,
251+
enabled=self._auto_emit_completion_webhooks,
252+
method_name=method_name,
253+
params=params,
254+
result=result,
255+
)
256+
214257
def _build_ctx(
215258
self,
216259
tool_ctx: ToolContext,
@@ -260,17 +303,16 @@ async def create_media_buy( # type: ignore[override]
260303
tool_ctx = context or ToolContext()
261304
account = await self._resolve_account(params.account, tool_ctx)
262305
ctx = self._build_ctx(tool_ctx, account)
263-
return cast(
264-
"CreateMediaBuySuccessResponse",
265-
await _invoke_platform_method(
266-
self._platform,
267-
"create_media_buy",
268-
params,
269-
ctx,
270-
executor=self._executor,
271-
registry=self._registry,
272-
),
306+
result = await _invoke_platform_method(
307+
self._platform,
308+
"create_media_buy",
309+
params,
310+
ctx,
311+
executor=self._executor,
312+
registry=self._registry,
273313
)
314+
self._maybe_auto_emit_sync_completion("create_media_buy", params, result)
315+
return cast("CreateMediaBuySuccessResponse", result)
274316

275317
async def update_media_buy( # type: ignore[override]
276318
self,
@@ -285,18 +327,17 @@ async def update_media_buy( # type: ignore[override]
285327
tool_ctx = context or ToolContext()
286328
account = await self._resolve_account(params.account, tool_ctx)
287329
ctx = self._build_ctx(tool_ctx, account)
288-
return cast(
289-
"UpdateMediaBuySuccessResponse",
290-
await _invoke_platform_method(
291-
self._platform,
292-
"update_media_buy",
293-
params,
294-
ctx,
295-
executor=self._executor,
296-
registry=self._registry,
297-
arg_projector={"media_buy_id": params.media_buy_id, "patch": params},
298-
),
330+
result = await _invoke_platform_method(
331+
self._platform,
332+
"update_media_buy",
333+
params,
334+
ctx,
335+
executor=self._executor,
336+
registry=self._registry,
337+
arg_projector={"media_buy_id": params.media_buy_id, "patch": params},
299338
)
339+
self._maybe_auto_emit_sync_completion("update_media_buy", params, result)
340+
return cast("UpdateMediaBuySuccessResponse", result)
300341

301342
async def sync_creatives( # type: ignore[override]
302343
self,
@@ -306,17 +347,16 @@ async def sync_creatives( # type: ignore[override]
306347
tool_ctx = context or ToolContext()
307348
account = await self._resolve_account(params.account, tool_ctx)
308349
ctx = self._build_ctx(tool_ctx, account)
309-
return cast(
310-
"SyncCreativesSuccessResponse",
311-
await _invoke_platform_method(
312-
self._platform,
313-
"sync_creatives",
314-
params,
315-
ctx,
316-
executor=self._executor,
317-
registry=self._registry,
318-
),
350+
result = await _invoke_platform_method(
351+
self._platform,
352+
"sync_creatives",
353+
params,
354+
ctx,
355+
executor=self._executor,
356+
registry=self._registry,
319357
)
358+
self._maybe_auto_emit_sync_completion("sync_creatives", params, result)
359+
return cast("SyncCreativesSuccessResponse", result)
320360

321361
async def get_media_buy_delivery( # type: ignore[override]
322362
self,

src/adcp/decisioning/serve.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from adcp.decisioning.resolve import ResourceResolver
4444
from adcp.decisioning.state import StateReader
4545
from adcp.decisioning.task_registry import TaskRegistry
46+
from adcp.webhook_sender import WebhookSender
4647

4748

4849
def _is_production_env() -> bool:
@@ -75,6 +76,8 @@ def create_adcp_server_from_platform(
7576
registry: TaskRegistry | None = None,
7677
state_reader: StateReader | None = None,
7778
resource_resolver: ResourceResolver | None = None,
79+
webhook_sender: WebhookSender | None = None,
80+
auto_emit_completion_webhooks: bool = True,
7881
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
7982
"""Build the :class:`PlatformHandler` + supporting wiring from a
8083
:class:`DecisioningPlatform`.
@@ -117,6 +120,24 @@ def create_adcp_server_from_platform(
117120
(D15 — async framework-mediated fetches). Default is the
118121
v6.0 stub (raises ``NotImplementedError`` with a pointer to
119122
v6.1).
123+
:param webhook_sender: Bring-your-own
124+
:class:`adcp.webhook_sender.WebhookSender` for sync-completion
125+
and HITL-completion webhook delivery. Default ``None`` — when
126+
unset, sync-completion auto-emit is a silent no-op (no URL to
127+
deliver to, framework can't synthesize a sender). Adopters
128+
wiring webhook delivery pass a configured sender (with their
129+
signing key, IP-pinned transport, etc.).
130+
:param auto_emit_completion_webhooks: F12 feature gate. When
131+
``True`` (default), the framework auto-fires a completion
132+
webhook on the sync-success arm of mutating tools whenever the
133+
request supplied ``push_notification_config.url`` AND the tool
134+
is in :data:`adcp.decisioning.webhook_emit.SPEC_WEBHOOK_TASK_TYPES`.
135+
Buyers passing the URL expect notification regardless of
136+
whether the seller routed sync vs HITL. Set ``False`` for
137+
adopters who emit webhooks manually inside their handlers
138+
(avoid duplicate delivery; idempotency-key dedup at the
139+
receiver would handle it but explicit suppression matches the
140+
v5 manual-emit posture for adopters mid-migration).
120141
121142
:raises ValueError: when ``executor`` and ``thread_pool_size`` are
122143
both supplied (D5 mutually-exclusive validation).
@@ -213,6 +234,8 @@ def create_adcp_server_from_platform(
213234
registry=registry,
214235
state_reader=state_reader,
215236
resource_resolver=resource_resolver,
237+
webhook_sender=webhook_sender,
238+
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
216239
)
217240
return handler, executor, registry
218241

@@ -226,6 +249,8 @@ def serve(
226249
registry: TaskRegistry | None = None,
227250
state_reader: StateReader | None = None,
228251
resource_resolver: ResourceResolver | None = None,
252+
webhook_sender: WebhookSender | None = None,
253+
auto_emit_completion_webhooks: bool = True,
229254
advertise_all: bool = False,
230255
**serve_kwargs: Any,
231256
) -> None:
@@ -246,6 +271,14 @@ def serve(
246271
:class:`InMemoryTaskRegistry` (gated for production).
247272
:param state_reader: Custom :class:`StateReader` impl (D15).
248273
:param resource_resolver: Custom :class:`ResourceResolver` impl (D15).
274+
:param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender`
275+
for completion webhook delivery (sync auto-emit + HITL terminal).
276+
``None`` disables auto-emit silently.
277+
:param auto_emit_completion_webhooks: F12 — auto-fire a completion
278+
webhook on the sync-success arm of mutating tools when the
279+
request supplied ``push_notification_config.url``. Default
280+
``True``. Set ``False`` for adopters who emit webhooks
281+
manually inside their handlers.
249282
:param advertise_all: Forwarded to :func:`adcp.server.serve`. When
250283
``True``, ``tools/list`` advertises every method on the
251284
handler regardless of override status. Default ``False`` —
@@ -267,6 +300,8 @@ def serve(
267300
registry=registry,
268301
state_reader=state_reader,
269302
resource_resolver=resource_resolver,
303+
webhook_sender=webhook_sender,
304+
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
270305
)
271306

272307
server_name = name or type(platform).__name__

0 commit comments

Comments
 (0)