Skip to content

Commit 9596356

Browse files
committed
feat(server): add register_lazy + async resolve to TenantRegistry
Adds lazy platform construction support requested by @bokelley (#628): - `register_lazy(factory=...)` — defers per-tenant DecisioningPlatform construction to first `resolve()` call; avoids paying KMS/GAM auth costs for all N tenants at boot - `async resolve(host)` — handles both eager and lazy tenants; invokes the factory on first hit, caches the result, serializes concurrent first-hit resolves with the per-tenant lock (single factory invocation per tenant) - `PlatformFactory` type alias exported from `adcp.server` - `register()` clears any lazy factory on eager re-registration; `register_lazy()` clears any cached platform on lazy re-registration; `unregister()` clears both - Docstring fixes: `_normalize_host` load-balancer port note, `serve_options` multi-tenant clarification, lock lifecycle docs - 14 new tests (39 total, all passing): lazy lifecycle, concurrent first-hit, factory/validator failures, unregister-during-resolve zombie guard, eager↔lazy re-registration https://claude.ai/code/session_01DRv6qahN7Jjt3Q4oxGBXkd
1 parent 787749d commit 9596356

3 files changed

Lines changed: 530 additions & 26 deletions

File tree

src/adcp/server/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ async def get_products(params, context=None):
142142
)
143143
from adcp.server.sponsored_intelligence import SponsoredIntelligenceHandler
144144
from adcp.server.tenant_registry import (
145+
PlatformFactory,
145146
TenantHealthState,
146147
TenantRegistry,
147148
TenantResolution,
@@ -218,6 +219,7 @@ async def get_products(params, context=None):
218219
"IdempotencyStore",
219220
"MemoryBackend",
220221
# Multi-tenant registry with health tracking
222+
"PlatformFactory",
221223
"TenantHealthState",
222224
"TenantRegistry",
223225
"TenantResolution",

src/adcp/server/tenant_registry.py

Lines changed: 250 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,25 @@
66
most-touched server-side primitive.
77
88
Adopters pre-build per-tenant :class:`~adcp.decisioning.DecisioningPlatform`
9-
instances and register them here. The registry tracks health state and
10-
surfaces :meth:`TenantRegistry.resolve_by_host` for the request path.
9+
instances and register them here (eager mode), or supply a factory callable
10+
that is invoked on first request (lazy mode). The registry tracks health
11+
state and surfaces :meth:`TenantRegistry.resolve_by_host` (sync, eager) or
12+
:meth:`TenantRegistry.resolve` (async, both modes) for the request path.
1113
1214
Comparison with lower-level building blocks:
1315
1416
* :class:`~adcp.server.CallableSubdomainTenantRouter` — host→Tenant lookup
1517
with TTL cache. Suitable when tenant routing is all you need.
1618
* :class:`~adcp.decisioning.LazyPlatformRouter` — per-tenant
1719
:class:`~adcp.decisioning.DecisioningPlatform` factory with LRU+TTL
18-
cache. Suitable when you need lazy platform construction.
20+
cache. Suitable when you need lazy platform construction without health
21+
tracking.
1922
* :class:`TenantRegistry` — combines health tracking + runtime mutation
20-
(register/unregister/recheck) into one object. Reach for this when
21-
multi-tenant SaaS topology requires observability into per-tenant
22-
health state and runtime admin operations without a process restart.
23+
(register/unregister/recheck) with optional lazy platform construction.
24+
Reach for this when multi-tenant SaaS topology requires observability
25+
into per-tenant health state and runtime admin operations without a
26+
process restart. Lazy mode (``register_lazy``) avoids paying per-tenant
27+
platform-build cost at boot.
2328
"""
2429

2530
from __future__ import annotations
@@ -45,6 +50,12 @@
4550
#: May be sync or async — the registry awaits at call time.
4651
TenantValidator = Callable[[str, str], "bool | Awaitable[bool]"]
4752

53+
#: Lazy platform factory callable. Takes ``tenant_id`` and returns an
54+
#: awaitable :class:`~adcp.decisioning.DecisioningPlatform`. Used with
55+
#: :meth:`TenantRegistry.register_lazy` to defer per-tenant platform
56+
#: construction until first request.
57+
PlatformFactory = Callable[[str], "Awaitable[DecisioningPlatform]"]
58+
4859

4960
@dataclass(frozen=True)
5061
class TenantResolution:
@@ -68,15 +79,23 @@ class TenantRegistry:
6879
"""Higher-level multi-tenant primitive with health tracking.
6980
7081
Mirrors JS SDK ``createTenantRegistry`` for Python deployments.
71-
Adopters pre-build per-tenant
72-
:class:`~adcp.decisioning.DecisioningPlatform` instances and register
73-
them here; the registry tracks health state and surfaces
74-
:meth:`resolve_by_host` for the request path.
82+
Supports two registration modes:
83+
84+
* **Eager** (:meth:`register`) — caller pre-builds the
85+
:class:`~adcp.decisioning.DecisioningPlatform` and passes it in.
86+
:meth:`resolve_by_host` (sync) and :meth:`resolve` (async) both
87+
return a resolution immediately.
88+
* **Lazy** (:meth:`register_lazy`) — caller supplies a factory
89+
callable; the platform is built on the first :meth:`resolve` call
90+
and cached. Avoids paying per-tenant construction costs (network
91+
handshakes, KMS credential fetches) at boot. Suitable for
92+
deployments with many tenants.
7593
7694
**Health states:**
7795
78-
* ``pending`` — registered, not yet validated. Adopters should 503
79-
traffic to pending tenants until validation completes.
96+
* ``pending`` — registered, not yet validated (or lazy factory not
97+
yet invoked). Adopters should 503 traffic until validation
98+
completes.
8099
* ``healthy`` — validated and serving.
81100
* ``unverified`` — was healthy; a subsequent :meth:`recheck` failed
82101
(transient failure). The tenant still serves (graceful-degrade).
@@ -90,20 +109,20 @@ class TenantRegistry:
90109
``await_first_validation=True`` transitions the tenant to
91110
``healthy`` without a network round-trip.
92111
112+
**Per-tenant locks:** Each tenant gets an ``asyncio.Lock`` on first
113+
use. Locks are removed when the tenant is unregistered. Any
114+
in-flight :meth:`recheck` or :meth:`resolve` that held the lock
115+
before ``unregister()`` was called completes safely — zombie-entry
116+
guards in both methods prevent stale writes after removal.
117+
93118
:param validator: Optional validation callable (sync or async).
94119
``None`` → principal-token mode; validation always succeeds.
95-
:param default_serve_options: Optional dict of defaults to pass to
96-
:func:`adcp.decisioning.serve` — stored for adopter convenience,
97-
not interpreted by the registry itself. Retrieve via
98-
:attr:`serve_options` and spread into ``serve()``:
120+
:param default_serve_options: Optional dict of defaults to store for
121+
adopter convenience. Retrieve via :attr:`serve_options`.
99122
100-
.. code-block:: python
101-
102-
serve(platform, **registry.serve_options)
123+
Example (eager boot-time registration)::
103124
104-
Example (boot-time registration)::
105-
106-
from adcp.server import TenantRegistry, BearerTokenAuth
125+
from adcp.server import TenantRegistry
107126
108127
registry = TenantRegistry(validator=None)
109128
@@ -115,8 +134,25 @@ class TenantRegistry:
115134
await_first_validation=True,
116135
)
117136
118-
def resolve(ctx):
119-
resolved = registry.resolve_by_host(ctx.host)
137+
async def resolve(ctx):
138+
resolved = await registry.resolve(ctx.host)
139+
if resolved is None or resolved.health in ("pending", "disabled"):
140+
raise HTTPException(503)
141+
return resolved.platform
142+
143+
Example (lazy registration — defers platform construction to first request)::
144+
145+
registry = TenantRegistry(validator=check_jwks)
146+
147+
for tenant in load_tenants_from_db():
148+
await registry.register_lazy(
149+
tenant.id,
150+
agent_url=tenant.agent_url,
151+
factory=build_platform_for_tenant, # called on first resolve()
152+
)
153+
154+
async def resolve(ctx):
155+
resolved = await registry.resolve(ctx.host) # triggers factory on first hit
120156
if resolved is None or resolved.health in ("pending", "disabled"):
121157
raise HTTPException(503)
122158
return resolved.platform
@@ -156,6 +192,9 @@ def __init__(
156192
# without a lock two concurrent rechecks for the same tenant could
157193
# both read, both await, and both commit — racing on the final state.
158194
self._locks: dict[str, asyncio.Lock] = {}
195+
# Per-tenant lazy platform factory. Set by register_lazy(); absent
196+
# for tenants registered eagerly via register().
197+
self._factories: dict[str, PlatformFactory] = {}
159198

160199
# ----- internal helpers ------------------------------------------------
161200

@@ -172,6 +211,12 @@ def _normalize_host(raw: str) -> str:
172211
173212
Accepts both full URLs (``https://acme.example.com``) and raw
174213
Host-header values (``acme.example.com``, ``acme.example.com:443``).
214+
215+
Note: port stripping is correct for ``Host`` headers where the port
216+
matches the scheme default. Some load-balancers forward
217+
``X-Forwarded-Host`` with non-default ports preserved; callers
218+
using that header should strip the port themselves before passing
219+
the value to :meth:`resolve_by_host` or :meth:`resolve`.
175220
"""
176221
if "://" in raw:
177222
host = urlparse(raw).netloc or raw
@@ -236,6 +281,8 @@ async def register(
236281
self._agent_urls[tenant_id] = agent_url
237282
self._host_map[self._normalize_host(agent_url)] = tenant_id
238283
self._health[tenant_id] = "pending"
284+
# Clear any lazy factory if re-registering as eager.
285+
self._factories.pop(tenant_id, None)
239286

240287
if await_first_validation:
241288
try:
@@ -251,6 +298,76 @@ async def register(
251298
return
252299
self._health[tenant_id] = "healthy" if ok else "disabled"
253300

301+
async def register_lazy(
302+
self,
303+
tenant_id: str,
304+
*,
305+
agent_url: str,
306+
factory: PlatformFactory,
307+
await_first_validation: bool = False,
308+
) -> None:
309+
"""Register a tenant with a lazy platform factory.
310+
311+
The platform is built on the first :meth:`resolve` call for this
312+
tenant's host, then cached. Subsequent resolves return the cached
313+
instance. Suitable for deployments with many tenants where eager
314+
construction is too expensive at boot — network handshakes, KMS
315+
credential fetches, inventory-manager construction, etc.
316+
317+
Health starts as ``pending``. When ``await_first_validation=True``
318+
the factory is invoked immediately, the platform is built, and
319+
validation completes before returning — the next :meth:`resolve`
320+
call sees the final state without triggering the factory again.
321+
322+
Use :meth:`resolve` (async) to get a :class:`TenantResolution`
323+
for lazy-registered tenants; the synchronous :meth:`resolve_by_host`
324+
returns ``None`` until the platform is built.
325+
326+
Lazy and eager tenants share the same health state machine:
327+
:meth:`health`, :meth:`unregister`, :meth:`recheck`, and
328+
:attr:`registered_tenants` work identically regardless of
329+
registration mode.
330+
331+
Re-registering an existing tenant (eager or lazy) atomically
332+
replaces its factory and agent_url under the per-tenant lock.
333+
334+
:param tenant_id: Stable identifier (e.g. DB primary key).
335+
:param agent_url: The tenant's agent endpoint URL. The host
336+
component is extracted for :meth:`resolve` / :meth:`resolve_by_host`.
337+
:param factory: Async callable ``(tenant_id) -> DecisioningPlatform``.
338+
Called at most once per registration (not once per request).
339+
:param await_first_validation: When ``True``, invokes the factory
340+
and validator immediately before returning.
341+
"""
342+
lock = self._get_lock(tenant_id)
343+
async with lock:
344+
old_url = self._agent_urls.get(tenant_id)
345+
if old_url is not None and old_url != agent_url:
346+
self._host_map.pop(self._normalize_host(old_url), None)
347+
348+
self._factories[tenant_id] = factory
349+
# Clear any eagerly-built platform if re-registering as lazy.
350+
self._platforms.pop(tenant_id, None)
351+
self._agent_urls[tenant_id] = agent_url
352+
self._host_map[self._normalize_host(agent_url)] = tenant_id
353+
self._health[tenant_id] = "pending"
354+
355+
if await_first_validation:
356+
try:
357+
platform = await factory(tenant_id)
358+
ok = await self._run_validator(tenant_id)
359+
except Exception:
360+
logger.warning(
361+
"TenantRegistry.register_lazy: factory/validator raised for "
362+
"tenant %r; health=disabled",
363+
tenant_id,
364+
exc_info=True,
365+
)
366+
self._health[tenant_id] = "disabled"
367+
return
368+
self._platforms[tenant_id] = platform
369+
self._health[tenant_id] = "healthy" if ok else "disabled"
370+
254371
def unregister(self, tenant_id: str) -> None:
255372
"""Remove a tenant from the registry.
256373
@@ -266,6 +383,7 @@ def unregister(self, tenant_id: str) -> None:
266383
if agent_url is not None:
267384
self._host_map.pop(self._normalize_host(agent_url), None)
268385
self._platforms.pop(tenant_id, None)
386+
self._factories.pop(tenant_id, None)
269387
self._health.pop(tenant_id, None)
270388
self._locks.pop(tenant_id, None)
271389

@@ -363,15 +481,120 @@ def resolve_by_host(self, host: str) -> TenantResolution | None:
363481
health = self._health.get(tenant_id, "pending")
364482
return TenantResolution(tenant_id=tenant_id, health=health, platform=platform)
365483

484+
async def resolve(self, host: str) -> TenantResolution | None:
485+
"""Async lookup by ``Host`` header value; builds lazy platforms on first hit.
486+
487+
For eager tenants (registered via :meth:`register`), equivalent to
488+
:meth:`resolve_by_host` at an async call site — no I/O occurs.
489+
490+
For lazy tenants (registered via :meth:`register_lazy`), the
491+
platform factory is invoked on the first call, then cached.
492+
Concurrent first-hit resolves for the same tenant serialize on
493+
the per-tenant lock — only one factory invocation occurs.
494+
495+
Returns ``None`` when:
496+
497+
* No tenant is registered for this host.
498+
* The factory raised (health is set to ``disabled``).
499+
* The validator returned ``False`` (health is set to ``disabled``).
500+
501+
The caller is responsible for checking ``result.health`` and
502+
gating traffic — the registry does not 503 automatically.
503+
504+
:param host: Raw ``Host`` header value. Port suffixes are stripped;
505+
full URLs are also accepted. See :meth:`_normalize_host` for
506+
load-balancer caveats.
507+
"""
508+
normalized = self._normalize_host(host)
509+
tenant_id = self._host_map.get(normalized)
510+
if tenant_id is None:
511+
return None
512+
513+
# Fast path: platform already built (eager or previously-resolved lazy).
514+
platform = self._platforms.get(tenant_id)
515+
if platform is not None:
516+
health = self._health.get(tenant_id, "pending")
517+
return TenantResolution(tenant_id=tenant_id, health=health, platform=platform)
518+
519+
# If there is no factory either, nothing to do.
520+
if tenant_id not in self._factories:
521+
return None
522+
523+
# Lazy path: acquire per-tenant lock to serialize concurrent first-hit
524+
# resolves — only one factory invocation per tenant.
525+
lock = self._get_lock(tenant_id)
526+
async with lock:
527+
# Double-check: another coroutine may have built the platform
528+
# while we waited for the lock.
529+
platform = self._platforms.get(tenant_id)
530+
if platform is not None:
531+
health = self._health.get(tenant_id, "pending")
532+
return TenantResolution(
533+
tenant_id=tenant_id, health=health, platform=platform
534+
)
535+
536+
# Guard: unregister() may have run while we waited.
537+
if tenant_id not in self._health:
538+
return None
539+
540+
factory = self._factories.get(tenant_id)
541+
if factory is None:
542+
return None
543+
544+
try:
545+
platform = await factory(tenant_id)
546+
except Exception:
547+
logger.warning(
548+
"TenantRegistry.resolve: factory raised for tenant %r; "
549+
"health=disabled",
550+
tenant_id,
551+
exc_info=True,
552+
)
553+
if tenant_id in self._health:
554+
self._health[tenant_id] = "disabled"
555+
return None
556+
557+
try:
558+
ok = await self._run_validator(tenant_id)
559+
except Exception:
560+
logger.warning(
561+
"TenantRegistry.resolve: validator raised for tenant %r; "
562+
"health=disabled",
563+
tenant_id,
564+
exc_info=True,
565+
)
566+
if tenant_id in self._health:
567+
self._health[tenant_id] = "disabled"
568+
return None
569+
570+
# Guard: unregister() may have run while we awaited factory/validator.
571+
if tenant_id not in self._health:
572+
return None
573+
574+
if ok:
575+
self._platforms[tenant_id] = platform
576+
self._health[tenant_id] = "healthy"
577+
return TenantResolution(
578+
tenant_id=tenant_id, health="healthy", platform=platform
579+
)
580+
else:
581+
self._health[tenant_id] = "disabled"
582+
return None
583+
366584
@property
367585
def serve_options(self) -> dict[str, Any]:
368586
"""The ``default_serve_options`` dict passed at construction.
369587
370-
Spread into :func:`adcp.decisioning.serve` for consistent per-host
371-
configuration::
588+
Convenience accessor for single-tenant setups or when spreading
589+
common options into :func:`adcp.decisioning.serve`::
372590
373591
serve(platform, **registry.serve_options)
374592
593+
Multi-tenant deployments typically pass a router (not a single
594+
platform) to ``serve()``; in that case these options are consumed
595+
by the per-request dispatch layer rather than passed to ``serve``
596+
directly.
597+
375598
Returns an empty dict when no options were passed at construction.
376599
Returns a shallow copy — mutations to the returned dict do not
377600
affect the registry's stored options.
@@ -389,6 +612,7 @@ def registered_tenants(self) -> frozenset[str]:
389612

390613

391614
__all__ = [
615+
"PlatformFactory",
392616
"TenantHealthState",
393617
"TenantRegistry",
394618
"TenantResolution",

0 commit comments

Comments
 (0)