@@ -15,7 +15,6 @@ import type {
1515 StreamErrorMessage ,
1616 SendMessageOptions ,
1717 ImagePart ,
18- DeleteMessage ,
1918} from "@/common/types/ipc" ;
2019import type { SendMessageError } from "@/common/types/errors" ;
2120import { createUnknownSendMessageError } from "@/node/services/utils/sendMessageError" ;
@@ -25,9 +24,7 @@ import { enforceThinkingPolicy } from "@/browser/utils/thinking/policy";
2524import { createRuntime } from "@/node/runtime/runtimeFactory" ;
2625import { MessageQueue } from "./messageQueue" ;
2726import type { StreamEndEvent , StreamAbortEvent } from "@/common/types/stream" ;
28- import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator" ;
29- import type { LanguageModelV2Usage } from "@ai-sdk/provider" ;
30- import { accUsageHistory } from "@/common/utils/tokens/displayUsage" ;
27+ import { CompactionHandler } from "./compactionHandler" ;
3128
3229export interface AgentSessionChatEvent {
3330 workspaceId : string ;
@@ -62,7 +59,7 @@ export class AgentSession {
6259 [ ] ;
6360 private disposed = false ;
6461 private readonly messageQueue = new MessageQueue ( ) ;
65- private readonly processedCompactionRequestIds = new Set < string > ( ) ;
62+ private readonly compactionHandler : CompactionHandler ;
6663
6764 constructor ( options : AgentSessionOptions ) {
6865 assert ( options , "AgentSession requires options" ) ;
@@ -80,6 +77,12 @@ export class AgentSession {
8077 this . aiService = aiService ;
8178 this . initStateManager = initStateManager ;
8279
80+ this . compactionHandler = new CompactionHandler ( {
81+ workspaceId : this . workspaceId ,
82+ historyService : this . historyService ,
83+ emitter : this . emitter ,
84+ } ) ;
85+
8386 this . attachAiListeners ( ) ;
8487 this . attachInitListeners ( ) ;
8588 }
@@ -435,7 +438,7 @@ export class AgentSession {
435438 forward ( "reasoning-end" , ( payload ) => this . emitChatEvent ( payload ) ) ;
436439
437440 forward ( "stream-end" , async ( payload ) => {
438- const handled = await this . handleCompactionCompletion ( payload as StreamEndEvent ) ;
441+ const handled = await this . compactionHandler . handleCompletion ( payload as StreamEndEvent ) ;
439442 if ( ! handled ) {
440443 this . emitChatEvent ( payload ) ;
441444 }
@@ -444,7 +447,7 @@ export class AgentSession {
444447 } ) ;
445448
446449 forward ( "stream-abort" , async ( payload ) => {
447- const handled = await this . handleCompactionAbort ( payload as StreamAbortEvent ) ;
450+ const handled = await this . compactionHandler . handleAbort ( payload as StreamAbortEvent ) ;
448451 if ( ! handled ) {
449452 this . emitChatEvent ( payload ) ;
450453 }
@@ -569,196 +572,4 @@ export class AgentSession {
569572 private assertNotDisposed ( operation : string ) : void {
570573 assert ( ! this . disposed , `AgentSession.${ operation } called after dispose` ) ;
571574 }
572-
573- /**
574- * Handle compaction stream abort (Ctrl+C cancel or Ctrl+A accept early)
575- *
576- * Two flows:
577- * - Ctrl+C: abandonPartial=true → skip compaction
578- * - Ctrl+A: abandonPartial=false/undefined → perform compaction with [truncated]
579- */
580- private async handleCompactionAbort ( event : StreamAbortEvent ) : Promise < boolean > {
581- // Check if the last user message is a compaction-request
582- const historyResult = await this . historyService . getHistory ( this . workspaceId ) ;
583- if ( ! historyResult . success ) {
584- return false ;
585- }
586-
587- const messages = historyResult . data ;
588- const lastUserMsg = [ ...messages ] . reverse ( ) . find ( ( m ) => m . role === "user" ) ;
589- const isCompaction = lastUserMsg ?. metadata ?. muxMetadata ?. type === "compaction-request" ;
590-
591- if ( ! isCompaction || ! lastUserMsg ) {
592- return false ;
593- }
594-
595- // Ctrl+C flow: abandonPartial=true means user cancelled, skip compaction
596- if ( event . abandonPartial === true ) {
597- return false ;
598- }
599-
600- // Ctrl+A flow: Accept early with [truncated] sentinel
601- // Get the truncated message from historyResult.data
602- const lastMessage = messages [ messages . length - 1 ] ;
603- if ( ! lastMessage || lastMessage . role !== "assistant" ) {
604- console . warn ( "[AgentSession] Compaction aborted but last message is not assistant" ) ;
605- return false ;
606- }
607-
608- const partialSummary = lastMessage . parts
609- . filter ( ( part ) : part is { type : "text" ; text : string } => part . type === "text" )
610- . map ( ( part ) => part . text )
611- . join ( "" ) ;
612-
613- // Append [truncated] sentinel
614- const truncatedSummary = partialSummary . trim ( ) + "\n\n[truncated]" ;
615-
616- // Perform compaction with truncated summary
617- const result = await this . performCompaction ( truncatedSummary , {
618- model : lastMessage . metadata ?. model ?? "unknown" ,
619- usage : event . metadata ?. usage ,
620- duration : event . metadata ?. duration ,
621- providerMetadata : lastMessage . metadata ?. providerMetadata ,
622- systemMessageTokens : lastMessage . metadata ?. systemMessageTokens ,
623- } ) ;
624- if ( ! result . success ) {
625- console . error ( "[AgentSession] Early compaction failed:" , result . error ) ;
626- return false ;
627- }
628-
629- this . emitChatEvent ( event ) ;
630- return true ;
631- }
632-
633- /**
634- * Handle compaction stream completion
635- *
636- * Detects when a compaction stream finishes, extracts the summary,
637- * and performs history replacement atomically.
638- */
639- private async handleCompactionCompletion ( event : StreamEndEvent ) : Promise < boolean > {
640- // Check if the last user message is a compaction-request
641- const historyResult = await this . historyService . getHistory ( this . workspaceId ) ;
642- if ( ! historyResult . success ) {
643- return false ;
644- }
645-
646- const messages = historyResult . data ;
647- const lastUserMsg = [ ...messages ] . reverse ( ) . find ( ( m ) => m . role === "user" ) ;
648- const isCompaction = lastUserMsg ?. metadata ?. muxMetadata ?. type === "compaction-request" ;
649-
650- if ( ! isCompaction || ! lastUserMsg ) {
651- return false ;
652- }
653-
654- // Dedupe: If we've already processed this compaction-request, skip
655- if ( this . processedCompactionRequestIds . has ( lastUserMsg . id ) ) {
656- return true ;
657- }
658-
659- const summary = event . parts
660- . filter ( ( part ) : part is { type : "text" ; text : string } => part . type === "text" )
661- . map ( ( part ) => part . text )
662- . join ( "" ) ;
663-
664- // Mark as processed before performing compaction
665- this . processedCompactionRequestIds . add ( lastUserMsg . id ) ;
666-
667- const result = await this . performCompaction ( summary , event . metadata ) ;
668- if ( ! result . success ) {
669- console . error ( "[AgentSession] Compaction failed:" , result . error ) ;
670- return false ;
671- }
672-
673- // Emit stream-end to frontend so UI knows compaction is complete
674- this . emitCompactionStreamEnd ( event ) ;
675- return true ;
676- }
677-
678- /**
679- * Perform history compaction by replacing all messages with a summary
680- *
681- * Steps:
682- * 1. Calculate cumulative usage from all messages (for historicalUsage field)
683- * 2. Clear entire history and get deleted sequence numbers
684- * 3. Append summary message with metadata
685- * 4. Emit delete event for old messages
686- * 5. Emit summary message to frontend
687- */
688- private async performCompaction (
689- summary : string ,
690- metadata : {
691- model : string ;
692- usage ?: LanguageModelV2Usage ;
693- duration ?: number ;
694- providerMetadata ?: Record < string , unknown > ;
695- systemMessageTokens ?: number ;
696- }
697- ) : Promise < Result < void , string > > {
698- // Get all messages to calculate cumulative usage
699- const historyResult = await this . historyService . getHistory ( this . workspaceId ) ;
700- if ( ! historyResult . success ) {
701- return Err ( `Failed to get history for usage calculation: ${ historyResult . error } ` ) ;
702- }
703-
704- const usageHistory = accUsageHistory ( historyResult . data , undefined ) ;
705-
706- const historicalUsage = usageHistory . length > 0 ? sumUsageHistory ( usageHistory ) : undefined ;
707-
708- // Clear entire history and get deleted sequences
709- const clearResult = await this . historyService . clearHistory ( this . workspaceId ) ;
710- if ( ! clearResult . success ) {
711- return Err ( `Failed to clear history: ${ clearResult . error } ` ) ;
712- }
713- const deletedSequences = clearResult . data ;
714-
715- // Create summary message with metadata
716- const summaryMessage = createMuxMessage (
717- `summary-${ Date . now ( ) } -${ Math . random ( ) . toString ( 36 ) . substring ( 2 , 11 ) } ` ,
718- "assistant" ,
719- summary ,
720- {
721- timestamp : Date . now ( ) ,
722- compacted : true ,
723- model : metadata . model ,
724- usage : metadata . usage ,
725- historicalUsage,
726- providerMetadata : metadata . providerMetadata ,
727- duration : metadata . duration ,
728- systemMessageTokens : metadata . systemMessageTokens ,
729- muxMetadata : { type : "normal" } ,
730- }
731- ) ;
732-
733- // Append summary to history
734- const appendResult = await this . historyService . appendToHistory (
735- this . workspaceId ,
736- summaryMessage
737- ) ;
738- if ( ! appendResult . success ) {
739- return Err ( `Failed to append summary: ${ appendResult . error } ` ) ;
740- }
741-
742- // Emit delete event for old messages
743- if ( deletedSequences . length > 0 ) {
744- const deleteMessage : DeleteMessage = {
745- type : "delete" ,
746- historySequences : deletedSequences ,
747- } ;
748- this . emitChatEvent ( deleteMessage ) ;
749- }
750-
751- // Emit summary message to frontend
752- this . emitChatEvent ( summaryMessage ) ;
753-
754- return Ok ( undefined ) ;
755- }
756-
757- /**
758- * Emit stream-end event after compaction completes
759- * This notifies the frontend that the stream is done
760- */
761- private emitCompactionStreamEnd ( event : StreamEndEvent ) : void {
762- this . emitChatEvent ( event ) ;
763- }
764575}
0 commit comments