Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,37 +498,8 @@ export class StreamManager extends EventEmitter {

streamInfo.abortController.abort();

// CRITICAL: Wait for processing to fully complete before cleanup
// This prevents race conditions where the old stream is still running
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;

// For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage.
// cumulativeUsage is updated on each finish-step event (before tool execution),
// so it has accurate data even when the stream is interrupted mid-tool-call.
// AI SDK's totalUsage may return zeros or stale data when aborted.
const duration = Date.now() - streamInfo.startTime;
const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0;
const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined;

// For context window display, use last step's usage (inputTokens = current context size)
const contextUsage = streamInfo.lastStepUsage;
const contextProviderMetadata = streamInfo.lastStepProviderMetadata;

// Include provider metadata for accurate cost calculation
const providerMetadata = streamInfo.cumulativeProviderMetadata;

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, contextUsage, duration, providerMetadata, contextProviderMetadata },
abandonPartial,
});

// Clean up immediately
this.workspaceStreams.delete(workspaceId);
// Unlike checkSoftCancelStream, await cleanup (blocking)
await this.cleanupStream(workspaceId, streamInfo, abandonPartial);
} catch (error) {
log.error("Error during stream cancellation:", error);
// Force cleanup even if cancellation fails
Expand Down Expand Up @@ -575,6 +546,9 @@ export class StreamManager extends EventEmitter {
await streamInfo.processingPromise;

// For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage.
// cumulativeUsage is updated on each finish-step event (before tool execution),
// so it has accurate data even when the stream is interrupted mid-tool-call.
// AI SDK's totalUsage may return zeros or stale data when aborted.
const duration = Date.now() - streamInfo.startTime;
const hasCumulativeUsage = (streamInfo.cumulativeUsage.totalTokens ?? 0) > 0;
const usage = hasCumulativeUsage ? streamInfo.cumulativeUsage : undefined;
Expand Down Expand Up @@ -1563,8 +1537,7 @@ export class StreamManager extends EventEmitter {

/**
* Stops an active stream for a workspace
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
* Second call: Hard aborts the stream immediately
* If soft is true, performs a soft interrupt (cancels at next block boundary)
*/
async stopStream(
workspaceId: string,
Expand Down