diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index cd6e38708..c3b69e833 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -16,13 +16,18 @@ AdagentsFetchResult, AdagentsValidationReport, AdAgentsValidationResult, + AgentAuthorizationsDirectoryResult, AuthorizationContext, + DirectoryDiscoveryMethod, + DirectoryEdgeStatus, + DirectoryPublisherEntry, DiscoveryMethod, EntryErrorKind, domain_matches, fetch_adagents, fetch_adagents_with_cache, fetch_agent_authorizations, + fetch_agent_authorizations_from_directory, filter_revoked_selectors, get_all_properties, get_all_tags, @@ -821,12 +826,17 @@ def get_adcp_version() -> str: "AdagentsEntryError", "AdagentsFetchResult", "AdagentsValidationReport", + "AgentAuthorizationsDirectoryResult", "AuthorizationContext", + "DirectoryDiscoveryMethod", + "DirectoryEdgeStatus", + "DirectoryPublisherEntry", "DiscoveryMethod", "EntryErrorKind", "fetch_adagents", "fetch_adagents_with_cache", "fetch_agent_authorizations", + "fetch_agent_authorizations_from_directory", "filter_revoked_selectors", "validate_adagents_domain", "validate_adagents_structure", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 6dc69bf79..306ea08ec 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -14,12 +14,15 @@ import re import socket from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Literal -from urllib.parse import urlparse +from urllib.parse import quote, urlparse import httpx +from pydantic import Field from adcp.exceptions import AdagentsNotFoundError, AdagentsTimeoutError, AdagentsValidationError +from adcp.types.base import AdCPBaseModel from adcp.validation import ValidationError, validate_adagents DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"] @@ -1773,3 +1776,170 @@ async def fetch_authorization_for_domain( # Build result dictionary, filtering out None values return {domain: ctx for domain, ctx in results if ctx is not None} + + +# Wire schema for the AAO agent → publishers inverse-lookup endpoint +# (`schemas/aao/agent-publishers.json`, adcp#4828). The publisher's own +# adagents.json remains the trust root — these models describe a *discovery* +# response, and callers SHOULD verify each `publisher_domain` against its +# adagents.json via :func:`fetch_adagents` before trusting an authorization. +DirectoryDiscoveryMethod = Literal[ + "direct", + "authoritative_location", + "adagents_authoritative", + "ads_txt_managerdomain", +] + +DirectoryEdgeStatus = Literal["authorized", "revoked"] + + +class DirectoryPublisherEntry(AdCPBaseModel): + """One publisher row in an AAO directory inverse-lookup response.""" + + publisher_domain: str + discovery_method: DirectoryDiscoveryMethod + manager_domain: str | None = None + properties_authorized: int = Field(ge=0) + properties_total: int = Field(ge=0) + signing_keys_pinned: bool | None = None + status: DirectoryEdgeStatus + last_verified_at: datetime + + +class AgentAuthorizationsDirectoryResult(AdCPBaseModel): + """Response envelope for ``GET /v1/agents/{agent_url}/publishers``. + + Maps directly to ``schemas/aao/agent-publishers.json`` in the AdCP + bundle (adcp#4828). The directory is a discovery accelerator — each + ``publisher_domain`` row tells callers where to look; they SHOULD + verify the publisher's adagents.json directly before treating an + authorization as trusted. + """ + + agent_url: str + directory_indexed_at: datetime | None + publishers: list[DirectoryPublisherEntry] = Field(default_factory=list) + next_cursor: str | None = None + + +# Per-page response cap. Matches MAX_POINTER_BYTES (5 MiB) — a directory +# page is a small envelope; pagination handles bulk responses. +MAX_DIRECTORY_PAGE_BYTES = 5 * 1024 * 1024 + + +async def fetch_agent_authorizations_from_directory( + agent_url: str, + *, + directory_url: str, + since: str | None = None, + timeout: float = 10.0, + client: httpx.AsyncClient | None = None, +) -> AgentAuthorizationsDirectoryResult: + """Query an AAO directory for publishers that authorize ``agent_url``. + + Calls ``GET {directory_url}/v1/agents/{agent_url}/publishers`` per the + AAO inverse-lookup contract (adcp#4823 / #4828) and returns the parsed + response. The directory's answer is *discovery*, not authorization: + callers should still verify each returned ``publisher_domain`` via + :func:`fetch_adagents` before treating an edge as trusted. + + Args: + agent_url: The agent whose publisher authorizations are being + queried. Passed verbatim in the path; the directory echoes + back a canonicalized form on the response. + directory_url: HTTPS base URL of the AAO directory + (e.g. ``"https://aao.example.com"``). The ``/v1/agents/...`` + path is appended; pass the directory's root, not a + request-specific path. + since: Optional opaque cursor or RFC 3339 timestamp from a prior + ``directory_indexed_at`` — passed through as ``?since=...`` + to limit the result to edges that changed since that point. + timeout: Request timeout in seconds. + client: Optional shared ``httpx.AsyncClient`` for connection + pooling. Caller owns the client lifecycle. + + Returns: + :class:`AgentAuthorizationsDirectoryResult`. On 404 from the + directory the function returns a result with ``publishers=[]`` + and ``directory_indexed_at=None`` — directories MUST be allowed + to answer "I do not index this agent" without callers needing + to branch on exception type. + + Raises: + AdagentsValidationError: If ``directory_url`` is malformed, the + response status is non-200/non-404, the body is not valid + JSON, or the body does not match the directory result schema. + AdagentsTimeoutError: If the request times out. + + Notes: + - ``directory_url`` is gated through the same SSRF protection + (HTTPS only, DNS pre-check, private/reserved address ban) as + publisher-side fetches. + - Response bodies are capped at 5 MiB. Bulk responses paginate + via ``next_cursor``; pass that value as ``since`` on the next + call — same wire field, different semantics per the schema. + """ + if not isinstance(agent_url, str) or not agent_url: + raise AdagentsValidationError("agent_url must be a non-empty string") + if not isinstance(directory_url, str) or not directory_url: + raise AdagentsValidationError("directory_url must be a non-empty string") + + base = directory_url.rstrip("/") + if not base.startswith("https://"): + raise AdagentsValidationError(f"directory_url must be an HTTPS URL, got: {directory_url!r}") + _validate_redirect_url(f"{base}/v1/agents/_/publishers") + + request_url = f"{base}/v1/agents/{quote(agent_url, safe='')}/publishers" + if since is not None: + request_url = f"{request_url}?since={quote(since, safe='')}" + + parsed = urlparse(request_url) + await _dns_validate_host( + parsed.hostname or "", parsed.port or (443 if parsed.scheme == "https" else 80) + ) + + headers = {"User-Agent": "AdCP-Client/1.0", "Accept": "application/json"} + + try: + if client is not None: + body, status_code, _ = await _stream_capped( + client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES + ) + else: + async with httpx.AsyncClient() as new_client: + body, status_code, _ = await _stream_capped( + new_client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES + ) + except httpx.TimeoutException as e: + raise AdagentsTimeoutError(parsed.netloc, timeout) from e + except httpx.RequestError as e: + raise AdagentsValidationError(f"Failed to fetch agent-publishers directory: {e}") from e + + if status_code == 404: + # Per adcp#4828, a directory that has not indexed this agent + # answers 404. Surface as an empty result so callers don't need + # to special-case the exception path for "no edges" — the + # protocol is intentionally permissive here. + return AgentAuthorizationsDirectoryResult( + agent_url=agent_url, + directory_indexed_at=None, + publishers=[], + next_cursor=None, + ) + + if status_code != 200: + raise AdagentsValidationError(f"Agent-publishers directory returned HTTP {status_code}") + + try: + data = json.loads(body) + except json.JSONDecodeError as e: + raise AdagentsValidationError( + f"Invalid JSON in agent-publishers directory response: {str(e)[:200]}" + ) from e + + try: + return AgentAuthorizationsDirectoryResult.model_validate(data) + except Exception as e: # pydantic.ValidationError + any coercion failure + raise AdagentsValidationError( + f"Agent-publishers directory response failed schema validation: {e}" + ) from e diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index d927297d5..ebeaacd42 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -35,6 +35,7 @@ "AdagentsValidationError", "AdagentsValidationReport", "AdvertiserIndustry", + "AgentAuthorizationsDirectoryResult", "AgentCapabilities", "AgentCompliance", "AgentConfig", @@ -116,6 +117,9 @@ "Destination", "DevicePlatform", "DeviceType", + "DirectoryDiscoveryMethod", + "DirectoryEdgeStatus", + "DirectoryPublisherEntry", "DiscoveryMethod", "DomainLookupResult", "Duration", @@ -352,6 +356,7 @@ "fetch_adagents", "fetch_adagents_with_cache", "fetch_agent_authorizations", + "fetch_agent_authorizations_from_directory", "filter_revoked_selectors", "generate_webhook_idempotency_key", "generated", diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 2f5c0df1e..071977f19 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -3461,3 +3461,226 @@ def _stream(method, url, **kwargs): ): with pytest.raises(AdagentsValidationError, match="size cap"): await fetch_adagents("example.com", client=mock_client) + + +# --------------------------------------------------------------------------- +# fetch_agent_authorizations_from_directory — HTTP-wire-level tests +# +# These tests exercise the AAO directory inverse-lookup path with a real +# httpx.MockTransport so the request URL, query string, and response body +# go through the same parser the SDK uses against a live directory. We +# parse the body with the real Pydantic model (no shape inference), and +# cover the 404 → empty path explicitly per the adcp#4828 contract. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestFetchAgentAuthorizationsFromDirectory: + @staticmethod + def _client(handler): + return httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + async def test_happy_path_parses_into_pydantic(self): + """Real wire body round-trips through AgentAuthorizationsDirectoryResult.""" + from adcp.adagents import ( + AgentAuthorizationsDirectoryResult, + DirectoryPublisherEntry, + fetch_agent_authorizations_from_directory, + ) + + captured: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["method"] = request.method + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": "2026-05-20T12:00:00Z", + "publishers": [ + { + "publisher_domain": "nytimes.example", + "discovery_method": "direct", + "properties_authorized": 3, + "properties_total": 5, + "signing_keys_pinned": False, + "status": "authorized", + "last_verified_at": "2026-05-20T11:50:00Z", + }, + { + "publisher_domain": "site1.example", + "discovery_method": "adagents_authoritative", + "manager_domain": "manager.example", + "properties_authorized": 1, + "properties_total": 1, + "status": "authorized", + "last_verified_at": "2026-05-20T11:55:00Z", + }, + ], + "next_cursor": "opaque-cursor-1", + }, + ) + + async with self._client(handler) as client: + result = await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert captured["method"] == "GET" + assert captured["url"] == ( + "https://aao.example.com/v1/agents/" "https%3A%2F%2Fagent.example.com%2F/publishers" + ) + assert isinstance(result, AgentAuthorizationsDirectoryResult) + assert result.agent_url == "https://agent.example.com/" + assert result.next_cursor == "opaque-cursor-1" + assert len(result.publishers) == 2 + assert all(isinstance(p, DirectoryPublisherEntry) for p in result.publishers) + assert result.publishers[0].discovery_method == "direct" + assert result.publishers[1].manager_domain == "manager.example" + assert result.publishers[0].status == "authorized" + + async def test_404_returns_empty_publishers(self): + """404 from the directory is the 'not indexed' answer — return empty.""" + from adcp.adagents import ( + AgentAuthorizationsDirectoryResult, + fetch_agent_authorizations_from_directory, + ) + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(404, text="Not found") + + async with self._client(handler) as client: + result = await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + assert isinstance(result, AgentAuthorizationsDirectoryResult) + assert result.publishers == [] + assert result.directory_indexed_at is None + assert result.next_cursor is None + assert result.agent_url == "https://agent.example.com/" + + async def test_since_cursor_passes_through_as_query_string(self): + """`since` is forwarded verbatim as ?since=… for pagination/incremental sync.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + captured: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["since"] = request.url.params.get("since") or "" + return httpx.Response( + 200, + json={ + "agent_url": "https://agent.example.com/", + "directory_indexed_at": None, + "publishers": [], + }, + ) + + async with self._client(handler) as client: + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com/", + since="opaque-cursor-1", + client=client, + ) + + assert captured["since"] == "opaque-cursor-1" + assert "?since=opaque-cursor-1" in captured["url"] + + async def test_timeout_raises_adagents_timeout_error(self): + """httpx timeouts surface as AdagentsTimeoutError (not generic Exception).""" + from adcp.adagents import fetch_agent_authorizations_from_directory + from adcp.exceptions import AdagentsTimeoutError + + def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ReadTimeout("simulated", request=request) + + async with self._client(handler) as client: + with pytest.raises(AdagentsTimeoutError): + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + async def test_malformed_json_raises_validation_error(self): + """A 200 with non-JSON body is the directory's bug — surface as ValidationError.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + content=b"not json at all", + headers={"content-type": "application/json"}, + ) + + async with self._client(handler) as client: + with pytest.raises(AdagentsValidationError, match="Invalid JSON"): + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + async def test_schema_mismatch_raises_validation_error(self): + """A 200 whose body doesn't match the schema fails Pydantic validation.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + # Missing required `directory_indexed_at`; `publishers` + # has an entry missing required `last_verified_at`. + "agent_url": "https://agent.example.com/", + "publishers": [ + { + "publisher_domain": "site1.example", + "discovery_method": "direct", + "properties_authorized": 0, + "properties_total": 0, + "status": "authorized", + } + ], + }, + ) + + async with self._client(handler) as client: + with pytest.raises(AdagentsValidationError, match="schema validation"): + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + ) + + async def test_non_https_directory_url_rejected(self): + """SSRF gate: http:// is refused before any network I/O.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + with pytest.raises(AdagentsValidationError, match="HTTPS"): + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="http://aao.example.com", + ) + + async def test_non_200_non_404_raises_validation_error(self): + """5xx is the directory's bug — surface as ValidationError, not 'empty'.""" + from adcp.adagents import fetch_agent_authorizations_from_directory + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503, text="upstream unavailable") + + async with self._client(handler) as client: + with pytest.raises(AdagentsValidationError, match="HTTP 503"): + await fetch_agent_authorizations_from_directory( + "https://agent.example.com/", + directory_url="https://aao.example.com", + client=client, + )