diff --git a/pyproject.toml b/pyproject.toml index efd1df09a..4d173628c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,6 +95,23 @@ dependencies = [ # time, and pinned snapshot semantics are what the spec requires for # cross-implementation conformance. "tldextract>=5.1.0", + # IDNA-2008 (UTS#46) host canonicalization — used by + # ``adcp.signing`` for JWKS URI host pinning, IP-pinned transport, + # revocation issuer canonicalization, and the key_origins + # consistency check. The stdlib ``encodings.idna`` module is + # IDNA-2003, which maps Eszett (``ß``) to ``ss`` and collapses + # final-sigma — both reject paths for IDNA-2008 (UTS#46 transitional + # off). A counterparty hosting ``straße.de`` would canonicalize + # under IDNA-2003 to ``strasse.de`` and silently match a + # different registrable domain than the spec mandates. + # ``idna.encode(host, uts46=True)`` is the canonical Python + # IDNA-2008 entry point. Floor at 3.7 because idna 3.0–3.6 have + # CVE-2024-3651 — a quadratic-complexity DoS in ``idna.encode()``, + # which this package calls on attacker-influenceable hosts (JWKS + # URI authority, redirect targets, revocation-issuer canonicalization, + # key_origins consistency check). Upper bound at <4 because IDNA + # semantics across a major are not contracted. + "idna>=3.7,<4", ] [project.scripts] diff --git a/src/adcp/signing/agent_resolver.py b/src/adcp/signing/agent_resolver.py index 03fa77d4d..bc041ef6f 100644 --- a/src/adcp/signing/agent_resolver.py +++ b/src/adcp/signing/agent_resolver.py @@ -44,7 +44,7 @@ from collections.abc import Callable from contextlib import AbstractAsyncContextManager from dataclasses import dataclass -from typing import Any, Literal +from typing import Any, ClassVar, Literal import httpx from pydantic import BaseModel, ConfigDict, Field @@ -57,6 +57,7 @@ from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport from adcp.signing.jwks import ( SSRFValidationError, + StaticJwksResolver, async_default_jwks_fetcher, ) @@ -149,6 +150,20 @@ class AgentResolution(BaseModel): description="Full JWK set fetched from ``jwks_uri`` (RFC 7517 ``{keys: [...]}``)" ) fetched_at: float = Field(description="Resolution wall-clock time (Unix epoch seconds)") + key_origins: dict[str, str] | None = Field( + default=None, + description=( + "Verbatim ``identity.key_origins`` map from the capabilities " + "response — purpose → declared origin (e.g. " + "``{'request_signing': 'https://keys.brand.com'}``). The " + "verifier consults this to enforce the spec's key-origin " + "consistency check (resolved jwks_uri host MUST equal the " + "declared origin for the purpose under verification). " + "``None`` when the operator advertises no key_origins map; " + "the verifier raises ``request_signature_key_origin_missing`` " + "for any signed-traffic purpose without a corresponding entry." + ), + ) trace: list[TraceEntry] = Field(default_factory=list) @@ -314,6 +329,34 @@ def _extract_brand_json_url(capabilities: dict[str, Any]) -> str: return brand_json_url +def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None: + """Pluck ``identity.key_origins`` from the capabilities body. + + Returns ``None`` when the operator advertises no key_origins map (a + common posture for unsigned-traffic-only deployments — the verifier + layer treats absence as "no per-purpose origin pin to check" and + only raises ``request_signature_key_origin_missing`` when a signed + purpose is actually exercised). Filters values to strings — a + malformed entry is skipped rather than poisoning the whole map. + + Forward-compat with operators on 3.0 schemas: the map travels under + ``additionalProperties: true`` and the SDK reads it as a plain dict + rather than via the typed Pydantic surface (which won't carry the + field until 3.1 lands). + """ + identity = capabilities.get("identity") + if not isinstance(identity, dict): + return None + raw = identity.get("key_origins") + if not isinstance(raw, dict) or not raw: + return None + out: dict[str, str] = {} + for purpose, origin in raw.items(): + if isinstance(purpose, str) and isinstance(origin, str) and origin: + out[purpose] = origin + return out or None + + # ---- Public API ---- @@ -387,6 +430,7 @@ async def async_resolve_agent( raise brand_json_url = _extract_brand_json_url(capabilities.body) + key_origins = _extract_key_origins(capabilities.body) # --- Hop 2: brand.json --- bj_start = time.monotonic() @@ -480,6 +524,7 @@ async def async_resolve_agent( jwks_uri=jwks_uri, jwks=jwks, fetched_at=fetched_at, + key_origins=key_origins, trace=trace, ) @@ -526,6 +571,8 @@ async def verify_from_agent_url( revocation_checker: Any = None, revocation_list: Any = None, allow_private_destinations: bool = False, + signing_purpose: str = "request_signing", + posture: str | None = None, ) -> Any: """Single-call factory: resolve ``agent_url`` and verify the request signature against the resolved JWKS. @@ -575,7 +622,6 @@ async def verify_from_agent_url( REQUEST_SIGNATURE_JWKS_UNTRUSTED, SignatureVerificationError, ) - from adcp.signing.jwks import StaticJwksResolver from adcp.signing.middleware import verify_starlette_request from adcp.signing.verifier import VerifierCapability, VerifyOptions @@ -605,19 +651,61 @@ async def verify_from_agent_url( message=f"agent-url resolution failed ({exc.code}): {exc.message}", ) from exc + # Mark the resolver with ``jwks_source = "brand_json"`` so the + # verifier's ``_maybe_check_key_origin`` step engages the spec's + # key-origin consistency check (the JWKS WAS sourced via the brand.json + # walk in ``async_resolve_agent``; the check applies). Without this + # marker the verifier would treat a bare ``StaticJwksResolver`` as a + # publisher-pin-equivalent and skip the check — defeating the + # production helper's defense against the shared-tenancy spoof. options = VerifyOptions( now=now if now is not None else _time.time(), capability=capability if capability is not None else VerifierCapability(supported=True), operation=operation, - jwks_resolver=StaticJwksResolver(resolution.jwks), + jwks_resolver=_BrandJsonStaticJwksResolver(resolution.jwks, jwks_uri=resolution.jwks_uri), replay_store=replay_store, revocation_checker=revocation_checker, revocation_list=revocation_list, agent_url=resolution.agent_entry.get("url"), + expected_key_origins=resolution.key_origins, + signing_purpose=signing_purpose, + posture=posture, ) return await verify_starlette_request(request, options=options) +class _BrandJsonStaticJwksResolver(StaticJwksResolver): + """A :class:`StaticJwksResolver` carrying the ``"brand_json"`` + source discriminant AND the resolved ``jwks_uri``. + + The brand.json walk in :func:`async_resolve_agent` resolved this + JWKS — that's exactly the source the spec's key-origin consistency + check (ADCP #3690 step 7) defends. The verifier's + ``_maybe_check_key_origin`` step skips when ``jwks_source`` is + absent (treating absence as publisher-pin-equivalent); marking the + static resolver here engages the check on every signed request + routed through :func:`verify_from_agent_url`. + + The verifier reads ``getattr(resolver, "jwks_uri", None)`` to look + up the resolved host for the consistency comparison. + :class:`StaticJwksResolver` does not carry a ``jwks_uri`` (it's a + static keyset), so this subclass stores the brand.json-resolved + URI on the instance. Without it the check would mismatch every + legitimate signer with ``actual_origin=""``. + + Defined inside the module rather than as a public type because the + discriminant is internal — adopters wiring custom resolvers set + their own ``jwks_source = "brand_json"`` class attribute and + ``jwks_uri`` instance attribute directly. + """ + + jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json" + + def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None: + super().__init__(jwks) + self.jwks_uri = jwks_uri + + # ---- helpers ---- diff --git a/src/adcp/signing/brand_jwks.py b/src/adcp/signing/brand_jwks.py index 4af8bb0ea..ed34e0569 100644 --- a/src/adcp/signing/brand_jwks.py +++ b/src/adcp/signing/brand_jwks.py @@ -41,7 +41,7 @@ from collections.abc import Callable from contextlib import AbstractAsyncContextManager from dataclasses import dataclass -from typing import Any, Literal +from typing import Any, ClassVar, Literal from urllib.parse import urlsplit, urlunsplit import httpx @@ -330,8 +330,25 @@ class BrandJsonJwksResolver: cascade: first ask the inner :class:`AsyncCachingJwksResolver` (which will refetch its own URL if cooldown has elapsed); if still unknown, refresh brand.json in case ``jwks_uri`` rotated. + + The :attr:`jwks_source` class attribute is the discriminant the + request-signing verifier consults to decide whether + :func:`adcp.signing.check_key_origin_consistency` applies for + this resolver. Per ADCP #3690 §step 7, the + ``identity.key_origins`` consistency check is mandatory only when + the JWKS source for the (agent, purpose, role) tuple was the + operator brand.json — and skipped for publisher-pinned tuples + (where the JWKS origin is the publisher's domain by design). + A resolver that always sources via brand.json declares + ``jwks_source = "brand_json"`` so the verifier engages the check; + a publisher-pin resolver either omits the attribute or declares + ``"publisher_pin"`` so the verifier skips it. """ + #: Discriminant for the verifier-side key_origin consistency + #: check (see class docstring). + jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json" + def __init__( self, brand_json_url: str, diff --git a/src/adcp/signing/ip_pinned_transport.py b/src/adcp/signing/ip_pinned_transport.py index 92e3b8308..fe2a968aa 100644 --- a/src/adcp/signing/ip_pinned_transport.py +++ b/src/adcp/signing/ip_pinned_transport.py @@ -63,6 +63,7 @@ import httpcore import httpx +import idna # Private but documented-as-the-default-backend implementations. The # underscore prefix is a stability hazard; the contract test in @@ -102,13 +103,19 @@ def _normalize_pin_host(host: str) -> str: Lowercases, strips a single trailing dot, and IDNA-encodes so Unicode hostnames compare equal to the punycode form httpx passes to httpcore. + + IDNA-2008 (UTS#46) via the PyPI ``idna`` package — the + package-wide canonicalization convention, matching the JWKS + fetcher's ``resolve_and_validate_host`` so a pin set on + ``straße.de`` collapses to the same A-label httpx will pass to + httpcore at connect time. """ host = host.lower() if host.endswith("."): host = host[:-1] try: - return host.encode("idna").decode("ascii") - except (UnicodeError, UnicodeEncodeError): + return idna.encode(host, uts46=True).decode("ascii") + except (idna.IDNAError, UnicodeError, UnicodeEncodeError): # Caller already stored the normalized form; fall through # with the lowercased input so the comparison just fails # cleanly instead of raising inside connect_tcp. diff --git a/src/adcp/signing/jwks.py b/src/adcp/signing/jwks.py index d4fa75b4c..cb4952dc5 100644 --- a/src/adcp/signing/jwks.py +++ b/src/adcp/signing/jwks.py @@ -33,6 +33,7 @@ from urllib.parse import urlsplit import httpx +import idna from adcp.signing.errors import ( REQUEST_SIGNATURE_JWKS_UNAVAILABLE, @@ -197,9 +198,17 @@ def resolve_and_validate_host( # raw Unicode; httpx encodes it. A mismatch here breaks the # hostname-match in the backend override and silently reopens # the TOCTOU for IDN hosts. + # + # IDNA-2008 (UTS#46, transitional_processing=False) via the PyPI + # ``idna`` package — stdlib ``encodings.idna`` is IDNA-2003 and + # mismaps Eszett (``ß`` → ``ss``) and final-sigma. The + # package-wide IDNA convention is IDNA-2008; all four callsites + # (here, ``ip_pinned_transport``, ``revocation_fetcher``, + # ``key_origins``) share this encoding so canonicalization + # results compare byte-equal across the verifier pipeline. try: - host = host.encode("idna").decode("ascii").lower() - except (UnicodeError, UnicodeEncodeError) as exc: + host = idna.encode(host, uts46=True).decode("ascii").lower() + except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc: raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80) if allowed_ports is not None and port not in allowed_ports: diff --git a/src/adcp/signing/key_origins.py b/src/adcp/signing/key_origins.py index 1a819dce9..5b240453c 100644 --- a/src/adcp/signing/key_origins.py +++ b/src/adcp/signing/key_origins.py @@ -39,6 +39,8 @@ from typing import Literal from urllib.parse import urlsplit +import idna + from adcp.signing.errors import ( REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH, REQUEST_SIGNATURE_KEY_ORIGIN_MISSING, @@ -160,15 +162,15 @@ def _origin_host(value: str) -> str | None: """Return the host portion of a URL or bare origin, canonicalized for byte-equality comparison. - Canonicalization mirrors the existing codebase pattern - (``jwks.py:201``, ``ip_pinned_transport.py:110``, - ``revocation_fetcher.py:380``): ASCII-lowercase, then - ``host.encode("idna").decode("ascii")`` to convert IDN U-labels to - their A-label (Punycode) form. The spec asks for IDNA-2008 strictly - while stdlib ``encodings.idna`` is IDNA-2003; the divergence is - rare in practice and matching the package's existing convention - keeps the canonicalization story coherent. A future IDNA-2008 - migration would update all four callsites together. + Canonicalization mirrors the package-wide IDNA-2008 (UTS#46) + convention used by ``jwks.py``, ``ip_pinned_transport.py``, and + ``revocation_fetcher.py``: ASCII-lowercase, then + ``idna.encode(host, uts46=True).decode("ascii")`` to convert IDN + U-labels to their A-label (Punycode) form. IDNA-2008 (vs the + stdlib's IDNA-2003) preserves Eszett (``ß``) and final-sigma per + spec rather than mapping them away, which is the canonicalization + the request-signing spec mandates for cross-implementation + byte-equality. **Bare-host and URL forms are normalized symmetrically.** A bare host like ``"keys.brand.com"`` is processed through the same @@ -198,8 +200,8 @@ def _origin_host(value: str) -> str | None: if not host: return None try: - return host.encode("idna").decode("ascii").lower() - except (UnicodeError, UnicodeEncodeError): + return idna.encode(host, uts46=True).decode("ascii").lower() + except (idna.IDNAError, UnicodeError, UnicodeEncodeError): return None diff --git a/src/adcp/signing/revocation_fetcher.py b/src/adcp/signing/revocation_fetcher.py index c902a04d6..b883f3ed5 100644 --- a/src/adcp/signing/revocation_fetcher.py +++ b/src/adcp/signing/revocation_fetcher.py @@ -42,6 +42,7 @@ from typing import Any, Protocol import httpx +import idna from adcp.signing.jwks import ( DEFAULT_JWKS_TIMEOUT_SECONDS, @@ -377,11 +378,14 @@ def _normalize_issuer(issuer: str) -> str: raise ValueError(f"issuer has no host: {issuer!r}") # IDNA-encode the host to collapse unicode homoglyphs to ASCII - # punycode. ``host.encode("idna")`` raises on characters outside the - # IDNA allowlist — which is the failure mode we want. + # punycode. ``idna.encode(host, uts46=True)`` raises on characters + # outside the IDNA-2008 allowlist — which is the failure mode we + # want. IDNA-2008 (UTS#46) matches the package-wide convention so + # an issuer canonicalized here compares byte-equal to the + # ``jwks_uri`` host the verifier pins via ``resolve_and_validate_host``. try: - host_ascii = parts.hostname.encode("idna").decode("ascii").lower() - except (UnicodeError, UnicodeEncodeError) as exc: + host_ascii = idna.encode(parts.hostname, uts46=True).decode("ascii").lower() + except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc: raise ValueError(f"issuer host {parts.hostname!r} is not IDNA-valid: {exc}") from exc netloc = f"{host_ascii}:{parts.port}" if parts.port else host_ascii diff --git a/src/adcp/signing/verifier.py b/src/adcp/signing/verifier.py index ac6b7160a..575c7a76f 100644 --- a/src/adcp/signing/verifier.py +++ b/src/adcp/signing/verifier.py @@ -55,6 +55,7 @@ SignatureVerificationError, ) from adcp.signing.jwks import JwksResolver +from adcp.signing.key_origins import check_key_origin_consistency from adcp.signing.replay import InMemoryReplayStore, ReplayStore from adcp.signing.revocation import RevocationChecker, RevocationList @@ -143,6 +144,25 @@ class VerifyOptions: expected_adcp_use: str = ADCP_USE_REQUEST allowed_algs: frozenset[str] = ALLOWED_ALGS agent_url: str | None = None + #: ADCP #3690 step 7 — the signing peer's declared + #: ``identity.key_origins`` map from its + #: ``get_adcp_capabilities`` response, keyed by signing purpose + #: (``request_signing``, ``webhook_signing``, ...). When provided + #: AND the JWKS resolver reports ``jwks_source == "brand_json"``, + #: the verifier checks that the resolved ``jwks_uri`` host + #: matches the declared origin for ``signing_purpose``. ``None`` + #: (default) skips the check — adopters who don't yet plumb + #: capabilities through to the verifier see no behavior change. + expected_key_origins: Mapping[str, str] | None = None + #: Purpose key used to look up ``expected_key_origins`` and to + #: render error messages. Default ``"request_signing"`` matches + #: the request-signing verifier's role; webhook callers pass + #: ``"webhook_signing"`` via their own wrapper. + signing_purpose: str = "request_signing" + #: Optional posture context (e.g. ``"required"``, ``"supported"``) + #: attached to ``_key_origin_missing`` rejections for adopter + #: diagnostics. + posture: str | None = None def verify_request_signature( @@ -242,6 +262,22 @@ def verify_request_signature( alg = str(parsed.params["alg"]) _check_key_purpose(jwk, alg, expected_adcp_use=options.expected_adcp_use) + # ADCP #3690 step 7: ``identity.key_origins`` consistency check. + # Mandatory ONLY when the JWKS source for this (agent, purpose, + # role) tuple was the operator brand.json. Publisher-pinned + # tuples skip the check (the JWKS origin is the publisher's + # domain by design). The resolver advertises which branch + # applies via its ``jwks_source`` attribute — duck-typed so the + # ``JwksResolver`` Protocol stays backwards-compatible with + # adopter resolvers that predate this attribute (those default + # to "publisher_pin" semantics, i.e. skip the check). + _maybe_check_key_origin( + resolver=options.jwks_resolver, + expected_key_origins=options.expected_key_origins, + signing_purpose=options.signing_purpose, + posture=options.posture, + ) + if options.revocation_list is not None: as_of = datetime.fromtimestamp(options.now, tz=timezone.utc) if options.revocation_list.is_stale(as_of): @@ -468,6 +504,54 @@ def _check_components( ) +def _maybe_check_key_origin( + *, + resolver: JwksResolver, + expected_key_origins: Mapping[str, str] | None, + signing_purpose: str, + posture: str | None, +) -> None: + """Run the ADCP #3690 step 7 ``identity.key_origins`` check when + the resolver sourced its keys from brand.json. + + Resolver contract (duck-typed): + + * ``jwks_source``: one of ``"brand_json"`` / ``"publisher_pin"`` / + absent. Only ``"brand_json"`` engages the check; ``"publisher_pin"`` + and absence skip it. Absence is treated as "skip" so legacy + :class:`JwksResolver` implementations that predate this attribute + keep working without behavior change. + * ``jwks_uri``: the resolved JWKS URI whose host is canonicalized + and compared against the declared origin. Required when the + resolver advertises ``jwks_source == "brand_json"``; a brand-json + resolver that doesn't expose ``jwks_uri`` is misconfigured — + we fail closed via the mismatch path (``actual_origin`` becomes + ``None``). + + Returns silently when: + + * ``expected_key_origins`` is ``None`` (adopter hasn't plumbed + capabilities through), OR + * the resolver isn't brand-json-sourced. + """ + if expected_key_origins is None: + return + source = getattr(resolver, "jwks_source", None) + if source != "brand_json": + return + jwks_uri = getattr(resolver, "jwks_uri", None) + # ``jwks_uri`` may be ``None`` if the brand-json resolver hasn't + # populated it yet (cold cache + failed refresh). The consistency + # check fails closed on a missing actual host — same posture as + # ``_origin_host`` returning ``None``. + check_key_origin_consistency( + jwks_uri=jwks_uri or "", + key_origins=expected_key_origins, + purpose=signing_purpose, + posture=posture, + ) + + def _check_key_purpose(jwk: Mapping[str, Any], alg: str, *, expected_adcp_use: str) -> None: if jwk.get("use") != "sig": raise SignatureVerificationError( diff --git a/tests/conformance/signing/test_verifier_key_origins.py b/tests/conformance/signing/test_verifier_key_origins.py new file mode 100644 index 000000000..2a00eebd0 --- /dev/null +++ b/tests/conformance/signing/test_verifier_key_origins.py @@ -0,0 +1,359 @@ +"""Verifier integration of the ``identity.key_origins`` consistency check. + +Behavior under test (matches ADCP request-signing spec #3690 step 7): + +* Brand-json-sourced resolver + matching declaration → success. +* Brand-json-sourced resolver + mismatched declaration → raises + ``request_signature_key_origin_mismatch``. +* Brand-json-sourced resolver + missing declaration → raises + ``request_signature_key_origin_missing``. +* Publisher-pin-sourced resolver (and resolvers without a + ``jwks_source`` attribute) skip the check entirely — a mismatched + declaration does NOT raise. +* Pre-existing verifier codes (``key_unknown``, etc.) still surface + unchanged when the prior step rejects. + +The check is wired only when ``VerifyOptions.expected_key_origins`` is +supplied; adopters who don't yet plumb capabilities through the +verifier see no behavior change. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, ClassVar, Literal + +import pytest + +from adcp.signing import ( + REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH, + REQUEST_SIGNATURE_KEY_ORIGIN_MISSING, + REQUEST_SIGNATURE_KEY_UNKNOWN, + SignatureVerificationError, + StaticJwksResolver, + VerifierCapability, + VerifyOptions, + sign_request, + verify_request_signature, +) +from adcp.signing.crypto import private_key_from_jwk + +VECTORS_DIR = Path(__file__).parent.parent / "vectors" / "request-signing" +KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +ED25519_KEY = next(k for k in KEYS if k["kid"] == "test-ed25519-2026") + + +class _BrandJsonStaticResolver: + """Test double: a JWKS resolver that advertises brand-json sourcing. + + Combines :class:`StaticJwksResolver`'s in-memory keys with the + ``jwks_source``/``jwks_uri`` discriminant the verifier reads to + decide whether to engage the key-origin consistency check. Lets + these tests exercise the verifier-integration seam without + standing up a real brand.json walk. + """ + + jwks_source: ClassVar[Literal["brand_json"]] = "brand_json" + + def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None: + self._inner = StaticJwksResolver(jwks) + self.jwks_uri = jwks_uri + + def __call__(self, keyid: str) -> dict[str, Any] | None: + return self._inner(keyid) + + +class _PublisherPinStaticResolver: + """Test double: a JWKS resolver that advertises publisher-pin sourcing. + + The publisher-pin source skips the consistency check per spec — + the JWKS origin is the publisher's domain by design, not the + operator's. + """ + + jwks_source: ClassVar[Literal["publisher_pin"]] = "publisher_pin" + + def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None: + self._inner = StaticJwksResolver(jwks) + self.jwks_uri = jwks_uri + + def __call__(self, keyid: str) -> dict[str, Any] | None: + return self._inner(keyid) + + +def _sign_basic( + *, + method: str = "POST", + url: str = "https://seller.example.com/adcp/create_media_buy", + body: bytes = b"{}", + created: int = 1776520800, +) -> tuple[dict[str, str], bytes]: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + headers = {"Content-Type": "application/json"} + signed = sign_request( + method=method, + url=url, + headers=headers, + body=body, + private_key=private_key, + key_id="test-ed25519-2026", + alg="ed25519", + created=created, + ) + return {**headers, **signed.as_dict()}, body + + +def _options_with( + resolver: Any, + *, + expected_key_origins: dict[str, str] | None = None, + signing_purpose: str = "request_signing", + posture: str | None = None, +) -> VerifyOptions: + return VerifyOptions( + now=1776520800.0, + capability=VerifierCapability(covers_content_digest="either"), + operation="create_media_buy", + jwks_resolver=resolver, + expected_key_origins=expected_key_origins, + signing_purpose=signing_purpose, + posture=posture, + ) + + +# ----- brand-json source: check engages ----- + + +def test_brand_json_source_matching_origin_passes() -> None: + """Resolver advertises brand-json source AND declaration matches + resolved jwks_uri host → verifier passes (no raise from the + key-origin check).""" + headers, body = _sign_basic() + resolver = _BrandJsonStaticResolver( + {"keys": [ED25519_KEY]}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + options = _options_with( + resolver, + expected_key_origins={"request_signing": "https://keys.brand.example"}, + ) + # Verification succeeds — both the signature and the key-origin + # consistency check pass. + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + + +def test_brand_json_source_mismatched_origin_raises_mismatch_code() -> None: + """Resolver advertises brand-json source AND declaration disagrees + with resolved jwks_uri host → verifier raises + ``request_signature_key_origin_mismatch``.""" + headers, body = _sign_basic() + resolver = _BrandJsonStaticResolver( + {"keys": [ED25519_KEY]}, + # JWKS resolved at attacker-controlled host. + jwks_uri="https://attacker.example.org/.well-known/jwks.json", + ) + options = _options_with( + resolver, + # Capabilities declare the legitimate host. + expected_key_origins={"request_signing": "https://keys.brand.example"}, + ) + with pytest.raises(SignatureVerificationError) as exc_info: + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + detail = exc_info.value.detail + assert detail is not None + assert detail["purpose"] == "request_signing" + assert detail["actual_origin"] == "attacker.example.org" + assert detail["expected_origin"] == "keys.brand.example" + + +def test_brand_json_source_missing_declaration_raises_missing_code() -> None: + """Resolver advertises brand-json source AND + ``identity.key_origins`` carries no entry for the purpose → + verifier raises ``request_signature_key_origin_missing``.""" + headers, body = _sign_basic() + resolver = _BrandJsonStaticResolver( + {"keys": [ED25519_KEY]}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + options = _options_with( + resolver, + # Capabilities map doesn't carry ``request_signing``. + expected_key_origins={"webhook_signing": "https://keys.brand.example"}, + posture="required", + ) + with pytest.raises(SignatureVerificationError) as exc_info: + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISSING + detail = exc_info.value.detail + assert detail is not None + assert detail["purpose"] == "request_signing" + assert detail["posture"] == "required" + + +# ----- publisher-pin source: check skips ----- + + +def test_publisher_pin_source_mismatched_origin_does_not_raise() -> None: + """Resolver advertises publisher-pin source → verifier skips the + key-origin check entirely. A mismatched declaration is NOT + grounds for rejection because publisher-pinned JWKS hosts are + expected to differ from the operator origin per spec. + """ + headers, body = _sign_basic() + resolver = _PublisherPinStaticResolver( + {"keys": [ED25519_KEY]}, + jwks_uri="https://publisher.example.com/.well-known/jwks.json", + ) + options = _options_with( + resolver, + # Operator-side declaration deliberately disagrees — the check + # must skip and not raise. + expected_key_origins={"request_signing": "https://keys.brand.example"}, + ) + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + + +def test_publisher_pin_source_missing_declaration_does_not_raise() -> None: + """Publisher-pin source + absent declaration → no raise. The + skip is unconditional regardless of declaration presence.""" + headers, body = _sign_basic() + resolver = _PublisherPinStaticResolver( + {"keys": [ED25519_KEY]}, + jwks_uri="https://publisher.example.com/.well-known/jwks.json", + ) + options = _options_with( + resolver, + expected_key_origins={"webhook_signing": "https://keys.brand.example"}, + ) + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + + +# ----- legacy resolvers (no jwks_source attribute) ----- + + +def test_resolver_without_source_attribute_skips_check() -> None: + """Adopter-supplied :class:`JwksResolver` that predates the + ``jwks_source`` attribute → verifier skips the check (treats + absence as ``publisher_pin``-equivalent). This preserves + backwards compatibility with the Protocol — adopters who haven't + rebuilt their resolvers against the new contract keep working. + """ + headers, body = _sign_basic() + # Plain StaticJwksResolver carries no ``jwks_source`` attribute. + resolver = StaticJwksResolver({"keys": [ED25519_KEY]}) + options = _options_with( + resolver, + # Declaration is provided but resolver doesn't advertise + # brand-json sourcing — check skips. + expected_key_origins={"request_signing": "https://keys.brand.example"}, + ) + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + + +# ----- check does not fire without expected_key_origins ----- + + +def test_brand_json_source_skips_check_when_no_expected_origins() -> None: + """``expected_key_origins=None`` (default) → check skips even on a + brand-json-sourced resolver. Adopters who haven't yet plumbed + capabilities through the verifier see no behavior change. + """ + headers, body = _sign_basic() + resolver = _BrandJsonStaticResolver( + # Even with a mismatched jwks_uri, the check skips when the + # capabilities-derived expected_key_origins is None. + {"keys": [ED25519_KEY]}, + jwks_uri="https://different.example/.well-known/jwks.json", + ) + options = _options_with(resolver, expected_key_origins=None) + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + + +# ----- earlier failure codes still surface ----- + + +def test_key_unknown_still_surfaces_before_origin_check() -> None: + """A keyid the resolver can't resolve raises ``key_unknown`` — + the verifier short-circuits before the key-origin check, so the + pre-existing code is preserved. Important: the key-origin wiring + must NOT change the order of operations in the checklist; the + spec mandates ``key_unknown`` is the rejection for a missing + JWK, even when a key-origin declaration is configured. + """ + headers, body = _sign_basic() + # Brand-json-sourced resolver with EMPTY key set — every kid + # returns None. + resolver = _BrandJsonStaticResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + options = _options_with( + resolver, + expected_key_origins={"request_signing": "https://keys.brand.example"}, + ) + with pytest.raises(SignatureVerificationError) as exc_info: + verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=options, + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_UNKNOWN + + +# ----- BrandJsonJwksResolver advertises the discriminant ----- + + +def test_brand_json_jwks_resolver_advertises_source() -> None: + """The production :class:`BrandJsonJwksResolver` advertises + ``jwks_source = "brand_json"`` at the class level — instances + surface it without needing a refresh cycle to populate it. + """ + from adcp.signing.brand_jwks import BrandJsonJwksResolver + + assert BrandJsonJwksResolver.jwks_source == "brand_json" diff --git a/tests/test_key_origins.py b/tests/test_key_origins.py index 3cfabe357..86814aed5 100644 --- a/tests/test_key_origins.py +++ b/tests/test_key_origins.py @@ -269,8 +269,8 @@ def test_consistency_bare_host_with_userinfo_rejects_or_normalizes() -> None: def test_consistency_idn_a_label_equals_u_label() -> None: # IDN U-label (``münchen.example``) and A-label (Punycode # ``xn--mnchen-3ya.example``) refer to the same host. Canonicalization - # to A-label via ``host.encode("idna")`` must make them compare equal - # regardless of which form each side uses. + # to A-label via the PyPI ``idna`` package must make them compare + # equal regardless of which form each side uses. check_key_origin_consistency( jwks_uri="https://xn--mnchen-3ya.example/.well-known/jwks.json", key_origins={"request_signing": "münchen.example"}, @@ -283,6 +283,39 @@ def test_consistency_idn_a_label_equals_u_label() -> None: ) +def test_consistency_eszett_preserves_as_punycode_idna_2008() -> None: + # IDNA-2008 (UTS#46 transitional off) regression: ``ß`` (U+00DF + # LATIN SMALL LETTER SHARP S) preserves as the Punycode + # ``xn--strae-oqa.de``. The stdlib's IDNA-2003 path (``encodings.idna``) + # instead lower-folded ``ß`` to ``ss``, canonicalizing + # ``straße.de`` to ``strasse.de`` — a different registrable domain + # that an attacker controlling either side could exploit to either + # silently match a non-owned origin or deny a legitimate one. + # + # If this test fails after a future canonicalization refactor, the + # IDNA-2008 binding has regressed. + check_key_origin_consistency( + jwks_uri="https://straße.de/.well-known/jwks.json", + key_origins={"request_signing": "xn--strae-oqa.de"}, + purpose="request_signing", + ) + check_key_origin_consistency( + jwks_uri="https://xn--strae-oqa.de/.well-known/jwks.json", + key_origins={"request_signing": "straße.de"}, + purpose="request_signing", + ) + # The IDNA-2003 mismapped form (``strasse.de``) must NOT match — + # otherwise the regression check silently passes if both sides + # converge on the same wrong A-label. + with pytest.raises(SignatureVerificationError) as exc_info: + check_key_origin_consistency( + jwks_uri="https://straße.de/.well-known/jwks.json", + key_origins={"request_signing": "strasse.de"}, + purpose="request_signing", + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + + def test_consistency_unparseable_declared_origin_fails_closed() -> None: # Symmetric to ``test_consistency_raises_mismatch_on_invalid_jwks_uri`` # but with the unparseable string on the *declared* side. A future diff --git a/tests/test_verify_from_agent_url.py b/tests/test_verify_from_agent_url.py index 715244e27..3ee7fe400 100644 --- a/tests/test_verify_from_agent_url.py +++ b/tests/test_verify_from_agent_url.py @@ -224,3 +224,278 @@ async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped ) assert exc.value.code == "request_signature_required" assert exc.value.step == 0 + + +# ---- key_origins consistency wiring ---- + + +def _resolved_with_origins(key_origins: dict[str, str] | None) -> AgentResolution: + """Variant of ``_RESOLVED_AGENT`` carrying a specific ``key_origins`` map. + + Used to pin the verifier-side wiring: ``verify_from_agent_url`` must + surface ``identity.key_origins`` from the capabilities response into + ``VerifyOptions.expected_key_origins`` so the verifier's + ``_maybe_check_key_origin`` step actually engages. Without this the + SDK's recommended buyer entrypoint silently skips the spec's + shared-tenancy-spoof defense. + """ + return AgentResolution( + agent_url="https://buyer.example.com/mcp", + brand_json_url="https://example.com/.well-known/brand.json", + agent_entry={ + "type": "sales", + "url": "https://buyer.example.com/mcp", + "jwks_uri": "https://example.com/.well-known/jwks.json", + }, + jwks_uri="https://example.com/.well-known/jwks.json", + jwks={"keys": []}, + fetched_at=0.0, + key_origins=key_origins, + trace=[], + ) + + +@pytest.mark.asyncio +async def test_factory_threads_key_origins_into_verify_options( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When the resolution carries an ``identity.key_origins`` map, + ``VerifyOptions.expected_key_origins`` is set to it. Pins the + production wiring against the bypass the original review flagged + (Argus on PR #789): without this the SDK's recommended helper + builds ``VerifyOptions`` with ``expected_key_origins=None`` and the + spec's consistency check silently no-ops in production.""" + origins = {"request_signing": "https://example.com"} + seen: dict[str, Any] = {} + + async def fake_resolve(*args, **kwargs): + return _resolved_with_origins(origins) + + async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped-def] + seen["options"] = options + return "ok" + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_resolve) + monkeypatch.setattr("adcp.signing.middleware.verify_starlette_request", fake_verify_starlette) + + await verify_from_agent_url( + _FakeStarletteRequest(), + "https://buyer.example.com/mcp", + agent_type="sales", + operation="get_products", + ) + assert seen["options"].expected_key_origins == origins + # Default ``signing_purpose`` is ``"request_signing"`` — the most + # common buyer path. Webhook callers override. + assert seen["options"].signing_purpose == "request_signing" + + +@pytest.mark.asyncio +async def test_factory_passes_signing_purpose_through( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The new ``signing_purpose`` kwarg propagates to ``VerifyOptions`` + so webhook-signing callers can route to ``identity.key_origins.webhook_signing`` + instead of ``request_signing``. Default is request-side; webhook + paths must opt in.""" + seen: dict[str, Any] = {} + + async def fake_resolve(*args, **kwargs): + return _resolved_with_origins({"webhook_signing": "https://hooks.example.com"}) + + async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped-def] + seen["options"] = options + return "ok" + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_resolve) + monkeypatch.setattr("adcp.signing.middleware.verify_starlette_request", fake_verify_starlette) + + await verify_from_agent_url( + _FakeStarletteRequest(), + "https://buyer.example.com/mcp", + agent_type="sales", + operation="webhook", + signing_purpose="webhook_signing", + posture="supported", + ) + assert seen["options"].signing_purpose == "webhook_signing" + assert seen["options"].posture == "supported" + + +@pytest.mark.asyncio +async def test_factory_resolver_carries_brand_json_source_marker( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The JWKS resolver passed to the verifier MUST carry + ``jwks_source = "brand_json"`` so the verifier's + ``_maybe_check_key_origin`` step engages. A bare + :class:`StaticJwksResolver` (which is what the pre-fix code used) + would skip the check because ``jwks_source`` is absent — exactly + the bypass Argus flagged on the original PR #789.""" + seen: dict[str, Any] = {} + + async def fake_resolve(*args, **kwargs): + return _resolved_with_origins({"request_signing": "https://example.com"}) + + async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped-def] + seen["options"] = options + return "ok" + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_resolve) + monkeypatch.setattr("adcp.signing.middleware.verify_starlette_request", fake_verify_starlette) + + await verify_from_agent_url( + _FakeStarletteRequest(), + "https://buyer.example.com/mcp", + agent_type="sales", + operation="get_products", + ) + resolver = seen["options"].jwks_resolver + # The discriminant is on the class, not the instance — the verifier + # reads it via ``type(resolver).jwks_source`` so subclassing carries + # the marker without per-instance state. + assert getattr(type(resolver), "jwks_source", None) == "brand_json" + + +@pytest.mark.asyncio +async def test_factory_passes_none_origins_when_capabilities_omit_them( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Operators on legacy 3.0 deployments don't advertise + ``identity.key_origins``. The resolution carries ``None`` and the + factory propagates it; the verifier-side check skips on + ``expected_key_origins is None`` per the ``_maybe_check_key_origin`` + contract. This preserves back-compat — the new defense engages + only when the operator opts in by publishing the origins map.""" + seen: dict[str, Any] = {} + + async def fake_resolve(*args, **kwargs): + return _resolved_with_origins(None) + + async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped-def] + seen["options"] = options + return "ok" + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_resolve) + monkeypatch.setattr("adcp.signing.middleware.verify_starlette_request", fake_verify_starlette) + + await verify_from_agent_url( + _FakeStarletteRequest(), + "https://buyer.example.com/mcp", + agent_type="sales", + operation="get_products", + ) + assert seen["options"].expected_key_origins is None + + +# ---- Integration: production resolver drives the real verifier path ---- +# +# The mocked tests above pin the wiring; this test pins the END-TO-END: +# the production ``_BrandJsonStaticJwksResolver`` class drives the real +# ``_maybe_check_key_origin`` step (no mocking of ``verify_starlette_request`` +# or the verifier internals). Catches the class of bug Argus flagged on +# the first pass: the subclass carried the ``jwks_source`` marker but +# not the ``jwks_uri`` attribute, so the verifier's +# ``getattr(resolver, "jwks_uri", None) → None`` produced an +# ``actual_origin=""`` mismatch on every legitimate signer. + + +def test_brand_json_static_resolver_carries_jwks_uri_attribute() -> None: + """``_BrandJsonStaticJwksResolver`` MUST expose ``jwks_uri`` on the + instance — the verifier reads it via ``getattr(resolver, "jwks_uri", + None)`` and compares the host to the declared origin. Argus's first + review caught this gap; the subclass needs the URI even though + ``StaticJwksResolver.__init__`` doesn't accept it.""" + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + assert resolver.jwks_uri == "https://keys.brand.example/.well-known/jwks.json" + assert type(resolver).jwks_source == "brand_json" + + +def test_maybe_check_key_origin_accepts_brand_json_resolver_with_matching_origin() -> None: + """Drives the production ``_BrandJsonStaticJwksResolver`` against + the real ``_maybe_check_key_origin`` step. Without the subclass's + ``__init__`` storing ``jwks_uri``, this check would fire mismatch + on every legitimate signer (the verifier would compare + ``actual_origin=""`` against the declared origin and reject). + The success path here is the regression test for that bypass.""" + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + from adcp.signing.verifier import _maybe_check_key_origin + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + + # No raise — host matches the declared origin. + _maybe_check_key_origin( + resolver=resolver, + expected_key_origins={"request_signing": "https://keys.brand.example"}, + signing_purpose="request_signing", + posture=None, + ) + + +def test_maybe_check_key_origin_rejects_mismatched_origin_on_brand_json_resolver() -> None: + """Negative side of the integration check: same production resolver + class, but the declared origin points elsewhere → step 7 raises + ``request_signature_key_origin_mismatch`` with the host pair in the + detail map. Pins the rejection direction (so a future refactor + can't accidentally flip the comparison).""" + from adcp.signing.agent_resolver import _BrandJsonStaticJwksResolver + from adcp.signing.errors import REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + from adcp.signing.verifier import _maybe_check_key_origin + + resolver = _BrandJsonStaticJwksResolver( + {"keys": []}, + jwks_uri="https://keys.brand.example/.well-known/jwks.json", + ) + + with pytest.raises(SignatureVerificationError) as exc_info: + _maybe_check_key_origin( + resolver=resolver, + expected_key_origins={"request_signing": "https://attacker.example"}, + signing_purpose="request_signing", + posture=None, + ) + assert exc_info.value.code == REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH + assert exc_info.value.detail is not None + assert exc_info.value.detail["actual_origin"] == "keys.brand.example" + assert exc_info.value.detail["expected_origin"] == "attacker.example" + + +@pytest.mark.asyncio +async def test_factory_construction_threads_jwks_uri_into_resolver( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``verify_from_agent_url`` MUST construct the wrapper with + ``jwks_uri=resolution.jwks_uri`` so the verifier's consistency + check has a real host to compare. The bug Argus traced on the + first review pass was exactly this construction site missing the + keyword.""" + seen: dict[str, Any] = {} + + async def fake_resolve(*args, **kwargs): + return _resolved_with_origins({"request_signing": "https://example.com"}) + + async def fake_verify_starlette(request, *, options): # type: ignore[no-untyped-def] + seen["options"] = options + return "ok" + + monkeypatch.setattr(agent_resolver, "async_resolve_agent", fake_resolve) + monkeypatch.setattr("adcp.signing.middleware.verify_starlette_request", fake_verify_starlette) + + await verify_from_agent_url( + _FakeStarletteRequest(), + "https://buyer.example.com/mcp", + agent_type="sales", + operation="get_products", + ) + resolver = seen["options"].jwks_resolver + # Production resolver MUST carry the URI as an instance attribute, + # not just the source-marker class attribute. + assert resolver.jwks_uri == "https://example.com/.well-known/jwks.json"