Skip to content

Commit 89f7d2b

Browse files
improvement(sockets): cleanup debounce logic + add flush mechanism to… (#1152)
* improvement(sockets): cleanup debounce logic + add flush mechanism to not lose ops * fix optimistic update overwritten race condition * fix * fix forever stuck in processing
1 parent 923c052 commit 89f7d2b

File tree

8 files changed

+354
-160
lines changed

8 files changed

+354
-160
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/code.tsx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ export function Code({
163163

164164
// State management - useSubBlockValue with explicit streaming control
165165
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, {
166-
debounceMs: 150,
167-
isStreaming: isAiStreaming, // Use AI streaming state directly
166+
isStreaming: isAiStreaming,
168167
onStreamingEnd: () => {
169168
logger.debug('AI streaming ended, value persisted', { blockId, subBlockId })
170169
},

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/long-input.tsx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
1313
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
1414
import type { SubBlockConfig } from '@/blocks/types'
1515
import { useTagSelection } from '@/hooks/use-tag-selection'
16+
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1617

1718
const logger = createLogger('LongInput')
1819

@@ -73,7 +74,6 @@ export function LongInput({
7374

7475
// State management - useSubBlockValue with explicit streaming control
7576
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlockId, false, {
76-
debounceMs: 150,
7777
isStreaming: wandHook?.isStreaming || false, // Use wand streaming state
7878
onStreamingEnd: () => {
7979
logger.debug('Wand streaming ended, value persisted', { blockId, subBlockId })
@@ -379,6 +379,11 @@ export function LongInput({
379379
onScroll={handleScroll}
380380
onWheel={handleWheel}
381381
onKeyDown={handleKeyDown}
382+
onBlur={() => {
383+
try {
384+
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
385+
} catch {}
386+
}}
382387
onFocus={() => {
383388
setShowEnvVars(false)
384389
setShowTags(false)

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/short-input.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { cn } from '@/lib/utils'
99
import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value'
1010
import type { SubBlockConfig } from '@/blocks/types'
1111
import { useTagSelection } from '@/hooks/use-tag-selection'
12+
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1213

1314
const logger = createLogger('ShortInput')
1415

@@ -329,6 +330,9 @@ export function ShortInput({
329330
onBlur={() => {
330331
setIsFocused(false)
331332
setShowEnvVars(false)
333+
try {
334+
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
335+
} catch {}
332336
}}
333337
onDrop={handleDrop}
334338
onDragOver={handleDragOver}

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store'
1111
const logger = createLogger('SubBlockValue')
1212

1313
interface UseSubBlockValueOptions {
14-
debounceMs?: number
15-
isStreaming?: boolean // Explicit streaming state
14+
isStreaming?: boolean
1615
onStreamingEnd?: () => void
1716
}
1817

@@ -130,8 +129,21 @@ export function useSubBlockValue<T = any>(
130129
if (!isEqual(valueRef.current, newValue)) {
131130
valueRef.current = newValue
132131

133-
// Update local store immediately for UI responsiveness
134-
// The collaborative function will also update it, but that's okay for idempotency
132+
// Ensure we're passing the actual value, not a reference that might change
133+
const valueCopy =
134+
newValue === null
135+
? null
136+
: typeof newValue === 'object'
137+
? JSON.parse(JSON.stringify(newValue))
138+
: newValue
139+
140+
// If streaming, hold value locally and do not update global store to avoid render-phase updates
141+
if (isStreaming) {
142+
streamingValueRef.current = valueCopy
143+
return
144+
}
145+
146+
// Update local store immediately for UI responsiveness (non-streaming)
135147
useSubBlockStore.setState((state) => ({
136148
workflowValues: {
137149
...state.workflowValues,
@@ -145,44 +157,26 @@ export function useSubBlockValue<T = any>(
145157
},
146158
}))
147159

148-
// Handle model changes for provider-based blocks - clear API key when provider changes
160+
// Handle model changes for provider-based blocks - clear API key when provider changes (non-streaming)
149161
if (
150162
subBlockId === 'model' &&
151163
isProviderBasedBlock &&
152164
newValue &&
153165
typeof newValue === 'string'
154166
) {
155167
const currentApiKeyValue = useSubBlockStore.getState().getValue(blockId, 'apiKey')
156-
157-
// Only clear if there's currently an API key value
158168
if (currentApiKeyValue && currentApiKeyValue !== '') {
159169
const oldModelValue = storeValue as string
160170
const oldProvider = oldModelValue ? getProviderFromModel(oldModelValue) : null
161171
const newProvider = getProviderFromModel(newValue)
162-
163-
// Clear API key if provider changed
164172
if (oldProvider !== newProvider) {
165-
// Use collaborative function to clear the API key
166173
collaborativeSetSubblockValue(blockId, 'apiKey', '')
167174
}
168175
}
169176
}
170177

171-
// Ensure we're passing the actual value, not a reference that might change
172-
const valueCopy =
173-
newValue === null
174-
? null
175-
: typeof newValue === 'object'
176-
? JSON.parse(JSON.stringify(newValue))
177-
: newValue
178-
179-
// If streaming, just store the value without emitting
180-
if (isStreaming) {
181-
streamingValueRef.current = valueCopy
182-
} else {
183-
// Emit immediately - let the operation queue handle debouncing and deduplication
184-
emitValue(valueCopy)
185-
}
178+
// Emit immediately - let the operation queue handle debouncing and deduplication
179+
emitValue(valueCopy)
186180

187181
if (triggerWorkflowUpdate) {
188182
useWorkflowStore.getState().triggerUpdate()

apps/sim/contexts/socket-context.tsx

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
334334
)
335335

336336
if (data.workflowId === urlWorkflowId) {
337+
try {
338+
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
339+
const hasPending = useOperationQueueStore
340+
.getState()
341+
.operations.some(
342+
(op: any) => op.workflowId === data.workflowId && op.status !== 'confirmed'
343+
)
344+
if (hasPending) {
345+
logger.info('Skipping copilot rehydration due to pending operations in queue')
346+
return
347+
}
348+
} catch {}
337349
try {
338350
// Fetch fresh workflow state directly from API
339351
const response = await fetch(`/api/workflows/${data.workflowId}`)
@@ -364,27 +376,38 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
364376
)
365377
})
366378

367-
// Update workflow store with fresh state from database
368-
const newWorkflowState = {
369-
blocks: workflowState.blocks || {},
370-
edges: workflowState.edges || [],
371-
loops: workflowState.loops || {},
372-
parallels: workflowState.parallels || {},
373-
lastSaved: workflowState.lastSaved || Date.now(),
374-
isDeployed: workflowState.isDeployed || false,
375-
deployedAt: workflowState.deployedAt,
376-
deploymentStatuses: workflowState.deploymentStatuses || {},
377-
hasActiveSchedule: workflowState.hasActiveSchedule || false,
378-
hasActiveWebhook: workflowState.hasActiveWebhook || false,
379+
// Merge workflow store with server state (do not drop optimistic local state)
380+
const existing = useWorkflowStore.getState()
381+
const mergedBlocks = {
382+
...(existing.blocks || {}),
383+
...(workflowState.blocks || {}),
379384
}
385+
const edgeById = new Map<string, any>()
386+
;(existing.edges || []).forEach((e: any) => edgeById.set(e.id, e))
387+
;(workflowState.edges || []).forEach((e: any) => edgeById.set(e.id, e))
388+
const mergedEdges = Array.from(edgeById.values())
389+
useWorkflowStore.setState({
390+
blocks: mergedBlocks,
391+
edges: mergedEdges,
392+
loops: workflowState.loops || existing.loops || {},
393+
parallels: workflowState.parallels || existing.parallels || {},
394+
lastSaved: workflowState.lastSaved || existing.lastSaved || Date.now(),
395+
isDeployed: workflowState.isDeployed ?? existing.isDeployed ?? false,
396+
deployedAt: workflowState.deployedAt || existing.deployedAt,
397+
deploymentStatuses:
398+
workflowState.deploymentStatuses || existing.deploymentStatuses || {},
399+
hasActiveWebhook:
400+
workflowState.hasActiveWebhook ?? existing.hasActiveWebhook ?? false,
401+
})
380402

381-
useWorkflowStore.setState(newWorkflowState)
382-
383-
// Update subblock store with fresh values
403+
// Merge subblock store values per workflow
384404
useSubBlockStore.setState((state: any) => ({
385405
workflowValues: {
386406
...state.workflowValues,
387-
[data.workflowId]: subblockValues,
407+
[data.workflowId]: {
408+
...(state.workflowValues?.[data.workflowId] || {}),
409+
...subblockValues,
410+
},
388411
},
389412
}))
390413

@@ -461,19 +484,31 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
461484

462485
// Update local stores with the fresh workflow state (same logic as YAML editor)
463486
if (workflowData?.state && workflowData.id === urlWorkflowId) {
487+
try {
488+
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
489+
const hasPending = useOperationQueueStore
490+
.getState()
491+
.operations.some(
492+
(op: any) => op.workflowId === workflowData.id && op.status !== 'confirmed'
493+
)
494+
if (hasPending) {
495+
logger.info(
496+
'Skipping workflow-state rehydration due to pending operations in queue'
497+
)
498+
return
499+
}
500+
} catch {}
464501
logger.info('Updating local stores with fresh workflow state from server')
465502

466503
try {
467-
// Import stores dynamically to avoid import issues
468504
Promise.all([
469505
import('@/stores/workflows/workflow/store'),
470506
import('@/stores/workflows/subblock/store'),
471507
import('@/stores/workflows/registry/store'),
472508
])
473-
.then(([{ useWorkflowStore }, { useSubBlockStore }, { useWorkflowRegistry }]) => {
509+
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
474510
const workflowState = workflowData.state
475511

476-
// Extract subblock values from blocks before updating workflow store
477512
const subblockValues: Record<string, Record<string, any>> = {}
478513
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
479514
const blockState = block as any
@@ -483,36 +518,40 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
483518
})
484519
})
485520

486-
// Update workflow store with new state
487-
const newWorkflowState = {
488-
blocks: workflowState.blocks || {},
489-
edges: workflowState.edges || [],
490-
loops: workflowState.loops || {},
491-
parallels: workflowState.parallels || {},
492-
lastSaved: workflowState.lastSaved || Date.now(),
493-
isDeployed: workflowState.isDeployed || false,
494-
deployedAt: workflowState.deployedAt,
495-
deploymentStatuses: workflowState.deploymentStatuses || {},
496-
hasActiveSchedule: workflowState.hasActiveSchedule || false,
497-
hasActiveWebhook: workflowState.hasActiveWebhook || false,
521+
const existing = useWorkflowStore.getState()
522+
const mergedBlocks = {
523+
...(existing.blocks || {}),
524+
...(workflowState.blocks || {}),
498525
}
526+
const edgeById = new Map<string, any>()
527+
;(existing.edges || []).forEach((e: any) => edgeById.set(e.id, e))
528+
;(workflowState.edges || []).forEach((e: any) => edgeById.set(e.id, e))
529+
const mergedEdges = Array.from(edgeById.values())
530+
useWorkflowStore.setState({
531+
blocks: mergedBlocks,
532+
edges: mergedEdges,
533+
loops: workflowState.loops || existing.loops || {},
534+
parallels: workflowState.parallels || existing.parallels || {},
535+
lastSaved: workflowState.lastSaved || existing.lastSaved || Date.now(),
536+
isDeployed: workflowState.isDeployed ?? existing.isDeployed ?? false,
537+
deployedAt: workflowState.deployedAt || existing.deployedAt,
538+
deploymentStatuses:
539+
workflowState.deploymentStatuses || existing.deploymentStatuses || {},
540+
hasActiveWebhook:
541+
workflowState.hasActiveWebhook ?? existing.hasActiveWebhook ?? false,
542+
})
499543

500-
useWorkflowStore.setState(newWorkflowState)
501-
502-
// Update subblock store with fresh values
503544
useSubBlockStore.setState((state: any) => ({
504545
workflowValues: {
505546
...state.workflowValues,
506-
[workflowData.id]: subblockValues,
547+
[workflowData.id]: {
548+
...(state.workflowValues?.[workflowData.id] || {}),
549+
...subblockValues,
550+
},
507551
},
508552
}))
509553

510-
// Note: Auto layout is not triggered here because:
511-
// 1. For copilot edits: positions are already optimized by the backend
512-
// 2. For other syncs: the existing positions should be preserved
513-
// This prevents ID conflicts and unnecessary position updates
514-
515-
logger.info('Successfully updated local stores with fresh workflow state')
554+
logger.info('Merged fresh workflow state with local state')
516555
})
517556
.catch((error) => {
518557
logger.error('Failed to import stores for workflow state update:', error)
@@ -558,6 +597,16 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
558597
`URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms`
559598
)
560599

600+
try {
601+
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
602+
// Flush debounced updates for the old workflow before switching rooms
603+
if (currentWorkflowId) {
604+
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
605+
} else {
606+
useOperationQueueStore.getState().flushAllDebounced()
607+
}
608+
} catch {}
609+
561610
// Leave current workflow first if we're in one
562611
if (currentWorkflowId) {
563612
logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`)
@@ -615,6 +664,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
615664
const leaveWorkflow = useCallback(() => {
616665
if (socket && currentWorkflowId) {
617666
logger.info(`Leaving workflow: ${currentWorkflowId}`)
667+
try {
668+
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
669+
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
670+
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
671+
} catch {}
618672
socket.emit('leave-workflow')
619673
setCurrentWorkflowId(null)
620674
setPresenceUsers([])

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,6 @@ export function useCollaborativeWorkflow() {
492492
(operation: string, target: string, payload: any, localAction: () => void) => {
493493
if (isApplyingRemoteChange.current) return
494494

495-
// Skip socket operations when in diff mode
496495
if (isShowingDiff) {
497496
logger.debug('Skipping debounced socket operation in diff mode:', operation)
498497
return
@@ -673,32 +672,6 @@ export function useCollaborativeWorkflow() {
673672
(id: string, name: string) => {
674673
executeQueuedOperation('update-name', 'block', { id, name }, () => {
675674
workflowStore.updateBlockName(id, name)
676-
677-
// Handle pending subblock updates
678-
const globalWindow = window as any
679-
const pendingUpdates = globalWindow.__pendingSubblockUpdates
680-
if (pendingUpdates && Array.isArray(pendingUpdates)) {
681-
// Queue each subblock update individually
682-
for (const update of pendingUpdates) {
683-
const { blockId, subBlockId, newValue } = update
684-
const operationId = crypto.randomUUID()
685-
686-
addToQueue({
687-
id: operationId,
688-
operation: {
689-
operation: 'subblock-update',
690-
target: 'subblock',
691-
payload: { blockId, subblockId: subBlockId, value: newValue },
692-
},
693-
workflowId: activeWorkflowId || '',
694-
userId: session?.user?.id || 'unknown',
695-
})
696-
697-
subBlockStore.setValue(blockId, subBlockId, newValue)
698-
}
699-
// Clear the pending updates
700-
globalWindow.__pendingSubblockUpdates = undefined
701-
}
702675
})
703676
},
704677
[

apps/sim/stores/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ function handleBeforeUnload(event: BeforeUnloadEvent): void {
9393
}
9494
}
9595

96-
// Note: Socket.IO handles real-time sync automatically
96+
try {
97+
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
98+
useOperationQueueStore.getState().flushAllDebounced()
99+
} catch {}
97100

98101
// Standard beforeunload pattern
99102
event.preventDefault()

0 commit comments

Comments
 (0)