Skip to content

Commit cb30e65

Browse files
authored
🤖 fix: compaction stream validation and replay blocking (#1247)
## Summary Fix several issues causing compaction (`/compact`) and workspace refresh to fail: 1. **Stream replay blocking `caught-up`**: `replayStream()` iterated a live `parts` array that continued growing during an active stream. This could delay the `caught-up` event indefinitely, leaving the UI stuck on "Loading workspace…". Fixed by snapshotting `parts` before iteration. 2. **ORPC validation failures for stream mode**: Compaction used `mode: "compact"` in `stream-start` events, but the schema only allowed `plan | exec`. This caused `EVENT_ITERATOR_VALIDATION_FAILED` errors, triggering resubscribe loops. Fixed by allowing any string for mode. 3. **Malformed init-output during replay**: Persisted init state could contain malformed entries that failed schema validation when replayed. Added defensive checks to skip invalid entries. 4. **Compaction tool policy bug**: `toolPolicy: []` was intended to disable tools but actually allowed all tools (empty policy = allow all). Fixed by using an explicit disable-all regex. ## Changes - `src/node/services/streamManager.ts`: Snapshot parts array before replay iteration - `src/common/orpc/schemas/stream.ts`: Loosen mode to `z.string().optional()` - `src/common/orpc/schemas/workspaceStats.ts`: Loosen ModeSchema to string - `src/node/services/sessionTimingService.ts`: Update mode type to `string | undefined` - `src/node/services/initStateManager.ts`: Skip malformed init-output entries during replay - `src/browser/utils/messages/compactionOptions.ts`: Use explicit disable-all tool policy - `src/browser/utils/messages/StreamingMessageAggregator.ts`: Remove unnecessary type assertion ## Testing - Added unit test verifying `replayStream()` snapshots parts and terminates promptly - Manual testing of `/compact` with workspace refresh --- _Generated with `mux` • Model: `anthropic:claude-opus-4-5` • Thinking: `high`_
1 parent 43fa874 commit cb30e65

File tree

13 files changed

+127
-29
lines changed

13 files changed

+127
-29
lines changed

.storybook/mocks/orpc.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,22 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): APICl
287287
data: { success: true, output: "", exitCode: 0, wall_duration_ms: 0 },
288288
};
289289
},
290-
onChat: async function* (input: { workspaceId: string }) {
290+
onChat: async function* (
291+
input: { workspaceId: string },
292+
options?: { signal?: AbortSignal }
293+
) {
291294
if (!onChat) {
295+
// Default mock behavior: subscriptions should remain open.
296+
// If this ends, WorkspaceStore will retry and reset state, which flakes stories.
292297
yield { type: "caught-up" } as WorkspaceChatMessage;
298+
299+
await new Promise<void>((resolve) => {
300+
if (options?.signal?.aborted) {
301+
resolve();
302+
return;
303+
}
304+
options?.signal?.addEventListener("abort", () => resolve(), { once: true });
305+
});
293306
return;
294307
}
295308

src/browser/components/RightSidebar/StatsTab.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const VIEW_MODE_OPTIONS: Array<ToggleOption<ViewMode>> = [
3737
interface ModelBreakdownEntry {
3838
key: string;
3939
model: string;
40-
mode?: "plan" | "exec";
40+
mode?: string;
4141
totalDurationMs: number;
4242
totalToolExecutionMs: number;
4343
totalStreamingMs: number;

src/browser/components/RightSidebar/statsTabCalculations.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export interface ModelBreakdownEntry {
2828
tokensPerSec: number | null;
2929
avgTokensPerMsg: number | null;
3030
avgReasoningPerMsg: number | null;
31-
mode?: "plan" | "exec";
31+
mode?: string;
3232
}
3333

3434
export interface ModelBreakdownData {
@@ -139,7 +139,7 @@ function getModelDisplayName(model: string): string {
139139
const MODE_SUFFIX_PLAN = ":plan" as const;
140140
const MODE_SUFFIX_EXEC = ":exec" as const;
141141

142-
function parseStatsKey(key: string): { model: string; mode?: "plan" | "exec" } {
142+
function parseStatsKey(key: string): { model: string; mode?: string } {
143143
if (key.endsWith(MODE_SUFFIX_PLAN)) {
144144
return { model: key.slice(0, -MODE_SUFFIX_PLAN.length), mode: "plan" };
145145
}
@@ -220,7 +220,7 @@ export function computeModelBreakdownData(params: {
220220
ttftCount: number;
221221
liveTPS: number | null;
222222
liveTokenCount: number;
223-
mode?: "plan" | "exec";
223+
mode?: string;
224224
}
225225

226226
const breakdown: Record<string, BreakdownEntry> = {};
@@ -287,7 +287,7 @@ export function computeModelBreakdownData(params: {
287287
const toModelBreakdownEntry = (
288288
model: string,
289289
stats: BreakdownEntry,
290-
mode?: "plan" | "exec"
290+
mode?: string
291291
): ModelBreakdownEntry => {
292292
const modelTime = Math.max(0, stats.totalDuration - stats.toolExecutionMs);
293293
const avgTtft = stats.ttftCount > 0 ? stats.ttftSum / stats.ttftCount : null;

src/browser/stores/WorkspaceStore.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ export interface StreamTimingStats {
8484
/** Live tokens-per-second during streaming - only available for active streams */
8585
liveTPS?: number;
8686
/** Mode (plan/exec) in which this stream occurred */
87-
mode?: "plan" | "exec";
87+
mode?: string;
8888
}
8989

9090
/** Per-model timing statistics */
@@ -104,7 +104,7 @@ export interface ModelTimingStats {
104104
/** Total reasoning/thinking tokens generated by this model */
105105
totalReasoningTokens: number;
106106
/** Mode extracted from composite key (undefined for old data without mode) */
107-
mode?: "plan" | "exec";
107+
mode?: string;
108108
}
109109

110110
/**

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ interface StreamingContext {
7070
pendingToolStarts: Map<string, number>;
7171

7272
/** Mode (plan/exec) */
73-
mode?: "plan" | "exec";
73+
mode?: string;
7474
}
7575

7676
/**
@@ -222,7 +222,7 @@ export class StreamingMessageAggregator {
222222
outputTokens: number;
223223
reasoningTokens: number;
224224
streamingMs: number; // Time from first token to end (for accurate tok/s)
225-
mode?: "plan" | "exec"; // Mode in which this response occurred
225+
mode?: string; // Mode in which this response occurred
226226
} | null = null;
227227

228228
// Session-level timing stats: model -> stats (totals computed on-the-fly)
@@ -468,7 +468,7 @@ export class StreamingMessageAggregator {
468468
// Streaming duration excludes TTFT and tool execution - used for avg tok/s
469469
const streamingMs = Math.max(0, durationMs - (ttftMs ?? 0) - totalToolExecutionMs);
470470

471-
const mode = (message?.metadata?.mode ?? context.mode) as "plan" | "exec" | undefined;
471+
const mode = message?.metadata?.mode ?? context.mode;
472472

473473
// Store last completed stream stats (include durations anchored in the renderer clock)
474474
const startTime = endTime - durationMs;
@@ -648,7 +648,7 @@ export class StreamingMessageAggregator {
648648
/** Live tokens-per-second (trailing window) */
649649
liveTPS: number;
650650
/** Mode (plan/exec) for this stream */
651-
mode?: "plan" | "exec";
651+
mode?: string;
652652
} | null {
653653
// Get the first (and typically only) active stream
654654
const entries = Array.from(this.activeStreams.entries());
@@ -695,7 +695,7 @@ export class StreamingMessageAggregator {
695695
outputTokens: number;
696696
reasoningTokens: number;
697697
streamingMs: number;
698-
mode?: "plan" | "exec";
698+
mode?: string;
699699
} | null {
700700
return this.lastCompletedStreamStats;
701701
}
@@ -728,7 +728,7 @@ export class StreamingMessageAggregator {
728728
totalOutputTokens: number;
729729
totalReasoningTokens: number;
730730
/** Mode extracted from composite key, undefined for old data */
731-
mode?: "plan" | "exec";
731+
mode?: string;
732732
}
733733
>;
734734
} | null {
@@ -755,15 +755,15 @@ export class StreamingMessageAggregator {
755755
responseCount: number;
756756
totalOutputTokens: number;
757757
totalReasoningTokens: number;
758-
mode?: "plan" | "exec";
758+
mode?: string;
759759
}
760760
> = {};
761761

762762
for (const [key, stats] of modelEntries) {
763763
// Parse composite key: "model" or "model:mode"
764764
// Model names can contain colons (e.g., "mux-gateway:provider/model")
765765
// so we look for ":plan" or ":exec" suffix specifically
766-
let mode: "plan" | "exec" | undefined;
766+
let mode: string | undefined;
767767
if (key.endsWith(":plan")) {
768768
mode = "plan";
769769
} else if (key.endsWith(":exec")) {

src/browser/utils/messages/compactionOptions.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ describe("applyCompactionOverrides", () => {
6262
const result = applyCompactionOverrides(baseOptions, compactData);
6363

6464
expect(result.mode).toBe("compact");
65-
expect(result.toolPolicy).toEqual([]);
65+
expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]);
6666
});
6767

6868
it("disables all tools even when base options has tool policy", () => {
@@ -74,7 +74,7 @@ describe("applyCompactionOverrides", () => {
7474
const result = applyCompactionOverrides(baseWithTools, compactData);
7575

7676
expect(result.mode).toBe("compact");
77-
expect(result.toolPolicy).toEqual([]); // Tools always disabled for compaction
77+
expect(result.toolPolicy).toEqual([{ regex_match: ".*", action: "disable" }]); // Tools always disabled for compaction
7878
});
7979

8080
it("applies all overrides together", () => {

src/browser/utils/messages/compactionOptions.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export function applyCompactionOverrides(
3535
thinkingLevel: baseOptions.thinkingLevel,
3636
maxOutputTokens: compactData.maxOutputTokens,
3737
mode: "compact" as const,
38-
toolPolicy: [], // Disable all tools during compaction
38+
// Disable all tools during compaction - regex .* matches all tool names
39+
toolPolicy: [{ regex_match: ".*", action: "disable" }],
3940
};
4041
}

src/common/orpc/schemas/stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ export const StreamStartEventSchema = z.object({
4343
startTime: z.number().meta({
4444
description: "Backend timestamp when stream started (Date.now())",
4545
}),
46-
mode: z.enum(["plan", "exec"]).optional().meta({
47-
description: "Agent mode (plan/exec) for this stream",
46+
mode: z.string().optional().meta({
47+
description: "Agent mode for this stream",
4848
}),
4949
});
5050

src/common/orpc/schemas/workspaceStats.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { z } from "zod";
22

3-
const ModeSchema = z.enum(["plan", "exec"]);
3+
// Mode is a string to support any mode value (plan, exec, compact, etc.)
4+
const ModeSchema = z.string();
45

56
export const TimingAnomalySchema = z.enum([
67
"negative_duration",

src/node/services/initStateManager.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,14 @@ export class InitStateManager extends EventEmitter {
100100
});
101101

102102
// Emit init-output for each accumulated line with original timestamps
103-
for (const timedLine of state.lines) {
103+
// Defensive: state.lines could be undefined from old persisted data
104+
const lines = state.lines ?? [];
105+
for (const timedLine of lines) {
106+
// Skip malformed entries (missing required fields)
107+
if (typeof timedLine.line !== "string" || typeof timedLine.timestamp !== "number") {
108+
log.warn(`[InitStateManager] Skipping malformed init-output:`, timedLine);
109+
continue;
110+
}
104111
events.push({
105112
type: "init-output",
106113
workspaceId,

0 commit comments

Comments
 (0)