fix(claude): correct token usage for Qwen streaming responses#2956
fix(claude): correct token usage for Qwen streaming responses#2956zilianpn wants to merge 1 commit into
Conversation
e6966ac to
ebca956
Compare
There was a problem hiding this comment.
Code Review
This pull request modifies the ClaudeExecutor to accumulate usage details throughout a stream and publish the total at the end, rather than publishing partial updates. It also adds a fallback in ParseClaudeStreamUsage to support providers like Qwen that nest usage data under message.usage. A logic error was identified in the mergeDetail function where TotalTokens was being calculated using max instead of being summed from the merged components, which could lead to incorrect totals when usage is split across different stream events.
baa059d to
f498067
Compare
Qwen's SSE stream deviates from standard Anthropic protocol: - message_start nests usage under message.usage (not top-level) - message_delta usage contains delta values, not cumulative counts Fix: add message.usage fallback in ParseClaudeStreamUsage and use max-based accumulation in the executor stream loops to correctly merge input tokens from message_start with output tokens from message_delta. TotalTokens is recalculated as the sum of component fields rather than taking max. Also apply the same accumulation fix to openai_compat_executor (used by Qwen openai-compatibility configs) to avoid per-event publish capturing partial usage values.
f498067 to
e3da4ac
Compare
luispater
left a comment
There was a problem hiding this comment.
Summary
This PR improves streaming usage accounting for Qwen when using the Claude executor by:
- Parsing usage nested under
message.usage(e.g.,message_startevents). - Accumulating usage over the stream and publishing once at the end to avoid
UsageReporter’ssync.Once“first publish wins” behavior.
It also applies the same “publish at end” accumulation toOpenAICompatExecutor.
Blocking
-
mergeDetail()recomputesTotalTokensasInput + Output + Reasoning.- For OpenAI-compatible usage,
reasoning_tokensis usually a breakdown ofcompletion_tokens, so adding it toTotalTokenscan inflate totals vs upstreamtotal_tokensand vs existing non-stream parsing (ParseOpenAIUsagepreserves upstream totals). - Suggestion: preserve
TotalTokens(e.g.,max(a.TotalTokens, b.TotalTokens)) when present; only fall back toInput + Outputwhen totals are missing/zero, and avoid adding reasoning unless you have evidence it is not included inOutputTokens.
- For OpenAI-compatible usage,
-
The PR description states Qwen
message_delta.usageis delta, but the merge strategy usesmax(...)(which assumes monotonic/cumulative values).- Please confirm behavior with a captured stream sample. If usage is truly delta across multiple events, we likely need summation for at least
OutputTokens(and possibly other fields).
- Please confirm behavior with a captured stream sample. If usage is truly delta across multiple events, we likely need summation for at least
Non-blocking
- Consider publishing accumulated usage only on successful scan completion (
scanner.Err() == nil) soPublishFailure()can win on stream read errors. - Add unit tests for
ParseClaudeStreamUsage(usagevsmessage.usage) and for the merge/publish-at-end behavior.
Test plan
- Not run (review-only; no checkout per instructions). Recommend running
go test ./...and a targeted streaming repro.
修复 Qwen 3.6 Plus 流式响应 token 统计 bug
问题
当 Qwen 3.6 Plus 配置为 Anthropic 协议端点(通过
claude-api-key)时,流式响应的 usage 统计数据出现严重错误:input_tokens永远显示为 6(实际应为 81923)output_tokens和reasoning_tokens统计不正确根因分析
Qwen 的 SSE 流式响应与标准 Anthropic 协议存在两处偏差:
message_start事件中,usage 嵌套在message.usage下(而非标准协议的顶层usage)message_delta事件中的 usage 是增量值(delta),而标准 Anthropic 的message_deltausage 是累积值(cumulative)具体对比:
message_startmessage.usage含input_tokens: 81923message.usage含input_tokens: 81923,output_tokens: 0message_deltausage含累积值(如output_tokens: 217)usage含增量值(如input_tokens: 6,output_tokens: 217)原有代码存在两个 bug:
ParseClaudeStreamUsage只查询顶层usage,找不到message_start中的message.usage,导致 81923 个 input tokens 被完全忽略sync.Oncefirst-wins 机制使得message_delta中先到的增量值(input=6)被锁定,后续即使解析到message_start的大值也不会更新修复方案
两处改动,
UsageReporter结构体零修改。修改 1:
ParseClaudeStreamUsage增加message.usage回退文件:
internal/runtime/executor/helps/usage_helpers.go当顶层
usage不存在时,回退查询message.usage:修改 2:
claude_executor.go流式循环本地 max 累加文件:
internal/runtime/executor/claude_executor.go在两个流式扫描循环(direct forward + translation)中:
var totalUsage usage.Detail替代循环内的即时reporter.PublishmergeDetail对各字段取 max 更新totalUsagereporter.Publish(ctx, totalUsage)新增
mergeDetail辅助函数:为什么用 max 而不是累加?
message_delta是累积值,每次事件包含截至当前的总量,取 max 等价于取最后一个事件的值message_delta是增量值,数值很小,但message_start中的 input 值很大,取 max 能正确拿到两者的最大值为什么
TotalTokens用求和而不是 max?TotalTokens不是独立字段,它是各分项之和。如果各分项取 max 后,TotalTokens也需要重新计算才能保持一致。行为验证
message_delta的累积值(正确)message_delta值(结果相同,无影响)input_tokens=6(错误)max(81923, 6) = 81923(正确)UsageReporter结构体变更文件
internal/runtime/executor/helps/usage_helpers.go—ParseClaudeStreamUsage加回退(+5 行)internal/runtime/executor/claude_executor.go— 流式循环累加 +mergeDetail函数(+27 行)