Skip to content

Commit 4e85392

Browse files
🤖 refactor: deduplicate stream cleanup code in streamManager (#892)
Deduplicate cleanup logic in `streamManager.ts`. `cancelStreamSafely` and `cleanupStream` had identical cleanup logic (wait for processing, calculate usage, emit stream-abort, delete from map). Now `cancelStreamSafely` delegates to `cleanupStream`, matching how `checkSoftCancelStream` already works. ~26 lines removed, no behavior change. --- _Generated with `mux`_
1 parent 4eef087 commit 4e85392

File tree

1 file changed

+6
-33
lines changed

1 file changed

+6
-33
lines changed

src/node/services/streamManager.ts

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -498,37 +498,8 @@ export class StreamManager extends EventEmitter {
498498

499499
streamInfo.abortController.abort();
500500

501-
// CRITICAL: Wait for processing to fully complete before cleanup
502-
// This prevents race conditions where the old stream is still running
503-
// while a new stream starts (e.g., old stream writing to partial.json)
504-
await streamInfo.processingPromise;
505-
506-
// For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage.
507-
// cumulativeUsage is updated on each finish-step event (before tool execution),
508-
// so it has accurate data even when the stream is interrupted mid-tool-call.
509-
// AI SDK's totalUsage may return zeros or stale data when aborted.
510-
const duration = Date.now() - streamInfo.startTime;
511-
const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0;
512-
const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined;
513-
514-
// For context window display, use last step's usage (inputTokens = current context size)
515-
const contextUsage = streamInfo.lastStepUsage;
516-
const contextProviderMetadata = streamInfo.lastStepProviderMetadata;
517-
518-
// Include provider metadata for accurate cost calculation
519-
const providerMetadata = streamInfo.cumulativeProviderMetadata;
520-
521-
// Emit abort event with usage if available
522-
this.emit("stream-abort", {
523-
type: "stream-abort",
524-
workspaceId: workspaceId as string,
525-
messageId: streamInfo.messageId,
526-
metadata: { usage, contextUsage, duration, providerMetadata, contextProviderMetadata },
527-
abandonPartial,
528-
});
529-
530-
// Clean up immediately
531-
this.workspaceStreams.delete(workspaceId);
501+
// Unlike checkSoftCancelStream, await cleanup (blocking)
502+
await this.cleanupStream(workspaceId, streamInfo, abandonPartial);
532503
} catch (error) {
533504
log.error("Error during stream cancellation:", error);
534505
// Force cleanup even if cancellation fails
@@ -575,6 +546,9 @@ export class StreamManager extends EventEmitter {
575546
await streamInfo.processingPromise;
576547

577548
// For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage.
549+
// cumulativeUsage is updated on each finish-step event (before tool execution),
550+
// so it has accurate data even when the stream is interrupted mid-tool-call.
551+
// AI SDK's totalUsage may return zeros or stale data when aborted.
578552
const duration = Date.now() - streamInfo.startTime;
579553
const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0;
580554
const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined;
@@ -1563,8 +1537,7 @@ export class StreamManager extends EventEmitter {
15631537

15641538
/**
15651539
* Stops an active stream for a workspace
1566-
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
1567-
* Second call: Hard aborts the stream immediately
1540+
* If soft is true, performs a soft interrupt (cancels at next block boundary)
15681541
*/
15691542
async stopStream(
15701543
workspaceId: string,

0 commit comments

Comments
 (0)