Skip to content

Commit b98c08d

Browse files
committed
🤖 fix: accumulate output tokens across steps in cost tracking
In multi-step mode (tool use), the AI SDK emits per-step usage where: - inputTokens: Current context window (grows each step) - outputTokens: Tokens generated in that step only The aggregator was replacing usage on each step, losing earlier output tokens. Now we accumulate output/reasoning tokens while keeping the latest input tokens (which correctly represent the growing context). This fixes the issue where costs appeared to increment by only ~2¢ at a time instead of reflecting cumulative output costs. _Generated with mux_
1 parent 7d668fe commit b98c08d

File tree

2 files changed

+70
-9
lines changed

2 files changed

+70
-9
lines changed

src/browser/utils/messages/StreamingMessageAggregator.test.ts

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,10 @@ describe("StreamingMessageAggregator", () => {
415415
expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined();
416416
});
417417

418-
test("latest usage-delta replaces previous for same messageId", () => {
418+
test("accumulates output tokens across steps, keeps latest input", () => {
419419
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
420420

421-
// First step usage
421+
// First step usage (AI calls a tool)
422422
aggregator.handleUsageDelta({
423423
type: "usage-delta",
424424
workspaceId: "ws-1",
@@ -434,12 +434,52 @@ describe("StreamingMessageAggregator", () => {
434434
usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 },
435435
});
436436

437-
// Should have latest values, not summed
438-
expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({
439-
inputTokens: 1500,
440-
outputTokens: 100,
441-
totalTokens: 1600,
437+
// Input should be latest (context window grows), output should be summed
438+
const usage = aggregator.getActiveStreamUsage("msg-1");
439+
expect(usage?.inputTokens).toBe(1500); // Latest context window
440+
expect(usage?.outputTokens).toBe(150); // Accumulated: 50 + 100
441+
expect(usage?.totalTokens).toBe(1650); // Latest input + accumulated output
442+
expect(usage?.reasoningTokens).toBe(0); // Accumulated (none in this test)
443+
});
444+
445+
test("accumulates reasoning tokens and includes cachedInputTokens in total", () => {
446+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
447+
448+
// First step: some reasoning, with cache hit
449+
aggregator.handleUsageDelta({
450+
type: "usage-delta",
451+
workspaceId: "ws-1",
452+
messageId: "msg-1",
453+
usage: {
454+
inputTokens: 800, // Anthropic: excludes cached
455+
cachedInputTokens: 200,
456+
outputTokens: 30,
457+
reasoningTokens: 20,
458+
totalTokens: 1050,
459+
},
442460
});
461+
462+
// Second step: more output and reasoning, cache grows
463+
aggregator.handleUsageDelta({
464+
type: "usage-delta",
465+
workspaceId: "ws-1",
466+
messageId: "msg-1",
467+
usage: {
468+
inputTokens: 900, // Context grew
469+
cachedInputTokens: 300, // More cached
470+
outputTokens: 70,
471+
reasoningTokens: 50,
472+
totalTokens: 1320,
473+
},
474+
});
475+
476+
const usage = aggregator.getActiveStreamUsage("msg-1");
477+
expect(usage?.inputTokens).toBe(900); // Latest
478+
expect(usage?.cachedInputTokens).toBe(300); // Latest
479+
expect(usage?.outputTokens).toBe(100); // Accumulated: 30 + 70
480+
expect(usage?.reasoningTokens).toBe(70); // Accumulated: 20 + 50
481+
// totalTokens = latest input (900) + latest cached (300) + accumulated output (100)
482+
expect(usage?.totalTokens).toBe(1300);
443483
});
444484

445485
test("tracks usage independently per messageId", () => {

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,10 +1002,31 @@ export class StreamingMessageAggregator {
10021002
}
10031003

10041004
/**
1005-
* Handle usage-delta event: update cumulative usage for active stream
1005+
* Handle usage-delta event: accumulate usage for active stream across steps
1006+
*
1007+
* In multi-step mode (tool use), each step emits per-step usage:
1008+
* - inputTokens: Current context window (grows each step) - use latest
1009+
* - cachedInputTokens: Current cache state - use latest
1010+
* - outputTokens: Tokens generated this step only - accumulate
1011+
* - reasoningTokens: Reasoning tokens this step only - accumulate
10061012
*/
10071013
handleUsageDelta(data: UsageDeltaEvent): void {
1008-
this.activeStreamStepUsage.set(data.messageId, data.usage);
1014+
const existing = this.activeStreamStepUsage.get(data.messageId);
1015+
if (existing) {
1016+
// Accumulate output/reasoning tokens, keep latest input (context grows each step)
1017+
const outputTokens = (existing.outputTokens ?? 0) + (data.usage.outputTokens ?? 0);
1018+
const reasoningTokens = (existing.reasoningTokens ?? 0) + (data.usage.reasoningTokens ?? 0);
1019+
this.activeStreamStepUsage.set(data.messageId, {
1020+
inputTokens: data.usage.inputTokens, // Latest context window
1021+
cachedInputTokens: data.usage.cachedInputTokens, // Latest cache state
1022+
outputTokens,
1023+
reasoningTokens,
1024+
totalTokens:
1025+
(data.usage.inputTokens ?? 0) + (data.usage.cachedInputTokens ?? 0) + outputTokens,
1026+
});
1027+
} else {
1028+
this.activeStreamStepUsage.set(data.messageId, data.usage);
1029+
}
10091030
}
10101031

10111032
/**

0 commit comments

Comments
 (0)