From 00dc5c32f080e5beecbbc4ab06539370fb46c01e Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 4 Dec 2025 14:18:04 +1100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20refactor:=20deduplicate=20stream?= =?UTF-8?q?=20cleanup=20code=20in=20streamManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cancelStreamSafely and cleanupStream had identical cleanup logic. Now cancelStreamSafely delegates to cleanupStream, matching how checkSoftCancelStream already works. ~26 lines removed, no behavior change. --- src/node/services/streamManager.ts | 39 +++++------------------------- 1 file changed, 6 insertions(+), 33 deletions(-) diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 05eadbac75..6c28e9ad60 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -498,37 +498,8 @@ export class StreamManager extends EventEmitter { streamInfo.abortController.abort(); - // CRITICAL: Wait for processing to fully complete before cleanup - // This prevents race conditions where the old stream is still running - // while a new stream starts (e.g., old stream writing to partial.json) - await streamInfo.processingPromise; - - // For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage. - // cumulativeUsage is updated on each finish-step event (before tool execution), - // so it has accurate data even when the stream is interrupted mid-tool-call. - // AI SDK's totalUsage may return zeros or stale data when aborted. - const duration = Date.now() - streamInfo.startTime; - const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0; - const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined; - - // For context window display, use last step's usage (inputTokens = current context size) - const contextUsage = streamInfo.lastStepUsage; - const contextProviderMetadata = streamInfo.lastStepProviderMetadata; - - // Include provider metadata for accurate cost calculation - const providerMetadata = streamInfo.cumulativeProviderMetadata; - - // Emit abort event with usage if available - this.emit("stream-abort", { - type: "stream-abort", - workspaceId: workspaceId as string, - messageId: streamInfo.messageId, - metadata: { usage, contextUsage, duration, providerMetadata, contextProviderMetadata }, - abandonPartial, - }); - - // Clean up immediately - this.workspaceStreams.delete(workspaceId); + // Unlike checkSoftCancelStream, await cleanup (blocking) + await this.cleanupStream(workspaceId, streamInfo, abandonPartial); } catch (error) { log.error("Error during stream cancellation:", error); // Force cleanup even if cancellation fails @@ -575,6 +546,9 @@ export class StreamManager extends EventEmitter { await streamInfo.processingPromise; // For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage. + // cumulativeUsage is updated on each finish-step event (before tool execution), + // so it has accurate data even when the stream is interrupted mid-tool-call. + // AI SDK's totalUsage may return zeros or stale data when aborted. const duration = Date.now() - streamInfo.startTime; const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0; const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined; @@ -1563,8 +1537,7 @@ export class StreamManager extends EventEmitter { /** * Stops an active stream for a workspace - * First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..." - * Second call: Hard aborts the stream immediately + * If soft is true, performs a soft interrupt (cancels at next block boundary) */ async stopStream( workspaceId: string,