diff --git a/.changeset/manager-revalidation-queue.md b/.changeset/manager-revalidation-queue.md new file mode 100644 index 0000000000..05f49eb793 --- /dev/null +++ b/.changeset/manager-revalidation-queue.md @@ -0,0 +1,31 @@ +--- +--- + +feat(crawler): queue-backed re-validation when a manager rotates adagents.json + +Closes #4200 item 2. When a manager domain (e.g. Raptive) updates its +`/.well-known/adagents.json`, every publisher delegating via ads.txt +`MANAGERDOMAIN` needs to be re-validated so its `authorized_agents` view +stays in sync. Inline fan-out from the cache write would saturate +crawler concurrency at managed-network scale; this PR adds a persistent +queue and a bounded worker tick. + +- Migration 471: `manager_revalidation_queue` table mirroring the shape + of `catalog_crawl_queue` (idempotent insert, `next_attempt_after` for + backoff, partial index on `(next_attempt_after, enqueued_at)`). +- `cacheAdagentsManifest` reads the previously-cached body before the + upsert and compares the contributory subset (`authorized_agents`, + `properties`) via stable canonicalization. Only actual content drift + triggers the fan-out; `$schema` / `last_updated` noise is ignored. +- New crawler tick `processManagerRevalidationQueue` drains up to 50 + rows per 5-minute interval at concurrency 10, calling + `crawlSingleDomain` for each. Success deletes the row; failure + advances exponential backoff (1h / 6h / 1d / 3d) and stores the last + error truncated to 500 chars. +- The reverse-index lookup uses the partial index on + `publishers.manager_domain` added in #4204, so a Raptive-scale + rotation enumerates 6K delegating publishers via an index-only scan. + +Tests: integration coverage for queue idempotency, due-row filtering, +oldest-first ordering, success deletion, and exponential backoff. Unit +coverage for the change-detection helper. diff --git a/server/src/crawler.ts b/server/src/crawler.ts index c7fe98f1cf..908df0af62 100644 --- a/server/src/crawler.ts +++ b/server/src/crawler.ts @@ -20,6 +20,36 @@ import { insertTypeReclassification } from "./db/type-reclassification-log-db.js const log = createLogger('crawler'); +/** + * Compare a freshly-fetched adagents.json against the previously-cached + * body for the same domain. Returns true when the contributory fields + * differ — `authorized_agents` and `properties` — so manager fan-out + * is gated on actual change rather than firing on every routine + * 60-minute crawl. Top-level keys outside that subset (`$schema`, + * `last_updated`, comments) are intentionally ignored. Arrays compare + * positionally; nested object keys are sorted so two semantically + * identical manifests with different key insertion order match. + */ +export function manifestContentChanged( + previous: AdagentsManifest | null, + next: AdagentsManifest, +): boolean { + if (!previous) return true; + const subset = (m: AdagentsManifest) => ({ + authorized_agents: Array.isArray(m.authorized_agents) ? m.authorized_agents : [], + properties: Array.isArray(m.properties) ? m.properties : [], + }); + return stableStringify(subset(previous)) !== stableStringify(subset(next)); +} + +function stableStringify(value: unknown): string { + if (value === null || typeof value !== 'object') return JSON.stringify(value) ?? 'null'; + if (Array.isArray(value)) return '[' + value.map(stableStringify).join(',') + ']'; + const obj = value as Record; + const keys = Object.keys(obj).sort(); + return '{' + keys.map(k => JSON.stringify(k) + ':' + stableStringify(obj[k])).join(',') + '}'; +} + export class CrawlerService { private crawler: PropertyCrawler; private crawling: boolean = false; @@ -820,6 +850,13 @@ export class CrawlerService { meta?: { statusCode?: number; responseBytes?: number; resolvedUrl?: string; discoveryMethod?: string; managerDomain?: string }, ): Promise { try { + // Read the existing cached body before the upsert so we can + // compute whether content actually changed. Used to gate + // manager → publishers fan-out: re-validation only fans out when + // the manager's authorized_agents or properties shape moved, not + // on every routine 60-minute crawl. + const previous = await this.publisherDb.getCachedAdagentsJson(domain); + await this.publisherDb.upsertAdagentsCache({ domain, manifest, @@ -829,6 +866,32 @@ export class CrawlerService { discoveryMethod: meta?.discoveryMethod, managerDomain: meta?.managerDomain, }); + + // Manager fan-out: when the just-written manifest belongs to a + // domain that other publishers delegate to via ads.txt + // MANAGERDOMAIN, queue those publishers for re-validation. The + // worker (processManagerRevalidationQueue) drains the queue at a + // bounded rate so a Raptive-scale rotation doesn't saturate + // crawler concurrency. Intentionally outside upsertAdagentsCache's + // transaction: if the enqueue fails the cache write has already + // committed, but the next routine 60-min crawl re-detects drift + // and re-enqueues, so silent fan-out loss self-heals. + if (manifestContentChanged(previous, manifest)) { + try { + const enqueued = await this.publisherDb.enqueueManagerRevalidation(domain); + if (enqueued > 0) { + log.info( + { managerDomain: domain, enqueued }, + 'Manager adagents.json changed; enqueued delegating publishers for re-validation', + ); + } + } catch (err) { + log.warn( + { domain, err: err instanceof Error ? err.message : err }, + 'Failed to enqueue manager revalidation fan-out', + ); + } + } } catch (err) { log.warn({ domain, err: err instanceof Error ? err.message : err }, 'Publisher cache write failed'); } @@ -1281,6 +1344,93 @@ export class CrawlerService { } } + // ── Manager Re-validation Queue ─────────────────────────────── + // + // When a manager domain rotates its adagents.json, every publisher + // delegating via ads.txt MANAGERDOMAIN needs to be re-validated so + // their authorized_agents view stays in sync. Inline fan-out from + // cacheAdagentsManifest() would saturate crawler concurrency at + // managed-network scale (Raptive ≈ 6K publishers), so we persist + // the work in manager_revalidation_queue (migration 471) and drain + // it at a bounded rate per tick here. + + private managerRevalidationIntervalId: NodeJS.Timeout | null = null; + private managerRevalidationProcessing = false; + + startPeriodicManagerRevalidation(intervalMinutes: number = 5) { + this.managerRevalidationIntervalId = setInterval(() => { + this.processManagerRevalidationQueue().catch((err) => { + log.error({ err }, 'Manager revalidation tick failed'); + }); + }, intervalMinutes * 60 * 1000); + + log.info({ intervalMinutes }, 'Periodic manager revalidation queue started'); + } + + async processManagerRevalidationQueue(): Promise<{ processed: number; succeeded: number; failed: number }> { + if (this.managerRevalidationProcessing) { + log.debug('Manager revalidation already in progress, skipping tick'); + return { processed: 0, succeeded: 0, failed: 0 }; + } + + this.managerRevalidationProcessing = true; + // Bounded per-tick batch — caps concurrency budget regardless of + // queue depth. At a 5-minute tick and BATCH_SIZE=50, a 6K-publisher + // manager rotation propagates within ~10 hours, comfortably ahead + // of the 60-minute organic re-crawl cadence for any single row. + const BATCH_SIZE = 50; + const CONCURRENCY = 10; + + try { + const rows = await this.publisherDb.dequeueRevalidationBatch(BATCH_SIZE); + if (rows.length === 0) { + return { processed: 0, succeeded: 0, failed: 0 }; + } + + log.info({ count: rows.length }, 'Manager revalidation batch'); + + const results = await this.processWithConcurrency( + rows, + CONCURRENCY, + async (row) => { + try { + // Full single-domain crawl re-runs adagents validation + // (which will hit the managerdomain fallback again) and + // refreshes the publisher's catalog projection. + await this.crawlSingleDomain(row.publisher_domain); + return { row, ok: true as const }; + } catch (err) { + return { + row, + ok: false as const, + error: err instanceof Error ? err.message : String(err), + }; + } + }, + ); + + let succeeded = 0; + let failed = 0; + for (const result of results) { + if (result.ok) { + await this.publisherDb.markRevalidationSucceeded(result.row.publisher_domain); + succeeded++; + } else { + await this.publisherDb.markRevalidationFailed(result.row.publisher_domain, result.error); + failed++; + } + } + + log.info( + { processed: rows.length, succeeded, failed }, + 'Manager revalidation batch complete', + ); + return { processed: rows.length, succeeded, failed }; + } finally { + this.managerRevalidationProcessing = false; + } + } + private async crawlSingleDomainForCatalog(domain: string): Promise { const validation = await this.adAgentsManager.validateDomain(domain); if (!validation.valid || !validation.raw_data?.authorized_agents) { diff --git a/server/src/db/migrations/471_manager_revalidation_queue.sql b/server/src/db/migrations/471_manager_revalidation_queue.sql new file mode 100644 index 0000000000..2e2eb2c14c --- /dev/null +++ b/server/src/db/migrations/471_manager_revalidation_queue.sql @@ -0,0 +1,38 @@ +-- Queue for fanning out re-validation when a manager domain rotates its +-- adagents.json. Item 2 of #4200, follow-up to #4173 / #4204. +-- +-- Why a queue: at managed-network scale a single manager (e.g. Raptive) +-- can have thousands of delegating publishers. Inline fan-out from a +-- crawlSingleDomain() that detects change would saturate the crawler's +-- concurrency budget. Persisting the work and draining it at a bounded +-- rate per tick keeps the crawler stable while still propagating manager +-- updates within a small number of crawl cycles. +-- +-- Mirrors the shape of catalog_crawl_queue (migration 367): +-- - identifier as primary key, idempotent insert +-- - next_attempt_after for backoff windowing +-- - worker pulls WHERE next_attempt_after <= NOW() ORDER BY ... LIMIT N + +CREATE TABLE manager_revalidation_queue ( + publisher_domain TEXT PRIMARY KEY, + manager_domain TEXT NOT NULL, + enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + next_attempt_after TIMESTAMPTZ NOT NULL DEFAULT NOW(), + attempts INTEGER NOT NULL DEFAULT 0, + last_attempted_at TIMESTAMPTZ, + last_error TEXT +); + +-- Worker query: oldest-first pull of due rows. Partial index keeps the +-- scan cheap even when the queue carries lots of rows in deep backoff. +CREATE INDEX idx_manager_revalidation_queue_due + ON manager_revalidation_queue (next_attempt_after, enqueued_at); + +-- Per-manager scan: lets ops see how many publishers are still pending +-- for a given manager and supports the optional /api/registry/managers +-- /:domain/recrawl endpoint planned in #4200 item 5. +CREATE INDEX idx_manager_revalidation_queue_manager + ON manager_revalidation_queue (manager_domain); + +COMMENT ON TABLE manager_revalidation_queue IS + 'Pending re-validations triggered by a manager rotating its adagents.json. Drained by the crawler at a bounded rate per tick. Rows are deleted on successful re-validation; failures advance next_attempt_after with exponential backoff.'; diff --git a/server/src/db/publisher-db.ts b/server/src/db/publisher-db.ts index c57c6ab449..a8159301d7 100644 --- a/server/src/db/publisher-db.ts +++ b/server/src/db/publisher-db.ts @@ -359,6 +359,129 @@ export class PublisherDatabase { } } + /** + * Read the cached adagents.json body for a domain. Used by the crawler + * to decide whether a re-fetch produced an actual content change before + * fanning out manager re-validation. Returns null when the domain has + * never been successfully crawled. + */ + async getCachedAdagentsJson(domain: string): Promise { + const client = await getClient(); + try { + const r = await client.query<{ adagents_json: AdagentsManifest | null }>( + `SELECT adagents_json FROM publishers WHERE domain = $1 LIMIT 1`, + [domain.toLowerCase()], + ); + return r.rows[0]?.adagents_json ?? null; + } finally { + client.release(); + } + } + + /** + * Insert one queue row per publisher delegating to this manager via + * ads.txt MANAGERDOMAIN. Idempotent: if a publisher is already queued + * (e.g. previous fan-out hasn't drained yet), the row is reset to + * "due now" with attempts=0 so the upstream manager change supersedes + * any in-flight backoff. Returns the number of rows touched (inserts + * + ON CONFLICT updates) — a delegating publisher with a stale row + * still counts since the supersede is the load-bearing semantic. + * + * The SELECT scans the partial index added in migration 470 + * (idx_publishers_manager_domain) so the lookup stays cheap even at + * managed-network scale. + */ + async enqueueManagerRevalidation(managerDomain: string): Promise { + const client = await getClient(); + try { + const r = await client.query( + `INSERT INTO manager_revalidation_queue + (publisher_domain, manager_domain, enqueued_at, next_attempt_after, attempts, last_attempted_at, last_error) + SELECT domain, $1, NOW(), NOW(), 0, NULL, NULL + FROM publishers + WHERE manager_domain = $1 + ON CONFLICT (publisher_domain) DO UPDATE SET + manager_domain = EXCLUDED.manager_domain, + enqueued_at = NOW(), + next_attempt_after = NOW(), + attempts = 0, + last_attempted_at = NULL, + last_error = NULL`, + [managerDomain.toLowerCase()], + ); + return r.rowCount ?? 0; + } finally { + client.release(); + } + } + + /** + * Pull a bounded batch of due rows from the queue. The caller (crawler + * worker tick) must call markRevalidationSucceeded / Failed for each + * returned row to advance the queue. + */ + async dequeueRevalidationBatch( + limit: number, + ): Promise> { + const client = await getClient(); + try { + const r = await client.query<{ + publisher_domain: string; + manager_domain: string; + attempts: number; + }>( + `SELECT publisher_domain, manager_domain, attempts + FROM manager_revalidation_queue + WHERE next_attempt_after <= NOW() + ORDER BY enqueued_at ASC + LIMIT $1`, + [limit], + ); + return r.rows; + } finally { + client.release(); + } + } + + async markRevalidationSucceeded(publisherDomain: string): Promise { + const client = await getClient(); + try { + await client.query( + `DELETE FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [publisherDomain.toLowerCase()], + ); + } finally { + client.release(); + } + } + + /** + * Advance the queue row with exponential backoff. Schedule mirrors the + * catalog_crawl_queue cadence (1h / 6h / 24h / 72h) so a manager whose + * file is briefly unparseable doesn't clog the queue forever. + */ + async markRevalidationFailed(publisherDomain: string, err: string): Promise { + const client = await getClient(); + try { + await client.query( + `UPDATE manager_revalidation_queue + SET attempts = attempts + 1, + last_attempted_at = NOW(), + last_error = LEFT($2, 500), + next_attempt_after = NOW() + CASE + WHEN attempts < 1 THEN INTERVAL '1 hour' + WHEN attempts < 2 THEN INTERVAL '6 hours' + WHEN attempts < 3 THEN INTERVAL '1 day' + ELSE INTERVAL '3 days' + END + WHERE publisher_domain = $1`, + [publisherDomain.toLowerCase(), err], + ); + } finally { + client.release(); + } + } + /** * Project a single adagents.json property into catalog_properties + * catalog_identifiers. Catalog rows are tagged diff --git a/server/src/http.ts b/server/src/http.ts index 129ac760e3..f2338dc195 100644 --- a/server/src/http.ts +++ b/server/src/http.ts @@ -9037,6 +9037,10 @@ ${p.category ? `${p.category}\n` : ''}${publishedUrl}< // Crawl catalog domains for adagents.json (demand-driven queue) this.crawler.startPeriodicCatalogCrawl(30); // Process queue every 30 minutes + // Drain manager_revalidation_queue (#4200 item 2) — fan-out + // re-validation when a manager rotates its adagents.json. + this.crawler.startPeriodicManagerRevalidation(5); // 5-minute tick + // Register and start all scheduled jobs registerAllJobs(); diff --git a/server/tests/integration/manager-revalidation-queue.test.ts b/server/tests/integration/manager-revalidation-queue.test.ts new file mode 100644 index 0000000000..8c99c20fcc --- /dev/null +++ b/server/tests/integration/manager-revalidation-queue.test.ts @@ -0,0 +1,225 @@ +/** + * Integration tests for the manager revalidation queue (#4200 item 2). + * + * When a manager rotates its adagents.json, every publisher delegating + * via ads.txt MANAGERDOMAIN needs to be re-validated. The queue is the + * fan-out primitive; the crawler worker drains it at a bounded rate. + * + * Shape mirrors catalog_crawl_queue (migration 367) — DB-backed, idempotent + * insert, exponential backoff on failure, deletion on success. + */ +import { describe, it, expect, beforeAll, beforeEach, afterAll } from 'vitest'; +import { initializeDatabase, closeDatabase, query } from '../../src/db/client.js'; +import { runMigrations } from '../../src/db/migrate.js'; +import { PublisherDatabase } from '../../src/db/publisher-db.js'; +import type { Pool } from 'pg'; + +const MANAGER_DOMAIN = 'manager-revalidation-test.example.com'; +const PUB_A = 'pub-a.manager-revalidation-test.example.com'; +const PUB_B = 'pub-b.manager-revalidation-test.example.com'; +const PUB_C = 'pub-c.manager-revalidation-test.example.com'; + +describe('Manager revalidation queue', () => { + let pool: Pool; + let publisherDb: PublisherDatabase; + + beforeAll(async () => { + pool = initializeDatabase({ + connectionString: process.env.DATABASE_URL || 'postgresql://adcp:localdev@localhost:5432/adcp_test', + }); + await runMigrations(); + publisherDb = new PublisherDatabase(); + }); + + async function clearFixtures() { + await pool.query( + `DELETE FROM manager_revalidation_queue WHERE publisher_domain = ANY($1::text[])`, + [[PUB_A, PUB_B, PUB_C]], + ); + await pool.query( + `DELETE FROM publishers WHERE domain = ANY($1::text[])`, + [[PUB_A, PUB_B, PUB_C, MANAGER_DOMAIN]], + ); + } + + beforeEach(async () => { + await clearFixtures(); + }); + + afterAll(async () => { + await clearFixtures(); + await closeDatabase(); + }); + + async function seedDelegatingPublisher(domain: string, manager: string): Promise { + await pool.query( + `INSERT INTO publishers (domain, source_type, manager_domain, discovery_method, last_validated) + VALUES ($1, 'adagents_json', $2, 'ads_txt_managerdomain', NOW())`, + [domain, manager], + ); + } + + describe('enqueueManagerRevalidation', () => { + it('inserts one queue row per delegating publisher', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await seedDelegatingPublisher(PUB_B, MANAGER_DOMAIN); + + const enqueued = await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + expect(enqueued).toBe(2); + + const rows = await pool.query( + `SELECT publisher_domain, manager_domain, attempts, last_error + FROM manager_revalidation_queue + WHERE manager_domain = $1 + ORDER BY publisher_domain ASC`, + [MANAGER_DOMAIN], + ); + expect(rows.rows).toHaveLength(2); + expect(rows.rows[0].publisher_domain).toBe(PUB_A); + expect(rows.rows[0].manager_domain).toBe(MANAGER_DOMAIN); + expect(rows.rows[0].attempts).toBe(0); + expect(rows.rows[0].last_error).toBeNull(); + expect(rows.rows[1].publisher_domain).toBe(PUB_B); + }); + + it('is idempotent and resets attempts on re-enqueue', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + // Simulate a prior failure that put the row into backoff. + await publisherDb.markRevalidationFailed(PUB_A, 'first try failed'); + const afterFailure = await pool.query( + `SELECT attempts, last_error, next_attempt_after + FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(afterFailure.rows[0].attempts).toBe(1); + expect(afterFailure.rows[0].last_error).toBe('first try failed'); + expect(new Date(afterFailure.rows[0].next_attempt_after).getTime()) + .toBeGreaterThan(Date.now()); + + // Manager rotates again — re-enqueue should reset attempts and + // mark the row as due now, superseding the in-flight backoff. + const second = await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + expect(second).toBe(1); + + const afterRequeue = await pool.query( + `SELECT attempts, last_error, next_attempt_after + FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(afterRequeue.rows[0].attempts).toBe(0); + expect(afterRequeue.rows[0].last_error).toBeNull(); + expect(new Date(afterRequeue.rows[0].next_attempt_after).getTime()) + .toBeLessThanOrEqual(Date.now() + 1000); + }); + + it('returns 0 and inserts nothing when no publishers delegate to the manager', async () => { + const enqueued = await publisherDb.enqueueManagerRevalidation('unknown-manager.example.com'); + expect(enqueued).toBe(0); + }); + }); + + describe('dequeueRevalidationBatch', () => { + it('returns only rows that are due (next_attempt_after <= NOW)', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await seedDelegatingPublisher(PUB_B, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + // Push PUB_B into the future. + await pool.query( + `UPDATE manager_revalidation_queue + SET next_attempt_after = NOW() + INTERVAL '1 hour' + WHERE publisher_domain = $1`, + [PUB_B], + ); + + const batch = await publisherDb.dequeueRevalidationBatch(50); + expect(batch.map(r => r.publisher_domain)).toEqual([PUB_A]); + }); + + it('respects the limit', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await seedDelegatingPublisher(PUB_B, MANAGER_DOMAIN); + await seedDelegatingPublisher(PUB_C, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + const batch = await publisherDb.dequeueRevalidationBatch(2); + expect(batch).toHaveLength(2); + }); + + it('orders oldest-first by enqueued_at', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await seedDelegatingPublisher(PUB_B, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + // Backdate PUB_B so it appears older. + await pool.query( + `UPDATE manager_revalidation_queue + SET enqueued_at = NOW() - INTERVAL '1 hour' + WHERE publisher_domain = $1`, + [PUB_B], + ); + + const batch = await publisherDb.dequeueRevalidationBatch(50); + expect(batch.map(r => r.publisher_domain)).toEqual([PUB_B, PUB_A]); + }); + }); + + describe('markRevalidationSucceeded / Failed', () => { + it('deletes the row on success', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + await publisherDb.markRevalidationSucceeded(PUB_A); + + const rows = await pool.query( + `SELECT 1 FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(rows.rows).toHaveLength(0); + }); + + it('advances backoff geometrically on failure', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + const startMs = Date.now(); + await publisherDb.markRevalidationFailed(PUB_A, 'boom'); + const r1 = await pool.query( + `SELECT next_attempt_after, attempts FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(r1.rows[0].attempts).toBe(1); + // First backoff is ~1 hour (3,600,000ms). Allow generous slop. + const delay1Ms = new Date(r1.rows[0].next_attempt_after).getTime() - startMs; + expect(delay1Ms).toBeGreaterThan(50 * 60 * 1000); + expect(delay1Ms).toBeLessThan(70 * 60 * 1000); + + await publisherDb.markRevalidationFailed(PUB_A, 'boom2'); + const r2 = await pool.query( + `SELECT next_attempt_after, attempts FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(r2.rows[0].attempts).toBe(2); + // Second backoff is ~6 hours. + const delay2Ms = new Date(r2.rows[0].next_attempt_after).getTime() - startMs; + expect(delay2Ms).toBeGreaterThan(5 * 60 * 60 * 1000); + expect(delay2Ms).toBeLessThan(7 * 60 * 60 * 1000); + }); + + it('truncates last_error to 500 chars', async () => { + await seedDelegatingPublisher(PUB_A, MANAGER_DOMAIN); + await publisherDb.enqueueManagerRevalidation(MANAGER_DOMAIN); + + const longErr = 'x'.repeat(2000); + await publisherDb.markRevalidationFailed(PUB_A, longErr); + + const r = await pool.query( + `SELECT last_error FROM manager_revalidation_queue WHERE publisher_domain = $1`, + [PUB_A], + ); + expect(r.rows[0].last_error).toHaveLength(500); + }); + }); +}); diff --git a/server/tests/unit/manifest-content-changed.test.ts b/server/tests/unit/manifest-content-changed.test.ts new file mode 100644 index 0000000000..fc7d2f0545 --- /dev/null +++ b/server/tests/unit/manifest-content-changed.test.ts @@ -0,0 +1,83 @@ +/** + * Unit tests for manifestContentChanged — gates manager → publishers + * fan-out so re-validation only fires on actual content drift, not on + * every routine 60-minute crawl. (#4200 item 2.) + */ +import { describe, it, expect } from 'vitest'; +import { manifestContentChanged } from '../../src/crawler.js'; +import type { AdagentsManifest } from '../../src/db/publisher-db.js'; + +const baseManifest: AdagentsManifest = { + authorized_agents: [ + { url: 'https://agent.example', authorized_for: 'All inventory' }, + ], + properties: [ + { property_id: 'site', property_type: 'website', name: 'Site' }, + ], +}; + +describe('manifestContentChanged', () => { + it('returns true when previous is null (first crawl)', () => { + expect(manifestContentChanged(null, baseManifest)).toBe(true); + }); + + it('returns false for identical manifests', () => { + expect(manifestContentChanged(baseManifest, baseManifest)).toBe(false); + }); + + it('returns false when only $schema or last_updated differ', () => { + const next: AdagentsManifest = { + ...baseManifest, + $schema: 'https://adcontextprotocol.org/schemas/v2/adagents.json', + last_updated: '2026-05-07T12:00:00Z', + }; + expect(manifestContentChanged(baseManifest, next)).toBe(false); + }); + + it('returns true when authorized_agents changes', () => { + const next: AdagentsManifest = { + ...baseManifest, + authorized_agents: [ + { url: 'https://agent.example', authorized_for: 'All inventory' }, + { url: 'https://other-agent.example', authorized_for: 'Display' }, + ], + }; + expect(manifestContentChanged(baseManifest, next)).toBe(true); + }); + + it('returns true when properties changes', () => { + const next: AdagentsManifest = { + ...baseManifest, + properties: [ + { property_id: 'site', property_type: 'website', name: 'Renamed' }, + ], + }; + expect(manifestContentChanged(baseManifest, next)).toBe(true); + }); + + it('returns true when authorized_agents reorders (order semantically distinct)', () => { + // Order matters — a publisher reshuffling priority is meaningful + // signal for downstream consumers, not noise to swallow. + const reordered: AdagentsManifest = { + authorized_agents: [ + { url: 'https://b.example', authorized_for: 'B' }, + { url: 'https://a.example', authorized_for: 'A' }, + ], + properties: [], + }; + const original: AdagentsManifest = { + authorized_agents: [ + { url: 'https://a.example', authorized_for: 'A' }, + { url: 'https://b.example', authorized_for: 'B' }, + ], + properties: [], + }; + expect(manifestContentChanged(original, reordered)).toBe(true); + }); + + it('treats missing authorized_agents/properties as empty arrays', () => { + const previous: AdagentsManifest = { authorized_agents: [], properties: [] }; + const next: AdagentsManifest = {}; + expect(manifestContentChanged(previous, next)).toBe(false); + }); +});