diff --git a/.changeset/fix-tenant-server-per-request-factory.md b/.changeset/fix-tenant-server-per-request-factory.md new file mode 100644 index 0000000000..f7169914c0 --- /dev/null +++ b/.changeset/fix-tenant-server-per-request-factory.md @@ -0,0 +1,4 @@ +--- +--- + +Fix "Already connected to a transport" under concurrent POST requests to the training-agent tenant router by creating a fresh MCP Server per request instead of reusing the registry singleton. Adds `createServer(tenantId)` factory to `RegistryHolder` and a regression test for concurrent back-to-back requests. diff --git a/server/src/training-agent/tenants/registry.ts b/server/src/training-agent/tenants/registry.ts index ce1377570b..43b1b07c75 100644 --- a/server/src/training-agent/tenants/registry.ts +++ b/server/src/training-agent/tenants/registry.ts @@ -26,6 +26,7 @@ import type { Request } from 'express'; import { createTenantRegistry, + createAdcpServerFromPlatform, createPostgresTaskRegistry, createInMemoryTaskRegistry, InMemoryStateStore, @@ -229,81 +230,123 @@ export interface RegistryHolder { * the same promise. */ get(): Promise; + + /** + * Create a fresh MCP server for a single HTTP request. The MCP SDK + * requires one Server instance per connection — calling connect() on + * a shared server while it is already connected throws + * "Already connected to a transport." This factory creates a new + * Protocol instance each time so concurrent requests to the same + * tenant each get an independent server, matching the SDK's stateless + * mode expectation (sessionIdGenerator: undefined). + * + * Returns null if the tenant is not registered (shouldn't happen when + * called after a successful resolveByRequest, but defended for safety). + */ + createServer(tenantId: string): ReturnType | null; } export function createRegistryHolder(): RegistryHolder { let registry: TenantRegistry | null = null; let pendingInit: Promise | null = null; + // Per-tenant server factories. Populated during init and used by + // createServer() to produce a fresh MCP Server per HTTP request. + const serverFactories = new Map ReturnType>(); + + async function ensureInit(): Promise { + if (registry) return registry; + if (pendingInit) return pendingInit; + const promise = (async () => { + const t0 = Date.now(); + logger.info('Tenant registry init starting'); + const hostBase = buildHostBaseUrl(); + const reg = createTenantRegistry({ + defaultServerOptions: buildDefaultServerOptions(), + jwksValidator: noopJwksValidator, + autoValidate: true, + }); + const tCreate = Date.now(); + const configs = [ + { id: 'signals', cfg: buildSignalsTenantConfig(hostBase) }, + { id: 'sales', cfg: buildSalesTenantConfig(hostBase) }, + { id: 'governance', cfg: buildGovernanceTenantConfig(hostBase) }, + { id: 'creative', cfg: buildCreativeTenantConfig(hostBase) }, + { id: 'creative-builder', cfg: buildCreativeBuilderTenantConfig(hostBase) }, + { id: 'brand', cfg: buildBrandTenantConfig(hostBase) }, + ] as const; + const tConfigs = Date.now(); + + // Build per-tenant server factories before registration. Each factory + // merges the shared default options with the tenant-specific platform + // and serverOptions so createServer() can spin up a fresh Protocol + // instance per HTTP request without re-registering. + const defaultOpts = buildDefaultServerOptions(); + for (const { cfg } of configs) { + const opts: CreateAdcpServerFromPlatformOptions = { + ...defaultOpts, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + platform: cfg.config.platform as any, + ...(cfg.config.serverOptions ?? {}), + }; + // Key by cfg.tenantId (the value passed to reg.register) so the factory + // map and the registry use the same source of truth. + serverFactories.set(cfg.tenantId, () => createAdcpServerFromPlatform(opts)); + } + + // awaitFirstValidation:true blocks until the no-op validator + // promotes the tenant to 'healthy'. Without it the first request + // would race the background validation and see 'pending' (refused + // traffic) for the first ~10ms. + await Promise.all( + configs.map(async ({ id, cfg }) => { + const start = Date.now(); + try { + await reg.register(cfg.tenantId, cfg.config, { awaitFirstValidation: true }); + logger.info({ tenantId: id, elapsedMs: Date.now() - start }, 'Tenant registered'); + } catch (err) { + logger.error( + { + err, + errMessage: err instanceof Error ? err.message : String(err), + errStack: err instanceof Error ? err.stack : undefined, + tenantId: id, + elapsedMs: Date.now() - start, + }, + 'Tenant register failed', + ); + throw err; + } + }), + ); + logger.info( + { + hostBase, + createMs: tCreate - t0, + configBuildMs: tConfigs - tCreate, + registerMs: Date.now() - tConfigs, + totalMs: Date.now() - t0, + tenants: configs.map(c => c.id), + }, + 'Tenant registry initialized', + ); + registry = reg; + return reg; + })(); + // Reset pendingInit on rejection so a transient init failure (e.g., + // DNS hiccup during the no-op validator's first probe) doesn't + // poison every subsequent request with the same rejected promise + // until machine restart. + promise.catch(() => { pendingInit = null; }); + pendingInit = promise; + return promise; + } return { - async get(): Promise { - if (registry) return registry; - if (pendingInit) return pendingInit; - const promise = (async () => { - const t0 = Date.now(); - logger.info('Tenant registry init starting'); - const hostBase = buildHostBaseUrl(); - const reg = createTenantRegistry({ - defaultServerOptions: buildDefaultServerOptions(), - jwksValidator: noopJwksValidator, - autoValidate: true, - }); - const tCreate = Date.now(); - const configs = [ - { id: 'signals', cfg: buildSignalsTenantConfig(hostBase) }, - { id: 'sales', cfg: buildSalesTenantConfig(hostBase) }, - { id: 'governance', cfg: buildGovernanceTenantConfig(hostBase) }, - { id: 'creative', cfg: buildCreativeTenantConfig(hostBase) }, - { id: 'creative-builder', cfg: buildCreativeBuilderTenantConfig(hostBase) }, - { id: 'brand', cfg: buildBrandTenantConfig(hostBase) }, - ] as const; - const tConfigs = Date.now(); - // awaitFirstValidation:true blocks until the no-op validator - // promotes the tenant to 'healthy'. Without it the first request - // would race the background validation and see 'pending' (refused - // traffic) for the first ~10ms. - await Promise.all( - configs.map(async ({ id, cfg }) => { - const start = Date.now(); - try { - await reg.register(cfg.tenantId, cfg.config, { awaitFirstValidation: true }); - logger.info({ tenantId: id, elapsedMs: Date.now() - start }, 'Tenant registered'); - } catch (err) { - logger.error( - { - err, - errMessage: err instanceof Error ? err.message : String(err), - errStack: err instanceof Error ? err.stack : undefined, - tenantId: id, - elapsedMs: Date.now() - start, - }, - 'Tenant register failed', - ); - throw err; - } - }), - ); - logger.info( - { - hostBase, - createMs: tCreate - t0, - configBuildMs: tConfigs - tCreate, - registerMs: Date.now() - tConfigs, - totalMs: Date.now() - t0, - tenants: configs.map(c => c.id), - }, - 'Tenant registry initialized', - ); - registry = reg; - return reg; - })(); - // Reset pendingInit on rejection so a transient init failure (e.g., - // DNS hiccup during the no-op validator's first probe) doesn't - // poison every subsequent request with the same rejected promise - // until machine restart. - promise.catch(() => { pendingInit = null; }); - pendingInit = promise; - return promise; + get: ensureInit, + + createServer(tenantId: string): ReturnType | null { + const factory = serverFactories.get(tenantId); + return factory ? factory() : null; }, }; } diff --git a/server/src/training-agent/tenants/router.ts b/server/src/training-agent/tenants/router.ts index 2449e03e8e..8e2eb77466 100644 --- a/server/src/training-agent/tenants/router.ts +++ b/server/src/training-agent/tenants/router.ts @@ -131,13 +131,29 @@ function tenantMcpHandler(holder: RegistryHolder, tenantId: string) { } } + // Create a fresh server per request. The MCP SDK requires a separate + // Protocol instance per connection — sharing a singleton server across + // concurrent POSTs triggers "Already connected to a transport." + // sessionIdGenerator:undefined (stateless mode) means each POST is + // self-contained; both server and transport are discarded after the + // response, matching standalone.ts and the SDK's own stateless examples. + const requestServer = holder.createServer(resolved.tenantId); + if (!requestServer) { + res.status(503).json({ + jsonrpc: '2.0', + id: null, + error: { code: -32000, message: 'Tenant server factory unavailable; retry shortly' }, + }); + return; + } + const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: true, }); try { - await resolved.server.connect(transport); + await requestServer.connect(transport); logger.debug({ tenantId: resolved.tenantId, method: req.body?.method }, 'tenant MCP request'); await runWithSessionContext(async () => { await transport.handleRequest(req, res, req.body); @@ -153,9 +169,7 @@ function tenantMcpHandler(holder: RegistryHolder, tenantId: string) { }); } } finally { - // Close server connection after handling — tenant servers are - // per-request transient, matching the v5 pattern. - await resolved.server.close().catch(() => {}); + await requestServer.close().catch(() => {}); } }; } diff --git a/server/src/training-agent/tenants/tenant-smoke.test.ts b/server/src/training-agent/tenants/tenant-smoke.test.ts index c331dd84e9..60c501008a 100644 --- a/server/src/training-agent/tenants/tenant-smoke.test.ts +++ b/server/src/training-agent/tenants/tenant-smoke.test.ts @@ -66,6 +66,45 @@ describe('tenant routing smoke', () => { } }, 15000); + it('handles concurrent back-to-back POSTs to the same tenant without 500s', async () => { + const { baseUrl, close } = await bootServer(); + try { + const url = `${baseUrl}/signals/mcp`; + // Warm up: get through registry init + await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + authorization: 'Bearer test-token', + }, + body: JSON.stringify({ + jsonrpc: '2.0', id: 0, method: 'initialize', + params: { protocolVersion: '2025-03-26', clientInfo: { name: 'x', version: '1' }, capabilities: {} }, + }), + }); + // Fire 5 concurrent tools/list calls to the same tenant endpoint. + // Before the fix, at least one of these would hit "Already connected + // to a transport" and return -32603. + const results = await Promise.all( + Array.from({ length: 5 }, (_, i) => + fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + authorization: 'Bearer test-token', + }, + body: JSON.stringify({ jsonrpc: '2.0', id: i + 1, method: 'tools/list', params: {} }), + }).then(r => r.status), + ), + ); + expect(results).toEqual([200, 200, 200, 200, 200]); + } finally { + await close(); + } + }, 20000); + it('dispatches /signals/mcp tools/list and returns only signals-tenant tools', async () => { const { baseUrl, close } = await bootServer(); try {