Skip to content

Commit b4844c2

Browse files
committed
refactor(adagents): address review follow-ups on #752
- Dedupe collected entries by publisher_domain before fan-out so a hostile directory can't amplify 1 publisher into a 20-concurrent burst against one host. (Medium-severity security finding.) - Pagination loop guards: detect repeated next_cursor and cap at MAX_DIRECTORY_PAGES=1000 pages; raise AdagentsValidationError on either violation. - Emit a one-shot logging warning when the entire sample comes back without property_ids[] -- adopters notice they're running in count-only mode rather than full set-diff. - Document the falsy property_id filter behavior in a code comment. - Cite the spec wire-format precedent for include= repeated-key form. Bot review: #752
1 parent 1cdbfc6 commit b4844c2

2 files changed

Lines changed: 156 additions & 0 deletions

File tree

src/adcp/adagents.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import asyncio
1212
import ipaddress
1313
import json
14+
import logging
1415
import re
1516
import socket
1617
from dataclasses import dataclass, field
@@ -25,6 +26,8 @@
2526
from adcp.types.base import AdCPBaseModel
2627
from adcp.validation import ValidationError, validate_adagents
2728

29+
logger = logging.getLogger(__name__)
30+
2831
DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"]
2932

3033

@@ -1836,6 +1839,11 @@ class AgentAuthorizationsDirectoryResult(AdCPBaseModel):
18361839
# page is a small envelope; pagination handles bulk responses.
18371840
MAX_DIRECTORY_PAGE_BYTES = 5 * 1024 * 1024
18381841

1842+
# Hard cap on directory pagination iterations. A misbehaving directory that
1843+
# never returns an empty next_cursor would otherwise hang the sweep
1844+
# indefinitely; this cap fail-closes the loop.
1845+
MAX_DIRECTORY_PAGES = 1000
1846+
18391847

18401848
async def fetch_agent_authorizations_from_directory(
18411849
agent_url: str,
@@ -1914,6 +1922,9 @@ async def fetch_agent_authorizations_from_directory(
19141922
if since is not None:
19151923
query_pairs.append(("since", since))
19161924
if include:
1925+
# Repeated-key form per docs/aao/directory-api.mdx (style: form,
1926+
# explode: true). Comma-joined NOT accepted by spec-conformant
1927+
# directories.
19171928
for value in include:
19181929
query_pairs.append(("include", value))
19191930
if query_pairs:
@@ -2054,6 +2065,8 @@ async def detect_publisher_properties_divergence(
20542065
try:
20552066
collected: list[DirectoryPublisherEntry] = []
20562067
cursor: str | None = None
2068+
seen_cursors: set[str] = set()
2069+
page_count = 0
20572070
while True:
20582071
page = await fetch_agent_authorizations_from_directory(
20592072
agent_url,
@@ -2063,13 +2076,51 @@ async def detect_publisher_properties_divergence(
20632076
timeout=timeout,
20642077
client=http,
20652078
)
2079+
page_count += 1
20662080
collected.extend(page.publishers)
20672081
if sample_size is not None and len(collected) >= sample_size:
20682082
collected = collected[:sample_size]
20692083
break
20702084
cursor = page.next_cursor
20712085
if not cursor:
20722086
break
2087+
if cursor in seen_cursors:
2088+
raise AdagentsValidationError(
2089+
f"Directory page cursor {cursor!r} repeated — refusing to loop forever."
2090+
)
2091+
seen_cursors.add(cursor)
2092+
if page_count >= MAX_DIRECTORY_PAGES:
2093+
raise AdagentsValidationError(
2094+
f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep."
2095+
)
2096+
2097+
# Dedupe by publisher_domain before fan-out: a hostile directory
2098+
# returning N rows for the same publisher would otherwise amplify
2099+
# into N concurrent fetches against a single victim host. First
2100+
# occurrence wins (deterministic) — conflicting property_ids /
2101+
# properties_authorized across duplicates are dropped here; the
2102+
# directory's behavior is itself a divergence signal for ops.
2103+
seen_domains: set[str] = set()
2104+
deduped: list[DirectoryPublisherEntry] = []
2105+
for entry in collected:
2106+
if entry.publisher_domain in seen_domains:
2107+
continue
2108+
seen_domains.add(entry.publisher_domain)
2109+
deduped.append(entry)
2110+
collected = deduped
2111+
2112+
# Emit a one-shot warning when the entire sample comes back without
2113+
# property_ids[]. In count-only mode, same-count substitutions are
2114+
# undetectable — adopters should pin include=["properties"] support
2115+
# on directories that offer it.
2116+
if collected and all(e.property_ids is None for e in collected):
2117+
logger.warning(
2118+
"AAO directory %s did not return property_ids[] on any publisher "
2119+
"entry — falling back to count-only divergence detection. Same-count "
2120+
"substitutions are undetectable in this mode. Upgrade the directory "
2121+
"or pin include=['properties'] support.",
2122+
directory_url,
2123+
)
20732124

20742125
sem = asyncio.Semaphore(max_concurrency)
20752126

@@ -2080,6 +2131,11 @@ async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None:
20802131
entry.publisher_domain, timeout=timeout, client=http
20812132
)
20822133
federated_props = get_properties_by_agent(data, agent_url)
2134+
# Falsy/empty property_id is silently dropped: upstream
2135+
# schema requires a non-empty string, so an empty value
2136+
# is a structural violation that belongs in
2137+
# validate_adagents, not a divergence signal. Federated
2138+
# properties with valid IDs only.
20832139
federated_ids = {
20842140
str(p.get("property_id")) for p in federated_props if p.get("property_id")
20852141
}

tests/test_adagents.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4106,3 +4106,103 @@ async def releaser():
41064106

41074107
assert peak <= 4
41084108
assert peak >= 1 # sanity: probes actually ran
4109+
4110+
async def test_divergence_dedupes_collected_by_publisher_domain(self, monkeypatch):
4111+
"""Hostile directory: 5 rows all publisher_domain=victim → 1 fetch."""
4112+
from adcp import adagents as adagents_mod
4113+
from adcp.adagents import detect_publisher_properties_divergence
4114+
4115+
# 5 entries all pointing at the same victim host. A naive
4116+
# implementation would fan out 5 concurrent fetches against
4117+
# victim.example; the dedupe path must collapse to a single probe.
4118+
publishers = [
4119+
self._entry("victim.example", properties_authorized=1, property_ids=["p1"])
4120+
for _ in range(5)
4121+
]
4122+
handler = self._directory_handler(publishers)
4123+
4124+
call_count = 0
4125+
4126+
async def fake_fetch_adagents(domain, timeout=10.0, client=None):
4127+
nonlocal call_count
4128+
call_count += 1
4129+
return {}
4130+
4131+
def fake_get_properties_by_agent(data, agent_url):
4132+
return [{"property_id": "p1"}]
4133+
4134+
monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents)
4135+
monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent)
4136+
4137+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4138+
await detect_publisher_properties_divergence(
4139+
"https://agent.example.com/",
4140+
directory_url="https://aao.example.com",
4141+
client=client,
4142+
)
4143+
4144+
assert call_count == 1
4145+
4146+
async def test_divergence_aborts_on_repeated_cursor(self, monkeypatch):
4147+
"""Misbehaving directory returns the same next_cursor forever → raise."""
4148+
from adcp.adagents import detect_publisher_properties_divergence
4149+
4150+
# Each response includes a next_cursor that never advances. The
4151+
# page-walk loop must detect the repeat and raise rather than
4152+
# spin until OOM.
4153+
def handler(request: httpx.Request) -> httpx.Response:
4154+
return httpx.Response(
4155+
200,
4156+
json={
4157+
"agent_url": "https://agent.example.com/",
4158+
"directory_indexed_at": "2026-05-20T12:00:00Z",
4159+
"publishers": [],
4160+
"next_cursor": "stuck",
4161+
},
4162+
)
4163+
4164+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4165+
with pytest.raises(AdagentsValidationError, match="cursor 'stuck' repeated"):
4166+
await detect_publisher_properties_divergence(
4167+
"https://agent.example.com/",
4168+
directory_url="https://aao.example.com",
4169+
sample_size=None, # full sweep, so we actually walk pages
4170+
client=client,
4171+
)
4172+
4173+
async def test_divergence_warns_on_count_only_mode(self, monkeypatch, caplog):
4174+
"""No entry has property_ids → one-shot warning fires."""
4175+
import logging as _logging
4176+
4177+
from adcp import adagents as adagents_mod
4178+
from adcp.adagents import detect_publisher_properties_divergence
4179+
4180+
# No property_ids on any row → directory is in count-only mode.
4181+
publishers = [
4182+
self._entry("a.example", properties_authorized=1),
4183+
self._entry("b.example", properties_authorized=2),
4184+
]
4185+
handler = self._directory_handler(publishers)
4186+
4187+
async def fake_fetch_adagents(domain, timeout=10.0, client=None):
4188+
return {}
4189+
4190+
def fake_get_properties_by_agent(data, agent_url):
4191+
return [{"property_id": "p1"}]
4192+
4193+
monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents)
4194+
monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent)
4195+
4196+
with caplog.at_level(_logging.WARNING, logger="adcp.adagents"):
4197+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4198+
await detect_publisher_properties_divergence(
4199+
"https://agent.example.com/",
4200+
directory_url="https://aao.example.com",
4201+
client=client,
4202+
)
4203+
4204+
count_only_warnings = [
4205+
r for r in caplog.records if "count-only divergence detection" in r.getMessage()
4206+
]
4207+
assert len(count_only_warnings) == 1
4208+
assert "https://aao.example.com" in count_only_warnings[0].getMessage()

0 commit comments

Comments
 (0)