diff --git a/src/common/utils/ai/providerOptions.test.ts b/src/common/utils/ai/providerOptions.test.ts index cab76f1a73..e692e36c0f 100644 --- a/src/common/utils/ai/providerOptions.test.ts +++ b/src/common/utils/ai/providerOptions.test.ts @@ -5,7 +5,11 @@ import type { OpenAIResponsesProviderOptions } from "@ai-sdk/openai"; import { createMuxMessage } from "@/common/types/message"; import { describe, test, expect, mock } from "bun:test"; -import { buildProviderOptions } from "./providerOptions"; +import { + buildProviderOptions, + buildRequestHeaders, + ANTHROPIC_1M_CONTEXT_HEADER, +} from "./providerOptions"; // Mock the log module to avoid console noise void mock.module("@/node/services/log", () => ({ @@ -228,3 +232,53 @@ describe("buildProviderOptions - OpenAI", () => { }); }); }); + +describe("buildRequestHeaders", () => { + test("should return anthropic-beta header for Opus 4.6 with use1MContext", () => { + const result = buildRequestHeaders("anthropic:claude-opus-4-6", { + anthropic: { use1MContext: true }, + }); + expect(result).toEqual({ "anthropic-beta": ANTHROPIC_1M_CONTEXT_HEADER }); + }); + + test("should return anthropic-beta header for gateway-routed Anthropic model", () => { + const result = buildRequestHeaders("mux-gateway:anthropic/claude-opus-4-6", { + anthropic: { use1MContext: true }, + }); + expect(result).toEqual({ "anthropic-beta": ANTHROPIC_1M_CONTEXT_HEADER }); + }); + + test("should return undefined for non-Anthropic model", () => { + const result = buildRequestHeaders("openai:gpt-5.2", { + anthropic: { use1MContext: true }, + }); + expect(result).toBeUndefined(); + }); + + test("should return undefined when use1MContext is false", () => { + const result = buildRequestHeaders("anthropic:claude-opus-4-6", { + anthropic: { use1MContext: false }, + }); + expect(result).toBeUndefined(); + }); + + test("should return undefined when no muxProviderOptions provided", () => { + const result = buildRequestHeaders("anthropic:claude-opus-4-6"); + expect(result).toBeUndefined(); + }); + + test("should return undefined for unsupported model even with use1MContext", () => { + // claude-opus-4-1 doesn't support 1M context + const result = buildRequestHeaders("anthropic:claude-opus-4-1", { + anthropic: { use1MContext: true }, + }); + expect(result).toBeUndefined(); + }); + + test("should return header when model is in use1MContextModels list", () => { + const result = buildRequestHeaders("anthropic:claude-opus-4-6", { + anthropic: { use1MContextModels: ["anthropic:claude-opus-4-6"] }, + }); + expect(result).toEqual({ "anthropic-beta": ANTHROPIC_1M_CONTEXT_HEADER }); + }); +}); diff --git a/src/common/utils/ai/providerOptions.ts b/src/common/utils/ai/providerOptions.ts index 3425673b86..805e2628e1 100644 --- a/src/common/utils/ai/providerOptions.ts +++ b/src/common/utils/ai/providerOptions.ts @@ -1,7 +1,9 @@ /** - * Provider options builder for AI SDK + * Provider-specific request configuration for AI SDK * - * Converts unified thinking levels to provider-specific options + * Builds both `providerOptions` (thinking, reasoning) and per-request HTTP + * `headers` (e.g. Anthropic 1M context beta) for streamText(). Both builders + * share the same gateway-normalization logic and provider branching. */ import type { AnthropicProviderOptions } from "@ai-sdk/anthropic"; @@ -19,7 +21,7 @@ import { } from "@/common/types/thinking"; import { log } from "@/node/services/log"; import type { MuxMessage } from "@/common/types/message"; -import { normalizeGatewayModel } from "./models"; +import { normalizeGatewayModel, supports1MContext } from "./models"; /** * OpenRouter reasoning options @@ -340,3 +342,36 @@ export function buildProviderOptions( log.debug("buildProviderOptions: Unsupported provider", provider); return {}; } + +// --------------------------------------------------------------------------- +// Per-request HTTP headers +// --------------------------------------------------------------------------- + +/** Header value for Anthropic 1M context beta */ +export const ANTHROPIC_1M_CONTEXT_HEADER = "context-1m-2025-08-07"; + +/** + * Build per-request HTTP headers for provider-specific features. + * + * These flow through streamText({ headers }) to the provider SDK, which merges + * them with provider-creation-time headers via combineHeaders(). This is the + * single injection site for features like the Anthropic 1M context beta header, + * regardless of whether the model is direct or gateway-routed. + */ +export function buildRequestHeaders( + modelString: string, + muxProviderOptions?: MuxProviderOptions +): Record | undefined { + const normalized = normalizeGatewayModel(modelString); + const [provider] = normalized.split(":", 2); + + if (provider !== "anthropic") return undefined; + + const is1MEnabled = + ((muxProviderOptions?.anthropic?.use1MContextModels?.includes(normalized) ?? false) || + muxProviderOptions?.anthropic?.use1MContext === true) && + supports1MContext(normalized); + + if (!is1MEnabled) return undefined; + return { "anthropic-beta": ANTHROPIC_1M_CONTEXT_HEADER }; +} diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index 5d6bc34a15..026b899313 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -47,7 +47,7 @@ import type { WorkspaceMCPOverrides } from "@/common/types/mcp"; import type { MCPServerManager, MCPWorkspaceStats } from "@/node/services/mcpServerManager"; import { WorkspaceMcpOverridesService } from "./workspaceMcpOverridesService"; import type { TaskService } from "@/node/services/taskService"; -import { buildProviderOptions } from "@/common/utils/ai/providerOptions"; +import { buildProviderOptions, buildRequestHeaders } from "@/common/utils/ai/providerOptions"; import { sliceMessagesFromLatestCompactionBoundary } from "@/common/utils/messages/compactionBoundary"; import { THINKING_LEVEL_OFF, type ThinkingLevel } from "@/common/types/thinking"; @@ -847,6 +847,11 @@ export class AIService extends EventEmitter { truncationMode ); + // Build per-request HTTP headers (e.g., anthropic-beta for 1M context). + // This is the single injection site for provider-specific headers, handling + // both direct and gateway-routed models identically. + const requestHeaders = buildRequestHeaders(modelString, effectiveMuxProviderOptions); + // Debug dump: Log the complete LLM request when MUX_DEBUG_LLM_REQUEST is set if (process.env.MUX_DEBUG_LLM_REQUEST === "1") { log.info( @@ -951,7 +956,8 @@ export class AIService extends EventEmitter { streamToken, // Pass the pre-generated stream token hasQueuedMessage, metadata.name, - effectiveThinkingLevel + effectiveThinkingLevel, + requestHeaders ); if (!streamResult.success) { diff --git a/src/node/services/providerModelFactory.ts b/src/node/services/providerModelFactory.ts index ad11f5e154..11e07f8ad4 100644 --- a/src/node/services/providerModelFactory.ts +++ b/src/node/services/providerModelFactory.ts @@ -23,7 +23,7 @@ import { isProviderDisabledInConfig } from "@/common/utils/providers/isProviderD import type { PolicyService } from "@/node/services/policyService"; import type { ProviderService } from "@/node/services/providerService"; import type { CodexOauthService } from "@/node/services/codexOauthService"; -import { normalizeGatewayModel, supports1MContext } from "@/common/utils/ai/models"; +import { normalizeGatewayModel } from "@/common/utils/ai/models"; import { MUX_APP_ATTRIBUTION_TITLE, MUX_APP_ATTRIBUTION_URL } from "@/constants/appAttribution"; import { resolveProviderCredentials } from "@/node/utils/providerRequirements"; import { @@ -228,8 +228,9 @@ export function normalizeAnthropicBaseURL(baseURL: string): string { return `${trimmed}/v1`; } -/** Header value for Anthropic 1M context beta */ -export const ANTHROPIC_1M_CONTEXT_HEADER = "context-1m-2025-08-07"; +// Canonical definition lives in providerOptions; import for local use + re-export for backward compat. +import { ANTHROPIC_1M_CONTEXT_HEADER } from "@/common/utils/ai/providerOptions"; +export { ANTHROPIC_1M_CONTEXT_HEADER }; /** * Build headers for Anthropic provider, optionally including the 1M context beta header. @@ -485,14 +486,9 @@ export class ProviderModelFactory { ? { ...configWithApiKey, baseURL: normalizeAnthropicBaseURL(effectiveBaseURL) } : configWithApiKey; - // Add 1M context beta header if requested and model supports it. - // Check both per-model list (use1MContextModels) and legacy global flag (use1MContext). - const fullModelId = `anthropic:${modelId}`; - const is1MEnabled = - ((muxProviderOptions?.anthropic?.use1MContextModels?.includes(fullModelId) ?? false) || - muxProviderOptions?.anthropic?.use1MContext === true) && - supports1MContext(fullModelId); - const headers = buildAnthropicHeaders(normalizedConfig.headers, is1MEnabled); + // 1M context beta header is injected per-request via buildRequestHeaders() → + // streamText({ headers }), not at provider creation time. This avoids duplicating + // header logic across direct and gateway handlers. // Lazy-load Anthropic provider to reduce startup time const { createAnthropic } = await PROVIDER_REGISTRY.anthropic(); @@ -503,7 +499,6 @@ export class ProviderModelFactory { const fetchWithCacheControl = wrapFetchWithAnthropicCacheControl(baseFetch); const provider = createAnthropic({ ...normalizedConfig, - headers, fetch: fetchWithCacheControl, }); return Ok(provider(modelId)); @@ -1024,6 +1019,9 @@ export class ProviderModelFactory { // Use configured baseURL or fall back to default gateway URL const gatewayBaseURL = providerConfig.baseURL ?? "https://gateway.mux.coder.com/api/v1/ai-gateway/v1/ai"; + + // 1M context beta header is injected per-request via buildRequestHeaders() → + // streamText({ headers }), not at provider creation time. const gateway = createGateway({ apiKey: couponCode, baseURL: gatewayBaseURL, diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 4cc260008d..8399c44145 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -97,6 +97,8 @@ interface StreamRequestConfig { tools?: Record; toolChoice?: StreamToolChoice; providerOptions?: Record; + /** Per-request HTTP headers (e.g., anthropic-beta for 1M context). */ + headers?: Record; maxOutputTokens?: number; hasQueuedMessage?: () => boolean; } @@ -929,7 +931,8 @@ export class StreamManager extends EventEmitter { providerOptions?: Record, maxOutputTokens?: number, toolPolicy?: ToolPolicy, - hasQueuedMessage?: () => boolean + hasQueuedMessage?: () => boolean, + headers?: Record ): StreamRequestConfig { // Determine toolChoice based on toolPolicy. // @@ -1002,6 +1005,7 @@ export class StreamManager extends EventEmitter { tools: finalTools, toolChoice, providerOptions: finalProviderOptions, + headers, maxOutputTokens: effectiveMaxOutputTokens, hasQueuedMessage, }; @@ -1049,6 +1053,7 @@ export class StreamManager extends EventEmitter { stopWhen: this.createStopWhenCondition(request), // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment providerOptions: request.providerOptions as any, // Pass provider-specific options (thinking/reasoning config) + headers: request.headers, // Per-request HTTP headers (e.g., anthropic-beta for 1M context) maxOutputTokens: request.maxOutputTokens, }); } @@ -1075,7 +1080,8 @@ export class StreamManager extends EventEmitter { toolPolicy?: ToolPolicy, hasQueuedMessage?: () => boolean, workspaceName?: string, - thinkingLevel?: string + thinkingLevel?: string, + headers?: Record ): WorkspaceStreamInfo { // abortController is created and linked to the caller-provided abortSignal in startStream(). @@ -1089,7 +1095,8 @@ export class StreamManager extends EventEmitter { providerOptions, maxOutputTokens, toolPolicy, - hasQueuedMessage + hasQueuedMessage, + headers ); // Start streaming - this can throw immediately if API key is missing @@ -2432,7 +2439,8 @@ export class StreamManager extends EventEmitter { providedStreamToken?: StreamToken, hasQueuedMessage?: () => boolean, workspaceName?: string, - thinkingLevel?: string + thinkingLevel?: string, + headers?: Record ): Promise> { const typedWorkspaceId = workspaceId as WorkspaceId; @@ -2507,7 +2515,8 @@ export class StreamManager extends EventEmitter { toolPolicy, hasQueuedMessage, workspaceName, - thinkingLevel + thinkingLevel, + headers ); // Guard against a narrow race: diff --git a/tests/ipc/compaction1MRetry.integration.test.ts b/tests/ipc/compaction1MRetry.integration.test.ts new file mode 100644 index 0000000000..11a0252cf7 --- /dev/null +++ b/tests/ipc/compaction1MRetry.integration.test.ts @@ -0,0 +1,138 @@ +/** + * Integration test: Compaction 1M context retry. + * + * Validates that when a /compact request exceeds the default context limit (200k), + * the backend automatically retries with 1M context enabled for models that support it. + * + * Pre-seeds ~250k tokens of conversation history, then issues a compaction request + * with Opus 4.6 (default 200k limit, supports 1M). If the 1M retry fires correctly, + * the compaction should succeed rather than returning context_exceeded. + */ + +import { setupWorkspace, shouldRunIntegrationTests, validateApiKeys } from "./setup"; +import { createStreamCollector, resolveOrpcClient } from "./helpers"; +import { HistoryService } from "../../src/node/services/historyService"; +import { createMuxMessage } from "../../src/common/types/message"; +import { KNOWN_MODELS } from "../../src/common/constants/knownModels"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +if (shouldRunIntegrationTests()) { + validateApiKeys(["ANTHROPIC_API_KEY"]); +} + +// ~1 token ≈ 4 chars in English text. To exceed 200k tokens we need ~800k chars. +// Use ~260k tokens of padding to comfortably exceed the 200k default context. +const TOKENS_PER_CHAR = 0.25; // conservative estimate +const TARGET_TOKENS = 260_000; +const CHARS_NEEDED = Math.ceil(TARGET_TOKENS / TOKENS_PER_CHAR); + +/** Build a filler message that is roughly `charCount` characters long. */ +function buildFillerText(charCount: number): string { + // Use varied text to avoid aggressive tokenizer compression + const base = + "The quick brown fox jumps over the lazy dog. " + + "Pack my box with five dozen liquor jugs. " + + "How vexingly quick daft zebras jump. " + + "Sphinx of black quartz, judge my vow. "; + const repeats = Math.ceil(charCount / base.length); + return base.repeat(repeats).slice(0, charCount); +} + +describeIntegration("compaction 1M context retry", () => { + // Compaction with 1M retry can take a while — summarizing 250k+ tokens of content + const TEST_TIMEOUT_MS = 120_000; + + test( + "should auto-retry compaction with 1M context when exceeding 200k default limit", + async () => { + const { env, workspaceId, cleanup } = await setupWorkspace("anthropic"); + try { + const historyService = new HistoryService(env.config); + + // Seed conversation history that exceeds 200k tokens. + // Split across multiple user/assistant pairs to be realistic. + const pairsNeeded = 10; + const charsPerMessage = Math.ceil(CHARS_NEEDED / pairsNeeded); + + for (let i = 0; i < pairsNeeded; i++) { + const userMsg = createMuxMessage( + `filler-user-${i}`, + "user", + buildFillerText(charsPerMessage), + {} + ); + const assistantMsg = createMuxMessage( + `filler-asst-${i}`, + "assistant", + buildFillerText(charsPerMessage), + {} + ); + const r1 = await historyService.appendToHistory(workspaceId, userMsg); + expect(r1.success).toBe(true); + const r2 = await historyService.appendToHistory(workspaceId, assistantMsg); + expect(r2.success).toBe(true); + } + + // Set up stream collector + const collector = createStreamCollector(env.orpc, workspaceId); + collector.start(); + + const opusModel = `anthropic:${KNOWN_MODELS.OPUS.providerModelId}`; + + // Send compaction request — use the same pattern as production /compact. + // Crucially, do NOT enable 1M context in providerOptions; the retry should add it. + const client = resolveOrpcClient(env); + const sendResult = await client.workspace.sendMessage({ + workspaceId, + message: + "Please provide a detailed summary of this conversation. " + + "Capture all key decisions, context, and open questions.", + options: { + model: opusModel, + thinkingLevel: "off", + agentId: "compact", + // No providerOptions.anthropic.use1MContext here — the retry should inject it + toolPolicy: [{ regex_match: ".*", action: "disable" }], + muxMetadata: { + type: "compaction-request", + rawCommand: "/compact", + parsed: {}, + }, + }, + }); + + expect(sendResult.success).toBe(true); + + // Wait for either stream-end (success) or stream-error (failure). + // With 1M retry working, we expect stream-end. + const terminalEvent = await Promise.race([ + collector.waitForEvent("stream-end", TEST_TIMEOUT_MS), + collector.waitForEvent("stream-error", TEST_TIMEOUT_MS), + ]); + + expect(terminalEvent).toBeDefined(); + + if (terminalEvent?.type === "stream-error") { + // If we got a stream-error, the 1M retry didn't work. + // Log diagnostic info for debugging. + const errorType = "errorType" in terminalEvent ? terminalEvent.errorType : "unknown"; + const errorMsg = "error" in terminalEvent ? terminalEvent.error : "unknown"; + throw new Error( + `Compaction failed (expected 1M retry to succeed): ` + + `errorType=${errorType}, error=${errorMsg}` + ); + } + + // Verify we got a successful compaction (stream-end) + expect(terminalEvent?.type).toBe("stream-end"); + + collector.stop(); + } finally { + await cleanup(); + } + }, + TEST_TIMEOUT_MS + 10_000 + ); +});