Skip to content

Commit 27c248a

Browse files
fix(sockets): delete block case (#703)
1 parent 19ca9c7 commit 27c248a

File tree

3 files changed

+97
-8
lines changed

3 files changed

+97
-8
lines changed

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,14 @@ export function useCollaborativeWorkflow() {
4444
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())
4545

4646
// Operation queue
47-
const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } =
48-
useOperationQueue()
47+
const {
48+
queue,
49+
hasOperationError,
50+
addToQueue,
51+
confirmOperation,
52+
failOperation,
53+
cancelOperationsForBlock,
54+
} = useOperationQueue()
4955

5056
// Clear position timestamps when switching workflows
5157
// Note: Workflow joining is now handled automatically by socket connect event based on URL
@@ -332,7 +338,7 @@ export function useCollaborativeWorkflow() {
332338
const { operationId, error, retryable } = data
333339
logger.warn('Operation failed', { operationId, error, retryable })
334340

335-
failOperation(operationId)
341+
failOperation(operationId, retryable)
336342
}
337343

338344
// Register event handlers
@@ -534,9 +540,11 @@ export function useCollaborativeWorkflow() {
534540

535541
const collaborativeRemoveBlock = useCallback(
536542
(id: string) => {
543+
cancelOperationsForBlock(id)
544+
537545
executeQueuedOperation('remove', 'block', { id }, () => workflowStore.removeBlock(id))
538546
},
539-
[executeQueuedOperation, workflowStore]
547+
[executeQueuedOperation, workflowStore, cancelOperationsForBlock]
540548
)
541549

542550
const collaborativeUpdateBlockPosition = useCallback(

apps/sim/socket-server/handlers/subblocks.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,16 @@ export function setupSubblocksHandlers(
125125
serverTimestamp: Date.now(),
126126
})
127127
}
128-
}
129128

130-
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
129+
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
130+
} else if (operationId) {
131+
// Block was deleted - notify client that operation completed (but didn't update anything)
132+
socket.emit('operation-failed', {
133+
operationId,
134+
error: 'Block no longer exists',
135+
retryable: false, // No point retrying for deleted blocks
136+
})
137+
}
131138
} catch (error) {
132139
logger.error('Error handling subblock update:', error)
133140

apps/sim/stores/operation-queue/store.ts

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ interface OperationQueueState {
2424

2525
addToQueue: (operation: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>) => void
2626
confirmOperation: (operationId: string) => void
27-
failOperation: (operationId: string) => void
27+
failOperation: (operationId: string, retryable?: boolean) => void
2828
handleOperationTimeout: (operationId: string) => void
2929
processNextOperation: () => void
30+
cancelOperationsForBlock: (blockId: string) => void
3031

3132
triggerOfflineMode: () => void
3233
clearError: () => void
@@ -211,7 +212,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
211212
get().processNextOperation()
212213
},
213214

214-
failOperation: (operationId: string) => {
215+
failOperation: (operationId: string, retryable = true) => {
215216
const state = get()
216217
const operation = state.operations.find((op) => op.id === operationId)
217218
if (!operation) {
@@ -239,6 +240,18 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
239240
}
240241
}
241242

243+
if (!retryable) {
244+
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })
245+
246+
set((state) => ({
247+
operations: state.operations.filter((op) => op.id !== operationId),
248+
isProcessing: false,
249+
}))
250+
251+
get().processNextOperation()
252+
return
253+
}
254+
242255
if (operation.retryCount < 3) {
243256
const newRetryCount = operation.retryCount + 1
244257
const delay = 2 ** newRetryCount * 1000 // 2s, 4s, 8s
@@ -338,6 +351,66 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
338351
operationTimeouts.set(nextOperation.id, timeoutId)
339352
},
340353

354+
cancelOperationsForBlock: (blockId: string) => {
355+
logger.debug('Canceling all operations for block', { blockId })
356+
357+
// Cancel all debounce timeouts for this block's subblocks
358+
const keysToDelete: string[] = []
359+
for (const [key, timeout] of subblockDebounceTimeouts.entries()) {
360+
if (key.startsWith(`${blockId}-`)) {
361+
clearTimeout(timeout)
362+
keysToDelete.push(key)
363+
}
364+
}
365+
keysToDelete.forEach((key) => subblockDebounceTimeouts.delete(key))
366+
367+
// Find and cancel operation timeouts for operations related to this block
368+
const state = get()
369+
const operationsToCancel = state.operations.filter(
370+
(op) =>
371+
(op.operation.target === 'block' && op.operation.payload?.id === blockId) ||
372+
(op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId)
373+
)
374+
375+
// Cancel timeouts for these operations
376+
operationsToCancel.forEach((op) => {
377+
const operationTimeout = operationTimeouts.get(op.id)
378+
if (operationTimeout) {
379+
clearTimeout(operationTimeout)
380+
operationTimeouts.delete(op.id)
381+
}
382+
383+
const retryTimeout = retryTimeouts.get(op.id)
384+
if (retryTimeout) {
385+
clearTimeout(retryTimeout)
386+
retryTimeouts.delete(op.id)
387+
}
388+
})
389+
390+
// Remove all operations for this block (both pending and processing)
391+
const newOperations = state.operations.filter(
392+
(op) =>
393+
!(
394+
(op.operation.target === 'block' && op.operation.payload?.id === blockId) ||
395+
(op.operation.target === 'subblock' && op.operation.payload?.blockId === blockId)
396+
)
397+
)
398+
399+
set({
400+
operations: newOperations,
401+
isProcessing: false, // Reset processing state in case we removed the current operation
402+
})
403+
404+
logger.debug('Cancelled operations for block', {
405+
blockId,
406+
cancelledDebounceTimeouts: keysToDelete.length,
407+
cancelledOperations: operationsToCancel.length,
408+
})
409+
410+
// Process next operation if there are any remaining
411+
get().processNextOperation()
412+
},
413+
341414
triggerOfflineMode: () => {
342415
logger.error('Operation failed after retries - triggering offline mode')
343416

@@ -369,6 +442,7 @@ export function useOperationQueue() {
369442
confirmOperation: store.confirmOperation,
370443
failOperation: store.failOperation,
371444
processNextOperation: store.processNextOperation,
445+
cancelOperationsForBlock: store.cancelOperationsForBlock,
372446
triggerOfflineMode: store.triggerOfflineMode,
373447
clearError: store.clearError,
374448
}

0 commit comments

Comments
 (0)