Skip to content

Commit a795477

Browse files
committed
refactor(adagents): address review follow-ups on #752
- Dedupe collected entries by publisher_domain before fan-out so a hostile directory can't amplify 1 publisher into a 20-concurrent burst against one host. (Medium-severity security finding.) - Pagination loop guards: detect repeated next_cursor and cap at MAX_DIRECTORY_PAGES=1000 pages; raise AdagentsValidationError on either violation. - Emit a one-shot logging warning when the entire sample comes back without property_ids[] -- adopters notice they're running in count-only mode rather than full set-diff. - Document the falsy property_id filter behavior in a code comment. - Cite the spec wire-format precedent for include= repeated-key form. Bot review: #752
1 parent 2a55dca commit a795477

2 files changed

Lines changed: 156 additions & 0 deletions

File tree

src/adcp/adagents.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import asyncio
1212
import ipaddress
1313
import json
14+
import logging
1415
import re
1516
import socket
1617
from dataclasses import dataclass, field
@@ -25,6 +26,8 @@
2526
from adcp.types.base import AdCPBaseModel
2627
from adcp.validation import ValidationError, validate_adagents
2728

29+
logger = logging.getLogger(__name__)
30+
2831
DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"]
2932

3033

@@ -1991,6 +1994,11 @@ class AgentAuthorizationsDirectoryResult(AdCPBaseModel):
19911994
# page is a small envelope; pagination handles bulk responses.
19921995
MAX_DIRECTORY_PAGE_BYTES = 5 * 1024 * 1024
19931996

1997+
# Hard cap on directory pagination iterations. A misbehaving directory that
1998+
# never returns an empty next_cursor would otherwise hang the sweep
1999+
# indefinitely; this cap fail-closes the loop.
2000+
MAX_DIRECTORY_PAGES = 1000
2001+
19942002

19952003
async def fetch_agent_authorizations_from_directory(
19962004
agent_url: str,
@@ -2069,6 +2077,9 @@ async def fetch_agent_authorizations_from_directory(
20692077
if since is not None:
20702078
query_pairs.append(("since", since))
20712079
if include:
2080+
# Repeated-key form per docs/aao/directory-api.mdx (style: form,
2081+
# explode: true). Comma-joined NOT accepted by spec-conformant
2082+
# directories.
20722083
for value in include:
20732084
query_pairs.append(("include", value))
20742085
if query_pairs:
@@ -2209,6 +2220,8 @@ async def detect_publisher_properties_divergence(
22092220
try:
22102221
collected: list[DirectoryPublisherEntry] = []
22112222
cursor: str | None = None
2223+
seen_cursors: set[str] = set()
2224+
page_count = 0
22122225
while True:
22132226
page = await fetch_agent_authorizations_from_directory(
22142227
agent_url,
@@ -2218,13 +2231,51 @@ async def detect_publisher_properties_divergence(
22182231
timeout=timeout,
22192232
client=http,
22202233
)
2234+
page_count += 1
22212235
collected.extend(page.publishers)
22222236
if sample_size is not None and len(collected) >= sample_size:
22232237
collected = collected[:sample_size]
22242238
break
22252239
cursor = page.next_cursor
22262240
if not cursor:
22272241
break
2242+
if cursor in seen_cursors:
2243+
raise AdagentsValidationError(
2244+
f"Directory page cursor {cursor!r} repeated — refusing to loop forever."
2245+
)
2246+
seen_cursors.add(cursor)
2247+
if page_count >= MAX_DIRECTORY_PAGES:
2248+
raise AdagentsValidationError(
2249+
f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep."
2250+
)
2251+
2252+
# Dedupe by publisher_domain before fan-out: a hostile directory
2253+
# returning N rows for the same publisher would otherwise amplify
2254+
# into N concurrent fetches against a single victim host. First
2255+
# occurrence wins (deterministic) — conflicting property_ids /
2256+
# properties_authorized across duplicates are dropped here; the
2257+
# directory's behavior is itself a divergence signal for ops.
2258+
seen_domains: set[str] = set()
2259+
deduped: list[DirectoryPublisherEntry] = []
2260+
for entry in collected:
2261+
if entry.publisher_domain in seen_domains:
2262+
continue
2263+
seen_domains.add(entry.publisher_domain)
2264+
deduped.append(entry)
2265+
collected = deduped
2266+
2267+
# Emit a one-shot warning when the entire sample comes back without
2268+
# property_ids[]. In count-only mode, same-count substitutions are
2269+
# undetectable — adopters should pin include=["properties"] support
2270+
# on directories that offer it.
2271+
if collected and all(e.property_ids is None for e in collected):
2272+
logger.warning(
2273+
"AAO directory %s did not return property_ids[] on any publisher "
2274+
"entry — falling back to count-only divergence detection. Same-count "
2275+
"substitutions are undetectable in this mode. Upgrade the directory "
2276+
"or pin include=['properties'] support.",
2277+
directory_url,
2278+
)
22282279

22292280
sem = asyncio.Semaphore(max_concurrency)
22302281

@@ -2235,6 +2286,11 @@ async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None:
22352286
entry.publisher_domain, timeout=timeout, client=http
22362287
)
22372288
federated_props = get_properties_by_agent(data, agent_url)
2289+
# Falsy/empty property_id is silently dropped: upstream
2290+
# schema requires a non-empty string, so an empty value
2291+
# is a structural violation that belongs in
2292+
# validate_adagents, not a divergence signal. Federated
2293+
# properties with valid IDs only.
22382294
federated_ids = {
22392295
str(p.get("property_id")) for p in federated_props if p.get("property_id")
22402296
}

tests/test_adagents.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4571,3 +4571,103 @@ async def releaser():
45714571

45724572
assert peak <= 4
45734573
assert peak >= 1 # sanity: probes actually ran
4574+
4575+
async def test_divergence_dedupes_collected_by_publisher_domain(self, monkeypatch):
4576+
"""Hostile directory: 5 rows all publisher_domain=victim → 1 fetch."""
4577+
from adcp import adagents as adagents_mod
4578+
from adcp.adagents import detect_publisher_properties_divergence
4579+
4580+
# 5 entries all pointing at the same victim host. A naive
4581+
# implementation would fan out 5 concurrent fetches against
4582+
# victim.example; the dedupe path must collapse to a single probe.
4583+
publishers = [
4584+
self._entry("victim.example", properties_authorized=1, property_ids=["p1"])
4585+
for _ in range(5)
4586+
]
4587+
handler = self._directory_handler(publishers)
4588+
4589+
call_count = 0
4590+
4591+
async def fake_fetch_adagents(domain, timeout=10.0, client=None):
4592+
nonlocal call_count
4593+
call_count += 1
4594+
return {}
4595+
4596+
def fake_get_properties_by_agent(data, agent_url):
4597+
return [{"property_id": "p1"}]
4598+
4599+
monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents)
4600+
monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent)
4601+
4602+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4603+
await detect_publisher_properties_divergence(
4604+
"https://agent.example.com/",
4605+
directory_url="https://aao.example.com",
4606+
client=client,
4607+
)
4608+
4609+
assert call_count == 1
4610+
4611+
async def test_divergence_aborts_on_repeated_cursor(self, monkeypatch):
4612+
"""Misbehaving directory returns the same next_cursor forever → raise."""
4613+
from adcp.adagents import detect_publisher_properties_divergence
4614+
4615+
# Each response includes a next_cursor that never advances. The
4616+
# page-walk loop must detect the repeat and raise rather than
4617+
# spin until OOM.
4618+
def handler(request: httpx.Request) -> httpx.Response:
4619+
return httpx.Response(
4620+
200,
4621+
json={
4622+
"agent_url": "https://agent.example.com/",
4623+
"directory_indexed_at": "2026-05-20T12:00:00Z",
4624+
"publishers": [],
4625+
"next_cursor": "stuck",
4626+
},
4627+
)
4628+
4629+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4630+
with pytest.raises(AdagentsValidationError, match="cursor 'stuck' repeated"):
4631+
await detect_publisher_properties_divergence(
4632+
"https://agent.example.com/",
4633+
directory_url="https://aao.example.com",
4634+
sample_size=None, # full sweep, so we actually walk pages
4635+
client=client,
4636+
)
4637+
4638+
async def test_divergence_warns_on_count_only_mode(self, monkeypatch, caplog):
4639+
"""No entry has property_ids → one-shot warning fires."""
4640+
import logging as _logging
4641+
4642+
from adcp import adagents as adagents_mod
4643+
from adcp.adagents import detect_publisher_properties_divergence
4644+
4645+
# No property_ids on any row → directory is in count-only mode.
4646+
publishers = [
4647+
self._entry("a.example", properties_authorized=1),
4648+
self._entry("b.example", properties_authorized=2),
4649+
]
4650+
handler = self._directory_handler(publishers)
4651+
4652+
async def fake_fetch_adagents(domain, timeout=10.0, client=None):
4653+
return {}
4654+
4655+
def fake_get_properties_by_agent(data, agent_url):
4656+
return [{"property_id": "p1"}]
4657+
4658+
monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents)
4659+
monkeypatch.setattr(adagents_mod, "get_properties_by_agent", fake_get_properties_by_agent)
4660+
4661+
with caplog.at_level(_logging.WARNING, logger="adcp.adagents"):
4662+
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
4663+
await detect_publisher_properties_divergence(
4664+
"https://agent.example.com/",
4665+
directory_url="https://aao.example.com",
4666+
client=client,
4667+
)
4668+
4669+
count_only_warnings = [
4670+
r for r in caplog.records if "count-only divergence detection" in r.getMessage()
4671+
]
4672+
assert len(count_only_warnings) == 1
4673+
assert "https://aao.example.com" in count_only_warnings[0].getMessage()

0 commit comments

Comments
 (0)