diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 05eadbac75..6c28e9ad60 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -498,37 +498,8 @@ export class StreamManager extends EventEmitter { streamInfo.abortController.abort(); - // CRITICAL: Wait for processing to fully complete before cleanup - // This prevents race conditions where the old stream is still running - // while a new stream starts (e.g., old stream writing to partial.json) - await streamInfo.processingPromise; - - // For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage. - // cumulativeUsage is updated on each finish-step event (before tool execution), - // so it has accurate data even when the stream is interrupted mid-tool-call. - // AI SDK's totalUsage may return zeros or stale data when aborted. - const duration = Date.now() - streamInfo.startTime; - const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0; - const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined; - - // For context window display, use last step's usage (inputTokens = current context size) - const contextUsage = streamInfo.lastStepUsage; - const contextProviderMetadata = streamInfo.lastStepProviderMetadata; - - // Include provider metadata for accurate cost calculation - const providerMetadata = streamInfo.cumulativeProviderMetadata; - - // Emit abort event with usage if available - this.emit("stream-abort", { - type: "stream-abort", - workspaceId: workspaceId as string, - messageId: streamInfo.messageId, - metadata: { usage, contextUsage, duration, providerMetadata, contextProviderMetadata }, - abandonPartial, - }); - - // Clean up immediately - this.workspaceStreams.delete(workspaceId); + // Unlike checkSoftCancelStream, await cleanup (blocking) + await this.cleanupStream(workspaceId, streamInfo, abandonPartial); } catch (error) { log.error("Error during stream cancellation:", error); // Force cleanup even if cancellation fails @@ -575,6 +546,9 @@ export class StreamManager extends EventEmitter { await streamInfo.processingPromise; // For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage. + // cumulativeUsage is updated on each finish-step event (before tool execution), + // so it has accurate data even when the stream is interrupted mid-tool-call. + // AI SDK's totalUsage may return zeros or stale data when aborted. const duration = Date.now() - streamInfo.startTime; const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0; const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined; @@ -1563,8 +1537,7 @@ export class StreamManager extends EventEmitter { /** * Stops an active stream for a workspace - * First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..." - * Second call: Hard aborts the stream immediately + * If soft is true, performs a soft interrupt (cancels at next block boundary) */ async stopStream( workspaceId: string,