Skip to content

Commit 02aea0f

Browse files
committed
Simplify state in both stream managers
1 parent 86d4d4f commit 02aea0f

File tree

3 files changed

+67
-81
lines changed

3 files changed

+67
-81
lines changed

src/services/streamManager.ts

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,6 @@ export class StreamManager extends EventEmitter {
300300
historySequence,
301301
} as StreamStartEvent);
302302

303-
let currentTextBuffer = "";
304-
305303
// Use fullStream to capture all events including tool calls
306304
const toolCalls = new Map<
307305
string,
@@ -323,31 +321,18 @@ export class StreamManager extends EventEmitter {
323321

324322
switch (part.type) {
325323
case "text-delta":
326-
currentTextBuffer += part.text;
327324
this.emit("stream-delta", {
328325
type: "stream-delta",
329326
workspaceId: workspaceId as string,
330327
messageId: streamInfo.messageId,
331328
delta: part.text,
332329
} as StreamDeltaEvent);
333330

334-
// Update parts array for partial message
335-
if (
336-
streamInfo.parts.length > 0 &&
337-
streamInfo.parts[streamInfo.parts.length - 1].type === "text"
338-
) {
339-
// Update existing text part
340-
streamInfo.parts[streamInfo.parts.length - 1] = {
341-
type: "text",
342-
text: currentTextBuffer,
343-
};
344-
} else {
345-
// Add new text part
346-
streamInfo.parts.push({
347-
type: "text",
348-
text: currentTextBuffer,
349-
});
350-
}
331+
// Append each delta as a new part (merging happens at display time)
332+
streamInfo.parts.push({
333+
type: "text",
334+
text: part.text,
335+
});
351336

352337
// Schedule partial write (throttled, fire-and-forget to not block stream)
353338
void this.schedulePartialWrite(workspaceId, streamInfo);
@@ -356,21 +341,11 @@ export class StreamManager extends EventEmitter {
356341
case "reasoning-delta": {
357342
const delta = (part as ReasoningDeltaPart).text ?? "";
358343

359-
// Check if last part is reasoning (consistent with text-delta handling)
360-
const lastPart = streamInfo.parts[streamInfo.parts.length - 1];
361-
if (lastPart?.type === "reasoning") {
362-
// Update existing reasoning part
363-
streamInfo.parts[streamInfo.parts.length - 1] = {
364-
type: "reasoning",
365-
text: lastPart.text + delta,
366-
};
367-
} else {
368-
// Push new reasoning part
369-
streamInfo.parts.push({
370-
type: "reasoning",
371-
text: delta,
372-
});
373-
}
344+
// Append each delta as a new part (merging happens at display time)
345+
streamInfo.parts.push({
346+
type: "reasoning",
347+
text: delta,
348+
});
374349

375350
this.emit("reasoning-delta", {
376351
type: "reasoning-delta",
@@ -393,10 +368,6 @@ export class StreamManager extends EventEmitter {
393368
}
394369

395370
case "tool-call": {
396-
// Reset text buffer to separate text segments (text is already in parts from text-delta handler)
397-
// We don't push here because text-delta already maintains parts array
398-
currentTextBuffer = "";
399-
400371
// Tool call started - store in map for later lookup
401372
toolCalls.set(part.toolCallId, {
402373
toolCallId: part.toolCallId,

src/utils/StreamingMessageAggregator.test.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ describe("StreamingMessageAggregator", () => {
171171
expect(messages).toHaveLength(1);
172172

173173
const message = messages[0];
174-
// Should have 3 parts: text, tool, text
175-
expect(message.parts).toHaveLength(3);
174+
// Should have 4 parts: text, tool, text, text (deltas not merged during streaming)
175+
expect(message.parts).toHaveLength(4);
176176

177177
// First text part (before tool)
178178
expect(message.parts[0].type).toBe("text");
@@ -186,10 +186,12 @@ describe("StreamingMessageAggregator", () => {
186186
expect(toolPart.toolName).toBe("searchFiles");
187187
expect(toolPart.state).toBe("output-available");
188188

189-
// Second text part (after tool)
189+
// Second and third text parts (after tool) - separate deltas not yet merged
190190
expect(message.parts[2].type).toBe("text");
191-
if (message.parts[2].type === "text") {
192-
expect(message.parts[2].text).toBe("I found the following results: file1.ts and file2.ts");
191+
expect(message.parts[3].type).toBe("text");
192+
if (message.parts[2].type === "text" && message.parts[3].type === "text") {
193+
expect(message.parts[2].text).toBe("I found the following results: ");
194+
expect(message.parts[3].text).toBe("file1.ts and file2.ts");
193195
}
194196

195197
// Test DisplayedMessages split
@@ -348,9 +350,18 @@ describe("StreamingMessageAggregator", () => {
348350
const messages = aggregator.getAllMessages();
349351
expect(messages).toHaveLength(1);
350352

353+
// Raw parts are separate deltas (2 parts: "Hello, " and "world!")
354+
expect(messages[0].parts).toHaveLength(2);
351355
const firstPart = messages[0].parts[0];
352356
if (firstPart.type === "text") {
353-
expect(firstPart.text).toBe("Hello, world!");
357+
expect(firstPart.text).toBe("Hello, ");
358+
}
359+
360+
// DisplayedMessages should merge them
361+
const displayedMessages = aggregator.getDisplayedMessages();
362+
expect(displayedMessages).toHaveLength(1);
363+
if (displayedMessages[0].type === "assistant") {
364+
expect(displayedMessages[0].content).toBe("Hello, world!");
354365
}
355366
});
356367

src/utils/StreamingMessageAggregator.ts

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,13 @@ export class StreamingMessageAggregator {
111111

112112
this.activeStreams.set(context.streamingId, context);
113113

114-
// Create initial streaming message
114+
// Create initial streaming message with empty parts (deltas will append)
115115
const streamingMessage = createCmuxMessage(data.messageId, "assistant", "", {
116116
historySequence: data.historySequence,
117117
streamingId: context.streamingId,
118118
timestamp: Date.now(),
119119
model: data.model,
120120
});
121-
// Start with empty text part (streaming status inferred from activeStreams)
122-
streamingMessage.parts[0] = { type: "text", text: "" };
123121

124122
this.messages.set(data.messageId, streamingMessage);
125123
this.incrementDisplayVersion();
@@ -129,21 +127,11 @@ export class StreamingMessageAggregator {
129127
const message = this.messages.get(data.messageId);
130128
if (!message) return;
131129

132-
// Check if last part is text (consistent with text handling in streamManager)
133-
const lastPart = message.parts[message.parts.length - 1];
134-
if (lastPart?.type === "text") {
135-
// Update existing text part
136-
message.parts[message.parts.length - 1] = {
137-
type: "text",
138-
text: lastPart.text + data.delta,
139-
};
140-
} else {
141-
// Push new text part
142-
message.parts.push({
143-
type: "text",
144-
text: data.delta,
145-
});
146-
}
130+
// Append each delta as a new part (merging happens at display time)
131+
message.parts.push({
132+
type: "text",
133+
text: data.delta,
134+
});
147135
this.incrementDisplayVersion();
148136
}
149137

@@ -279,21 +267,11 @@ export class StreamingMessageAggregator {
279267
const message = this.messages.get(data.messageId);
280268
if (!message) return;
281269

282-
// Check if last part is reasoning (consistent with text handling)
283-
const lastPart = message.parts[message.parts.length - 1];
284-
if (lastPart?.type === "reasoning") {
285-
// Update existing reasoning part
286-
message.parts[message.parts.length - 1] = {
287-
type: "reasoning",
288-
text: lastPart.text + data.delta,
289-
};
290-
} else {
291-
// Push new reasoning part
292-
message.parts.push({
293-
type: "reasoning",
294-
text: data.delta,
295-
});
296-
}
270+
// Append each delta as a new part (merging happens at display time)
271+
message.parts.push({
272+
type: "reasoning",
273+
text: data.delta,
274+
});
297275
this.incrementDisplayVersion();
298276
}
299277

@@ -376,11 +354,37 @@ export class StreamingMessageAggregator {
376354
// Check if this message has an active stream (for inferring streaming status)
377355
const hasActiveStream = this.getActiveStreams().some((s) => s.messageId === message.id);
378356

357+
// Merge adjacent parts of same type (text with text, reasoning with reasoning)
358+
// This is where all merging happens - streaming just appends raw deltas
359+
const mergedParts: typeof message.parts = [];
360+
for (let i = 0; i < message.parts.length; i++) {
361+
const part = message.parts[i];
362+
const lastMerged = mergedParts[mergedParts.length - 1];
363+
364+
// Try to merge with last part if same type
365+
if (lastMerged?.type === "text" && part.type === "text") {
366+
// Merge text parts
367+
mergedParts[mergedParts.length - 1] = {
368+
type: "text",
369+
text: lastMerged.text + part.text,
370+
};
371+
} else if (lastMerged?.type === "reasoning" && part.type === "reasoning") {
372+
// Merge reasoning parts
373+
mergedParts[mergedParts.length - 1] = {
374+
type: "reasoning",
375+
text: lastMerged.text + part.text,
376+
};
377+
} else {
378+
// Different type or tool part - add new part
379+
mergedParts.push(part);
380+
}
381+
}
382+
379383
// Find the last part that will produce a DisplayedMessage
380384
// (reasoning, text parts with content, OR tool parts)
381385
let lastPartIndex = -1;
382-
for (let i = message.parts.length - 1; i >= 0; i--) {
383-
const part = message.parts[i];
386+
for (let i = mergedParts.length - 1; i >= 0; i--) {
387+
const part = mergedParts[i];
384388
if (
385389
part.type === "reasoning" ||
386390
(part.type === "text" && part.text) ||
@@ -391,7 +395,7 @@ export class StreamingMessageAggregator {
391395
}
392396
}
393397

394-
message.parts.forEach((part, partIndex) => {
398+
mergedParts.forEach((part, partIndex) => {
395399
const isLastPart = partIndex === lastPartIndex;
396400
// Part is streaming if: active stream exists AND this is the last part
397401
const isStreaming = hasActiveStream && isLastPart;

0 commit comments

Comments
 (0)