diff --git a/src/adcp/server/__init__.py b/src/adcp/server/__init__.py index 8521ab93a..8adb70670 100644 --- a/src/adcp/server/__init__.py +++ b/src/adcp/server/__init__.py @@ -144,6 +144,13 @@ async def get_products(params, context=None): serve, ) from adcp.server.sponsored_intelligence import SponsoredIntelligenceHandler +from adcp.server.tenant_registry import ( + PlatformFactory, + TenantHealthState, + TenantRegistry, + TenantResolution, + TenantValidator, +) from adcp.server.tenant_router import ( CallableSubdomainTenantRouter, InMemorySubdomainTenantRouter, @@ -217,6 +224,12 @@ async def get_products(params, context=None): # Idempotency middleware (AdCP #2315 seller side) "IdempotencyStore", "MemoryBackend", + # Multi-tenant registry with health tracking + "PlatformFactory", + "TenantHealthState", + "TenantRegistry", + "TenantResolution", + "TenantValidator", # Subdomain tenant routing "CallableSubdomainTenantRouter", "InMemorySubdomainTenantRouter", diff --git a/src/adcp/server/tenant_registry.py b/src/adcp/server/tenant_registry.py new file mode 100644 index 000000000..11cb82392 --- /dev/null +++ b/src/adcp/server/tenant_registry.py @@ -0,0 +1,661 @@ +"""TenantRegistry — higher-level multi-tenant management primitive. + +Provides JS ``createTenantRegistry`` parity for Python multi-tenant +deployments. Composes per-tenant health tracking with runtime +register/unregister/recheck, closing the JS↔Python parity gap on the +most-touched server-side primitive. + +Adopters pre-build per-tenant :class:`~adcp.decisioning.DecisioningPlatform` +instances and register them here (eager mode), or supply a factory callable +that is invoked on first request (lazy mode). The registry tracks health +state and surfaces :meth:`TenantRegistry.resolve_by_host` (sync, eager) or +:meth:`TenantRegistry.resolve` (async, both modes) for the request path. + +Comparison with lower-level building blocks: + +* :class:`~adcp.server.CallableSubdomainTenantRouter` — host→Tenant lookup + with TTL cache. Suitable when tenant routing is all you need. +* :class:`~adcp.decisioning.LazyPlatformRouter` — per-tenant + :class:`~adcp.decisioning.DecisioningPlatform` factory with LRU+TTL + cache. Suitable when you need lazy platform construction without health + tracking. +* :class:`TenantRegistry` — combines health tracking + runtime mutation + (register/unregister/recheck) with optional lazy platform construction. + Reach for this when multi-tenant SaaS topology requires observability + into per-tenant health state and runtime admin operations without a + process restart. Lazy mode (``register_lazy``) avoids paying per-tenant + platform-build cost at boot. +""" + +from __future__ import annotations + +import asyncio +import inspect +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import urlparse + +if TYPE_CHECKING: + from adcp.decisioning.platform import DecisioningPlatform + +logger = logging.getLogger(__name__) + +#: Per-tenant health state. See :class:`TenantRegistry` for semantics. +TenantHealthState = Literal["pending", "healthy", "unverified", "disabled"] + +#: Validator callable. Takes ``(tenant_id, agent_url)`` and returns +#: ``True`` when the tenant is valid, ``False`` otherwise. +#: May be sync or async — the registry awaits at call time. +TenantValidator = Callable[[str, str], "bool | Awaitable[bool]"] + +#: Lazy platform factory callable. Takes ``tenant_id`` and returns an +#: awaitable :class:`~adcp.decisioning.DecisioningPlatform`. Used with +#: :meth:`TenantRegistry.register_lazy` to defer per-tenant platform +#: construction until first request. +PlatformFactory = Callable[[str], "Awaitable[DecisioningPlatform]"] + + +@dataclass(frozen=True) +class TenantResolution: + """Result of :meth:`TenantRegistry.resolve_by_host`. + + :param tenant_id: Stable identifier for the resolved tenant. + :param health: Current health state. Callers gate traffic on this — + typically 503 for ``pending`` and ``disabled``, serve for + ``healthy`` and ``unverified``. + :param platform: The :class:`~adcp.decisioning.DecisioningPlatform` + for this tenant. Pass to :func:`adcp.decisioning.serve` or use + with a :class:`~adcp.decisioning.PlatformRouter`. + """ + + tenant_id: str + health: TenantHealthState + platform: DecisioningPlatform + + +class TenantRegistry: + """Higher-level multi-tenant primitive with health tracking. + + Mirrors JS SDK ``createTenantRegistry`` for Python deployments. + Supports two registration modes: + + * **Eager** (:meth:`register`) — caller pre-builds the + :class:`~adcp.decisioning.DecisioningPlatform` and passes it in. + :meth:`resolve_by_host` (sync) and :meth:`resolve` (async) both + return a resolution immediately. + * **Lazy** (:meth:`register_lazy`) — caller supplies a factory + callable; the platform is built on the first :meth:`resolve` call + and cached. Avoids paying per-tenant construction costs (network + handshakes, KMS credential fetches) at boot. Suitable for + deployments with many tenants. + + **Health states:** + + * ``pending`` — registered, not yet validated (or lazy factory not + yet invoked). Adopters should 503 traffic until validation + completes. + * ``healthy`` — validated and serving. + * ``unverified`` — was healthy; a subsequent :meth:`recheck` failed + (transient failure). The tenant still serves (graceful-degrade). + * ``disabled`` — persistent failure. 503 until an operator calls + :meth:`recheck` and validation succeeds. + + **Validator:** Optional callable ``(tenant_id, agent_url) -> bool``. + Pass a JWKS health-check, a connectivity probe, or any custom + validation logic. Adopters using principal-token bearer auth (no + JWKS) pass ``None`` — validation always succeeds immediately so + ``await_first_validation=True`` transitions the tenant to + ``healthy`` without a network round-trip. + + **Per-tenant locks:** Each tenant gets an ``asyncio.Lock`` on first + use. Locks are removed when the tenant is unregistered. Any + in-flight :meth:`recheck` or :meth:`resolve` that held the lock + before ``unregister()`` was called completes safely — zombie-entry + guards in both methods prevent stale writes after removal. + + **Do not pass a TenantRegistry as a SubdomainTenantRouter.** + Both classes expose ``async def resolve(host)``, but the return types + are incompatible (:class:`TenantResolution` vs :class:`Tenant`). + Mypy will flag the mismatch; duck-typing and ``isinstance`` checks + will not. + + :param validator: Optional validation callable (sync or async). + ``None`` → principal-token mode; validation always succeeds. + :param default_serve_options: Optional dict of defaults to store for + adopter convenience. Retrieve via :attr:`serve_options`. + + Example (eager boot-time registration):: + + from adcp.server import TenantRegistry + + registry = TenantRegistry(validator=None) + + for tenant in load_tenants_from_db(): + await registry.register( + tenant.id, + agent_url=tenant.agent_url, + platform=build_platform_for(tenant), + await_first_validation=True, + ) + + async def resolve(ctx): + resolved = await registry.resolve(ctx.host) + if resolved is None or resolved.health in ("pending", "disabled"): + raise HTTPException(503) + return resolved.platform + + Example (lazy registration — defers platform construction to first request):: + + registry = TenantRegistry(validator=check_jwks) + + for tenant in load_tenants_from_db(): + await registry.register_lazy( + tenant.id, + agent_url=tenant.agent_url, + factory=build_platform_for_tenant, # called on first resolve() + ) + + async def resolve(ctx): + resolved = await registry.resolve(ctx.host) # triggers factory on first hit + if resolved is None or resolved.health in ("pending", "disabled"): + raise HTTPException(503) + return resolved.platform + + Example (runtime admin operations):: + + # Hot-add a newly onboarded tenant + await registry.register(new_id, agent_url=..., platform=...) + + # Remove a deactivated tenant + registry.unregister(old_id) + + # Re-validate after key rotation + await registry.recheck(rotated_id) + status = registry.health(rotated_id) + """ + + def __init__( + self, + *, + validator: TenantValidator | None = None, + default_serve_options: dict[str, Any] | None = None, + ) -> None: + self._validator = validator + self._default_serve_options: dict[str, Any] = default_serve_options or {} + # Per-tenant health state. + self._health: dict[str, TenantHealthState] = {} + # Per-tenant DecisioningPlatform. Annotation uses TYPE_CHECKING import; + # safe because from __future__ import annotations makes it a lazy string. + self._platforms: dict[str, DecisioningPlatform] = {} + # Per-tenant agent_url — used to derive + update host_map entries. + self._agent_urls: dict[str, str] = {} + # Normalized host → tenant_id for O(1) resolve_by_host lookups. + self._host_map: dict[str, str] = {} + # Per-tenant asyncio.Lock for TOCTOU-safe state transitions. + # State mutations (register, recheck) read, await I/O, then write; + # without a lock two concurrent rechecks for the same tenant could + # both read, both await, and both commit — racing on the final state. + self._locks: dict[str, asyncio.Lock] = {} + # Per-tenant lazy platform factory. Set by register_lazy(); absent + # for tenants registered eagerly via register(). + self._factories: dict[str, PlatformFactory] = {} + + # ----- internal helpers ------------------------------------------------ + + def _get_lock(self, tenant_id: str) -> asyncio.Lock: + # No await between the check and the insertion, so this is safe + # under asyncio cooperative scheduling (single event loop thread). + if tenant_id not in self._locks: + self._locks[tenant_id] = asyncio.Lock() + return self._locks[tenant_id] + + @staticmethod + def _normalize_host(raw: str) -> str: + """Lower-case and strip any port suffix from a host or URL. + + Accepts both full URLs (``https://acme.example.com``) and raw + Host-header values (``acme.example.com``, ``acme.example.com:443``). + + Note: port stripping is correct for ``Host`` headers where the port + matches the scheme default. Some load-balancers forward + ``X-Forwarded-Host`` with non-default ports preserved; callers + using that header should strip the port themselves before passing + the value to :meth:`resolve_by_host` or :meth:`resolve`. + """ + if "://" in raw: + host = urlparse(raw).netloc or raw + else: + host = raw + if ":" in host: + host = host.rsplit(":", 1)[0] + return host.lower() + + async def _run_validator(self, tenant_id: str) -> bool: + """Invoke the configured validator; return True when valid.""" + if self._validator is None: + return True + agent_url = self._agent_urls.get(tenant_id, "") + result = self._validator(tenant_id, agent_url) + if inspect.isawaitable(result): + result = await result + return bool(result) + + # ----- public API ------------------------------------------------------ + + async def register( + self, + tenant_id: str, + *, + agent_url: str, + platform: DecisioningPlatform, + await_first_validation: bool = False, + ) -> None: + """Register a tenant. + + Health starts as ``pending``. When ``await_first_validation=True`` + the coroutine suspends until the validator resolves, then + transitions to ``healthy`` or ``disabled`` before returning — the + next :meth:`resolve_by_host` call sees the final state. + + Re-registering an existing tenant atomically replaces its platform + and agent_url under the per-tenant lock. The old host-map entry is + removed if the URL changed. + + :param tenant_id: Stable identifier (e.g. DB primary key). + :param agent_url: The tenant's agent endpoint URL. The host + component is extracted and used as the key for + :meth:`resolve_by_host` lookups. + :param platform: Pre-built + :class:`~adcp.decisioning.DecisioningPlatform` for this tenant. + :param await_first_validation: When ``True``, suspends the caller + until validation completes (not "blocks the event loop" — the + coroutine yields cooperatively while awaiting I/O). Useful at + boot so the first incoming request doesn't race the validation + roundtrip. The typical ``False`` default is correct for + background hot-add where traffic is gated on ``health != 'pending'``. + """ + lock = self._get_lock(tenant_id) + async with lock: + # Remove stale host-map entry when the URL changes. + old_url = self._agent_urls.get(tenant_id) + if old_url is not None and old_url != agent_url: + self._host_map.pop(self._normalize_host(old_url), None) + + self._platforms[tenant_id] = platform + self._agent_urls[tenant_id] = agent_url + self._host_map[self._normalize_host(agent_url)] = tenant_id + self._health[tenant_id] = "pending" + # Clear any lazy factory if re-registering as eager. + self._factories.pop(tenant_id, None) + + if await_first_validation: + try: + ok = await self._run_validator(tenant_id) + except Exception: + logger.warning( + "TenantRegistry.register: validator raised for tenant %r; " + "health=disabled", + tenant_id, + exc_info=True, + ) + self._health[tenant_id] = "disabled" + return + self._health[tenant_id] = "healthy" if ok else "disabled" + + async def register_lazy( + self, + tenant_id: str, + *, + agent_url: str, + factory: PlatformFactory, + await_first_validation: bool = False, + ) -> None: + """Register a tenant with a lazy platform factory. + + The platform is built on the first :meth:`resolve` call for this + tenant's host, then cached. Subsequent resolves return the cached + instance. Suitable for deployments with many tenants where eager + construction is too expensive at boot — network handshakes, KMS + credential fetches, inventory-manager construction, etc. + + Health starts as ``pending``. When ``await_first_validation=True`` + the factory is invoked immediately, the platform is built, and + validation completes before returning — the next :meth:`resolve` + call sees the final state without triggering the factory again. + + Use :meth:`resolve` (async) to get a :class:`TenantResolution` + for lazy-registered tenants; the synchronous :meth:`resolve_by_host` + returns ``None`` until the platform is built. + + Lazy and eager tenants share the same health state machine: + :meth:`health`, :meth:`unregister`, :meth:`recheck`, and + :attr:`registered_tenants` work identically regardless of + registration mode. + + Re-registering an existing tenant (eager or lazy) atomically + replaces its factory and agent_url under the per-tenant lock. + + :param tenant_id: Stable identifier (e.g. DB primary key). + :param agent_url: The tenant's agent endpoint URL. The host + component is extracted for :meth:`resolve` / :meth:`resolve_by_host`. + :param factory: Async callable ``(tenant_id) -> DecisioningPlatform``. + Called at most once per registration (not once per request). + :param await_first_validation: When ``True``, invokes the factory + and validator immediately before returning. + """ + lock = self._get_lock(tenant_id) + async with lock: + old_url = self._agent_urls.get(tenant_id) + if old_url is not None and old_url != agent_url: + self._host_map.pop(self._normalize_host(old_url), None) + + self._factories[tenant_id] = factory + # Clear any eagerly-built platform if re-registering as lazy. + self._platforms.pop(tenant_id, None) + self._agent_urls[tenant_id] = agent_url + self._host_map[self._normalize_host(agent_url)] = tenant_id + self._health[tenant_id] = "pending" + + if await_first_validation: + try: + platform = await factory(tenant_id) + ok = await self._run_validator(tenant_id) + except Exception: + logger.warning( + "TenantRegistry.register_lazy: factory/validator raised for " + "tenant %r; health=disabled", + tenant_id, + exc_info=True, + ) + self._health[tenant_id] = "disabled" + self._factories.pop(tenant_id, None) + return + if ok: + self._platforms[tenant_id] = platform + self._factories.pop(tenant_id, None) + self._health[tenant_id] = "healthy" + else: + # Validator rejected the platform — discard it and clear the + # factory to mirror resolve() cold-path behavior: a disabled + # lazy tenant needs register_lazy() + recheck() to recover. + self._health[tenant_id] = "disabled" + self._factories.pop(tenant_id, None) + + def unregister(self, tenant_id: str) -> None: + """Remove a tenant from the registry. + + Callers that already hold a reference to the tenant's platform + (e.g. an in-flight request that called :meth:`resolve_by_host` + before this call) complete normally — the registry does not cancel + in-flight work. Subsequent :meth:`resolve_by_host` calls for this + host return ``None``. + + Safe to call when the tenant is not registered (no-op). + """ + agent_url = self._agent_urls.pop(tenant_id, None) + if agent_url is not None: + self._host_map.pop(self._normalize_host(agent_url), None) + self._platforms.pop(tenant_id, None) + self._factories.pop(tenant_id, None) + self._health.pop(tenant_id, None) + self._locks.pop(tenant_id, None) + + async def recheck(self, tenant_id: str) -> None: + """Re-validate a tenant after key rotation or config change. + + **State transitions on validator success:** any state → ``healthy``. + + **State transitions on validator failure or exception:** + + * ``healthy`` → ``unverified`` (was serving; graceful-degrade so + existing traffic keeps flowing while the operator investigates). + * ``pending`` / ``unverified`` / ``disabled`` → ``disabled`` + (no prior healthy baseline; fail closed). + + The health state is updated before any exception propagates, so + the state is always consistent even when the validator raises. + + **Lazy-tenant caveats:** + + * For a lazy tenant in ``pending`` state (factory never invoked), + ``recheck()`` runs the validator against the registered + ``agent_url`` only. If it succeeds, health advances to + ``healthy`` — but the platform has not been built yet. + :meth:`resolve_by_host` still returns ``None``; use the async + :meth:`resolve` which triggers the factory on first call. + * For a lazy tenant that reached ``disabled`` via factory failure, + the factory has been cleared. Calling ``recheck()`` alone is + insufficient to recover — the validator may succeed but there + is no platform to serve. To retry platform construction, call + :meth:`register_lazy` again with the same factory, then call + :meth:`recheck` if you also need to re-run the validator. + + :raises KeyError: when ``tenant_id`` is not registered. + :raises Exception: re-raises any exception from the validator + after updating the health state. + """ + if tenant_id not in self._health: + raise KeyError(f"Tenant {tenant_id!r} is not registered") + + lock = self._get_lock(tenant_id) + async with lock: + # Re-check inside the lock — unregister may have raced. + if tenant_id not in self._health: + raise KeyError(f"Tenant {tenant_id!r} is not registered") + prior = self._health[tenant_id] + try: + ok = await self._run_validator(tenant_id) + except Exception: + # Guard: unregister() may have run while we awaited the validator. + # If so, _health no longer has this tenant — writing back would + # create a zombie entry visible via health() / registered_tenants. + if tenant_id not in self._health: + return + self._health[tenant_id] = ( + "unverified" if prior == "healthy" else "disabled" + ) + logger.warning( + "TenantRegistry.recheck: validator raised for tenant %r; " + "health=%s", + tenant_id, + self._health[tenant_id], + exc_info=True, + ) + raise + # Same guard for the success path. + if tenant_id not in self._health: + return + if ok: + self._health[tenant_id] = "healthy" + else: + self._health[tenant_id] = ( + "unverified" if prior == "healthy" else "disabled" + ) + + def health(self, tenant_id: str) -> TenantHealthState | None: + """Return the current health state for ``tenant_id``. + + Returns ``None`` when the tenant is not registered (distinct from + any health state value — callers can use ``is None`` to detect + unknown tenants). + """ + return self._health.get(tenant_id) + + def resolve_by_host(self, host: str) -> TenantResolution | None: + """Synchronous lookup by ``Host`` header value. + + Returns ``None`` when no tenant is registered for this host. + The caller is responsible for checking ``result.health`` and + gating traffic as appropriate — the registry does not 503 + automatically (health-gating belongs in the adopter's request + dispatch layer). + + The lookup is synchronous because the registry maintains its own + in-memory host → tenant mapping (updated eagerly by + :meth:`register` and :meth:`unregister`). This intentionally + departs from the JS SDK's async variant, which must call an + external resolver; the Python registry owns the mapping directly. + + :param host: Raw ``Host`` header value. Port suffixes are stripped + before lookup; the string may also be a full URL. + """ + normalized = self._normalize_host(host) + tenant_id = self._host_map.get(normalized) + if tenant_id is None: + return None + platform = self._platforms.get(tenant_id) + if platform is None: + return None + health = self._health.get(tenant_id, "pending") + return TenantResolution(tenant_id=tenant_id, health=health, platform=platform) + + async def resolve(self, host: str) -> TenantResolution | None: + """Async lookup by ``Host`` header value; builds lazy platforms on first hit. + + For eager tenants (registered via :meth:`register`), equivalent to + :meth:`resolve_by_host` at an async call site — no I/O occurs. + + For lazy tenants (registered via :meth:`register_lazy`), the + platform factory is invoked on the first call, then cached. + Concurrent first-hit resolves for the same tenant serialize on + the per-tenant lock — only one factory invocation occurs. + + Returns ``None`` when no tenant is registered for this host, or when + a lazy tenant's factory/validator fails on this call (health set to + ``disabled`` in both cases). + + Returns a :class:`TenantResolution` — which may have + ``health="disabled"`` — when the platform was already built (eager + registration, lazy + ``await_first_validation=True``, or a previous + :meth:`resolve` call). **Always check ``result.health`` before + serving; never gate solely on ``result is None``.** + + The caller is responsible for gating traffic — the registry does + not 503 automatically. + + :param host: Raw ``Host`` header value. Port suffixes are stripped; + full URLs are also accepted. See :meth:`_normalize_host` for + load-balancer caveats. + """ + normalized = self._normalize_host(host) + tenant_id = self._host_map.get(normalized) + if tenant_id is None: + return None + + # Fast path: platform already built (eager or previously-resolved lazy). + platform = self._platforms.get(tenant_id) + if platform is not None: + health = self._health.get(tenant_id, "pending") + return TenantResolution(tenant_id=tenant_id, health=health, platform=platform) + + # If there is no factory either, nothing to do. + if tenant_id not in self._factories: + return None + + # Lazy path: acquire per-tenant lock to serialize concurrent first-hit + # resolves — only one factory invocation per tenant. + lock = self._get_lock(tenant_id) + async with lock: + # Double-check: another coroutine may have built the platform + # while we waited for the lock. + platform = self._platforms.get(tenant_id) + if platform is not None: + health = self._health.get(tenant_id, "pending") + return TenantResolution( + tenant_id=tenant_id, health=health, platform=platform + ) + + # Guard: unregister() may have run while we waited. + if tenant_id not in self._health: + return None + + factory = self._factories.get(tenant_id) + if factory is None: + return None + + try: + platform = await factory(tenant_id) + except Exception: + logger.warning( + "TenantRegistry.resolve: factory raised for tenant %r; " + "health=disabled", + tenant_id, + exc_info=True, + ) + if tenant_id in self._health: + self._health[tenant_id] = "disabled" + # Drop the factory so subsequent resolve() calls don't re-invoke + # it — a disabled tenant needs operator intervention via recheck(). + self._factories.pop(tenant_id, None) + return None + + try: + ok = await self._run_validator(tenant_id) + except Exception: + logger.warning( + "TenantRegistry.resolve: validator raised for tenant %r; " + "health=disabled", + tenant_id, + exc_info=True, + ) + if tenant_id in self._health: + self._health[tenant_id] = "disabled" + self._factories.pop(tenant_id, None) + return None + + # Guard: unregister() may have run while we awaited factory/validator. + if tenant_id not in self._health: + return None + + if ok: + self._platforms[tenant_id] = platform + self._health[tenant_id] = "healthy" + # Factory no longer needed — platform is cached in _platforms. + self._factories.pop(tenant_id, None) + return TenantResolution( + tenant_id=tenant_id, health="healthy", platform=platform + ) + else: + self._health[tenant_id] = "disabled" + self._factories.pop(tenant_id, None) + return None + + @property + def serve_options(self) -> dict[str, Any]: + """The ``default_serve_options`` dict passed at construction. + + Convenience accessor for single-tenant setups or when spreading + common options into :func:`adcp.decisioning.serve`:: + + serve(platform, **registry.serve_options) + + Multi-tenant deployments typically pass a router (not a single + platform) to ``serve()``; in that case these options are consumed + by the per-request dispatch layer rather than passed to ``serve`` + directly. + + Returns an empty dict when no options were passed at construction. + Returns a shallow copy — mutations to the returned dict do not + affect the registry's stored options. + """ + return dict(self._default_serve_options) + + @property + def registered_tenants(self) -> frozenset[str]: + """Snapshot of the currently registered tenant ids. + + Read-only — mutations to the registry after this property is read + are not reflected in the returned frozenset. + """ + return frozenset(self._health) + + +__all__ = [ + "PlatformFactory", + "TenantHealthState", + "TenantRegistry", + "TenantResolution", + "TenantValidator", +] diff --git a/tests/test_tenant_registry.py b/tests/test_tenant_registry.py new file mode 100644 index 000000000..4a653dc04 --- /dev/null +++ b/tests/test_tenant_registry.py @@ -0,0 +1,760 @@ +"""Tests for :class:`adcp.server.TenantRegistry`. + +Covers: +* Basic register / unregister / resolve_by_host lifecycle +* Health state transitions (pending → healthy, disabled) +* recheck state machine (success and failure arms) +* Per-tenant asyncio.Lock — concurrent rechecks don't corrupt state +* Host normalization (port stripping, case folding) +* Validator (sync and async) is invoked correctly +* resolve_by_host returns None for unknown / unregistered hosts +* registered_tenants snapshot is correct after mutations +* register_lazy + resolve: lazy platform construction on first resolve() +* resolve() fast path for eagerly-registered tenants +* Lazy concurrent first-hit — only one factory invocation +* Factory failure → health=disabled, resolve returns None +* Lazy unregister-during-resolve — no zombie state +* Re-registering eager→lazy and lazy→eager +""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from adcp.server import ( + TenantRegistry, + TenantResolution, +) + +# --------------------------------------------------------------------------- +# Minimal mock DecisioningPlatform — only needs to be an object. +# --------------------------------------------------------------------------- + + +def _mock_platform(name: str = "platform") -> Any: + p = MagicMock() + p.__repr__ = lambda self: f"" + return p + + +# --------------------------------------------------------------------------- +# Basic lifecycle +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_register_sets_pending_health() -> None: + registry = TenantRegistry() + await registry.register("acme", agent_url="https://acme.example.com", platform=_mock_platform()) + assert registry.health("acme") == "pending" + + +@pytest.mark.asyncio +async def test_register_with_await_validation_no_validator_goes_healthy() -> None: + registry = TenantRegistry(validator=None) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_register_with_sync_validator_healthy() -> None: + registry = TenantRegistry(validator=lambda tid, url: True) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_register_with_sync_validator_disabled() -> None: + registry = TenantRegistry(validator=lambda tid, url: False) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_register_with_async_validator_healthy() -> None: + async def async_validator(tid: str, url: str) -> bool: + return True + + registry = TenantRegistry(validator=async_validator) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_register_validator_raises_sets_disabled() -> None: + def bad_validator(tid: str, url: str) -> bool: + raise RuntimeError("connection refused") + + registry = TenantRegistry(validator=bad_validator) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_unregister_removes_tenant() -> None: + registry = TenantRegistry() + await registry.register("acme", agent_url="https://acme.example.com", platform=_mock_platform()) + registry.unregister("acme") + assert registry.health("acme") is None + assert registry.resolve_by_host("acme.example.com") is None + + +@pytest.mark.asyncio +async def test_unregister_noop_for_unknown_tenant() -> None: + registry = TenantRegistry() + registry.unregister("nonexistent") # must not raise + + +@pytest.mark.asyncio +async def test_health_returns_none_for_unknown_tenant() -> None: + registry = TenantRegistry() + assert registry.health("ghost") is None + + +# --------------------------------------------------------------------------- +# resolve_by_host +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resolve_by_host_returns_none_for_unknown() -> None: + registry = TenantRegistry() + assert registry.resolve_by_host("unknown.example.com") is None + + +@pytest.mark.asyncio +async def test_resolve_by_host_returns_tenant_resolution() -> None: + platform = _mock_platform("acme") + registry = TenantRegistry() + await registry.register("acme", agent_url="https://acme.example.com", platform=platform) + + result = registry.resolve_by_host("acme.example.com") + assert result is not None + assert isinstance(result, TenantResolution) + assert result.tenant_id == "acme" + assert result.health == "pending" + assert result.platform is platform + + +@pytest.mark.asyncio +async def test_resolve_by_host_strips_port() -> None: + platform = _mock_platform() + registry = TenantRegistry() + await registry.register("acme", agent_url="https://acme.example.com", platform=platform) + + result = registry.resolve_by_host("acme.example.com:443") + assert result is not None + assert result.tenant_id == "acme" + + +@pytest.mark.asyncio +async def test_resolve_by_host_case_insensitive() -> None: + platform = _mock_platform() + registry = TenantRegistry() + await registry.register("acme", agent_url="https://acme.example.com", platform=platform) + + result = registry.resolve_by_host("ACME.EXAMPLE.COM") + assert result is not None + assert result.tenant_id == "acme" + + +@pytest.mark.asyncio +async def test_resolve_by_host_after_url_change() -> None: + platform = _mock_platform() + registry = TenantRegistry() + await registry.register("acme", agent_url="https://old.example.com", platform=platform) + await registry.register("acme", agent_url="https://new.example.com", platform=platform) + + assert registry.resolve_by_host("old.example.com") is None + result = registry.resolve_by_host("new.example.com") + assert result is not None and result.tenant_id == "acme" + + +# --------------------------------------------------------------------------- +# recheck state machine +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_recheck_pending_to_healthy() -> None: + registry = TenantRegistry(validator=lambda tid, url: True) + await registry.register("acme", agent_url="https://acme.example.com", platform=_mock_platform()) + assert registry.health("acme") == "pending" + + await registry.recheck("acme") + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_recheck_disabled_to_healthy() -> None: + calls = [False, True] + + def toggling(tid: str, url: str) -> bool: + return calls.pop(0) + + registry = TenantRegistry(validator=toggling) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "disabled" + + await registry.recheck("acme") + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_recheck_healthy_failure_goes_unverified() -> None: + calls = [True, False] + + def toggling(tid: str, url: str) -> bool: + return calls.pop(0) + + registry = TenantRegistry(validator=toggling) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "healthy" + + await registry.recheck("acme") + assert registry.health("acme") == "unverified" + + +@pytest.mark.asyncio +async def test_recheck_unverified_failure_goes_disabled() -> None: + # Start healthy → unverified → disabled + calls = [True, False, False] + + def toggling(tid: str, url: str) -> bool: + return calls.pop(0) + + registry = TenantRegistry(validator=toggling) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + await registry.recheck("acme") # healthy → unverified + assert registry.health("acme") == "unverified" + + await registry.recheck("acme") # unverified → disabled + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_recheck_raises_for_unknown_tenant() -> None: + registry = TenantRegistry() + with pytest.raises(KeyError, match="ghost"): + await registry.recheck("ghost") + + +@pytest.mark.asyncio +async def test_recheck_validator_raises_updates_state_then_reraises() -> None: + call_count = 0 + + def first_ok_then_raise(tid: str, url: str) -> bool: + nonlocal call_count + call_count += 1 + if call_count == 1: + return True + raise RuntimeError("validator exploded") + + registry = TenantRegistry(validator=first_ok_then_raise) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("acme") == "healthy" + + with pytest.raises(RuntimeError, match="validator exploded"): + await registry.recheck("acme") + + # Was healthy before the failed recheck → unverified (graceful-degrade). + assert registry.health("acme") == "unverified" + + +# --------------------------------------------------------------------------- +# Validator receives correct arguments +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_validator_receives_tenant_id_and_agent_url() -> None: + received: list[tuple[str, str]] = [] + + def capture(tid: str, url: str) -> bool: + received.append((tid, url)) + return True + + registry = TenantRegistry(validator=capture) + await registry.register( + "acme", + agent_url="https://acme.example.com/agent", + platform=_mock_platform(), + await_first_validation=True, + ) + assert received == [("acme", "https://acme.example.com/agent")] + + +# --------------------------------------------------------------------------- +# registered_tenants +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_registered_tenants_reflects_mutations() -> None: + registry = TenantRegistry() + await registry.register("a", agent_url="https://a.example.com", platform=_mock_platform()) + await registry.register("b", agent_url="https://b.example.com", platform=_mock_platform()) + assert registry.registered_tenants == {"a", "b"} + + registry.unregister("a") + assert registry.registered_tenants == {"b"} + + +# --------------------------------------------------------------------------- +# Concurrency — per-tenant lock prevents TOCTOU on state transitions +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_concurrent_rechecks_same_tenant_do_not_corrupt_state() -> None: + """Two concurrent rechecks on the same tenant complete without + corrupting state — the per-tenant lock serializes them. + + The validator yields (``asyncio.sleep(0)``) to give the event loop + a chance to interleave. The lock ensures only one transition runs + at a time, so both complete and the list is fully consumed. + """ + results = [True, True] + + async def async_validator(tid: str, url: str) -> bool: + await asyncio.sleep(0) # yield to event loop — maximises interleave opportunity + return results.pop(0) + + registry = TenantRegistry( + validator=lambda tid, url: async_validator(tid, url), + ) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=_mock_platform(), + ) + + await asyncio.gather(registry.recheck("acme"), registry.recheck("acme")) + # Both rechecks ran (list fully consumed) and final state is healthy. + assert results == [] + assert registry.health("acme") == "healthy" + + +@pytest.mark.asyncio +async def test_unregister_during_recheck_no_zombie_health_entry() -> None: + """unregister() called while recheck() is awaiting the validator must + not leave a zombie _health entry — health() and registered_tenants + must reflect the clean removal.""" + recheck_started = asyncio.Event() + allow_recheck = asyncio.Event() + + async def blocking_validator(tid: str, url: str) -> bool: + recheck_started.set() + await allow_recheck.wait() + return True + + registry = TenantRegistry(validator=blocking_validator) + await registry.register("acme", agent_url="https://acme.example.com", platform=_mock_platform()) + + async def do_recheck() -> None: + await registry.recheck("acme") + + recheck_task = asyncio.create_task(do_recheck()) + await recheck_started.wait() # recheck is now suspended inside the validator + + registry.unregister("acme") # race: remove while validator is awaited + + allow_recheck.set() + await recheck_task # recheck completes without raising + + # No zombie: tenant is fully gone. + assert registry.health("acme") is None + assert "acme" not in registry.registered_tenants + + +@pytest.mark.asyncio +async def test_multiple_tenants_independent() -> None: + """Health state for one tenant does not affect another.""" + registry = TenantRegistry(validator=lambda tid, url: tid == "good") + await registry.register( + "good", + agent_url="https://good.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + await registry.register( + "bad", + agent_url="https://bad.example.com", + platform=_mock_platform(), + await_first_validation=True, + ) + assert registry.health("good") == "healthy" + assert registry.health("bad") == "disabled" + + +# --------------------------------------------------------------------------- +# Lazy registration — register_lazy + resolve +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_register_lazy_sets_pending_health() -> None: + registry = TenantRegistry() + + async def factory(tid: str) -> Any: + return _mock_platform(tid) + + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + assert registry.health("acme") == "pending" + assert "acme" in registry.registered_tenants + + +@pytest.mark.asyncio +async def test_resolve_by_host_returns_none_for_lazy_unresolved() -> None: + """resolve_by_host (sync) returns None until the lazy platform is built.""" + registry = TenantRegistry() + + async def factory(tid: str) -> Any: + return _mock_platform(tid) + + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + assert registry.resolve_by_host("acme.example.com") is None + + +@pytest.mark.asyncio +async def test_resolve_builds_lazy_platform_on_first_call() -> None: + platform = _mock_platform("acme") + call_count = 0 + + async def factory(tid: str) -> Any: + nonlocal call_count + call_count += 1 + return platform + + registry = TenantRegistry(validator=None) + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + + result = await registry.resolve("acme.example.com") + assert result is not None + assert result.tenant_id == "acme" + assert result.health == "healthy" + assert result.platform is platform + assert call_count == 1 + + # Second call must use the cached platform — factory not called again. + result2 = await registry.resolve("acme.example.com") + assert result2 is not None + assert result2.platform is platform + assert call_count == 1 + + +@pytest.mark.asyncio +async def test_resolve_fast_path_for_eager_tenant() -> None: + """resolve() with an eager tenant does not invoke any factory.""" + platform = _mock_platform() + registry = TenantRegistry(validator=None) + await registry.register("acme", agent_url="https://acme.example.com", platform=platform, + await_first_validation=True) + + result = await registry.resolve("acme.example.com") + assert result is not None + assert result.health == "healthy" + assert result.platform is platform + + +@pytest.mark.asyncio +async def test_resolve_returns_none_for_unknown_host() -> None: + registry = TenantRegistry() + assert await registry.resolve("unknown.example.com") is None + + +@pytest.mark.asyncio +async def test_register_lazy_await_first_validation_builds_immediately() -> None: + platform = _mock_platform() + factory_called = False + + async def factory(tid: str) -> Any: + nonlocal factory_called + factory_called = True + return platform + + registry = TenantRegistry(validator=None) + await registry.register_lazy( + "acme", + agent_url="https://acme.example.com", + factory=factory, + await_first_validation=True, + ) + + assert factory_called + assert registry.health("acme") == "healthy" + + # resolve() must hit the fast path — no second factory invocation. + result = await registry.resolve("acme.example.com") + assert result is not None + assert result.health == "healthy" + + +@pytest.mark.asyncio +async def test_register_lazy_await_first_validation_factory_raises_disabled() -> None: + async def bad_factory(tid: str) -> Any: + raise RuntimeError("KMS unreachable") + + registry = TenantRegistry() + await registry.register_lazy( + "acme", + agent_url="https://acme.example.com", + factory=bad_factory, + await_first_validation=True, + ) + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_resolve_factory_raises_sets_disabled_returns_none() -> None: + async def bad_factory(tid: str) -> Any: + raise RuntimeError("factory exploded") + + registry = TenantRegistry() + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=bad_factory) + assert registry.health("acme") == "pending" + + result = await registry.resolve("acme.example.com") + assert result is None + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_resolve_validator_fails_sets_disabled_returns_none() -> None: + async def factory(tid: str) -> Any: + return _mock_platform(tid) + + registry = TenantRegistry(validator=lambda tid, url: False) + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + + result = await registry.resolve("acme.example.com") + assert result is None + assert registry.health("acme") == "disabled" + + +@pytest.mark.asyncio +async def test_resolve_concurrent_first_hit_invokes_factory_once() -> None: + """Concurrent first-hit resolves serialize on the per-tenant lock; + only one factory invocation occurs.""" + factory_call_count = 0 + + async def factory(tid: str) -> Any: + nonlocal factory_call_count + factory_call_count += 1 + await asyncio.sleep(0) # yield to maximise interleave opportunity + return _mock_platform(tid) + + registry = TenantRegistry(validator=None) + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + + results = await asyncio.gather( + registry.resolve("acme.example.com"), + registry.resolve("acme.example.com"), + registry.resolve("acme.example.com"), + ) + assert all(r is not None for r in results) + assert factory_call_count == 1 + + +@pytest.mark.asyncio +async def test_lazy_unregister_during_resolve_no_zombie() -> None: + """unregister() called while resolve() is awaiting the factory must + not leave a zombie health entry.""" + factory_started = asyncio.Event() + allow_factory = asyncio.Event() + + async def blocking_factory(tid: str) -> Any: + factory_started.set() + await allow_factory.wait() + return _mock_platform(tid) + + registry = TenantRegistry(validator=None) + await registry.register_lazy("acme", agent_url="https://acme.example.com", + factory=blocking_factory) + + resolve_task = asyncio.create_task(registry.resolve("acme.example.com")) + await factory_started.wait() + + registry.unregister("acme") + + allow_factory.set() + result = await resolve_task + + # The tenant was removed mid-flight — result must be None and no zombie. + assert result is None + assert registry.health("acme") is None + assert "acme" not in registry.registered_tenants + + +@pytest.mark.asyncio +async def test_reregister_eager_after_lazy_clears_factory() -> None: + """Re-registering a lazy tenant as eager clears the factory and uses + the pre-built platform immediately.""" + platform = _mock_platform("eager") + factory_called = False + + async def factory(tid: str) -> Any: + nonlocal factory_called + factory_called = True + return _mock_platform("lazy") + + registry = TenantRegistry(validator=None) + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + # Now re-register as eager. + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=platform, + await_first_validation=True, + ) + + result = await registry.resolve("acme.example.com") + assert result is not None + assert result.platform is platform + assert not factory_called + + +@pytest.mark.asyncio +async def test_reregister_lazy_after_eager_clears_platform() -> None: + """Re-registering an eager tenant as lazy clears the cached platform; + resolve() must invoke the new factory.""" + old_platform = _mock_platform("old-eager") + new_platform = _mock_platform("new-lazy") + + async def factory(tid: str) -> Any: + return new_platform + + registry = TenantRegistry(validator=None) + await registry.register( + "acme", + agent_url="https://acme.example.com", + platform=old_platform, + await_first_validation=True, + ) + # Verify sync path sees the old platform. + assert registry.resolve_by_host("acme.example.com") is not None + + # Re-register as lazy — should clear the old platform. + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + # Sync resolve_by_host now returns None (platform cleared). + assert registry.resolve_by_host("acme.example.com") is None + + # Async resolve builds the new platform. + result = await registry.resolve("acme.example.com") + assert result is not None + assert result.platform is new_platform + + +@pytest.mark.asyncio +async def test_register_lazy_await_first_validation_validator_false_does_not_cache() -> None: + """When validator returns False in register_lazy(await_first_validation=True), + platform must NOT be cached and factory must be cleared — mirrors resolve() + cold-path behavior so disabled tenants are consistent regardless of how they + were registered.""" + platform = _mock_platform() + + async def factory(tid: str) -> Any: + return platform + + registry = TenantRegistry(validator=lambda tid, url: False) + await registry.register_lazy( + "acme", + agent_url="https://acme.example.com", + factory=factory, + await_first_validation=True, + ) + assert registry.health("acme") == "disabled" + # Sync path must return None (platform not cached). + assert registry.resolve_by_host("acme.example.com") is None + # Async path must also return None (factory was cleared, no retry). + assert await registry.resolve("acme.example.com") is None + + +@pytest.mark.asyncio +async def test_resolve_factory_failure_does_not_retry_on_subsequent_calls() -> None: + """After factory failure sets health=disabled, subsequent resolve() calls + must not re-invoke the factory — disabled tenants need operator recheck().""" + call_count = 0 + + async def bad_factory(tid: str) -> Any: + nonlocal call_count + call_count += 1 + raise RuntimeError("factory exploded") + + registry = TenantRegistry() + await registry.register_lazy("acme", agent_url="https://acme.example.com", + factory=bad_factory) + + # First resolve: factory invoked, sets disabled. + result1 = await registry.resolve("acme.example.com") + assert result1 is None + assert registry.health("acme") == "disabled" + assert call_count == 1 + + # Subsequent resolves: factory must NOT be called again. + result2 = await registry.resolve("acme.example.com") + assert result2 is None + assert call_count == 1 + + +@pytest.mark.asyncio +async def test_unregister_lazy_tenant_removes_factory() -> None: + """Unregistering a lazy tenant removes the factory; resolve() returns None.""" + async def factory(tid: str) -> Any: + return _mock_platform(tid) + + registry = TenantRegistry() + await registry.register_lazy("acme", agent_url="https://acme.example.com", factory=factory) + registry.unregister("acme") + + assert registry.health("acme") is None + assert await registry.resolve("acme.example.com") is None