Skip to content

Commit 8c09e0d

Browse files
committed
fix(agent-runtime): fix duplicate text in subagent streaming view
1 parent 268a9f9 commit 8c09e0d

File tree

5 files changed

+205
-90
lines changed

5 files changed

+205
-90
lines changed

packages/agent-runtime/src/__tests__/subagent-streaming.test.ts

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { handleSpawnAgents } from '../tools/handlers/tool/spawn-agents'
2121
import type { AgentTemplate } from '../templates/types'
2222
import type { SendSubagentChunk } from '../tools/handlers/tool/spawn-agents'
2323
import type { CodebuffToolCall } from '@codebuff/common/tools/list'
24+
import type { PrintModeEvent } from '@codebuff/common/types/print-mode'
2425
import type { ParamsExcluding } from '@codebuff/common/types/function-params'
2526
import type { Mock } from 'bun:test'
2627

@@ -118,6 +119,7 @@ describe('Subagent Streaming', () => {
118119
beforeEach(() => {
119120
mockSendSubagentChunk.mockClear()
120121
mockLoopAgentSteps.mockClear()
122+
mockWriteToClient.mockClear()
121123
})
122124

123125
afterAll(() => {
@@ -171,7 +173,6 @@ describe('Subagent Streaming', () => {
171173
2,
172174
expect.objectContaining({ type: 'subagent_finish' }),
173175
)
174-
return
175176
})
176177

177178
it('should include correct agentId and agentType in streaming messages', async () => {
@@ -228,4 +229,75 @@ describe('Subagent Streaming', () => {
228229
expect(firstCall.userInputId).toBe('test-input')
229230
expect(secondCall.userInputId).toBe('test-input')
230231
})
232+
233+
it('streams subagent text chunks without duplication', async () => {
234+
mockLoopAgentSteps.mockImplementationOnce(async (options) => {
235+
options.onResponseChunk?.('first chunk')
236+
options.onResponseChunk?.({ type: 'text', text: 'second chunk' })
237+
options.onResponseChunk?.({ type: 'text', text: '' })
238+
239+
return {
240+
agentState: {
241+
...options.agentState,
242+
messageHistory: [assistantMessage('Final subagent response')],
243+
},
244+
output: {
245+
type: 'lastMessage',
246+
value: [assistantMessage('Final subagent response')],
247+
},
248+
}
249+
})
250+
251+
const sessionState = getInitialSessionState(mockFileContext)
252+
const agentState = sessionState.mainAgentState
253+
254+
const parentTemplate = {
255+
id: 'base',
256+
spawnableAgents: ['thinker'],
257+
} as unknown as AgentTemplate
258+
259+
const toolCall: CodebuffToolCall<'spawn_agents'> = {
260+
toolName: 'spawn_agents' as const,
261+
toolCallId: 'test-tool-call-id-3',
262+
input: {
263+
agents: [
264+
{
265+
agent_type: 'thinker',
266+
prompt: 'Check for duplicates',
267+
},
268+
],
269+
},
270+
}
271+
272+
await handleSpawnAgents({
273+
...handleSpawnAgentsBaseParams,
274+
agentState,
275+
agentTemplate: parentTemplate,
276+
localAgentTemplates: {
277+
[mockAgentTemplate.id]: mockAgentTemplate,
278+
},
279+
toolCall,
280+
})
281+
282+
const streamedChunks = (
283+
mockSendSubagentChunk.mock.calls as Array<
284+
[Parameters<SendSubagentChunk>[0]]
285+
>
286+
).map(([call]) => call.chunk)
287+
288+
expect(streamedChunks).toEqual(['first chunk', 'second chunk'])
289+
290+
const subagentLifecycleEvents = (
291+
mockWriteToClient.mock.calls as Array<[string | PrintModeEvent]>
292+
)
293+
.map(([event]) => event)
294+
.filter(
295+
(event) =>
296+
typeof event === 'object' &&
297+
(event?.type === 'subagent_start' ||
298+
event?.type === 'subagent_finish'),
299+
)
300+
301+
expect(subagentLifecycleEvents).toHaveLength(2)
302+
})
231303
})

packages/agent-runtime/src/__tests__/tool-stream-parser.test.ts

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,6 @@ describe('processStreamWithTags', () => {
5151
}
5252

5353
const result: string[] = []
54-
const responseChunks: any[] = []
55-
56-
function onResponseChunk(chunk: any) {
57-
responseChunks.push(chunk)
58-
}
5954

6055
function defaultProcessor(toolName: string) {
6156
return {
@@ -70,7 +65,6 @@ describe('processStreamWithTags', () => {
7065
processors,
7166
defaultProcessor,
7267
onError,
73-
onResponseChunk,
7468
})) {
7569
if (chunk.type === 'text') {
7670
result.push(chunk.text)
@@ -117,11 +111,6 @@ describe('processStreamWithTags', () => {
117111
}
118112

119113
const result: string[] = []
120-
const responseChunks: any[] = []
121-
122-
function onResponseChunk(chunk: any) {
123-
responseChunks.push(chunk)
124-
}
125114

126115
function defaultProcessor(toolName: string) {
127116
return {
@@ -136,7 +125,6 @@ describe('processStreamWithTags', () => {
136125
processors,
137126
defaultProcessor,
138127
onError,
139-
onResponseChunk,
140128
})) {
141129
if (chunk.type === 'text') {
142130
result.push(chunk.text)
@@ -193,11 +181,6 @@ describe('processStreamWithTags', () => {
193181
}
194182

195183
const result: string[] = []
196-
const responseChunks: any[] = []
197-
198-
function onResponseChunk(chunk: any) {
199-
responseChunks.push(chunk)
200-
}
201184

202185
function defaultProcessor(toolName: string) {
203186
return {
@@ -212,7 +195,6 @@ describe('processStreamWithTags', () => {
212195
processors,
213196
defaultProcessor,
214197
onError,
215-
onResponseChunk,
216198
})) {
217199
if (chunk.type === 'text') {
218200
result.push(chunk.text)
@@ -267,12 +249,6 @@ describe('processStreamWithTags', () => {
267249
events.push({ name, error, type: 'error' })
268250
}
269251

270-
const responseChunks: any[] = []
271-
272-
function onResponseChunk(chunk: any) {
273-
responseChunks.push(chunk)
274-
}
275-
276252
function defaultProcessor(toolName: string) {
277253
// For unknown tools, still return a processor but track the error
278254
events.push({
@@ -292,7 +268,6 @@ describe('processStreamWithTags', () => {
292268
processors,
293269
defaultProcessor,
294270
onError,
295-
onResponseChunk,
296271
})) {
297272
// consume stream
298273
}
@@ -341,11 +316,6 @@ describe('processStreamWithTags', () => {
341316
}
342317

343318
const result: string[] = []
344-
const responseChunks: any[] = []
345-
346-
function onResponseChunk(chunk: any) {
347-
responseChunks.push(chunk)
348-
}
349319

350320
function defaultProcessor(toolName: string) {
351321
return {
@@ -360,7 +330,6 @@ describe('processStreamWithTags', () => {
360330
processors,
361331
defaultProcessor,
362332
onError,
363-
onResponseChunk,
364333
})) {
365334
if (chunk.type === 'text') {
366335
result.push(chunk.text)
@@ -413,11 +382,6 @@ describe('processStreamWithTags', () => {
413382
}
414383

415384
const result: string[] = []
416-
const responseChunks: any[] = []
417-
418-
function onResponseChunk(chunk: any) {
419-
responseChunks.push(chunk)
420-
}
421385

422386
function defaultProcessor(toolName: string) {
423387
return {
@@ -432,7 +396,6 @@ describe('processStreamWithTags', () => {
432396
processors,
433397
defaultProcessor,
434398
onError,
435-
onResponseChunk,
436399
})) {
437400
if (chunk.type === 'text') {
438401
result.push(chunk.text)
@@ -466,11 +429,6 @@ describe('processStreamWithTags', () => {
466429
}
467430

468431
const result: string[] = []
469-
const responseChunks: any[] = []
470-
471-
function onResponseChunk(chunk: any) {
472-
responseChunks.push(chunk)
473-
}
474432

475433
function defaultProcessor(toolName: string) {
476434
return {
@@ -485,7 +443,6 @@ describe('processStreamWithTags', () => {
485443
processors,
486444
defaultProcessor,
487445
onError,
488-
onResponseChunk,
489446
})) {
490447
if (chunk.type === 'text') {
491448
result.push(chunk.text)
@@ -512,11 +469,6 @@ describe('processStreamWithTags', () => {
512469
}
513470

514471
const result: string[] = []
515-
const responseChunks: any[] = []
516-
517-
function onResponseChunk(chunk: any) {
518-
responseChunks.push(chunk)
519-
}
520472

521473
function defaultProcessor(toolName: string) {
522474
return {
@@ -531,7 +483,6 @@ describe('processStreamWithTags', () => {
531483
processors,
532484
defaultProcessor,
533485
onError,
534-
onResponseChunk,
535486
})) {
536487
if (chunk.type === 'text') {
537488
result.push(chunk.text)

packages/agent-runtime/src/tool-stream-parser.ts

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@ import type { Model } from '@codebuff/common/old-constants'
44
import type { TrackEventFn } from '@codebuff/common/types/contracts/analytics'
55
import type { StreamChunk } from '@codebuff/common/types/contracts/llm'
66
import type { Logger } from '@codebuff/common/types/contracts/logger'
7-
import type {
8-
PrintModeError,
9-
PrintModeText,
10-
} from '@codebuff/common/types/print-mode'
11-
127
export async function* processStreamWithTools(params: {
138
stream: AsyncGenerator<StreamChunk, string | null>
149
processors: Record<
@@ -23,7 +18,6 @@ export async function* processStreamWithTools(params: {
2318
onTagEnd: (tagName: string, params: Record<string, any>) => void
2419
}
2520
onError: (tagName: string, errorMessage: string) => void
26-
onResponseChunk: (chunk: PrintModeText | PrintModeError) => void
2721
logger: Logger
2822
loggerOptions?: {
2923
userId?: string
@@ -37,13 +31,11 @@ export async function* processStreamWithTools(params: {
3731
processors,
3832
defaultProcessor,
3933
onError,
40-
onResponseChunk,
4134
logger,
4235
loggerOptions,
4336
trackEvent,
4437
} = params
4538
let streamCompleted = false
46-
let buffer = ''
4739
let autocompleted = false
4840

4941
function processToolCallObject(params: {
@@ -73,31 +65,14 @@ export async function* processStreamWithTools(params: {
7365
processor.onTagEnd(toolName, input)
7466
}
7567

76-
function flush() {
77-
if (buffer) {
78-
onResponseChunk({
79-
type: 'text',
80-
text: buffer,
81-
})
82-
}
83-
buffer = ''
84-
}
85-
8668
function* processChunk(
8769
chunk: StreamChunk | undefined,
8870
): Generator<StreamChunk> {
8971
if (chunk === undefined) {
90-
flush()
9172
streamCompleted = true
9273
return
9374
}
9475

95-
if (chunk.type === 'text') {
96-
buffer += chunk.text
97-
} else {
98-
flush()
99-
}
100-
10176
if (chunk.type === 'tool-call') {
10277
processToolCallObject(chunk)
10378
}

packages/agent-runtime/src/tools/stream-parser.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,21 +234,6 @@ export async function processStream(
234234
model: agentTemplate.model,
235235
agentName: agentTemplate.id,
236236
},
237-
onResponseChunk: (chunk) => {
238-
if (chunk.type === 'text') {
239-
if (chunk.text) {
240-
assistantMessages.push(assistantMessage(chunk.text))
241-
}
242-
} else if (chunk.type === 'error') {
243-
// do nothing
244-
} else {
245-
chunk satisfies never
246-
throw new Error(
247-
`Internal error: unhandled chunk type: ${(chunk as any).type}`,
248-
)
249-
}
250-
return onResponseChunk(chunk)
251-
},
252237
})
253238

254239
let messageId: string | null = null
@@ -273,6 +258,7 @@ export async function processStream(
273258
} else if (chunk.type === 'text') {
274259
onResponseChunk(chunk.text)
275260
fullResponseChunks.push(chunk.text)
261+
assistantMessages.push(assistantMessage(chunk.text))
276262
} else if (chunk.type === 'error') {
277263
onResponseChunk(chunk)
278264

0 commit comments

Comments
 (0)