@@ -98,12 +98,10 @@ export function useCollaborativeWorkflow() {
9898 handleSocketReconnection ,
9999 ] )
100100
101- // Handle incoming workflow operations from other users
102101 useEffect ( ( ) => {
103102 const handleWorkflowOperation = ( data : any ) => {
104103 const { operation, target, payload, userId } = data
105104
106- // Don't apply our own operations
107105 if ( isApplyingRemoteChange . current ) return
108106
109107 logger . info ( `Received ${ operation } on ${ target } from user ${ userId } ` )
@@ -115,8 +113,6 @@ export function useCollaborativeWorkflow() {
115113 if ( target === 'block' ) {
116114 switch ( operation ) {
117115 case 'add' :
118- // Use normal addBlock - the collaborative system now sends complete data
119- // and the validation schema preserves outputs and subBlocks
120116 workflowStore . addBlock (
121117 payload . id ,
122118 payload . type ,
@@ -126,17 +122,13 @@ export function useCollaborativeWorkflow() {
126122 payload . parentId ,
127123 payload . extent
128124 )
129- // Handle auto-connect edge if present
130125 if ( payload . autoConnectEdge ) {
131126 workflowStore . addEdge ( payload . autoConnectEdge )
132127 }
133128 break
134129 case 'update-position' : {
135- // Apply position update only if it's newer than the last applied timestamp
136- // This prevents jagged movement from out-of-order position updates
137130 const blockId = payload . id
138131
139- // Server should always provide timestamp - if missing, skip ordering check
140132 if ( ! data . timestamp ) {
141133 logger . warn ( 'Position update missing timestamp, applying without ordering check' , {
142134 blockId,
@@ -180,20 +172,16 @@ export function useCollaborativeWorkflow() {
180172 workflowStore . setBlockWide ( payload . id , payload . isWide )
181173 break
182174 case 'update-advanced-mode' :
183- // Note: toggleBlockAdvancedMode doesn't take a parameter, it just toggles
184- // For now, we'll use the existing toggle method
185175 workflowStore . toggleBlockAdvancedMode ( payload . id )
186176 break
187177 case 'toggle-handles' : {
188- // Apply the handles toggle - we need to set the specific value to ensure consistency
189178 const currentBlock = workflowStore . blocks [ payload . id ]
190179 if ( currentBlock && currentBlock . horizontalHandles !== payload . horizontalHandles ) {
191180 workflowStore . toggleBlockHandles ( payload . id )
192181 }
193182 break
194183 }
195184 case 'duplicate' :
196- // Apply the duplicate operation by adding the new block
197185 workflowStore . addBlock (
198186 payload . id ,
199187 payload . type ,
@@ -374,7 +362,6 @@ export function useCollaborativeWorkflow() {
374362 const { operationId, error, retryable } = data
375363 logger . warn ( 'Operation failed' , { operationId, error, retryable } )
376364
377- // Create a retry function that re-emits the operation using the correct channel
378365 const retryFunction = ( operation : any ) => {
379366 const { operation : op , target, payload } = operation . operation
380367
@@ -421,7 +408,6 @@ export function useCollaborativeWorkflow() {
421408 queue ,
422409 ] )
423410
424- // Helper function to execute queued operations
425411 const executeQueuedOperation = useCallback (
426412 ( operation : string , target : string , payload : any , localAction : ( ) => void ) => {
427413 console . log ( '🎯 executeQueuedOperation called' , {
@@ -430,16 +416,12 @@ export function useCollaborativeWorkflow() {
430416 isApplyingRemoteChange : isApplyingRemoteChange . current ,
431417 } )
432418
433- // Skip if applying remote changes
434419 if ( isApplyingRemoteChange . current ) {
435- console . log ( '❌ Skipping - applying remote change' )
436420 return
437421 }
438422
439- // Generate operation ID
440423 const operationId = crypto . randomUUID ( )
441424
442- // Add to queue
443425 addToQueue ( {
444426 id : operationId ,
445427 operation : {
@@ -451,34 +433,24 @@ export function useCollaborativeWorkflow() {
451433 userId : session ?. user ?. id || 'unknown' ,
452434 } )
453435
454- // Apply locally
455436 localAction ( )
456437
457- // Emit to server with operation ID
458438 emitWorkflowOperation ( operation , target , payload , operationId )
459439 } ,
460440 [ addToQueue , emitWorkflowOperation , session ?. user ?. id ]
461441 )
462442
463- // Special helper for debounced operations (position updates)
464- // These are high-frequency, low-importance operations that don't need queue tracking
465443 const executeQueuedDebouncedOperation = useCallback (
466444 ( operation : string , target : string , payload : any , localAction : ( ) => void ) => {
467- // Skip if applying remote changes
468445 if ( isApplyingRemoteChange . current ) return
469446
470- // Apply locally first (immediate UI feedback)
471447 localAction ( )
472448
473- // For debounced operations, don't use queue tracking
474- // The debouncing in socket context handles reliability
475- // No operation ID needed since we're not tracking these
476449 emitWorkflowOperation ( operation , target , payload )
477450 } ,
478451 [ emitWorkflowOperation ]
479452 )
480453
481- // Collaborative workflow operations
482454 const collaborativeAddBlock = useCallback (
483455 (
484456 id : string ,
@@ -490,7 +462,6 @@ export function useCollaborativeWorkflow() {
490462 extent ?: 'parent' ,
491463 autoConnectEdge ?: Edge
492464 ) => {
493- // Create complete block data upfront using the same logic as the store
494465 const blockConfig = getBlock ( type )
495466
496467 // Handle loop/parallel blocks that don't use BlockConfig
@@ -513,13 +484,11 @@ export function useCollaborativeWorkflow() {
513484 autoConnectEdge, // Include edge data for atomic operation
514485 }
515486
516- // Apply locally first
517487 workflowStore . addBlock ( id , type , name , position , data , parentId , extent )
518488 if ( autoConnectEdge ) {
519489 workflowStore . addEdge ( autoConnectEdge )
520490 }
521491
522- // Then broadcast to other clients with complete block data
523492 if ( ! isApplyingRemoteChange . current ) {
524493 emitWorkflowOperation ( 'add' , 'block' , completeBlockData )
525494 }
@@ -545,7 +514,6 @@ export function useCollaborativeWorkflow() {
545514 } )
546515 }
547516
548- // Generate outputs using the same logic as the store
549517 const outputs = resolveOutputType ( blockConfig . outputs )
550518
551519 const completeBlockData = {
@@ -669,11 +637,9 @@ export function useCollaborativeWorkflow() {
669637
670638 const collaborativeToggleBlockAdvancedMode = useCallback (
671639 ( id : string ) => {
672- // Get the current state before toggling
673640 const currentBlock = workflowStore . blocks [ id ]
674641 if ( ! currentBlock ) return
675642
676- // Calculate the new advancedMode value
677643 const newAdvancedMode = ! currentBlock . advancedMode
678644
679645 executeQueuedOperation (
@@ -688,11 +654,9 @@ export function useCollaborativeWorkflow() {
688654
689655 const collaborativeToggleBlockHandles = useCallback (
690656 ( id : string ) => {
691- // Get the current state before toggling
692657 const currentBlock = workflowStore . blocks [ id ]
693658 if ( ! currentBlock ) return
694659
695- // Calculate the new horizontalHandles value
696660 const newHorizontalHandles = ! currentBlock . horizontalHandles
697661
698662 executeQueuedOperation (
@@ -717,7 +681,6 @@ export function useCollaborativeWorkflow() {
717681 y : sourceBlock . position . y + 20 ,
718682 }
719683
720- // Generate new name with numbering
721684 const match = sourceBlock . name . match ( / ( .* ?) ( \d + ) ? $ / )
722685 const newName = match ?. [ 2 ]
723686 ? `${ match [ 1 ] } ${ Number . parseInt ( match [ 2 ] ) + 1 } `
@@ -741,7 +704,6 @@ export function useCollaborativeWorkflow() {
741704 height : sourceBlock . height || 0 ,
742705 }
743706
744- // Apply locally first using addBlock to ensure consistent IDs
745707 workflowStore . addBlock (
746708 newId ,
747709 sourceBlock . type ,
@@ -752,7 +714,6 @@ export function useCollaborativeWorkflow() {
752714 sourceBlock . data ?. extent
753715 )
754716
755- // Copy subblock values to the new block
756717 const activeWorkflowId = useWorkflowRegistry . getState ( ) . activeWorkflowId
757718 if ( activeWorkflowId ) {
758719 const subBlockValues =
@@ -769,7 +730,6 @@ export function useCollaborativeWorkflow() {
769730 }
770731
771732 executeQueuedOperation ( 'duplicate' , 'block' , duplicatedBlockData , ( ) => {
772- // Apply locally - add the duplicated block
773733 workflowStore . addBlock (
774734 newId ,
775735 sourceBlock . type ,
@@ -778,10 +738,8 @@ export function useCollaborativeWorkflow() {
778738 sourceBlock . data ? JSON . parse ( JSON . stringify ( sourceBlock . data ) ) : { }
779739 )
780740
781- // Copy subblock values to the new block
782741 const subBlockValues = subBlockStore . workflowValues [ activeWorkflowId || '' ] ?. [ sourceId ]
783742 if ( subBlockValues && activeWorkflowId ) {
784- // Copy each subblock value individually
785743 Object . entries ( subBlockValues ) . forEach ( ( [ subblockId , value ] ) => {
786744 subBlockStore . setValue ( newId , subblockId , value )
787745 } )
@@ -809,10 +767,8 @@ export function useCollaborativeWorkflow() {
809767
810768 const collaborativeSetSubblockValue = useCallback (
811769 ( blockId : string , subblockId : string , value : any ) => {
812- // Skip if applying remote changes
813770 if ( isApplyingRemoteChange . current ) return
814771
815- // Check workflow state
816772 if ( ! currentWorkflowId || activeWorkflowId !== currentWorkflowId ) {
817773 logger . debug ( 'Skipping subblock update - not in active workflow' , {
818774 currentWorkflowId,
@@ -823,17 +779,37 @@ export function useCollaborativeWorkflow() {
823779 return
824780 }
825781
826- // Apply locally first
782+ // Generate operation ID for queue tracking
783+ const operationId = crypto . randomUUID ( )
784+
785+ // Add to queue for retry mechanism
786+ addToQueue ( {
787+ id : operationId ,
788+ operation : {
789+ operation : 'subblock-update' ,
790+ target : 'subblock' ,
791+ payload : { blockId, subblockId, value } ,
792+ } ,
793+ workflowId : activeWorkflowId || '' ,
794+ userId : session ?. user ?. id || 'unknown' ,
795+ } )
796+
797+ // Apply locally first (immediate UI feedback)
827798 subBlockStore . setValue ( blockId , subblockId , value )
828799
829- // Emit to server (subblock updates have their own handler with built-in retry)
830- // No need for operation queue - the subblock handler already has confirmation/failure logic
831- emitSubblockUpdate ( blockId , subblockId , value )
800+ // Emit to server with operation ID for tracking
801+ emitSubblockUpdate ( blockId , subblockId , value , operationId )
832802 } ,
833- [ subBlockStore , emitSubblockUpdate , currentWorkflowId , activeWorkflowId ]
803+ [
804+ subBlockStore ,
805+ emitSubblockUpdate ,
806+ currentWorkflowId ,
807+ activeWorkflowId ,
808+ addToQueue ,
809+ session ?. user ?. id ,
810+ ]
834811 )
835812
836- // Collaborative loop/parallel configuration updates
837813 const collaborativeUpdateLoopCount = useCallback (
838814 ( loopId : string , count : number ) => {
839815 // Get current state BEFORE making changes
@@ -866,16 +842,13 @@ export function useCollaborativeWorkflow() {
866842
867843 const collaborativeUpdateLoopType = useCallback (
868844 ( loopId : string , loopType : 'for' | 'forEach' ) => {
869- // Get current state BEFORE making changes
870845 const currentBlock = workflowStore . blocks [ loopId ]
871846 if ( ! currentBlock || currentBlock . type !== 'loop' ) return
872847
873- // Find child nodes before state changes
874848 const childNodes = Object . values ( workflowStore . blocks )
875849 . filter ( ( b ) => b . data ?. parentId === loopId )
876850 . map ( ( b ) => b . id )
877851
878- // Get current values to preserve them
879852 const currentIterations = currentBlock . data ?. count || 5
880853 const currentCollection = currentBlock . data ?. collection || ''
881854
@@ -896,16 +869,13 @@ export function useCollaborativeWorkflow() {
896869
897870 const collaborativeUpdateLoopCollection = useCallback (
898871 ( loopId : string , collection : string ) => {
899- // Get current state BEFORE making changes
900872 const currentBlock = workflowStore . blocks [ loopId ]
901873 if ( ! currentBlock || currentBlock . type !== 'loop' ) return
902874
903- // Find child nodes before state changes
904875 const childNodes = Object . values ( workflowStore . blocks )
905876 . filter ( ( b ) => b . data ?. parentId === loopId )
906877 . map ( ( b ) => b . id )
907878
908- // Get current values to preserve them
909879 const currentIterations = currentBlock . data ?. count || 5
910880 const currentLoopType = currentBlock . data ?. loopType || 'for'
911881
@@ -926,16 +896,13 @@ export function useCollaborativeWorkflow() {
926896
927897 const collaborativeUpdateParallelCount = useCallback (
928898 ( parallelId : string , count : number ) => {
929- // Get current state BEFORE making changes
930899 const currentBlock = workflowStore . blocks [ parallelId ]
931900 if ( ! currentBlock || currentBlock . type !== 'parallel' ) return
932901
933- // Find child nodes before state changes
934902 const childNodes = Object . values ( workflowStore . blocks )
935903 . filter ( ( b ) => b . data ?. parentId === parallelId )
936904 . map ( ( b ) => b . id )
937905
938- // Get current values to preserve them
939906 const currentDistribution = currentBlock . data ?. collection || ''
940907 const currentParallelType = currentBlock . data ?. parallelType || 'collection'
941908
@@ -959,16 +926,13 @@ export function useCollaborativeWorkflow() {
959926
960927 const collaborativeUpdateParallelCollection = useCallback (
961928 ( parallelId : string , collection : string ) => {
962- // Get current state BEFORE making changes
963929 const currentBlock = workflowStore . blocks [ parallelId ]
964930 if ( ! currentBlock || currentBlock . type !== 'parallel' ) return
965931
966- // Find child nodes before state changes
967932 const childNodes = Object . values ( workflowStore . blocks )
968933 . filter ( ( b ) => b . data ?. parentId === parallelId )
969934 . map ( ( b ) => b . id )
970935
971- // Get current values to preserve them
972936 const currentCount = currentBlock . data ?. count || 5
973937 const currentParallelType = currentBlock . data ?. parallelType || 'collection'
974938
@@ -992,23 +956,18 @@ export function useCollaborativeWorkflow() {
992956
993957 const collaborativeUpdateParallelType = useCallback (
994958 ( parallelId : string , parallelType : 'count' | 'collection' ) => {
995- // Get current state BEFORE making changes
996959 const currentBlock = workflowStore . blocks [ parallelId ]
997960 if ( ! currentBlock || currentBlock . type !== 'parallel' ) return
998961
999- // Find child nodes before state changes
1000962 const childNodes = Object . values ( workflowStore . blocks )
1001963 . filter ( ( b ) => b . data ?. parentId === parallelId )
1002964 . map ( ( b ) => b . id )
1003965
1004- // Calculate new values based on type change
1005966 let newCount = currentBlock . data ?. count || 5
1006967 let newDistribution = currentBlock . data ?. collection || ''
1007968
1008- // Reset values based on type (same logic as the UI)
1009969 if ( parallelType === 'count' ) {
1010970 newDistribution = ''
1011- // Keep existing count
1012971 } else {
1013972 newCount = 1
1014973 newDistribution = newDistribution || ''
@@ -1027,7 +986,6 @@ export function useCollaborativeWorkflow() {
1027986 'subflow' ,
1028987 { id : parallelId , type : 'parallel' , config } ,
1029988 ( ) => {
1030- // Apply all changes locally
1031989 workflowStore . updateParallelType ( parallelId , parallelType )
1032990 workflowStore . updateParallelCount ( parallelId , newCount )
1033991 workflowStore . updateParallelCollection ( parallelId , newDistribution )
0 commit comments