diff --git a/.changeset/fix-discovered-properties-source-reconcile.md b/.changeset/fix-discovered-properties-source-reconcile.md new file mode 100644 index 0000000000..7254163ce8 --- /dev/null +++ b/.changeset/fix-discovered-properties-source-reconcile.md @@ -0,0 +1,4 @@ +--- +--- + +Add `source` column to `discovered_properties` to enable property-removal reconciliation in the hosted-property sync. Previously, the sync could only do additive upserts because it could not distinguish its own rows from crawler-written rows. Now, rows written by the hosted sync carry `source='aao_hosted'` and are reconciled (deleted when removed from the manifest) inside a domain-scoped advisory-lock transaction, preventing concurrent sync races. diff --git a/.changeset/hosted-property-fed-index-sync.md b/.changeset/hosted-property-fed-index-sync.md index df24af289d..9bcf67aaa6 100644 --- a/.changeset/hosted-property-fed-index-sync.md +++ b/.changeset/hosted-property-fed-index-sync.md @@ -34,10 +34,10 @@ Reconciliation semantics: - `discovered_publishers` row uses a stable AAO sentinel value (`aao://hosted`) for `discovered_by_agent` so re-syncs collapse to one row regardless of agent ordering. -- `discovered_properties` is additive only — the table has no source - column, so we cannot safely distinguish hosted-written rows from - crawler-written rows. Removed properties persist until manually cleared. - Tracked as a follow-up. +- `discovered_properties` now has full reconcile support (see PR #4111). + Properties removed from the hosted manifest are deleted on re-sync. + The `source` column added in migration 467 enables source-aware + conflict handling; the reconcile runs under a domain-scoped advisory lock. Also: rate-limit the per-agent rollup on `/api/registry/publisher` to 50 agents per request to bound fan-out on an unauthenticated endpoint. diff --git a/server/src/db/federated-index-db.ts b/server/src/db/federated-index-db.ts index f1d1a924ae..bb573da4cf 100644 --- a/server/src/db/federated-index-db.ts +++ b/server/src/db/federated-index-db.ts @@ -76,6 +76,7 @@ export interface DiscoveredProperty { name: string; identifiers: PropertyIdentifier[]; tags?: string[]; + source?: 'crawler' | 'aao_hosted'; // Write-path discriminator (migration 467) discovered_at?: Date; last_validated?: Date; expires_at?: Date; @@ -548,14 +549,15 @@ export class FederatedIndexDatabase { async upsertProperty(property: DiscoveredProperty): Promise { const result = await query( `INSERT INTO discovered_properties ( - property_id, publisher_domain, property_type, name, identifiers, tags, expires_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7) + property_id, publisher_domain, property_type, name, identifiers, tags, expires_at, source + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (publisher_domain, name, property_type) DO UPDATE SET property_id = COALESCE(EXCLUDED.property_id, discovered_properties.property_id), identifiers = EXCLUDED.identifiers, tags = EXCLUDED.tags, last_validated = NOW(), - expires_at = EXCLUDED.expires_at + expires_at = EXCLUDED.expires_at, + source = CASE WHEN discovered_properties.source = 'crawler' THEN 'crawler' ELSE EXCLUDED.source END RETURNING *`, [ property.property_id || null, @@ -565,6 +567,7 @@ export class FederatedIndexDatabase { JSON.stringify(property.identifiers), property.tags || [], property.expires_at || null, + property.source || 'crawler', ] ); return this.deserializeProperty(result.rows[0]); diff --git a/server/src/db/migrations/467_discovered_properties_source.sql b/server/src/db/migrations/467_discovered_properties_source.sql new file mode 100644 index 0000000000..00bd26059e --- /dev/null +++ b/server/src/db/migrations/467_discovered_properties_source.sql @@ -0,0 +1,16 @@ +-- Migration: 467_discovered_properties_source.sql +-- Purpose: Add source column to discovered_properties to distinguish crawler-written +-- rows from hosted-sync-written rows, enabling full property reconciliation on re-sync. +-- +-- Prior to this migration, discovered_properties had no write-path discriminator, so +-- hosted-property-sync.ts could only do additive upserts — it could not safely delete +-- rows it no longer owns without risking crawler-written rows. This column fixes that: +-- only rows with source='aao_hosted' are owned (and reconciled) by the sync job. + +ALTER TABLE discovered_properties + ADD COLUMN source TEXT NOT NULL DEFAULT 'crawler' + CHECK (source IN ('crawler', 'aao_hosted')); + +-- Index for reconcile queries: delete WHERE publisher_domain=$1 AND source='aao_hosted' +CREATE INDEX idx_properties_by_publisher_source + ON discovered_properties(publisher_domain, source); diff --git a/server/src/services/hosted-property-sync.ts b/server/src/services/hosted-property-sync.ts index fb1862c5d8..96d8bd35ad 100644 --- a/server/src/services/hosted-property-sync.ts +++ b/server/src/services/hosted-property-sync.ts @@ -23,11 +23,14 @@ * this publisher_domain. We own that source label exclusively; * crawler-written `adagents_json` rows for the same domain are left * untouched (they represent verified origin facts). - * - Properties: additive only. `discovered_properties` has no source - * column, so we cannot safely distinguish hosted-written rows from - * crawler-written rows. Removed properties persist until manually - * cleared. Tracked as a follow-up — adding a `source` column to - * `discovered_properties` would let us reconcile here too. + * - Properties: full reconcile. This sync is the authoritative source + * for the publisher_domain's property list — any row not in the current + * manifest is deleted, regardless of `source`. Source is still written + * as `'aao_hosted'` on new rows; on conflict with a crawler row, the + * crawler's source label is preserved (origin-verified > hosted-only) + * but the sync still owns the reconcile (the publisher's manifest is + * the truth for which properties exist). Runs inside a domain-scoped + * advisory-lock transaction to prevent concurrent-sync interleave races. * - Publisher row: keyed by stable sentinel (`AAO_HOSTED_SENTINEL`) so * re-syncs collapse to the same row regardless of which agent is first * in the manifest. @@ -39,7 +42,7 @@ */ import type { HostedProperty } from '../types.js'; import { FederatedIndexDatabase } from '../db/federated-index-db.js'; -import { query } from '../db/client.js'; +import { query, getClient } from '../db/client.js'; import { createLogger } from '../logger.js'; const logger = createLogger('hosted-property-sync'); @@ -49,6 +52,7 @@ export const AAO_HOSTED_SENTINEL = 'aao://hosted'; export interface HostedSyncResult { properties_synced: number; + properties_removed: number; agents_synced: number; authorizations_reconciled: number; authorizations_removed: number; @@ -87,6 +91,7 @@ export async function syncHostedPropertyToFederatedIndex( ): Promise { const result: HostedSyncResult = { properties_synced: 0, + properties_removed: 0, agents_synced: 0, authorizations_reconciled: 0, authorizations_removed: 0, @@ -99,26 +104,93 @@ export async function syncHostedPropertyToFederatedIndex( const agents = readAgents(adagents); const properties = readProperties(adagents); - // Properties: additive upsert (see file-level comment for why removal - // is not yet supported). - for (const p of properties) { - if (typeof p.name !== 'string' || !p.name) continue; - const propType = typeof p.type === 'string' && p.type ? p.type : 'website'; + // Properties: upsert + full reconcile under a domain-scoped advisory lock. + // The lock prevents two concurrent syncs for the same domain from interleaving + // their upserts with the trailing DELETE, which would cause the second sync's + // delete to remove rows the first sync just wrote (and vice versa). + { + const client = await getClient(); try { - await fedDb.upsertProperty({ - property_id: typeof p.property_id === 'string' ? p.property_id : undefined, - publisher_domain: domain, - property_type: propType, - name: p.name, - identifiers: Array.isArray(p.identifiers) - ? (p.identifiers as Array<{ type: string; value: string }>) - : [], - tags: Array.isArray(p.tags) ? (p.tags as string[]) : [], - }); - result.properties_synced++; + await client.query('BEGIN'); + await client.query(`SET LOCAL lock_timeout = '5000ms'`); + await client.query(`SET LOCAL statement_timeout = '30000ms'`); + await client.query('SELECT pg_advisory_xact_lock(hashtext($1))', [`dp:${domain}`]); + + const propertyNames: string[] = []; + const propertyTypes: string[] = []; + let synced = 0; + for (const p of properties) { + if (typeof p.name !== 'string' || !p.name) continue; + const propType = typeof p.type === 'string' && p.type ? p.type : 'website'; + propertyNames.push(p.name); + propertyTypes.push(propType); + await client.query( + `INSERT INTO discovered_properties ( + property_id, publisher_domain, property_type, name, identifiers, tags, source + ) VALUES ($1, $2, $3, $4, $5, $6, 'aao_hosted') + ON CONFLICT (publisher_domain, name, property_type) DO UPDATE SET + property_id = COALESCE(EXCLUDED.property_id, discovered_properties.property_id), + -- Preserve crawler-attested identifiers/tags: they represent origin-verified + -- facts and take precedence over hosted-manifest values. + identifiers = CASE WHEN discovered_properties.source = 'crawler' + THEN discovered_properties.identifiers + ELSE EXCLUDED.identifiers END, + tags = CASE WHEN discovered_properties.source = 'crawler' + THEN discovered_properties.tags + ELSE EXCLUDED.tags END, + last_validated = NOW(), + source = CASE WHEN discovered_properties.source = 'crawler' + THEN 'crawler' + ELSE 'aao_hosted' END`, + [ + typeof p.property_id === 'string' ? p.property_id : null, + domain, + propType, + p.name, + JSON.stringify(Array.isArray(p.identifiers) ? p.identifiers : []), + Array.isArray(p.tags) ? p.tags : [], + ] + ); + synced++; // after await: counts only confirmed writes + } + + // Reconcile: the hosted manifest is authoritative for this publisher's + // property list. Delete any row — regardless of source — whose + // (name, property_type) is not in the current manifest. This covers + // both aao_hosted rows (we wrote them) and crawler rows that were later + // removed from the manifest (the publisher's intent takes precedence). + // Keyed on (name, property_type) — not name alone — so a property + // reclassified to a different type is correctly removed. + const deleteResult = propertyNames.length > 0 + ? await client.query( + `DELETE FROM discovered_properties + WHERE publisher_domain = $1 + AND NOT EXISTS ( + SELECT 1 FROM unnest($2::text[], $3::text[]) AS m(mname, mtype) + WHERE m.mname = discovered_properties.name + AND m.mtype = discovered_properties.property_type + )`, + [domain, propertyNames, propertyTypes], + ) + : await client.query( + // Empty manifest: publisher has declared zero properties. Delete all + // rows for the domain — the hosted manifest is authoritative and the + // publisher's intent (empty list) takes precedence over any + // crawler-attested rows that may exist. + `DELETE FROM discovered_properties WHERE publisher_domain = $1`, + [domain], + ); + + await client.query('COMMIT'); + // Only update counters after commit so partial-rollback doesn't skew them. + result.properties_synced = synced; + result.properties_removed = deleteResult.rowCount ?? 0; } catch (err) { + try { await client.query('ROLLBACK'); } catch { /* ignore rollback failures */ } result.errors++; - logger.warn({ err, domain, name: p.name }, 'Failed to upsert hosted property row'); + logger.warn({ err, domain }, 'Failed to sync/reconcile hosted property rows'); + } finally { + client.release(); } } diff --git a/server/tests/integration/hosted-property-fed-index-sync.test.ts b/server/tests/integration/hosted-property-fed-index-sync.test.ts index 04649ebdbf..87e357dbe5 100644 --- a/server/tests/integration/hosted-property-fed-index-sync.test.ts +++ b/server/tests/integration/hosted-property-fed-index-sync.test.ts @@ -221,6 +221,42 @@ describe('Hosted property → federated index sync', () => { expect(res.body.authorized_agents[0].url).toBe(AGENT_X); }); + it('reconciles properties on re-sync: properties removed from the manifest are deleted', async () => { + const initial = await propertyDb.createHostedProperty({ + publisher_domain: PUB, + adagents_json: { + authorized_agents: [{ url: AGENT_X }], + properties: [ + { type: 'website', name: PUB, identifiers: [{ type: 'domain', value: PUB }] }, + { type: 'mobile_app', name: `${PUB} app` }, + ], + }, + is_public: true, + source_type: 'community', + }); + await syncHostedPropertyToFederatedIndex(initial); + + let res = await request(app).get(`/api/registry/publisher?domain=${encodeURIComponent(PUB)}`); + expect(res.body.properties).toHaveLength(2); + + // Re-sync with only one property — the removed one should be deleted. + await pool.query( + `UPDATE hosted_properties SET adagents_json = $2 WHERE publisher_domain = $1`, + [PUB, JSON.stringify({ + authorized_agents: [{ url: AGENT_X }], + properties: [{ type: 'website', name: PUB, identifiers: [{ type: 'domain', value: PUB }] }], + })], + ); + const updated = await propertyDb.getHostedPropertyByDomain(PUB); + if (!updated) throw new Error('expected hosted property to exist'); + const result = await syncHostedPropertyToFederatedIndex(updated); + expect(result.properties_removed).toBe(1); + + res = await request(app).get(`/api/registry/publisher?domain=${encodeURIComponent(PUB)}`); + expect(res.body.properties).toHaveLength(1); + expect(res.body.properties[0].name).toBe(PUB); + }); + it('skips entries without required fields without throwing', async () => { const hosted = await propertyDb.createHostedProperty({ publisher_domain: PUB,