From 720b8acf9f64936dcbadcb1b596d42c3dbf886fb Mon Sep 17 00:00:00 2001 From: sangilish <56685007+sangilish@users.noreply.github.com> Date: Thu, 28 May 2026 08:36:14 -0700 Subject: [PATCH 1/4] fix(testing): seed storyboard root context --- .changeset/storyboard-root-context.md | 7 ++ src/lib/testing/storyboard/runner.ts | 10 ++- src/lib/testing/storyboard/types.ts | 5 ++ test/lib/storyboard-root-context.test.js | 104 +++++++++++++++++++++++ 4 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 .changeset/storyboard-root-context.md create mode 100644 test/lib/storyboard-root-context.test.js diff --git a/.changeset/storyboard-root-context.md b/.changeset/storyboard-root-context.md new file mode 100644 index 000000000..3fcdd5788 --- /dev/null +++ b/.changeset/storyboard-root-context.md @@ -0,0 +1,7 @@ +--- +"@adcp/sdk": patch +--- + +fix(testing): seed storyboard root context before applying caller overrides. + +Refs #2099. The storyboard runner now uses top-level storyboard `context` defaults for full-run, multi-pass seeding, and single-step execution paths, while preserving `options.context` override behavior. diff --git a/src/lib/testing/storyboard/runner.ts b/src/lib/testing/storyboard/runner.ts index 56d3ccd7b..4e0620c48 100644 --- a/src/lib/testing/storyboard/runner.ts +++ b/src/lib/testing/storyboard/runner.ts @@ -1969,7 +1969,8 @@ async function executeStoryboardPass( } } - let context: StoryboardContext = { ...options.context }; + let context: StoryboardContext = { ...storyboard.context, ...options.context }; + if (storyboard.context) forwardAliasCache(storyboard.context, context); if (options.context) forwardAliasCache(options.context, context); const contributions = new Set(); // First phase/step that contributed each flag. Branch-set post-pass reads @@ -3015,7 +3016,9 @@ async function runMultiPass( // phase to its `phaseResults`, so the aggregated top-level counts reflect // a single seeding pass across the whole run. const preSeedClients = agentUrls.map(url => getOrCreateClientResolution(url, options).client); - const preSeedContext: StoryboardContext = { ...options.context }; + const preSeedContext: StoryboardContext = { ...storyboard.context, ...options.context }; + if (storyboard.context) forwardAliasCache(storyboard.context, preSeedContext); + if (options.context) forwardAliasCache(options.context, preSeedContext); const preSeededResult = await runControllerSeeding(preSeedClients[0]!, storyboard, options, preSeedContext); const passes: StoryboardPassResult[] = []; @@ -3249,7 +3252,8 @@ export async function runStoryboardStep( profile = options._profile; } - const context: StoryboardContext = { ...options.context }; + const context: StoryboardContext = { ...storyboard.context, ...options.context }; + if (storyboard.context) forwardAliasCache(storyboard.context, context); if (options.context) forwardAliasCache(options.context, context); // `_webhookReceiver` is a test-only injection point; production callers diff --git a/src/lib/testing/storyboard/types.ts b/src/lib/testing/storyboard/types.ts index 1e9552b04..918443ba0 100644 --- a/src/lib/testing/storyboard/types.ts +++ b/src/lib/testing/storyboard/types.ts @@ -179,6 +179,11 @@ export interface Storyboard { * field(s) into `params.fixture` verbatim. */ fixtures?: StoryboardFixtures; + /** + * Initial context values declared by the storyboard YAML. Runner callers may + * override these defaults with `StoryboardRunOptions.context`. + */ + context?: StoryboardContext; phases: StoryboardPhase[]; /** * Cross-step assertions that apply to this storyboard. Every assertion diff --git a/test/lib/storyboard-root-context.test.js b/test/lib/storyboard-root-context.test.js new file mode 100644 index 000000000..6f7d88404 --- /dev/null +++ b/test/lib/storyboard-root-context.test.js @@ -0,0 +1,104 @@ +const { describe, it } = require('node:test'); +const assert = require('node:assert'); +const http = require('http'); + +const { runStoryboard, runStoryboardStep } = require('../../dist/lib/testing/storyboard/runner.js'); + +function buildStoryboard(context = { seed_value: 'root-default' }) { + return { + id: 'root_context_sb', + version: '1.0.0', + title: 'Root context', + category: 'compliance', + summary: '', + narrative: '', + agent: { interaction_model: '*', capabilities: [] }, + caller: { role: 'buyer_agent' }, + context, + phases: [ + { + id: 'p', + title: 'root context phase', + steps: [ + { + id: 's1', + title: 'uses root context', + task: 'get_products', + auth: 'none', + sample_request: { brief: '$context.seed_value' }, + validations: [{ check: 'http_status_in', allowed_values: [401], description: '' }], + }, + ], + }, + ], + }; +} + +const runnerOptions = { + protocol: 'mcp', + allow_http: true, + agentTools: ['get_products'], + _profile: { name: 'Test', tools: ['get_products'] }, + _client: { getAgentInfo: async () => ({ name: 'Test', tools: [{ name: 'get_products' }] }) }, +}; + +async function startCaptureAgent() { + const seen = []; + const server = http.createServer(async (req, res) => { + const chunks = []; + for await (const c of req) chunks.push(c); + const rpc = JSON.parse(Buffer.concat(chunks).toString('utf8')); + if (rpc.params?.name) { + seen.push({ name: rpc.params.name, args: rpc.params.arguments }); + } + res.writeHead(401, { 'content-type': 'application/json', 'www-authenticate': 'Bearer realm="x"' }); + res.end('{}'); + }); + await new Promise(resolve => server.listen(0, resolve)); + return { server, seen, url: `http://127.0.0.1:${server.address().port}/mcp` }; +} + +function closeServer(server) { + return new Promise(resolve => server.close(resolve)); +} + +describe('runStoryboard: root context seeding', () => { + it('injects storyboard root context into full-run sample requests', async () => { + const { server, seen, url } = await startCaptureAgent(); + try { + await runStoryboard(url, buildStoryboard(), runnerOptions); + + assert.strictEqual(seen.length, 1); + assert.strictEqual(seen[0].args.brief, 'root-default'); + } finally { + await closeServer(server); + } + }); + + it('lets caller-supplied options.context override storyboard root defaults', async () => { + const { server, seen, url } = await startCaptureAgent(); + try { + await runStoryboard(url, buildStoryboard(), { + ...runnerOptions, + context: { seed_value: 'caller-override' }, + }); + + assert.strictEqual(seen.length, 1); + assert.strictEqual(seen[0].args.brief, 'caller-override'); + } finally { + await closeServer(server); + } + }); + + it('injects storyboard root context into single-step sample requests', async () => { + const { server, seen, url } = await startCaptureAgent(); + try { + await runStoryboardStep(url, buildStoryboard(), 's1', runnerOptions); + + assert.strictEqual(seen.length, 1); + assert.strictEqual(seen[0].args.brief, 'root-default'); + } finally { + await closeServer(server); + } + }); +}); From f04a134fa7622f3d2f79f330aea3e11ed2625051 Mon Sep 17 00:00:00 2001 From: sangilish <56685007+sangilish@users.noreply.github.com> Date: Thu, 28 May 2026 18:21:47 -0700 Subject: [PATCH 2/4] fix(wholesale-feed-sync): harden lifecycle recovery --- .changeset/wholesale-feed-sync-lifecycle.md | 5 + src/lib/wholesale-feed-sync/sync.ts | 259 ++++++++++++++------ test/lib/wholesale-feed-sync.test.js | 164 +++++++++++++ 3 files changed, 351 insertions(+), 77 deletions(-) create mode 100644 .changeset/wholesale-feed-sync-lifecycle.md diff --git a/.changeset/wholesale-feed-sync-lifecycle.md b/.changeset/wholesale-feed-sync-lifecycle.md new file mode 100644 index 000000000..5afed48af --- /dev/null +++ b/.changeset/wholesale-feed-sync-lifecycle.md @@ -0,0 +1,5 @@ +--- +'@adcp/sdk': patch +--- + +Harden WholesaleFeedSync lifecycle recovery by cancelling stale in-flight bootstraps after stop, committing feed indexes and version tokens atomically after successful bootstrap, and bounding version-mismatch repair retries. diff --git a/src/lib/wholesale-feed-sync/sync.ts b/src/lib/wholesale-feed-sync/sync.ts index aceec6015..45ce6e2d6 100644 --- a/src/lib/wholesale-feed-sync/sync.ts +++ b/src/lib/wholesale-feed-sync/sync.ts @@ -18,10 +18,23 @@ type Product = V31Beta.Product; // Signal export in the generated bundle). Extract the element type so // the index map and search helpers stay strongly-typed. type Signal = NonNullable[number]; +type FeedMetadata = { + wholesaleFeedVersion: string | undefined; + pricingVersion: string | undefined; + cacheScope: 'public' | 'account'; +}; +type BootstrapFeedResult = { + cancelled: boolean; + unchanged: boolean; + items: Map; + metadata: FeedMetadata; +}; const DEFAULT_PROBE_INTERVAL_MS = 600_000; const DEFAULT_CAPABILITY_REFRESH_INTERVAL_MS = 86_400_000; const DEFAULT_BOOTSTRAP_PAGE_LIMIT = 100; +const VERSION_MISMATCH_RECOVERY_ATTEMPTS = 3; +const VERSION_MISMATCH_RECOVERY_BACKOFF_MS = 5; /** * In-memory mirror of an AdCP agent's wholesale product and signal feeds. @@ -85,6 +98,7 @@ export class WholesaleFeedSync extends EventEmitter { private probeTimer: ReturnType | null = null; private capabilityTimer: ReturnType | null = null; + private lifecycleEpoch = 0; /** * Read-only view of the in-memory product index. The `mode` reflects the @@ -189,23 +203,25 @@ export class WholesaleFeedSync extends EventEmitter { private async startInner(): Promise { this.stop(); - await this.resolveMode(); - await this.bootstrap(); + const epoch = this.lifecycleEpoch; + if (!(await this.resolveMode(epoch))) return; + if (!(await this.bootstrap({ epoch }))) return; if (this._mode === 'auto-poll') { - this.scheduleProbe(); + this.scheduleProbe(epoch); } if (this.capabilityRefreshIntervalMs > 0) { - this.scheduleCapabilityRefresh(); + this.scheduleCapabilityRefresh(epoch); } } /** Stop all background activity. Preserves in-memory state and version tokens. */ stop(): void { + this.lifecycleEpoch++; if (this.probeTimer) clearTimeout(this.probeTimer); if (this.capabilityTimer) clearTimeout(this.capabilityTimer); this.probeTimer = null; this.capabilityTimer = null; - if (this._state === 'syncing') this.setState('idle'); + if (this._state === 'syncing' || this._state === 'bootstrapping') this.setState('idle'); } /** @@ -233,8 +249,9 @@ export class WholesaleFeedSync extends EventEmitter { * fetched wholesale feed. */ async refresh(): Promise { + const epoch = this.lifecycleEpoch; this.emit('resyncing', { reason: 'manual' }); - await this.bootstrap({ emitDiffs: true }); + await this.bootstrap({ emitDiffs: true, epoch }); } /** @@ -244,6 +261,7 @@ export class WholesaleFeedSync extends EventEmitter { * reads instead of applying a suspect delta. */ async applyWebhook(webhook: V31Beta.WholesaleFeedWebhook): Promise { + const epoch = this.lifecycleEpoch; const event = webhook.event; if (!event || webhook.notification_type !== event.event_type) { throw new Error('WholesaleFeedSync: wholesale feed webhook notification_type does not match event.event_type.'); @@ -274,9 +292,10 @@ export class WholesaleFeedSync extends EventEmitter { const dedupeKey = this.webhookDedupeKey(webhook); const eventDedupeKey = this.webhookEventDedupeKey(webhook); if (await this.hasProcessedWebhook(dedupeKey, eventDedupeKey)) return; + if (!this.isLifecycleCurrent(epoch)) return; if (this.lastWebhookEventId && compareUuidV7(event.event_id, this.lastWebhookEventId) <= 0) { - await this.recoverFromVersionMismatch(); + if (!(await this.recoverFromVersionMismatch(event, epoch))) return; await this.markWebhookProcessed(dedupeKey, eventDedupeKey); return; } @@ -294,7 +313,7 @@ export class WholesaleFeedSync extends EventEmitter { currentVersion && webhook.previous_wholesale_feed_version !== currentVersion ) { - await this.recoverFromVersionMismatch(); + if (!(await this.recoverFromVersionMismatch(event, epoch))) return; await this.markWebhookProcessed(dedupeKey, eventDedupeKey); return; } @@ -302,7 +321,7 @@ export class WholesaleFeedSync extends EventEmitter { if (event.event_type === 'wholesale_feed.bulk_change') { this.emit('wholesale_feed.bulk_change', { event }); try { - await this.recoverFromBulkChange(event); + if (!(await this.recoverFromBulkChange(event, epoch))) return; } catch (err) { await this.markWebhookProcessed(dedupeKey, eventDedupeKey); this.rememberLastWebhookEventId(event.event_id); @@ -348,8 +367,9 @@ export class WholesaleFeedSync extends EventEmitter { // ====== Private: capability resolution ====== - private async resolveMode(): Promise { + private async resolveMode(epoch = this.lifecycleEpoch): Promise { const caps = await this.client.getAdcpCapabilities({}); + if (!this.isLifecycleCurrent(epoch)) return false; // TaskResult union — only `success` carries a typed `data` field. // Other task-arms (`deferred`, `input-required`, `error`) leave us // without enough info to pick a mode confidently. Spec says we MAY @@ -396,13 +416,16 @@ export class WholesaleFeedSync extends EventEmitter { this.signals._mode = mode; this.signals.queryable = wholesaleSignals; this.emit('mode_resolved', { mode, capabilities: resolved }); + return true; } // ====== Private: bootstrap (wholesale enumeration) ====== private async bootstrap( - options: { emitDiffs?: boolean; entities?: 'products' | 'signals' | 'all' } = {} - ): Promise { + options: { emitDiffs?: boolean; entities?: 'products' | 'signals' | 'all'; epoch?: number } = {} + ): Promise { + const epoch = options.epoch ?? this.lifecycleEpoch; + if (!this.isLifecycleCurrent(epoch)) return false; this.setState('bootstrapping'); try { // Build into local maps and atomically swap on success. The previous @@ -414,29 +437,34 @@ export class WholesaleFeedSync extends EventEmitter { // mutated on a successful, fresh fetch. const previousProducts = new Map(this.productIndex); const previousSignals = new Map(this.signalIndex); - const incomingProducts = new Map(); - const incomingSignals = new Map(); - let productsUnchanged = false; - let signalsUnchanged = false; + let productResult: BootstrapFeedResult | undefined; + let signalResult: BootstrapFeedResult | undefined; const entities = options.entities ?? 'all'; const refreshProducts = entities !== 'signals'; const refreshSignals = entities !== 'products' && this.signals.queryable; if (refreshProducts) { - productsUnchanged = await this.bootstrapProducts(incomingProducts); + productResult = await this.bootstrapProducts(epoch); + if (productResult.cancelled) return false; } if (refreshSignals) { - signalsUnchanged = await this.bootstrapSignals(incomingSignals); + signalResult = await this.bootstrapSignals(epoch); + if (signalResult.cancelled) return false; } - if (refreshProducts && productsUnchanged) { - // Seller confirmed our cached version is current. Keep the - // existing index intact; don't swap with the empty incoming. - } else if (refreshProducts) { - this.productIndex = incomingProducts; + if (!this.isLifecycleCurrent(epoch)) return false; + + if (refreshProducts && productResult) { + this.commitProductMetadata(productResult.metadata); + if (!productResult.unchanged) { + this.productIndex = productResult.items; + } } - if (refreshSignals && !signalsUnchanged) { - this.signalIndex = incomingSignals; + if (refreshSignals && signalResult) { + this.commitSignalMetadata(signalResult.metadata); + if (!signalResult.unchanged) { + this.signalIndex = signalResult.items; + } } if (options.emitDiffs) { @@ -450,7 +478,9 @@ export class WholesaleFeedSync extends EventEmitter { signalCount: this.signalIndex.size, mode: this._mode, }); + return true; } catch (err) { + if (!this.isLifecycleCurrent(epoch)) return false; this.setState('error'); const error = err instanceof Error ? err : new Error(String(err)); this.errorHandler?.(error); @@ -460,8 +490,10 @@ export class WholesaleFeedSync extends EventEmitter { } /** Returns `true` when the seller short-circuited with `unchanged: true`. */ - private async bootstrapProducts(into: Map): Promise { + private async bootstrapProducts(epoch: number): Promise> { let cursor: string | undefined; + const into = new Map(); + let metadata = this.currentProductMetadata(); do { const params: Record = { buying_mode: 'wholesale', @@ -471,80 +503,73 @@ export class WholesaleFeedSync extends EventEmitter { // Conditional fetch on the page-0 call when we already have a cached // version. Per the spec, the seller short-circuits with // `unchanged: true` and no payload — caller keeps the previous index. - if (!cursor && this.productWholesaleFeedVersion && this._capabilities.wholesaleFeedVersioning) { - params.if_wholesale_feed_version = this.productWholesaleFeedVersion; - if (this.productPricingVersion) params.if_pricing_version = this.productPricingVersion; + if (!cursor && metadata.wholesaleFeedVersion && this._capabilities.wholesaleFeedVersioning) { + params.if_wholesale_feed_version = metadata.wholesaleFeedVersion; + if (metadata.pricingVersion) params.if_pricing_version = metadata.pricingVersion; } const result = (await this.client.getProducts(params as never)) as { data?: V31Beta.GetProductsResponse; }; + if (!this.isLifecycleCurrent(epoch)) return { cancelled: true, unchanged: false, items: into, metadata }; const body = result.data; - if (!body) return false; + if (!body) return { cancelled: false, unchanged: false, items: into, metadata }; if (body.unchanged) { // Echo any newer pricing_version / cache_scope the seller // returned alongside the unchanged signal, then tell the caller // to keep the existing index. - if (typeof body.wholesale_feed_version === 'string') { - this.productWholesaleFeedVersion = body.wholesale_feed_version; - } - if (typeof body.pricing_version === 'string') this.productPricingVersion = body.pricing_version; - if (body.cache_scope === 'public' || body.cache_scope === 'account') this.productCacheScope = body.cache_scope; - return true; + metadata = mergeFeedMetadata(metadata, body); + return { cancelled: false, unchanged: true, items: into, metadata }; } const products = Array.isArray(body.products) ? body.products : []; for (const product of products) { const id = (product as { product_id?: string }).product_id; if (typeof id === 'string') into.set(id, product as Product); } - if (typeof body.wholesale_feed_version === 'string') - this.productWholesaleFeedVersion = body.wholesale_feed_version; - if (typeof body.pricing_version === 'string') this.productPricingVersion = body.pricing_version; - if (body.cache_scope === 'public' || body.cache_scope === 'account') this.productCacheScope = body.cache_scope; + metadata = mergeFeedMetadata(metadata, body); cursor = body.pagination?.has_more ? body.pagination?.cursor : undefined; } while (cursor); - return false; + return { cancelled: false, unchanged: false, items: into, metadata }; } - private async bootstrapSignals(into: Map): Promise { + private async bootstrapSignals(epoch: number): Promise> { let cursor: string | undefined; + const into = new Map(); + let metadata = this.currentSignalMetadata(); do { const params: Record = { discovery_mode: 'wholesale', pagination: { max_results: DEFAULT_BOOTSTRAP_PAGE_LIMIT, ...(cursor && { cursor }) }, ...(this.account && { account: this.account }), }; - if (!cursor && this.signalWholesaleFeedVersion && this._capabilities.wholesaleFeedVersioning) { - params.if_wholesale_feed_version = this.signalWholesaleFeedVersion; - if (this.signalPricingVersion) params.if_pricing_version = this.signalPricingVersion; + if (!cursor && metadata.wholesaleFeedVersion && this._capabilities.wholesaleFeedVersioning) { + params.if_wholesale_feed_version = metadata.wholesaleFeedVersion; + if (metadata.pricingVersion) params.if_pricing_version = metadata.pricingVersion; } const result = (await this.client.getSignals(params as never)) as { data?: V31Beta.GetSignalsResponse; }; + if (!this.isLifecycleCurrent(epoch)) return { cancelled: true, unchanged: false, items: into, metadata }; const body = result.data; - if (!body) return false; + if (!body) return { cancelled: false, unchanged: false, items: into, metadata }; if (body.unchanged) { - if (typeof body.wholesale_feed_version === 'string') { - this.signalWholesaleFeedVersion = body.wholesale_feed_version; - } - if (typeof body.pricing_version === 'string') this.signalPricingVersion = body.pricing_version; - if (body.cache_scope === 'public' || body.cache_scope === 'account') this.signalCacheScope = body.cache_scope; - return true; + metadata = mergeFeedMetadata(metadata, body); + return { cancelled: false, unchanged: true, items: into, metadata }; } const signals = Array.isArray(body.signals) ? body.signals : []; for (const signal of signals) { const id = (signal as { signal_agent_segment_id?: string }).signal_agent_segment_id; if (typeof id === 'string') into.set(id, signal as Signal); } - if (typeof body.wholesale_feed_version === 'string') - this.signalWholesaleFeedVersion = body.wholesale_feed_version; - if (typeof body.pricing_version === 'string') this.signalPricingVersion = body.pricing_version; - if (body.cache_scope === 'public' || body.cache_scope === 'account') this.signalCacheScope = body.cache_scope; + metadata = mergeFeedMetadata(metadata, body); cursor = body.pagination?.has_more ? body.pagination?.cursor : undefined; } while (cursor); - return false; + return { cancelled: false, unchanged: false, items: into, metadata }; } - private async recoverFromBulkChange(event: V31Beta.WholesaleFeedEvent): Promise { + private async recoverFromBulkChange( + event: V31Beta.WholesaleFeedEvent, + epoch = this.lifecycleEpoch + ): Promise { this.emit('resyncing', { reason: 'bulk_change' }); const affected = this.bulkChangeAffectedEntityType(event); if (affected === 'signal' && !this.signals.queryable) { @@ -553,60 +578,82 @@ export class WholesaleFeedSync extends EventEmitter { ); } const entities = affected === 'product' ? 'products' : 'signals'; - await this.bootstrap({ emitDiffs: true, entities }); + return this.bootstrap({ emitDiffs: true, entities, epoch }); } - private async recoverFromVersionMismatch(): Promise { + private async recoverFromVersionMismatch( + event: V31Beta.WholesaleFeedEvent, + epoch = this.lifecycleEpoch + ): Promise { this.emit('resyncing', { reason: 'version_mismatch' }); - await this.bootstrap({ emitDiffs: true }); + const feed = this.feedLabelForEvent(event); + const beforeVersion = this.currentWholesaleFeedVersionForEvent(event); + for (let attempt = 1; attempt <= VERSION_MISMATCH_RECOVERY_ATTEMPTS; attempt++) { + const recovered = await this.bootstrap({ emitDiffs: true, epoch }); + if (!recovered) return false; + const afterVersion = this.currentWholesaleFeedVersionForEvent(event); + if (afterVersion !== beforeVersion) return true; + if (attempt < VERSION_MISMATCH_RECOVERY_ATTEMPTS) { + await sleep(VERSION_MISMATCH_RECOVERY_BACKOFF_MS * attempt); + if (!this.isLifecycleCurrent(epoch)) return false; + } + } + throw new Error( + `WholesaleFeedSync: version mismatch recovery did not advance ${feed} wholesale_feed_version after ${VERSION_MISMATCH_RECOVERY_ATTEMPTS} attempts.` + ); } // ====== Private: auto-poll mode version probe ====== - private scheduleProbe(): void { - this.probeTimer = setTimeout(() => this.probeLoop(), this.probeIntervalMs); + private scheduleProbe(epoch = this.lifecycleEpoch): void { + if (!this.isLifecycleCurrent(epoch)) return; + this.probeTimer = setTimeout(() => this.probeLoop(epoch), this.probeIntervalMs); } - private async probeLoop(): Promise { + private async probeLoop(epoch: number): Promise { + if (!this.isLifecycleCurrent(epoch)) return; try { - await this.probeVersion(); + await this.probeVersion(epoch); } catch (err) { + if (!this.isLifecycleCurrent(epoch)) return; const error = err instanceof Error ? err : new Error(String(err)); this.emit('error', { error }); this.errorHandler?.(error); } - if (this._state === 'syncing' && this._mode === 'auto-poll') { - this.scheduleProbe(); + if (this.isLifecycleCurrent(epoch) && this._state === 'syncing' && this._mode === 'auto-poll') { + this.scheduleProbe(epoch); } } - private async probeVersion(): Promise { - await this.bootstrap({ emitDiffs: true }); + private async probeVersion(epoch: number): Promise { + await this.bootstrap({ emitDiffs: true, epoch }); } // ====== Private: capability refresh ====== - private scheduleCapabilityRefresh(): void { - this.capabilityTimer = setTimeout(() => this.capabilityRefreshLoop(), this.capabilityRefreshIntervalMs); + private scheduleCapabilityRefresh(epoch = this.lifecycleEpoch): void { + if (!this.isLifecycleCurrent(epoch)) return; + this.capabilityTimer = setTimeout(() => this.capabilityRefreshLoop(epoch), this.capabilityRefreshIntervalMs); } - private async capabilityRefreshLoop(): Promise { + private async capabilityRefreshLoop(epoch: number): Promise { + if (!this.isLifecycleCurrent(epoch)) return; try { const previousMode = this._mode; - await this.resolveMode(); + if (!(await this.resolveMode(epoch))) return; if (this._mode !== previousMode) { // Capability upgrade or downgrade — re-establish background sync. - this.stop(); - await this.start(); + await this.startInner(); return; } } catch (err) { + if (!this.isLifecycleCurrent(epoch)) return; const error = err instanceof Error ? err : new Error(String(err)); this.emit('error', { error }); this.errorHandler?.(error); } - if (this._state === 'syncing' && this.capabilityRefreshIntervalMs > 0) { - this.scheduleCapabilityRefresh(); + if (this.isLifecycleCurrent(epoch) && this._state === 'syncing' && this.capabilityRefreshIntervalMs > 0) { + this.scheduleCapabilityRefresh(epoch); } } @@ -627,6 +674,40 @@ export class WholesaleFeedSync extends EventEmitter { ); } + private feedLabelForEvent(event: V31Beta.WholesaleFeedEvent): 'product' | 'signal' { + if (event.event_type.startsWith('product.')) return 'product'; + if (event.event_type.startsWith('signal.')) return 'signal'; + return this.bulkChangeAffectedEntityType(event); + } + + private currentProductMetadata(): FeedMetadata { + return { + wholesaleFeedVersion: this.productWholesaleFeedVersion, + pricingVersion: this.productPricingVersion, + cacheScope: this.productCacheScope, + }; + } + + private currentSignalMetadata(): FeedMetadata { + return { + wholesaleFeedVersion: this.signalWholesaleFeedVersion, + pricingVersion: this.signalPricingVersion, + cacheScope: this.signalCacheScope, + }; + } + + private commitProductMetadata(metadata: FeedMetadata): void { + this.productWholesaleFeedVersion = metadata.wholesaleFeedVersion; + this.productPricingVersion = metadata.pricingVersion; + this.productCacheScope = metadata.cacheScope; + } + + private commitSignalMetadata(metadata: FeedMetadata): void { + this.signalWholesaleFeedVersion = metadata.wholesaleFeedVersion; + this.signalPricingVersion = metadata.pricingVersion; + this.signalCacheScope = metadata.cacheScope; + } + private rememberWebhookVersion(webhook: V31Beta.WholesaleFeedWebhook): void { const event = webhook.event; if (event.event_type.startsWith('product.')) { @@ -987,6 +1068,10 @@ export class WholesaleFeedSync extends EventEmitter { this._state = next; this.emit('stateChange', { from, to: next }); } + + private isLifecycleCurrent(epoch: number): boolean { + return epoch === this.lifecycleEpoch; + } } // ====== Diff helpers ====== @@ -1013,3 +1098,23 @@ function compareUuidV7(a: string, b: string): number { const normalizedB = b.toLowerCase(); return normalizedA === normalizedB ? 0 : normalizedA > normalizedB ? 1 : -1; } + +function mergeFeedMetadata( + current: FeedMetadata, + body: { + wholesale_feed_version?: unknown; + pricing_version?: unknown; + cache_scope?: unknown; + } +): FeedMetadata { + return { + wholesaleFeedVersion: + typeof body.wholesale_feed_version === 'string' ? body.wholesale_feed_version : current.wholesaleFeedVersion, + pricingVersion: typeof body.pricing_version === 'string' ? body.pricing_version : current.pricingVersion, + cacheScope: body.cache_scope === 'public' || body.cache_scope === 'account' ? body.cache_scope : current.cacheScope, + }; +} + +async function sleep(ms: number): Promise { + await new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/test/lib/wholesale-feed-sync.test.js b/test/lib/wholesale-feed-sync.test.js index 6076eb8fc..e18402837 100644 --- a/test/lib/wholesale-feed-sync.test.js +++ b/test/lib/wholesale-feed-sync.test.js @@ -107,11 +107,28 @@ function makeEvent(event_type, entity_type, entity_id, payload) { }; } +function deferred() { + let resolve; + const promise = new Promise(r => { + resolve = r; + }); + return { promise, resolve }; +} + +async function waitFor(predicate, message) { + for (let i = 0; i < 100; i++) { + if (predicate()) return; + await new Promise(resolve => setTimeout(resolve, 5)); + } + assert.fail(message); +} + function makeStubClient(opts = {}) { const calls = { capabilities: 0, getProducts: [], getSignals: [] }; const client = { async getAdcpCapabilities() { calls.capabilities++; + if (typeof opts.capabilities === 'function') return makeCapabilitiesResult(opts.capabilities(calls.capabilities)); return makeCapabilitiesResult(opts.capabilities ?? {}); }, async getProducts(params) { @@ -202,6 +219,116 @@ describe('WholesaleFeedSync beta 3 wholesale feed flow', () => { sync.stop(); }); + test('stop cancels an in-flight bootstrap before it commits mirror state', async () => { + const gate = deferred(); + const { client, calls } = makeStubClient({ + capabilities: { + wholesale_feed_versioning: { supported: true }, + }, + getProducts: async () => { + await gate.promise; + return makeProductsResult([makeProduct('p_after_stop')], { wholesale_feed_version: 'products-v1' }); + }, + }); + const sync = new WholesaleFeedSync({ client }); + + const startPromise = sync.start(); + await waitFor(() => calls.getProducts.length === 1, 'expected product bootstrap to begin'); + sync.stop(); + gate.resolve(); + await startPromise; + + assert.strictEqual(sync.state, 'idle'); + assert.strictEqual(sync.products.count, 0); + sync.stop(); + }); + + test('stop during a mixed bootstrap preserves the previous indexes and version tokens', async () => { + let phase = 'initial'; + const signalGate = deferred(); + const { client, calls } = makeStubClient({ + capabilities: { + wholesale_feed_versioning: { supported: true }, + signals: { discovery_modes: ['wholesale'] }, + }, + getProducts: params => { + if (phase === 'initial') { + return makeProductsResult([makeProduct('p1')], { wholesale_feed_version: 'products-v1' }); + } + if (phase === 'blocked') { + assert.strictEqual(params.if_wholesale_feed_version, 'products-v1'); + return makeProductsResult([makeProduct('p2')], { wholesale_feed_version: 'products-v2' }); + } + assert.strictEqual(params.if_wholesale_feed_version, 'products-v1'); + return makeUnchangedResult({ wholesale_feed_version: 'products-v1' }); + }, + getSignals: async params => { + if (phase === 'initial') { + return makeSignalsResult([makeSignal('s1')], { wholesale_feed_version: 'signals-v1' }); + } + if (phase === 'blocked') { + assert.strictEqual(params.if_wholesale_feed_version, 'signals-v1'); + await signalGate.promise; + return makeSignalsResult([makeSignal('s2')], { wholesale_feed_version: 'signals-v2' }); + } + assert.strictEqual(params.if_wholesale_feed_version, 'signals-v1'); + return makeUnchangedResult({ wholesale_feed_version: 'signals-v1' }); + }, + }); + const sync = new WholesaleFeedSync({ client }); + + await sync.start(); + phase = 'blocked'; + const refreshPromise = sync.refresh(); + await waitFor(() => calls.getSignals.length === 2, 'expected signal bootstrap to block during refresh'); + sync.stop(); + signalGate.resolve(); + await refreshPromise; + + assert.strictEqual(sync.state, 'idle'); + assert.strictEqual(sync.products.get('p1').name, 'Product p1'); + assert.strictEqual(sync.products.get('p2'), undefined); + assert.strictEqual(sync.signals.get('s1').name, 'Signal s1'); + assert.strictEqual(sync.signals.get('s2'), undefined); + + phase = 'verify'; + await sync.refresh(); + assert.strictEqual(calls.getProducts.at(-1).if_wholesale_feed_version, 'products-v1'); + assert.strictEqual(calls.getSignals.at(-1).if_wholesale_feed_version, 'signals-v1'); + sync.stop(); + }); + + test('capability refresh restarts through the internal lifecycle on mode changes', async () => { + const { client, calls } = makeStubClient({ + capabilities: callNumber => + callNumber === 1 + ? {} + : { + wholesale_feed_versioning: { supported: true }, + }, + getProducts: () => + makeProductsResult([makeProduct(`p${calls.getProducts.length}`)], { + wholesale_feed_version: `products-v${calls.getProducts.length}`, + }), + }); + const sync = new WholesaleFeedSync({ + client, + capabilityRefreshIntervalMs: 5, + probeIntervalMs: 60_000, + }); + + await sync.start(); + assert.strictEqual(sync.mode, 'manual'); + await waitFor( + () => sync.mode === 'auto-poll' && calls.getProducts.length >= 2, + 'expected capability refresh to restart in auto-poll mode' + ); + + assert.strictEqual(sync.mode, 'auto-poll'); + assert.ok(sync.products.count >= 1); + sync.stop(); + }); + test('applyWebhook applies product and signal deltas and emits typed events', async () => { const { client } = makeStubClient({ capabilities: { @@ -457,6 +584,43 @@ describe('WholesaleFeedSync beta 3 wholesale feed flow', () => { sync.stop(); }); + test('version mismatch recovery fails after bounded retries when the feed version does not advance', async () => { + const { client, calls } = makeStubClient({ + capabilities: { + wholesale_feed_versioning: { supported: true }, + wholesale_feed_webhooks: { supported: true, event_types: ['product.updated'] }, + }, + getProducts: (_params, callNumber) => { + if (callNumber === 1) return makeProductsResult([makeProduct('p1')], { wholesale_feed_version: 'v5' }); + return makeUnchangedResult({ wholesale_feed_version: 'v5' }); + }, + }); + const sync = new WholesaleFeedSync({ client, account }); + const reasons = []; + sync.on('resyncing', ({ reason }) => reasons.push(reason)); + + await sync.start(); + await assert.rejects( + () => + sync.applyWebhook( + makeWebhook( + makeEvent('product.updated', 'product', 'p1', { + product_id: 'p1', + product: makeProduct('p1', { name: 'Stale Webhook Product' }), + applies_to: { scope: 'public' }, + }), + { version: 'v6', previous: 'v4' } + ) + ), + /version mismatch recovery did not advance product wholesale_feed_version after 3 attempts/ + ); + + assert.deepStrictEqual(reasons, ['version_mismatch']); + assert.strictEqual(calls.getProducts.length, 4); + assert.strictEqual(sync.products.get('p1').name, 'Product p1'); + sync.stop(); + }); + test('rejects malformed webhook envelopes before mutating the mirror', async () => { const { client } = makeStubClient({ getProducts: () => makeProductsResult([makeProduct('p1')], { wholesale_feed_version: 'v1' }), From 9bf6253ea4a83d459c035e5feff3c5f75f7e15b9 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 30 May 2026 13:31:01 -0400 Subject: [PATCH 3/4] ci: stabilize workflow linting --- scripts/lint-workflows.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/lint-workflows.js b/scripts/lint-workflows.js index 5bc718bb7..d086259a3 100644 --- a/scripts/lint-workflows.js +++ b/scripts/lint-workflows.js @@ -16,12 +16,15 @@ async function main() { throw new Error(`No workflow files found in ${path.relative(process.cwd(), workflowDir)}`); } - const lint = await createLinter(); let issueCount = 0; for (const file of workflowFiles) { const relativePath = path.relative(process.cwd(), file); const input = await readFile(file, 'utf8'); + // actionlint's JS/WASM linter can throw `RuntimeError: unreachable` when + // the same instance is reused across multiple workflow files. A fresh + // instance per file keeps diagnostics stable while preserving full linting. + const lint = await createLinter(); const results = lint(input, relativePath); for (const result of results) { From 5d19f0f379977f805f42348bbed4701ad83d5654 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 30 May 2026 13:32:38 -0400 Subject: [PATCH 4/4] ci: add empty changeset for Argus workflow --- .changeset/argus-contributor-ci.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .changeset/argus-contributor-ci.md diff --git a/.changeset/argus-contributor-ci.md b/.changeset/argus-contributor-ci.md new file mode 100644 index 000000000..48ba9b460 --- /dev/null +++ b/.changeset/argus-contributor-ci.md @@ -0,0 +1,4 @@ +--- +--- + +Empty changeset - CI workflow and workflow-lint maintenance only, no package release needed.