From 935f0710d829e6703e88a9fe32b12b5da7fbbba2 Mon Sep 17 00:00:00 2001 From: ethan Date: Mon, 1 Dec 2025 15:48:18 +1100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20fix:=20accumulate=20output=20tok?= =?UTF-8?q?ens=20across=20steps=20in=20cost=20tracking?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In multi-step mode (tool use), the AI SDK emits per-step usage where: - inputTokens: Current context window (grows each step) - outputTokens: Tokens generated in that step only The aggregator was replacing usage on each step, losing earlier output tokens. Now we accumulate output/reasoning tokens while keeping the latest input tokens (which correctly represent the growing context). This fixes the issue where costs appeared to increment by only ~2ยข at a time instead of reflecting cumulative output costs. _Generated with mux_ --- .../StreamingMessageAggregator.test.ts | 54 ++++++++++++++++--- .../messages/StreamingMessageAggregator.ts | 27 +++++++++- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index cac12d623f..6ef25d8868 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -415,10 +415,10 @@ describe("StreamingMessageAggregator", () => { expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined(); }); - test("latest usage-delta replaces previous for same messageId", () => { + test("accumulates output tokens across steps, keeps latest input", () => { const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); - // First step usage + // First step usage (AI calls a tool) aggregator.handleUsageDelta({ type: "usage-delta", workspaceId: "ws-1", @@ -434,12 +434,52 @@ describe("StreamingMessageAggregator", () => { usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 }, }); - // Should have latest values, not summed - expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ - inputTokens: 1500, - outputTokens: 100, - totalTokens: 1600, + // Input should be latest (context window grows), output should be summed + const usage = aggregator.getActiveStreamUsage("msg-1"); + expect(usage?.inputTokens).toBe(1500); // Latest context window + expect(usage?.outputTokens).toBe(150); // Accumulated: 50 + 100 + expect(usage?.totalTokens).toBe(1600); // Provider's latest value (not recomputed) + expect(usage?.reasoningTokens).toBe(0); // Accumulated (none in this test) + }); + + test("accumulates reasoning tokens and includes cachedInputTokens in total", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // First step: some reasoning, with cache hit + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { + inputTokens: 800, // Anthropic: excludes cached + cachedInputTokens: 200, + outputTokens: 30, + reasoningTokens: 20, + totalTokens: 1050, + }, }); + + // Second step: more output and reasoning, cache grows + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { + inputTokens: 900, // Context grew + cachedInputTokens: 300, // More cached + outputTokens: 70, + reasoningTokens: 50, + totalTokens: 1320, + }, + }); + + const usage = aggregator.getActiveStreamUsage("msg-1"); + expect(usage?.inputTokens).toBe(900); // Latest + expect(usage?.cachedInputTokens).toBe(300); // Latest + expect(usage?.outputTokens).toBe(100); // Accumulated: 30 + 70 + expect(usage?.reasoningTokens).toBe(70); // Accumulated: 20 + 50 + // totalTokens preserved from provider (not recomputed - avoids OpenAI/Anthropic semantic differences) + expect(usage?.totalTokens).toBe(1320); }); test("tracks usage independently per messageId", () => { diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 7e5a472699..2c8b0b7f3d 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -1002,10 +1002,33 @@ export class StreamingMessageAggregator { } /** - * Handle usage-delta event: update cumulative usage for active stream + * Handle usage-delta event: accumulate usage for active stream across steps + * + * In multi-step mode (tool use), each step emits per-step usage: + * - inputTokens: Current context window (grows each step) - use latest + * - cachedInputTokens: Current cache state - use latest + * - outputTokens: Tokens generated this step only - accumulate + * - reasoningTokens: Reasoning tokens this step only - accumulate */ handleUsageDelta(data: UsageDeltaEvent): void { - this.activeStreamStepUsage.set(data.messageId, data.usage); + const existing = this.activeStreamStepUsage.get(data.messageId); + if (existing) { + // Accumulate output/reasoning tokens, keep latest input (context grows each step) + const outputTokens = (existing.outputTokens ?? 0) + (data.usage.outputTokens ?? 0); + const reasoningTokens = (existing.reasoningTokens ?? 0) + (data.usage.reasoningTokens ?? 0); + this.activeStreamStepUsage.set(data.messageId, { + inputTokens: data.usage.inputTokens, // Latest context window + cachedInputTokens: data.usage.cachedInputTokens, // Latest cache state + outputTokens, + reasoningTokens, + // Preserve provider's totalTokens - don't recompute since provider semantics differ + // (OpenAI inputTokens includes cached; Anthropic excludes). Cost calculation uses + // individual components, not totalTokens. + totalTokens: data.usage.totalTokens, + }); + } else { + this.activeStreamStepUsage.set(data.messageId, data.usage); + } } /**