Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions src/browser/utils/messages/StreamingMessageAggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,10 @@ describe("StreamingMessageAggregator", () => {
expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined();
});

test("latest usage-delta replaces previous for same messageId", () => {
test("accumulates output tokens across steps, keeps latest input", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

// First step usage
// First step usage (AI calls a tool)
aggregator.handleUsageDelta({
type: "usage-delta",
workspaceId: "ws-1",
Expand All @@ -434,12 +434,52 @@ describe("StreamingMessageAggregator", () => {
usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 },
});

// Should have latest values, not summed
expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({
inputTokens: 1500,
outputTokens: 100,
totalTokens: 1600,
// Input should be latest (context window grows), output should be summed
const usage = aggregator.getActiveStreamUsage("msg-1");
expect(usage?.inputTokens).toBe(1500); // Latest context window
expect(usage?.outputTokens).toBe(150); // Accumulated: 50 + 100
expect(usage?.totalTokens).toBe(1600); // Provider's latest value (not recomputed)
expect(usage?.reasoningTokens).toBe(0); // Accumulated (none in this test)
});

test("accumulates reasoning tokens and includes cachedInputTokens in total", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

// First step: some reasoning, with cache hit
aggregator.handleUsageDelta({
type: "usage-delta",
workspaceId: "ws-1",
messageId: "msg-1",
usage: {
inputTokens: 800, // Anthropic: excludes cached
cachedInputTokens: 200,
outputTokens: 30,
reasoningTokens: 20,
totalTokens: 1050,
},
});

// Second step: more output and reasoning, cache grows
aggregator.handleUsageDelta({
type: "usage-delta",
workspaceId: "ws-1",
messageId: "msg-1",
usage: {
inputTokens: 900, // Context grew
cachedInputTokens: 300, // More cached
outputTokens: 70,
reasoningTokens: 50,
totalTokens: 1320,
},
});

const usage = aggregator.getActiveStreamUsage("msg-1");
expect(usage?.inputTokens).toBe(900); // Latest
expect(usage?.cachedInputTokens).toBe(300); // Latest
expect(usage?.outputTokens).toBe(100); // Accumulated: 30 + 70
expect(usage?.reasoningTokens).toBe(70); // Accumulated: 20 + 50
// totalTokens preserved from provider (not recomputed - avoids OpenAI/Anthropic semantic differences)
expect(usage?.totalTokens).toBe(1320);
});

test("tracks usage independently per messageId", () => {
Expand Down
27 changes: 25 additions & 2 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1002,10 +1002,33 @@ export class StreamingMessageAggregator {
}

/**
* Handle usage-delta event: update cumulative usage for active stream
* Handle usage-delta event: accumulate usage for active stream across steps
*
* In multi-step mode (tool use), each step emits per-step usage:
* - inputTokens: Current context window (grows each step) - use latest
* - cachedInputTokens: Current cache state - use latest
* - outputTokens: Tokens generated this step only - accumulate
* - reasoningTokens: Reasoning tokens this step only - accumulate
*/
handleUsageDelta(data: UsageDeltaEvent): void {
this.activeStreamStepUsage.set(data.messageId, data.usage);
const existing = this.activeStreamStepUsage.get(data.messageId);
if (existing) {
// Accumulate output/reasoning tokens, keep latest input (context grows each step)
const outputTokens = (existing.outputTokens ?? 0) + (data.usage.outputTokens ?? 0);
const reasoningTokens = (existing.reasoningTokens ?? 0) + (data.usage.reasoningTokens ?? 0);
this.activeStreamStepUsage.set(data.messageId, {
inputTokens: data.usage.inputTokens, // Latest context window
cachedInputTokens: data.usage.cachedInputTokens, // Latest cache state
outputTokens,
reasoningTokens,
// Preserve provider's totalTokens - don't recompute since provider semantics differ
// (OpenAI inputTokens includes cached; Anthropic excludes). Cost calculation uses
// individual components, not totalTokens.
totalTokens: data.usage.totalTokens,
});
} else {
this.activeStreamStepUsage.set(data.messageId, data.usage);
}
}

/**
Expand Down
Loading