Skip to content

Commit 5a28945

Browse files
committed
🤖 fix: avoid mixed server/client clocks in timing stats
- Track backend timestamps separately from renderer clock time - Translate backend timestamps into renderer time before computing live durations - Prefer backend-provided duration on stream-end; remove Date.now()-startTime override - Compute streamingMs from duration/ttft/toolExecution deltas to avoid mixed clocks Change-Id: Ia451bdeee13ad5797a3ab10ec1af0944b6a05eb1 Signed-off-by: Thomas Kosiewski <tk@coder.com>
1 parent 74335af commit 5a28945

File tree

1 file changed

+118
-41
lines changed

1 file changed

+118
-41
lines changed

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 118 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { isDynamicToolPart } from "@/common/types/toolParts";
3232
import { z } from "zod";
3333
import { createDeltaStorage, type DeltaRecordStorage } from "./StreamingTPSCalculator";
3434
import { computeRecencyTimestamp } from "./recency";
35+
import { assert } from "@/common/utils/assert";
3536
import { getStatusStateKey, getSessionTimingKey } from "@/common/constants/storage";
3637

3738
// Maximum number of messages to display in the DOM for performance
@@ -63,17 +64,29 @@ type AgentStatus = z.infer<typeof AgentStatusSchema>;
6364
const MAX_DISPLAYED_MESSAGES = 128;
6465

6566
interface StreamingContext {
66-
startTime: number;
67+
/** Backend timestamp when stream started (Date.now()) */
68+
serverStartTime: number;
69+
/**
70+
* Offset to translate backend timestamps into the renderer clock.
71+
* Computed as: `Date.now() - lastServerTimestamp`.
72+
*/
73+
clockOffsetMs: number;
74+
/** Most recent backend timestamp observed for this stream */
75+
lastServerTimestamp: number;
76+
6777
isComplete: boolean;
6878
isCompacting: boolean;
6979
model: string;
70-
/** Timestamp of first content token (text or reasoning delta) */
71-
firstTokenTime: number | null;
80+
81+
/** Timestamp of first content token (text or reasoning delta) - backend Date.now() */
82+
serverFirstTokenTime: number | null;
83+
7284
/** Accumulated tool execution time in ms */
7385
toolExecutionMs: number;
74-
/** Map of tool call start times for in-progress tool calls */
86+
/** Map of tool call start times for in-progress tool calls (backend timestamps) */
7587
pendingToolStarts: Map<string, number>;
76-
/** Mode (plan/exec) - captured from message metadata when stream ends */
88+
89+
/** Mode (plan/exec) */
7790
mode?: "plan" | "exec";
7891
}
7992

@@ -341,6 +354,26 @@ export class StreamingMessageAggregator {
341354
}
342355
}
343356

357+
private updateStreamClock(context: StreamingContext, serverTimestamp: number): void {
358+
assert(context, "updateStreamClock requires context");
359+
assert(typeof serverTimestamp === "number", "updateStreamClock requires serverTimestamp");
360+
361+
// Only update if this timestamp is >= the most recent one we've seen.
362+
// During stream replay, older historical parts may be re-emitted out of order.
363+
if (serverTimestamp < context.lastServerTimestamp) {
364+
return;
365+
}
366+
367+
context.lastServerTimestamp = serverTimestamp;
368+
context.clockOffsetMs = Date.now() - serverTimestamp;
369+
}
370+
371+
private translateServerTime(context: StreamingContext, serverTimestamp: number): number {
372+
assert(context, "translateServerTime requires context");
373+
assert(typeof serverTimestamp === "number", "translateServerTime requires serverTimestamp");
374+
375+
return serverTimestamp + context.clockOffsetMs;
376+
}
344377
private invalidateCache(): void {
345378
this.cachedAllMessages = null;
346379
this.cachedDisplayedMessages = null;
@@ -447,31 +480,53 @@ export class StreamingMessageAggregator {
447480
const context = this.activeStreams.get(messageId);
448481
if (context) {
449482
const endTime = Date.now();
450-
const duration = endTime - context.startTime;
483+
const message = this.messages.get(messageId);
451484

452-
const ttft =
453-
context.firstTokenTime !== null ? context.firstTokenTime - context.startTime : null;
485+
// Prefer backend-provided duration (computed in the same clock domain as tool/delta timestamps).
486+
// Fall back to renderer-based timing translated into the renderer clock.
487+
const durationMsFromMetadata = message?.metadata?.duration;
488+
const fallbackStartTime = this.translateServerTime(context, context.serverStartTime);
489+
const fallbackDurationMs = Math.max(0, endTime - fallbackStartTime);
490+
const durationMs =
491+
typeof durationMsFromMetadata === "number" && Number.isFinite(durationMsFromMetadata)
492+
? durationMsFromMetadata
493+
: fallbackDurationMs;
494+
495+
const ttftMs =
496+
context.serverFirstTokenTime !== null
497+
? Math.max(0, context.serverFirstTokenTime - context.serverStartTime)
498+
: null;
454499

455500
// Get output tokens from cumulative usage (if available)
456501
const cumulativeUsage = this.activeStreamUsage.get(messageId)?.cumulative.usage;
457502
const outputTokens = cumulativeUsage?.outputTokens ?? 0;
458503
const reasoningTokens = cumulativeUsage?.reasoningTokens ?? 0;
459504

460-
// Calculate streaming duration: time from first token to end, EXCLUDING tool execution
461-
// This is what determines actual model output speed - we don't count time waiting for tools
462-
const rawStreamingMs = context.firstTokenTime !== null ? endTime - context.firstTokenTime : 0;
463-
const streamingMs = Math.max(0, rawStreamingMs - context.toolExecutionMs);
505+
// Account for in-progress tool calls (can happen on abort/error)
506+
let totalToolExecutionMs = context.toolExecutionMs;
507+
if (context.pendingToolStarts.size > 0) {
508+
const serverEndTime = context.serverStartTime + durationMs;
509+
for (const toolStartTime of context.pendingToolStarts.values()) {
510+
const toolMs = serverEndTime - toolStartTime;
511+
if (toolMs > 0) {
512+
totalToolExecutionMs += toolMs;
513+
}
514+
}
515+
}
516+
517+
// Streaming duration excludes TTFT and tool execution - used for avg tok/s
518+
const streamingMs = Math.max(0, durationMs - (ttftMs ?? 0) - totalToolExecutionMs);
464519

465-
// Get mode from the message metadata (set by backend at stream-end)
466-
const message = this.messages.get(messageId);
467-
const mode = message?.metadata?.mode as "plan" | "exec" | undefined;
520+
const mode = (message?.metadata?.mode ?? context.mode) as "plan" | "exec" | undefined;
468521

469-
// Store last completed stream stats (including tokens and mode)
522+
// Store last completed stream stats (include durations anchored in the renderer clock)
523+
const startTime = endTime - durationMs;
524+
const firstTokenTime = ttftMs !== null ? startTime + ttftMs : null;
470525
this.lastCompletedStreamStats = {
471-
startTime: context.startTime,
526+
startTime,
472527
endTime,
473-
firstTokenTime: context.firstTokenTime,
474-
toolExecutionMs: context.toolExecutionMs,
528+
firstTokenTime,
529+
toolExecutionMs: totalToolExecutionMs,
475530
model: context.model,
476531
outputTokens: outputTokens + reasoningTokens, // Combined for speed calc
477532
reasoningTokens,
@@ -494,14 +549,14 @@ export class StreamingMessageAggregator {
494549
totalReasoningTokens: 0,
495550
totalStreamingMs: 0,
496551
};
497-
modelStats.totalDurationMs += duration;
498-
modelStats.totalToolExecutionMs += context.toolExecutionMs;
552+
modelStats.totalDurationMs += durationMs;
553+
modelStats.totalToolExecutionMs += totalToolExecutionMs;
499554
modelStats.responseCount += 1;
500555
modelStats.totalOutputTokens += outputTokens + reasoningTokens; // Combined for speed calc
501556
modelStats.totalReasoningTokens += reasoningTokens;
502557
modelStats.totalStreamingMs += streamingMs;
503-
if (ttft !== null) {
504-
modelStats.totalTtftMs += ttft;
558+
if (ttftMs !== null) {
559+
modelStats.totalTtftMs += ttftMs;
505560
modelStats.ttftCount += 1;
506561
}
507562
this.sessionTimingStats[statsKey] = modelStats;
@@ -652,16 +707,24 @@ export class StreamingMessageAggregator {
652707
if (entries.length === 0) return null;
653708
const [messageId, context] = entries[0];
654709

710+
const now = Date.now();
711+
712+
const startTime = this.translateServerTime(context, context.serverStartTime);
713+
const firstTokenTime =
714+
context.serverFirstTokenTime !== null
715+
? this.translateServerTime(context, context.serverFirstTokenTime)
716+
: null;
717+
655718
// Include time from currently-executing tools (not just completed ones)
656719
let totalToolMs = context.toolExecutionMs;
657-
const now = Date.now();
658-
for (const startTime of context.pendingToolStarts.values()) {
659-
totalToolMs += now - startTime;
720+
for (const toolStartServerTime of context.pendingToolStarts.values()) {
721+
const toolStartTime = this.translateServerTime(context, toolStartServerTime);
722+
totalToolMs += Math.max(0, now - toolStartTime);
660723
}
661724

662725
return {
663-
startTime: context.startTime,
664-
firstTokenTime: context.firstTokenTime,
726+
startTime,
727+
firstTokenTime,
665728
toolExecutionMs: totalToolMs,
666729
model: context.model,
667730
liveTokenCount: this.getStreamingTokenCount(messageId),
@@ -876,12 +939,15 @@ export class StreamingMessageAggregator {
876939
const lastUserMsg = [...messages].reverse().find((m) => m.role === "user");
877940
const isCompacting = lastUserMsg?.metadata?.muxMetadata?.type === "compaction-request";
878941

942+
const now = Date.now();
879943
const context: StreamingContext = {
880-
startTime: data.startTime,
944+
serverStartTime: data.startTime,
945+
clockOffsetMs: now - data.startTime,
946+
lastServerTimestamp: data.startTime,
881947
isComplete: false,
882948
isCompacting,
883949
model: data.model,
884-
firstTokenTime: null,
950+
serverFirstTokenTime: null,
885951
toolExecutionMs: 0,
886952
pendingToolStarts: new Map(),
887953
mode: data.mode,
@@ -906,11 +972,13 @@ export class StreamingMessageAggregator {
906972
const message = this.messages.get(data.messageId);
907973
if (!message) return;
908974

909-
// Track first token time (only for non-empty deltas)
910-
if (data.delta.length > 0) {
911-
const context = this.activeStreams.get(data.messageId);
912-
if (context?.firstTokenTime === null) {
913-
context.firstTokenTime = data.timestamp;
975+
const context = this.activeStreams.get(data.messageId);
976+
if (context) {
977+
this.updateStreamClock(context, data.timestamp);
978+
979+
// Track first token time (only for non-empty deltas)
980+
if (data.delta.length > 0 && context.serverFirstTokenTime === null) {
981+
context.serverFirstTokenTime = data.timestamp;
914982
}
915983
}
916984

@@ -939,8 +1007,12 @@ export class StreamingMessageAggregator {
9391007
const updatedMetadata: MuxMetadata = {
9401008
...message.metadata,
9411009
...data.metadata,
942-
duration: Date.now() - activeStream.startTime,
9431010
};
1011+
1012+
const durationMs = data.metadata.duration;
1013+
if (typeof durationMs === "number" && Number.isFinite(durationMs)) {
1014+
this.updateStreamClock(activeStream, activeStream.serverStartTime + durationMs);
1015+
}
9441016
message.metadata = updatedMetadata;
9451017

9461018
// Update tool parts with their results if provided
@@ -1080,6 +1152,7 @@ export class StreamingMessageAggregator {
10801152
// Track tool start time for execution duration calculation
10811153
const context = this.activeStreams.get(data.messageId);
10821154
if (context) {
1155+
this.updateStreamClock(context, data.timestamp);
10831156
context.pendingToolStarts.set(data.toolCallId, data.timestamp);
10841157
}
10851158

@@ -1159,6 +1232,8 @@ export class StreamingMessageAggregator {
11591232
// Track tool execution duration
11601233
const context = this.activeStreams.get(data.messageId);
11611234
if (context) {
1235+
this.updateStreamClock(context, data.timestamp);
1236+
11621237
const startTime = context.pendingToolStarts.get(data.toolCallId);
11631238
if (startTime !== undefined) {
11641239
context.toolExecutionMs += data.timestamp - startTime;
@@ -1191,11 +1266,13 @@ export class StreamingMessageAggregator {
11911266
const message = this.messages.get(data.messageId);
11921267
if (!message) return;
11931268

1194-
// Track first token time (reasoning also counts as first token)
1195-
if (data.delta.length > 0) {
1196-
const context = this.activeStreams.get(data.messageId);
1197-
if (context?.firstTokenTime === null) {
1198-
context.firstTokenTime = data.timestamp;
1269+
const context = this.activeStreams.get(data.messageId);
1270+
if (context) {
1271+
this.updateStreamClock(context, data.timestamp);
1272+
1273+
// Track first token time (reasoning also counts as first token)
1274+
if (data.delta.length > 0 && context.serverFirstTokenTime === null) {
1275+
context.serverFirstTokenTime = data.timestamp;
11991276
}
12001277
}
12011278

0 commit comments

Comments
 (0)