Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .changeset/manager-revalidation-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
---

feat(crawler): queue-backed re-validation when a manager rotates adagents.json

Closes #4200 item 2. When a manager domain (e.g. Raptive) updates its
`/.well-known/adagents.json`, every publisher delegating via ads.txt
`MANAGERDOMAIN` needs to be re-validated so its `authorized_agents` view
stays in sync. Inline fan-out from the cache write would saturate
crawler concurrency at managed-network scale; this PR adds a persistent
queue and a bounded worker tick.

- Migration 471: `manager_revalidation_queue` table mirroring the shape
of `catalog_crawl_queue` (idempotent insert, `next_attempt_after` for
backoff, partial index on `(next_attempt_after, enqueued_at)`).
- `cacheAdagentsManifest` reads the previously-cached body before the
upsert and compares the contributory subset (`authorized_agents`,
`properties`) via stable canonicalization. Only actual content drift
triggers the fan-out; `$schema` / `last_updated` noise is ignored.
- New crawler tick `processManagerRevalidationQueue` drains up to 50
rows per 5-minute interval at concurrency 10, calling
`crawlSingleDomain` for each. Success deletes the row; failure
advances exponential backoff (1h / 6h / 1d / 3d) and stores the last
error truncated to 500 chars.
- The reverse-index lookup uses the partial index on
`publishers.manager_domain` added in #4204, so a Raptive-scale
rotation enumerates 6K delegating publishers via an index-only scan.

Tests: integration coverage for queue idempotency, due-row filtering,
oldest-first ordering, success deletion, and exponential backoff. Unit
coverage for the change-detection helper.
150 changes: 150 additions & 0 deletions server/src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,36 @@ import { insertTypeReclassification } from "./db/type-reclassification-log-db.js

const log = createLogger('crawler');

/**
* Compare a freshly-fetched adagents.json against the previously-cached
* body for the same domain. Returns true when the contributory fields
* differ — `authorized_agents` and `properties` — so manager fan-out
* is gated on actual change rather than firing on every routine
* 60-minute crawl. Top-level keys outside that subset (`$schema`,
* `last_updated`, comments) are intentionally ignored. Arrays compare
* positionally; nested object keys are sorted so two semantically
* identical manifests with different key insertion order match.
*/
export function manifestContentChanged(
previous: AdagentsManifest | null,
next: AdagentsManifest,
): boolean {
if (!previous) return true;
const subset = (m: AdagentsManifest) => ({
authorized_agents: Array.isArray(m.authorized_agents) ? m.authorized_agents : [],
properties: Array.isArray(m.properties) ? m.properties : [],
});
return stableStringify(subset(previous)) !== stableStringify(subset(next));
}

function stableStringify(value: unknown): string {
if (value === null || typeof value !== 'object') return JSON.stringify(value) ?? 'null';
if (Array.isArray(value)) return '[' + value.map(stableStringify).join(',') + ']';
const obj = value as Record<string, unknown>;
const keys = Object.keys(obj).sort();
return '{' + keys.map(k => JSON.stringify(k) + ':' + stableStringify(obj[k])).join(',') + '}';
}

export class CrawlerService {
private crawler: PropertyCrawler;
private crawling: boolean = false;
Expand Down Expand Up @@ -820,6 +850,13 @@ export class CrawlerService {
meta?: { statusCode?: number; responseBytes?: number; resolvedUrl?: string; discoveryMethod?: string; managerDomain?: string },
): Promise<void> {
try {
// Read the existing cached body before the upsert so we can
// compute whether content actually changed. Used to gate
// manager → publishers fan-out: re-validation only fans out when
// the manager's authorized_agents or properties shape moved, not
// on every routine 60-minute crawl.
const previous = await this.publisherDb.getCachedAdagentsJson(domain);

await this.publisherDb.upsertAdagentsCache({
domain,
manifest,
Expand All @@ -829,6 +866,32 @@ export class CrawlerService {
discoveryMethod: meta?.discoveryMethod,
managerDomain: meta?.managerDomain,
});

// Manager fan-out: when the just-written manifest belongs to a
// domain that other publishers delegate to via ads.txt
// MANAGERDOMAIN, queue those publishers for re-validation. The
// worker (processManagerRevalidationQueue) drains the queue at a
// bounded rate so a Raptive-scale rotation doesn't saturate
// crawler concurrency. Intentionally outside upsertAdagentsCache's
// transaction: if the enqueue fails the cache write has already
// committed, but the next routine 60-min crawl re-detects drift
// and re-enqueues, so silent fan-out loss self-heals.
if (manifestContentChanged(previous, manifest)) {
try {
const enqueued = await this.publisherDb.enqueueManagerRevalidation(domain);
if (enqueued > 0) {
log.info(
{ managerDomain: domain, enqueued },
'Manager adagents.json changed; enqueued delegating publishers for re-validation',
);
}
} catch (err) {
log.warn(
{ domain, err: err instanceof Error ? err.message : err },
'Failed to enqueue manager revalidation fan-out',
);
}
}
} catch (err) {
log.warn({ domain, err: err instanceof Error ? err.message : err }, 'Publisher cache write failed');
}
Expand Down Expand Up @@ -1281,6 +1344,93 @@ export class CrawlerService {
}
}

// ── Manager Re-validation Queue ───────────────────────────────
//
// When a manager domain rotates its adagents.json, every publisher
// delegating via ads.txt MANAGERDOMAIN needs to be re-validated so
// their authorized_agents view stays in sync. Inline fan-out from
// cacheAdagentsManifest() would saturate crawler concurrency at
// managed-network scale (Raptive ≈ 6K publishers), so we persist
// the work in manager_revalidation_queue (migration 471) and drain
// it at a bounded rate per tick here.

private managerRevalidationIntervalId: NodeJS.Timeout | null = null;
private managerRevalidationProcessing = false;

startPeriodicManagerRevalidation(intervalMinutes: number = 5) {
this.managerRevalidationIntervalId = setInterval(() => {
this.processManagerRevalidationQueue().catch((err) => {
log.error({ err }, 'Manager revalidation tick failed');
});
}, intervalMinutes * 60 * 1000);

log.info({ intervalMinutes }, 'Periodic manager revalidation queue started');
}

async processManagerRevalidationQueue(): Promise<{ processed: number; succeeded: number; failed: number }> {
if (this.managerRevalidationProcessing) {
log.debug('Manager revalidation already in progress, skipping tick');
return { processed: 0, succeeded: 0, failed: 0 };
}

this.managerRevalidationProcessing = true;
// Bounded per-tick batch — caps concurrency budget regardless of
// queue depth. At a 5-minute tick and BATCH_SIZE=50, a 6K-publisher
// manager rotation propagates within ~10 hours, comfortably ahead
// of the 60-minute organic re-crawl cadence for any single row.
const BATCH_SIZE = 50;
const CONCURRENCY = 10;

try {
const rows = await this.publisherDb.dequeueRevalidationBatch(BATCH_SIZE);
if (rows.length === 0) {
return { processed: 0, succeeded: 0, failed: 0 };
}

log.info({ count: rows.length }, 'Manager revalidation batch');

const results = await this.processWithConcurrency(
rows,
CONCURRENCY,
async (row) => {
try {
// Full single-domain crawl re-runs adagents validation
// (which will hit the managerdomain fallback again) and
// refreshes the publisher's catalog projection.
await this.crawlSingleDomain(row.publisher_domain);
return { row, ok: true as const };
} catch (err) {
return {
row,
ok: false as const,
error: err instanceof Error ? err.message : String(err),
};
}
},
);

let succeeded = 0;
let failed = 0;
for (const result of results) {
if (result.ok) {
await this.publisherDb.markRevalidationSucceeded(result.row.publisher_domain);
succeeded++;
} else {
await this.publisherDb.markRevalidationFailed(result.row.publisher_domain, result.error);
failed++;
}
}

log.info(
{ processed: rows.length, succeeded, failed },
'Manager revalidation batch complete',
);
return { processed: rows.length, succeeded, failed };
} finally {
this.managerRevalidationProcessing = false;
}
}

private async crawlSingleDomainForCatalog(domain: string): Promise<void> {
const validation = await this.adAgentsManager.validateDomain(domain);
if (!validation.valid || !validation.raw_data?.authorized_agents) {
Expand Down
38 changes: 38 additions & 0 deletions server/src/db/migrations/471_manager_revalidation_queue.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- Queue for fanning out re-validation when a manager domain rotates its
-- adagents.json. Item 2 of #4200, follow-up to #4173 / #4204.
--
-- Why a queue: at managed-network scale a single manager (e.g. Raptive)
-- can have thousands of delegating publishers. Inline fan-out from a
-- crawlSingleDomain() that detects change would saturate the crawler's
-- concurrency budget. Persisting the work and draining it at a bounded
-- rate per tick keeps the crawler stable while still propagating manager
-- updates within a small number of crawl cycles.
--
-- Mirrors the shape of catalog_crawl_queue (migration 367):
-- - identifier as primary key, idempotent insert
-- - next_attempt_after for backoff windowing
-- - worker pulls WHERE next_attempt_after <= NOW() ORDER BY ... LIMIT N

CREATE TABLE manager_revalidation_queue (
publisher_domain TEXT PRIMARY KEY,
manager_domain TEXT NOT NULL,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
next_attempt_after TIMESTAMPTZ NOT NULL DEFAULT NOW(),
attempts INTEGER NOT NULL DEFAULT 0,
last_attempted_at TIMESTAMPTZ,
last_error TEXT
);

-- Worker query: oldest-first pull of due rows. Partial index keeps the
-- scan cheap even when the queue carries lots of rows in deep backoff.
CREATE INDEX idx_manager_revalidation_queue_due
ON manager_revalidation_queue (next_attempt_after, enqueued_at);

-- Per-manager scan: lets ops see how many publishers are still pending
-- for a given manager and supports the optional /api/registry/managers
-- /:domain/recrawl endpoint planned in #4200 item 5.
CREATE INDEX idx_manager_revalidation_queue_manager
ON manager_revalidation_queue (manager_domain);

COMMENT ON TABLE manager_revalidation_queue IS
'Pending re-validations triggered by a manager rotating its adagents.json. Drained by the crawler at a bounded rate per tick. Rows are deleted on successful re-validation; failures advance next_attempt_after with exponential backoff.';
123 changes: 123 additions & 0 deletions server/src/db/publisher-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,129 @@ export class PublisherDatabase {
}
}

/**
* Read the cached adagents.json body for a domain. Used by the crawler
* to decide whether a re-fetch produced an actual content change before
* fanning out manager re-validation. Returns null when the domain has
* never been successfully crawled.
*/
async getCachedAdagentsJson(domain: string): Promise<AdagentsManifest | null> {
const client = await getClient();
try {
const r = await client.query<{ adagents_json: AdagentsManifest | null }>(
`SELECT adagents_json FROM publishers WHERE domain = $1 LIMIT 1`,
[domain.toLowerCase()],
);
return r.rows[0]?.adagents_json ?? null;
} finally {
client.release();
}
}

/**
* Insert one queue row per publisher delegating to this manager via
* ads.txt MANAGERDOMAIN. Idempotent: if a publisher is already queued
* (e.g. previous fan-out hasn't drained yet), the row is reset to
* "due now" with attempts=0 so the upstream manager change supersedes
* any in-flight backoff. Returns the number of rows touched (inserts
* + ON CONFLICT updates) — a delegating publisher with a stale row
* still counts since the supersede is the load-bearing semantic.
*
* The SELECT scans the partial index added in migration 470
* (idx_publishers_manager_domain) so the lookup stays cheap even at
* managed-network scale.
*/
async enqueueManagerRevalidation(managerDomain: string): Promise<number> {
const client = await getClient();
try {
const r = await client.query(
`INSERT INTO manager_revalidation_queue
(publisher_domain, manager_domain, enqueued_at, next_attempt_after, attempts, last_attempted_at, last_error)
SELECT domain, $1, NOW(), NOW(), 0, NULL, NULL
FROM publishers
WHERE manager_domain = $1
ON CONFLICT (publisher_domain) DO UPDATE SET
manager_domain = EXCLUDED.manager_domain,
enqueued_at = NOW(),
next_attempt_after = NOW(),
attempts = 0,
last_attempted_at = NULL,
last_error = NULL`,
[managerDomain.toLowerCase()],
);
return r.rowCount ?? 0;
} finally {
client.release();
}
}

/**
* Pull a bounded batch of due rows from the queue. The caller (crawler
* worker tick) must call markRevalidationSucceeded / Failed for each
* returned row to advance the queue.
*/
async dequeueRevalidationBatch(
limit: number,
): Promise<Array<{ publisher_domain: string; manager_domain: string; attempts: number }>> {
const client = await getClient();
try {
const r = await client.query<{
publisher_domain: string;
manager_domain: string;
attempts: number;
}>(
`SELECT publisher_domain, manager_domain, attempts
FROM manager_revalidation_queue
WHERE next_attempt_after <= NOW()
ORDER BY enqueued_at ASC
LIMIT $1`,
[limit],
);
return r.rows;
} finally {
client.release();
}
}

async markRevalidationSucceeded(publisherDomain: string): Promise<void> {
const client = await getClient();
try {
await client.query(
`DELETE FROM manager_revalidation_queue WHERE publisher_domain = $1`,
[publisherDomain.toLowerCase()],
);
} finally {
client.release();
}
}

/**
* Advance the queue row with exponential backoff. Schedule mirrors the
* catalog_crawl_queue cadence (1h / 6h / 24h / 72h) so a manager whose
* file is briefly unparseable doesn't clog the queue forever.
*/
async markRevalidationFailed(publisherDomain: string, err: string): Promise<void> {
const client = await getClient();
try {
await client.query(
`UPDATE manager_revalidation_queue
SET attempts = attempts + 1,
last_attempted_at = NOW(),
last_error = LEFT($2, 500),
next_attempt_after = NOW() + CASE
WHEN attempts < 1 THEN INTERVAL '1 hour'
WHEN attempts < 2 THEN INTERVAL '6 hours'
WHEN attempts < 3 THEN INTERVAL '1 day'
ELSE INTERVAL '3 days'
END
WHERE publisher_domain = $1`,
[publisherDomain.toLowerCase(), err],
);
} finally {
client.release();
}
}

/**
* Project a single adagents.json property into catalog_properties +
* catalog_identifiers. Catalog rows are tagged
Expand Down
4 changes: 4 additions & 0 deletions server/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9037,6 +9037,10 @@ ${p.category ? `<category>${p.category}</category>\n` : ''}<url>${publishedUrl}<
// Crawl catalog domains for adagents.json (demand-driven queue)
this.crawler.startPeriodicCatalogCrawl(30); // Process queue every 30 minutes

// Drain manager_revalidation_queue (#4200 item 2) — fan-out
// re-validation when a manager rotates its adagents.json.
this.crawler.startPeriodicManagerRevalidation(5); // 5-minute tick

// Register and start all scheduled jobs
registerAllJobs();

Expand Down
Loading
Loading