Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/fix-tenant-server-per-request-factory.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
---

Fix "Already connected to a transport" under concurrent POST requests to the training-agent tenant router by creating a fresh MCP Server per request instead of reusing the registry singleton. Adds `createServer(tenantId)` factory to `RegistryHolder` and a regression test for concurrent back-to-back requests.
179 changes: 111 additions & 68 deletions server/src/training-agent/tenants/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import type { Request } from 'express';
import {
createTenantRegistry,
createAdcpServerFromPlatform,
createPostgresTaskRegistry,
createInMemoryTaskRegistry,
InMemoryStateStore,
Expand Down Expand Up @@ -229,81 +230,123 @@
* the same promise.
*/
get(): Promise<TenantRegistry>;

/**
* Create a fresh MCP server for a single HTTP request. The MCP SDK
* requires one Server instance per connection — calling connect() on
* a shared server while it is already connected throws
* "Already connected to a transport." This factory creates a new
* Protocol instance each time so concurrent requests to the same
* tenant each get an independent server, matching the SDK's stateless
* mode expectation (sessionIdGenerator: undefined).
*
* Returns null if the tenant is not registered (shouldn't happen when
* called after a successful resolveByRequest, but defended for safety).
*/
createServer(tenantId: string): ReturnType<typeof createAdcpServerFromPlatform> | null;
}

export function createRegistryHolder(): RegistryHolder {
let registry: TenantRegistry | null = null;
let pendingInit: Promise<TenantRegistry> | null = null;
// Per-tenant server factories. Populated during init and used by
// createServer() to produce a fresh MCP Server per HTTP request.
const serverFactories = new Map<string, () => ReturnType<typeof createAdcpServerFromPlatform>>();

async function ensureInit(): Promise<TenantRegistry> {
if (registry) return registry;
if (pendingInit) return pendingInit;
const promise = (async () => {
const t0 = Date.now();
logger.info('Tenant registry init starting');
const hostBase = buildHostBaseUrl();
const reg = createTenantRegistry({
defaultServerOptions: buildDefaultServerOptions(),
jwksValidator: noopJwksValidator,
autoValidate: true,
});
const tCreate = Date.now();
const configs = [
{ id: 'signals', cfg: buildSignalsTenantConfig(hostBase) },
{ id: 'sales', cfg: buildSalesTenantConfig(hostBase) },
{ id: 'governance', cfg: buildGovernanceTenantConfig(hostBase) },
{ id: 'creative', cfg: buildCreativeTenantConfig(hostBase) },
{ id: 'creative-builder', cfg: buildCreativeBuilderTenantConfig(hostBase) },
{ id: 'brand', cfg: buildBrandTenantConfig(hostBase) },
] as const;
const tConfigs = Date.now();

// Build per-tenant server factories before registration. Each factory
// merges the shared default options with the tenant-specific platform
// and serverOptions so createServer() can spin up a fresh Protocol
// instance per HTTP request without re-registering.
const defaultOpts = buildDefaultServerOptions();
for (const { cfg } of configs) {
const opts: CreateAdcpServerFromPlatformOptions = {
...defaultOpts,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
platform: cfg.config.platform as any,

Check failure on line 288 in server/src/training-agent/tenants/registry.ts

View workflow job for this annotation

GitHub Actions / TypeScript Build

Object literal may only specify known properties, and 'platform' does not exist in type 'CreateAdcpServerFromPlatformOptions'.
...(cfg.config.serverOptions ?? {}),
};
// Key by cfg.tenantId (the value passed to reg.register) so the factory
// map and the registry use the same source of truth.
serverFactories.set(cfg.tenantId, () => createAdcpServerFromPlatform(opts));

Check failure on line 293 in server/src/training-agent/tenants/registry.ts

View workflow job for this annotation

GitHub Actions / TypeScript Build

Expected 2 arguments, but got 1.
}

// awaitFirstValidation:true blocks until the no-op validator
// promotes the tenant to 'healthy'. Without it the first request
// would race the background validation and see 'pending' (refused
// traffic) for the first ~10ms.
await Promise.all(
configs.map(async ({ id, cfg }) => {
const start = Date.now();
try {
await reg.register(cfg.tenantId, cfg.config, { awaitFirstValidation: true });
logger.info({ tenantId: id, elapsedMs: Date.now() - start }, 'Tenant registered');
} catch (err) {
logger.error(
{
err,
errMessage: err instanceof Error ? err.message : String(err),
errStack: err instanceof Error ? err.stack : undefined,
tenantId: id,
elapsedMs: Date.now() - start,
},
'Tenant register failed',
);
throw err;
}
}),
);
logger.info(
{
hostBase,
createMs: tCreate - t0,
configBuildMs: tConfigs - tCreate,
registerMs: Date.now() - tConfigs,
totalMs: Date.now() - t0,
tenants: configs.map(c => c.id),
},
'Tenant registry initialized',
);
registry = reg;
return reg;
})();
// Reset pendingInit on rejection so a transient init failure (e.g.,
// DNS hiccup during the no-op validator's first probe) doesn't
// poison every subsequent request with the same rejected promise
// until machine restart.
promise.catch(() => { pendingInit = null; });
pendingInit = promise;
return promise;
}

return {
async get(): Promise<TenantRegistry> {
if (registry) return registry;
if (pendingInit) return pendingInit;
const promise = (async () => {
const t0 = Date.now();
logger.info('Tenant registry init starting');
const hostBase = buildHostBaseUrl();
const reg = createTenantRegistry({
defaultServerOptions: buildDefaultServerOptions(),
jwksValidator: noopJwksValidator,
autoValidate: true,
});
const tCreate = Date.now();
const configs = [
{ id: 'signals', cfg: buildSignalsTenantConfig(hostBase) },
{ id: 'sales', cfg: buildSalesTenantConfig(hostBase) },
{ id: 'governance', cfg: buildGovernanceTenantConfig(hostBase) },
{ id: 'creative', cfg: buildCreativeTenantConfig(hostBase) },
{ id: 'creative-builder', cfg: buildCreativeBuilderTenantConfig(hostBase) },
{ id: 'brand', cfg: buildBrandTenantConfig(hostBase) },
] as const;
const tConfigs = Date.now();
// awaitFirstValidation:true blocks until the no-op validator
// promotes the tenant to 'healthy'. Without it the first request
// would race the background validation and see 'pending' (refused
// traffic) for the first ~10ms.
await Promise.all(
configs.map(async ({ id, cfg }) => {
const start = Date.now();
try {
await reg.register(cfg.tenantId, cfg.config, { awaitFirstValidation: true });
logger.info({ tenantId: id, elapsedMs: Date.now() - start }, 'Tenant registered');
} catch (err) {
logger.error(
{
err,
errMessage: err instanceof Error ? err.message : String(err),
errStack: err instanceof Error ? err.stack : undefined,
tenantId: id,
elapsedMs: Date.now() - start,
},
'Tenant register failed',
);
throw err;
}
}),
);
logger.info(
{
hostBase,
createMs: tCreate - t0,
configBuildMs: tConfigs - tCreate,
registerMs: Date.now() - tConfigs,
totalMs: Date.now() - t0,
tenants: configs.map(c => c.id),
},
'Tenant registry initialized',
);
registry = reg;
return reg;
})();
// Reset pendingInit on rejection so a transient init failure (e.g.,
// DNS hiccup during the no-op validator's first probe) doesn't
// poison every subsequent request with the same rejected promise
// until machine restart.
promise.catch(() => { pendingInit = null; });
pendingInit = promise;
return promise;
get: ensureInit,

createServer(tenantId: string): ReturnType<typeof createAdcpServerFromPlatform> | null {
const factory = serverFactories.get(tenantId);
return factory ? factory() : null;
},
};
}
22 changes: 18 additions & 4 deletions server/src/training-agent/tenants/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,29 @@ function tenantMcpHandler(holder: RegistryHolder, tenantId: string) {
}
}

// Create a fresh server per request. The MCP SDK requires a separate
// Protocol instance per connection — sharing a singleton server across
// concurrent POSTs triggers "Already connected to a transport."
// sessionIdGenerator:undefined (stateless mode) means each POST is
// self-contained; both server and transport are discarded after the
// response, matching standalone.ts and the SDK's own stateless examples.
const requestServer = holder.createServer(resolved.tenantId);
if (!requestServer) {
res.status(503).json({
jsonrpc: '2.0',
id: null,
error: { code: -32000, message: 'Tenant server factory unavailable; retry shortly' },
});
return;
}

const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: true,
});

try {
await resolved.server.connect(transport);
await requestServer.connect(transport);
logger.debug({ tenantId: resolved.tenantId, method: req.body?.method }, 'tenant MCP request');
await runWithSessionContext(async () => {
await transport.handleRequest(req, res, req.body);
Expand All @@ -153,9 +169,7 @@ function tenantMcpHandler(holder: RegistryHolder, tenantId: string) {
});
}
} finally {
// Close server connection after handling — tenant servers are
// per-request transient, matching the v5 pattern.
await resolved.server.close().catch(() => {});
await requestServer.close().catch(() => {});
}
};
}
Expand Down
39 changes: 39 additions & 0 deletions server/src/training-agent/tenants/tenant-smoke.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,45 @@ describe('tenant routing smoke', () => {
}
}, 15000);

it('handles concurrent back-to-back POSTs to the same tenant without 500s', async () => {
const { baseUrl, close } = await bootServer();
try {
const url = `${baseUrl}/signals/mcp`;
// Warm up: get through registry init
await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
accept: 'application/json, text/event-stream',
authorization: 'Bearer test-token',
},
body: JSON.stringify({
jsonrpc: '2.0', id: 0, method: 'initialize',
params: { protocolVersion: '2025-03-26', clientInfo: { name: 'x', version: '1' }, capabilities: {} },
}),
});
// Fire 5 concurrent tools/list calls to the same tenant endpoint.
// Before the fix, at least one of these would hit "Already connected
// to a transport" and return -32603.
const results = await Promise.all(
Array.from({ length: 5 }, (_, i) =>
fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
accept: 'application/json, text/event-stream',
authorization: 'Bearer test-token',
},
body: JSON.stringify({ jsonrpc: '2.0', id: i + 1, method: 'tools/list', params: {} }),
}).then(r => r.status),
),
);
expect(results).toEqual([200, 200, 200, 200, 200]);
} finally {
await close();
}
}, 20000);

it('dispatches /signals/mcp tools/list and returns only signals-tenant tools', async () => {
const { baseUrl, close } = await bootServer();
try {
Expand Down
Loading