Skip to content

fix(claude): correct token usage for Qwen streaming responses#2956

Closed
zilianpn wants to merge 1 commit into
router-for-me:devfrom
zilianpn:fix/qwen-claude-stream-usage
Closed

fix(claude): correct token usage for Qwen streaming responses#2956
zilianpn wants to merge 1 commit into
router-for-me:devfrom
zilianpn:fix/qwen-claude-stream-usage

Conversation

@zilianpn
Copy link
Copy Markdown
Contributor

@zilianpn zilianpn commented Apr 22, 2026

修复 Qwen 3.6 Plus 流式响应 token 统计 bug

问题

当 Qwen 3.6 Plus 配置为 Anthropic 协议端点(通过 claude-api-key)时,流式响应的 usage 统计数据出现严重错误:

  • input_tokens 永远显示为 6(实际应为 81923)
  • output_tokensreasoning_tokens 统计不正确
  • 导致用户看到的 token 消耗数据与实际相差巨大

根因分析

Qwen 的 SSE 流式响应与标准 Anthropic 协议存在两处偏差:

  1. usage 嵌套位置不同message_start 事件中,usage 嵌套在 message.usage 下(而非标准协议的顶层 usage
  2. usage 语义不同message_delta 事件中的 usage 是增量值(delta),而标准 Anthropic 的 message_delta usage 是累积值(cumulative)

具体对比:

事件 标准 Anthropic Qwen
message_start message.usageinput_tokens: 81923 message.usageinput_tokens: 81923, output_tokens: 0
message_delta 顶层 usage 含累积值(如 output_tokens: 217 顶层 usage 含增量值(如 input_tokens: 6, output_tokens: 217

原有代码存在两个 bug:

  1. ParseClaudeStreamUsage 只查询顶层 usage,找不到 message_start 中的 message.usage,导致 81923 个 input tokens 被完全忽略
  2. sync.Once first-wins 机制使得 message_delta 中先到的增量值(input=6)被锁定,后续即使解析到 message_start 的大值也不会更新

修复方案

两处改动,UsageReporter 结构体零修改。

修改 1:ParseClaudeStreamUsage 增加 message.usage 回退

文件: internal/runtime/executor/helps/usage_helpers.go

当顶层 usage 不存在时,回退查询 message.usage

usageNode := gjson.GetBytes(payload, "usage")
if !usageNode.Exists() {
    // 部分厂商(如 Qwen)在 message_start 事件中将 usage
    // 嵌套在 message.usage 下,而非顶层 usage 字段。
    usageNode = gjson.GetBytes(payload, "message.usage")
}

修改 2:claude_executor.go 流式循环本地 max 累加

文件: internal/runtime/executor/claude_executor.go

在两个流式扫描循环(direct forward + translation)中:

  • 用局部变量 var totalUsage usage.Detail 替代循环内的即时 reporter.Publish
  • 每次 parse 到 usage 后,通过 mergeDetail 对各字段取 max 更新 totalUsage
  • 循环结束后统一调用一次 reporter.Publish(ctx, totalUsage)

新增 mergeDetail 辅助函数:

func mergeDetail(a, b usage.Detail) usage.Detail {
    res := usage.Detail{
        InputTokens:     max(a.InputTokens, b.InputTokens),
        OutputTokens:    max(a.OutputTokens, b.OutputTokens),
        ReasoningTokens: max(a.ReasoningTokens, b.ReasoningTokens),
        CachedTokens:    max(a.CachedTokens, b.CachedTokens),
    }
    res.TotalTokens = res.InputTokens + res.OutputTokens + res.ReasoningTokens
    return res
}

为什么用 max 而不是累加?

  • 标准 Anthropic 的 message_delta 是累积值,每次事件包含截至当前的总量,取 max 等价于取最后一个事件的值
  • Qwen 的 message_delta 是增量值,数值很小,但 message_start 中的 input 值很大,取 max 能正确拿到两者的最大值
  • 对两种场景都安全,不会重复计数

为什么 TotalTokens 用求和而不是 max?

  • TotalTokens 不是独立字段,它是各分项之和。如果各分项取 max 后,TotalTokens 也需要重新计算才能保持一致。

行为验证

场景 修复前 修复后
标准 Anthropic 流式响应 message_delta 的累积值(正确) max 取到最终 message_delta 值(结果相同,无影响)
Qwen 流式响应 input_tokens=6(错误) max(81923, 6) = 81923(正确)
非流式请求 不变 不变
UsageReporter 结构体 不变 不变
其他 executor(Gemini/OpenAI/Codex/Kimi) 不变 不受影响,各自有独立的解析函数

变更文件

  • internal/runtime/executor/helps/usage_helpers.goParseClaudeStreamUsage 加回退(+5 行)
  • internal/runtime/executor/claude_executor.go — 流式循环累加 + mergeDetail 函数(+27 行)
image

@zilianpn zilianpn force-pushed the fix/qwen-claude-stream-usage branch from e6966ac to ebca956 Compare April 22, 2026 07:39
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request modifies the ClaudeExecutor to accumulate usage details throughout a stream and publish the total at the end, rather than publishing partial updates. It also adds a fallback in ParseClaudeStreamUsage to support providers like Qwen that nest usage data under message.usage. A logic error was identified in the mergeDetail function where TotalTokens was being calculated using max instead of being summed from the merged components, which could lead to incorrect totals when usage is split across different stream events.

Comment thread internal/runtime/executor/claude_executor.go
@zilianpn zilianpn force-pushed the fix/qwen-claude-stream-usage branch 2 times, most recently from baa059d to f498067 Compare April 22, 2026 14:20
Qwen's SSE stream deviates from standard Anthropic protocol:
- message_start nests usage under message.usage (not top-level)
- message_delta usage contains delta values, not cumulative counts

Fix: add message.usage fallback in ParseClaudeStreamUsage and use
max-based accumulation in the executor stream loops to correctly
merge input tokens from message_start with output tokens from
message_delta. TotalTokens is recalculated as the sum of component
fields rather than taking max.

Also apply the same accumulation fix to openai_compat_executor
(used by Qwen openai-compatibility configs) to avoid per-event
publish capturing partial usage values.
@zilianpn zilianpn force-pushed the fix/qwen-claude-stream-usage branch from f498067 to e3da4ac Compare April 22, 2026 14:35
Copy link
Copy Markdown
Collaborator

@luispater luispater left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This PR improves streaming usage accounting for Qwen when using the Claude executor by:

  • Parsing usage nested under message.usage (e.g., message_start events).
  • Accumulating usage over the stream and publishing once at the end to avoid UsageReporter’s sync.Once “first publish wins” behavior.
    It also applies the same “publish at end” accumulation to OpenAICompatExecutor.

Blocking

  1. mergeDetail() recomputes TotalTokens as Input + Output + Reasoning.

    • For OpenAI-compatible usage, reasoning_tokens is usually a breakdown of completion_tokens, so adding it to TotalTokens can inflate totals vs upstream total_tokens and vs existing non-stream parsing (ParseOpenAIUsage preserves upstream totals).
    • Suggestion: preserve TotalTokens (e.g., max(a.TotalTokens, b.TotalTokens)) when present; only fall back to Input + Output when totals are missing/zero, and avoid adding reasoning unless you have evidence it is not included in OutputTokens.
  2. The PR description states Qwen message_delta.usage is delta, but the merge strategy uses max(...) (which assumes monotonic/cumulative values).

    • Please confirm behavior with a captured stream sample. If usage is truly delta across multiple events, we likely need summation for at least OutputTokens (and possibly other fields).

Non-blocking

  • Consider publishing accumulated usage only on successful scan completion (scanner.Err() == nil) so PublishFailure() can win on stream read errors.
  • Add unit tests for ParseClaudeStreamUsage (usage vs message.usage) and for the merge/publish-at-end behavior.

Test plan

  • Not run (review-only; no checkout per instructions). Recommend running go test ./... and a targeted streaming repro.

@zilianpn zilianpn closed this Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants