Skip to content

Commit e01eef0

Browse files
bokelleyclaude
andauthored
feat(decisioning): PostgresTaskRegistry — durable HITL task state (v6.1) (#361)
* feat(decisioning): PostgresTaskRegistry — durable HITL task state (v6.1) Closes #353 Implements the durable `TaskRegistry` promised in the v6.1 plan. Production adopters running `ADCP_ENV=production` were blocked from HITL flows because `InMemoryTaskRegistry.is_durable=False` trips the production-mode gate in `create_adcp_server_from_platform`. This PR delivers the Postgres-backed alternative. **Implementation note:** Uses `psycopg_pool.AsyncConnectionPool` (the existing `[pg]` extra) rather than SQLAlchemy ORM. The `TaskRegistry` Protocol is fully async; the async psycopg pool is a drop-in with no new dependency. See PR body for rationale. https://claude.ai/code/session_01L8ig1NtL6WZQcYge3RKrb3 * fix(decisioning): address pre-PR review blockers in PostgresTaskRegistry - Add internal _table parameter + pre-formatted SQL (enables test isolation, mirrors PgReplayStore.table_name pattern without public API) - Add decisioning/pg/*.sql to pyproject.toml package-data so DDL ships in wheel - Fix conformance test fixture to use _table for per-test table isolation - Add complete/fail unknown-task raise tests to conformance suite - Fix stub ClassVar annotation in decisioning __init__.py - Remove unused logging import from task_registry.py - Update pg extra comment in pyproject.toml to reference PostgresTaskRegistry https://claude.ai/code/session_01L8ig1NtL6WZQcYge3RKrb3 --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent c0b8afc commit e01eef0

7 files changed

Lines changed: 796 additions & 5 deletions

File tree

pyproject.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ docs = [
100100
"pdoc3>=0.10.0",
101101
]
102102
pg = [
103-
# PostgreSQL-backed PgReplayStore (and future PgIdempotencyBackend).
104-
# psycopg3 gives both sync + async client interfaces so the same dep
105-
# serves the sync replay store today and an async one later.
103+
# PostgreSQL-backed adcp.signing.PgReplayStore,
104+
# adcp.decisioning.PostgresTaskRegistry (durable HITL task state), and
105+
# future PgIdempotencyBackend. psycopg3 ships both sync + async pool
106+
# interfaces so the single dep serves all three use cases.
106107
"psycopg[binary]>=3.1.0",
107108
"psycopg-pool>=3.2.0",
108109
]
@@ -121,6 +122,7 @@ adcp = [
121122
"py.typed",
122123
"ADCP_VERSION",
123124
"signing/pg/*.sql",
125+
"decisioning/pg/*.sql",
124126
# AdCP JSON schemas, mirrored from ``schemas/cache/`` by
125127
# ``scripts/bundle_schemas.py`` so the wheel ships them for
126128
# ``adcp.validation.schema_loader``.

src/adcp/decisioning/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,32 @@ def create_media_buy(
129129
WorkflowHandoff,
130130
)
131131

132+
# Conditional import: PostgresTaskRegistry needs the [pg] extra. Always expose
133+
# the name — when psycopg isn't installed we fall through to a stub class whose
134+
# constructor raises ImportError with the install hint. Matches the pattern
135+
# used by adcp.signing for PgReplayStore.
136+
try:
137+
from adcp.decisioning.pg import PostgresTaskRegistry # noqa: F401
138+
except ImportError: # pragma: no cover — exercised by the [pg] extra tests
139+
from typing import ClassVar as _ClassVar
140+
141+
class PostgresTaskRegistry: # type: ignore[no-redef]
142+
"""Stub raised when ``adcp[pg]`` isn't installed.
143+
144+
Attempting to instantiate raises :class:`ImportError` with the
145+
install-hint text from :mod:`adcp.decisioning.pg.task_registry`.
146+
"""
147+
148+
is_durable: _ClassVar[bool] = True
149+
150+
def __init__(self, *args: object, **kwargs: object) -> None:
151+
raise ImportError(
152+
"PostgresTaskRegistry requires psycopg3 and psycopg-pool. "
153+
"Install the 'pg' extra: `pip install 'adcp[pg]'` "
154+
"(Poetry: `poetry add 'adcp[pg]'`)."
155+
)
156+
157+
132158
__all__ = [
133159
"Account",
134160
"AccountStore",
@@ -161,6 +187,7 @@ def create_media_buy(
161187
"InMemoryTaskRegistry",
162188
"MaybeAsync",
163189
"OAuthCredential",
190+
"PostgresTaskRegistry",
164191
"Proposal",
165192
"PropertyList",
166193
"PropertyListReference",

src/adcp/decisioning/pg/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@
1111
request to gate dispatch on the seller's commercial relationship
1212
with the buyer agent (allowlist + onboarding state + billing
1313
capabilities).
14+
* :class:`PostgresTaskRegistry` — durable
15+
:class:`~adcp.decisioning.TaskRegistry` for HITL task state. Survives
16+
process restarts and is safe for multi-worker deployments sharing a
17+
single Postgres database. Drop-in replacement for
18+
:class:`~adcp.decisioning.InMemoryTaskRegistry` that satisfies the
19+
production-mode durability gate.
1420
1521
The schema DDL ships alongside the Python code (e.g.
16-
``adcp/decisioning/pg/buyer_agent_registry.sql``) so adopters can run
17-
it through whatever migration tool they use (Alembic, Flyway, psql).
22+
``adcp/decisioning/pg/buyer_agent_registry.sql``,
23+
``adcp/decisioning/pg/decisioning_tasks.sql``) so adopters can run it
24+
through whatever migration tool they use (Alembic, Flyway, psql).
1825
"""
1926

2027
from __future__ import annotations
@@ -24,9 +31,11 @@
2431
PG_AVAILABLE,
2532
PgBuyerAgentRegistry,
2633
)
34+
from adcp.decisioning.pg.task_registry import PostgresTaskRegistry
2735

2836
__all__ = [
2937
"DEFAULT_TABLE_NAME",
3038
"PG_AVAILABLE",
3139
"PgBuyerAgentRegistry",
40+
"PostgresTaskRegistry",
3241
]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
-- AdCP decisioning task registry — durable HITL task state.
2+
--
3+
-- Run this once per deployment. Tracked by PostgresTaskRegistry;
4+
-- see src/adcp/decisioning/pg/task_registry.py for the query shapes
5+
-- the Python code executes.
6+
--
7+
-- COLLATE "C" on identifier columns avoids locale-dependent case
8+
-- folding — on some locales "Task-A" and "task-a" compare equal,
9+
-- which could collapse distinct task_ids or account_ids. "C" is the
10+
-- byte-for-byte comparison we actually want.
11+
--
12+
-- Alternatively, call PostgresTaskRegistry.create_schema() from
13+
-- application code — it runs the equivalent DDL idempotently on boot.
14+
15+
CREATE TABLE IF NOT EXISTS decisioning_tasks (
16+
task_id TEXT COLLATE "C" NOT NULL PRIMARY KEY,
17+
account_id TEXT COLLATE "C" NOT NULL,
18+
state TEXT NOT NULL DEFAULT 'submitted',
19+
task_type TEXT NOT NULL,
20+
progress JSONB,
21+
result JSONB,
22+
error JSONB,
23+
-- Unix epoch seconds (float), matches TaskRecord.created_at/updated_at
24+
-- so Python round-trips the value without lossy TIMESTAMPTZ conversion.
25+
created_at DOUBLE PRECISION NOT NULL,
26+
updated_at DOUBLE PRECISION NOT NULL
27+
);
28+
29+
-- Supports the cross-tenant get() query: WHERE task_id = $1 AND account_id = $2.
30+
-- Without this index, every tasks/get is a full-table scan on account_id.
31+
CREATE INDEX IF NOT EXISTS decisioning_tasks_account_idx
32+
ON decisioning_tasks (account_id);

0 commit comments

Comments
 (0)