Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ dependencies = [
# time, and pinned snapshot semantics are what the spec requires for
# cross-implementation conformance.
"tldextract>=5.1.0",
# IDNA-2008 (UTS#46) host canonicalization — used by
# ``adcp.signing`` for JWKS URI host pinning, IP-pinned transport,
# revocation issuer canonicalization, and the key_origins
# consistency check. The stdlib ``encodings.idna`` module is
# IDNA-2003, which maps Eszett (``ß``) to ``ss`` and collapses
# final-sigma — both reject paths for IDNA-2008 (UTS#46 transitional
# off). A counterparty hosting ``straße.de`` would canonicalize
# under IDNA-2003 to ``strasse.de`` and silently match a
# different registrable domain than the spec mandates.
# ``idna.encode(host, uts46=True)`` is the canonical Python
# IDNA-2008 entry point. Floor at 3.7 because idna 3.0–3.6 have
# CVE-2024-3651 — a quadratic-complexity DoS in ``idna.encode()``,
# which this package calls on attacker-influenceable hosts (JWKS
# URI authority, redirect targets, revocation-issuer canonicalization,
# key_origins consistency check). Upper bound at <4 because IDNA
# semantics across a major are not contracted.
"idna>=3.7,<4",
]

[project.scripts]
Expand Down
94 changes: 91 additions & 3 deletions src/adcp/signing/agent_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from collections.abc import Callable
from contextlib import AbstractAsyncContextManager
from dataclasses import dataclass
from typing import Any, Literal
from typing import Any, ClassVar, Literal

import httpx
from pydantic import BaseModel, ConfigDict, Field
Expand All @@ -57,6 +57,7 @@
from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport
from adcp.signing.jwks import (
SSRFValidationError,
StaticJwksResolver,
async_default_jwks_fetcher,
)

Expand Down Expand Up @@ -149,6 +150,20 @@ class AgentResolution(BaseModel):
description="Full JWK set fetched from ``jwks_uri`` (RFC 7517 ``{keys: [...]}``)"
)
fetched_at: float = Field(description="Resolution wall-clock time (Unix epoch seconds)")
key_origins: dict[str, str] | None = Field(
default=None,
description=(
"Verbatim ``identity.key_origins`` map from the capabilities "
"response — purpose → declared origin (e.g. "
"``{'request_signing': 'https://keys.brand.com'}``). The "
"verifier consults this to enforce the spec's key-origin "
"consistency check (resolved jwks_uri host MUST equal the "
"declared origin for the purpose under verification). "
"``None`` when the operator advertises no key_origins map; "
"the verifier raises ``request_signature_key_origin_missing`` "
"for any signed-traffic purpose without a corresponding entry."
),
)
trace: list[TraceEntry] = Field(default_factory=list)


Expand Down Expand Up @@ -314,6 +329,34 @@ def _extract_brand_json_url(capabilities: dict[str, Any]) -> str:
return brand_json_url


def _extract_key_origins(capabilities: dict[str, Any]) -> dict[str, str] | None:
"""Pluck ``identity.key_origins`` from the capabilities body.

Returns ``None`` when the operator advertises no key_origins map (a
common posture for unsigned-traffic-only deployments — the verifier
layer treats absence as "no per-purpose origin pin to check" and
only raises ``request_signature_key_origin_missing`` when a signed
purpose is actually exercised). Filters values to strings — a
malformed entry is skipped rather than poisoning the whole map.

Forward-compat with operators on 3.0 schemas: the map travels under
``additionalProperties: true`` and the SDK reads it as a plain dict
rather than via the typed Pydantic surface (which won't carry the
field until 3.1 lands).
"""
identity = capabilities.get("identity")
if not isinstance(identity, dict):
return None
raw = identity.get("key_origins")
if not isinstance(raw, dict) or not raw:
return None
out: dict[str, str] = {}
for purpose, origin in raw.items():
if isinstance(purpose, str) and isinstance(origin, str) and origin:
out[purpose] = origin
return out or None


# ---- Public API ----


Expand Down Expand Up @@ -387,6 +430,7 @@ async def async_resolve_agent(
raise

brand_json_url = _extract_brand_json_url(capabilities.body)
key_origins = _extract_key_origins(capabilities.body)

# --- Hop 2: brand.json ---
bj_start = time.monotonic()
Expand Down Expand Up @@ -480,6 +524,7 @@ async def async_resolve_agent(
jwks_uri=jwks_uri,
jwks=jwks,
fetched_at=fetched_at,
key_origins=key_origins,
trace=trace,
)

Expand Down Expand Up @@ -526,6 +571,8 @@ async def verify_from_agent_url(
revocation_checker: Any = None,
revocation_list: Any = None,
allow_private_destinations: bool = False,
signing_purpose: str = "request_signing",
posture: str | None = None,
) -> Any:
"""Single-call factory: resolve ``agent_url`` and verify the
request signature against the resolved JWKS.
Expand Down Expand Up @@ -575,7 +622,6 @@ async def verify_from_agent_url(
REQUEST_SIGNATURE_JWKS_UNTRUSTED,
SignatureVerificationError,
)
from adcp.signing.jwks import StaticJwksResolver
from adcp.signing.middleware import verify_starlette_request
from adcp.signing.verifier import VerifierCapability, VerifyOptions

Expand Down Expand Up @@ -605,19 +651,61 @@ async def verify_from_agent_url(
message=f"agent-url resolution failed ({exc.code}): {exc.message}",
) from exc

# Mark the resolver with ``jwks_source = "brand_json"`` so the
# verifier's ``_maybe_check_key_origin`` step engages the spec's
# key-origin consistency check (the JWKS WAS sourced via the brand.json
# walk in ``async_resolve_agent``; the check applies). Without this
# marker the verifier would treat a bare ``StaticJwksResolver`` as a
# publisher-pin-equivalent and skip the check — defeating the
# production helper's defense against the shared-tenancy spoof.
options = VerifyOptions(
now=now if now is not None else _time.time(),
capability=capability if capability is not None else VerifierCapability(supported=True),
operation=operation,
jwks_resolver=StaticJwksResolver(resolution.jwks),
jwks_resolver=_BrandJsonStaticJwksResolver(resolution.jwks, jwks_uri=resolution.jwks_uri),
replay_store=replay_store,
revocation_checker=revocation_checker,
revocation_list=revocation_list,
agent_url=resolution.agent_entry.get("url"),
expected_key_origins=resolution.key_origins,
signing_purpose=signing_purpose,
posture=posture,
)
return await verify_starlette_request(request, options=options)


class _BrandJsonStaticJwksResolver(StaticJwksResolver):
"""A :class:`StaticJwksResolver` carrying the ``"brand_json"``
source discriminant AND the resolved ``jwks_uri``.

The brand.json walk in :func:`async_resolve_agent` resolved this
JWKS — that's exactly the source the spec's key-origin consistency
check (ADCP #3690 step 7) defends. The verifier's
``_maybe_check_key_origin`` step skips when ``jwks_source`` is
absent (treating absence as publisher-pin-equivalent); marking the
static resolver here engages the check on every signed request
routed through :func:`verify_from_agent_url`.

The verifier reads ``getattr(resolver, "jwks_uri", None)`` to look
up the resolved host for the consistency comparison.
:class:`StaticJwksResolver` does not carry a ``jwks_uri`` (it's a
static keyset), so this subclass stores the brand.json-resolved
URI on the instance. Without it the check would mismatch every
legitimate signer with ``actual_origin=""``.

Defined inside the module rather than as a public type because the
discriminant is internal — adopters wiring custom resolvers set
their own ``jwks_source = "brand_json"`` class attribute and
``jwks_uri`` instance attribute directly.
"""

jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json"

def __init__(self, jwks: dict[str, Any], *, jwks_uri: str) -> None:
super().__init__(jwks)
self.jwks_uri = jwks_uri


# ---- helpers ----


Expand Down
19 changes: 18 additions & 1 deletion src/adcp/signing/brand_jwks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from collections.abc import Callable
from contextlib import AbstractAsyncContextManager
from dataclasses import dataclass
from typing import Any, Literal
from typing import Any, ClassVar, Literal
from urllib.parse import urlsplit, urlunsplit

import httpx
Expand Down Expand Up @@ -330,8 +330,25 @@ class BrandJsonJwksResolver:
cascade: first ask the inner :class:`AsyncCachingJwksResolver`
(which will refetch its own URL if cooldown has elapsed); if
still unknown, refresh brand.json in case ``jwks_uri`` rotated.

The :attr:`jwks_source` class attribute is the discriminant the
request-signing verifier consults to decide whether
:func:`adcp.signing.check_key_origin_consistency` applies for
this resolver. Per ADCP #3690 §step 7, the
``identity.key_origins`` consistency check is mandatory only when
the JWKS source for the (agent, purpose, role) tuple was the
operator brand.json — and skipped for publisher-pinned tuples
(where the JWKS origin is the publisher's domain by design).
A resolver that always sources via brand.json declares
``jwks_source = "brand_json"`` so the verifier engages the check;
a publisher-pin resolver either omits the attribute or declares
``"publisher_pin"`` so the verifier skips it.
"""

#: Discriminant for the verifier-side key_origin consistency
#: check (see class docstring).
jwks_source: ClassVar[Literal["brand_json", "publisher_pin"]] = "brand_json"

def __init__(
self,
brand_json_url: str,
Expand Down
11 changes: 9 additions & 2 deletions src/adcp/signing/ip_pinned_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import httpcore
import httpx
import idna

# Private but documented-as-the-default-backend implementations. The
# underscore prefix is a stability hazard; the contract test in
Expand Down Expand Up @@ -102,13 +103,19 @@ def _normalize_pin_host(host: str) -> str:
Lowercases, strips a single trailing dot, and IDNA-encodes so
Unicode hostnames compare equal to the punycode form httpx
passes to httpcore.

IDNA-2008 (UTS#46) via the PyPI ``idna`` package — the
package-wide canonicalization convention, matching the JWKS
fetcher's ``resolve_and_validate_host`` so a pin set on
``straße.de`` collapses to the same A-label httpx will pass to
httpcore at connect time.
"""
host = host.lower()
if host.endswith("."):
host = host[:-1]
try:
return host.encode("idna").decode("ascii")
except (UnicodeError, UnicodeEncodeError):
return idna.encode(host, uts46=True).decode("ascii")
except (idna.IDNAError, UnicodeError, UnicodeEncodeError):
# Caller already stored the normalized form; fall through
# with the lowercased input so the comparison just fails
# cleanly instead of raising inside connect_tcp.
Expand Down
13 changes: 11 additions & 2 deletions src/adcp/signing/jwks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from urllib.parse import urlsplit

import httpx
import idna

from adcp.signing.errors import (
REQUEST_SIGNATURE_JWKS_UNAVAILABLE,
Expand Down Expand Up @@ -197,9 +198,17 @@ def resolve_and_validate_host(
# raw Unicode; httpx encodes it. A mismatch here breaks the
# hostname-match in the backend override and silently reopens
# the TOCTOU for IDN hosts.
#
# IDNA-2008 (UTS#46, transitional_processing=False) via the PyPI
# ``idna`` package — stdlib ``encodings.idna`` is IDNA-2003 and
# mismaps Eszett (``ß`` → ``ss``) and final-sigma. The
# package-wide IDNA convention is IDNA-2008; all four callsites
# (here, ``ip_pinned_transport``, ``revocation_fetcher``,
# ``key_origins``) share this encoding so canonicalization
# results compare byte-equal across the verifier pipeline.
try:
host = host.encode("idna").decode("ascii").lower()
except (UnicodeError, UnicodeEncodeError) as exc:
host = idna.encode(host, uts46=True).decode("ascii").lower()
except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc:
raise SSRFValidationError(f"URI host {host!r} is not IDNA-valid: {exc}") from exc
port = parts.port if parts.port is not None else (443 if parts.scheme == "https" else 80)
if allowed_ports is not None and port not in allowed_ports:
Expand Down
24 changes: 13 additions & 11 deletions src/adcp/signing/key_origins.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
from typing import Literal
from urllib.parse import urlsplit

import idna

from adcp.signing.errors import (
REQUEST_SIGNATURE_KEY_ORIGIN_MISMATCH,
REQUEST_SIGNATURE_KEY_ORIGIN_MISSING,
Expand Down Expand Up @@ -160,15 +162,15 @@ def _origin_host(value: str) -> str | None:
"""Return the host portion of a URL or bare origin, canonicalized
for byte-equality comparison.

Canonicalization mirrors the existing codebase pattern
(``jwks.py:201``, ``ip_pinned_transport.py:110``,
``revocation_fetcher.py:380``): ASCII-lowercase, then
``host.encode("idna").decode("ascii")`` to convert IDN U-labels to
their A-label (Punycode) form. The spec asks for IDNA-2008 strictly
while stdlib ``encodings.idna`` is IDNA-2003; the divergence is
rare in practice and matching the package's existing convention
keeps the canonicalization story coherent. A future IDNA-2008
migration would update all four callsites together.
Canonicalization mirrors the package-wide IDNA-2008 (UTS#46)
convention used by ``jwks.py``, ``ip_pinned_transport.py``, and
``revocation_fetcher.py``: ASCII-lowercase, then
``idna.encode(host, uts46=True).decode("ascii")`` to convert IDN
U-labels to their A-label (Punycode) form. IDNA-2008 (vs the
stdlib's IDNA-2003) preserves Eszett (``ß``) and final-sigma per
spec rather than mapping them away, which is the canonicalization
the request-signing spec mandates for cross-implementation
byte-equality.

**Bare-host and URL forms are normalized symmetrically.** A bare
host like ``"keys.brand.com"`` is processed through the same
Expand Down Expand Up @@ -198,8 +200,8 @@ def _origin_host(value: str) -> str | None:
if not host:
return None
try:
return host.encode("idna").decode("ascii").lower()
except (UnicodeError, UnicodeEncodeError):
return idna.encode(host, uts46=True).decode("ascii").lower()
except (idna.IDNAError, UnicodeError, UnicodeEncodeError):
return None


Expand Down
12 changes: 8 additions & 4 deletions src/adcp/signing/revocation_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from typing import Any, Protocol

import httpx
import idna

from adcp.signing.jwks import (
DEFAULT_JWKS_TIMEOUT_SECONDS,
Expand Down Expand Up @@ -377,11 +378,14 @@ def _normalize_issuer(issuer: str) -> str:
raise ValueError(f"issuer has no host: {issuer!r}")

# IDNA-encode the host to collapse unicode homoglyphs to ASCII
# punycode. ``host.encode("idna")`` raises on characters outside the
# IDNA allowlist — which is the failure mode we want.
# punycode. ``idna.encode(host, uts46=True)`` raises on characters
# outside the IDNA-2008 allowlist — which is the failure mode we
# want. IDNA-2008 (UTS#46) matches the package-wide convention so
# an issuer canonicalized here compares byte-equal to the
# ``jwks_uri`` host the verifier pins via ``resolve_and_validate_host``.
try:
host_ascii = parts.hostname.encode("idna").decode("ascii").lower()
except (UnicodeError, UnicodeEncodeError) as exc:
host_ascii = idna.encode(parts.hostname, uts46=True).decode("ascii").lower()
except (idna.IDNAError, UnicodeError, UnicodeEncodeError) as exc:
raise ValueError(f"issuer host {parts.hostname!r} is not IDNA-valid: {exc}") from exc

netloc = f"{host_ascii}:{parts.port}" if parts.port else host_ascii
Expand Down
Loading
Loading