Skip to content

Commit 841cb63

Browse files
fix(edge-validation): race condition on collaborative add (#2980)
1 parent c7db48e commit 841cb63

File tree

4 files changed

+79
-39
lines changed

4 files changed

+79
-39
lines changed

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { useUndoRedoStore } from '@/stores/undo-redo'
2424
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
2525
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
2626
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
27-
import { filterNewEdges, mergeSubblockState } from '@/stores/workflows/utils'
27+
import { filterNewEdges, filterValidEdges, mergeSubblockState } from '@/stores/workflows/utils'
2828
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
2929
import type { BlockState, Loop, Parallel, Position } from '@/stores/workflows/workflow/types'
3030

@@ -226,9 +226,12 @@ export function useCollaborativeWorkflow() {
226226
case EDGES_OPERATIONS.BATCH_ADD_EDGES: {
227227
const { edges } = payload
228228
if (Array.isArray(edges) && edges.length > 0) {
229-
const newEdges = filterNewEdges(edges, useWorkflowStore.getState().edges)
229+
const blocks = useWorkflowStore.getState().blocks
230+
const currentEdges = useWorkflowStore.getState().edges
231+
const validEdges = filterValidEdges(edges, blocks)
232+
const newEdges = filterNewEdges(validEdges, currentEdges)
230233
if (newEdges.length > 0) {
231-
useWorkflowStore.getState().batchAddEdges(newEdges)
234+
useWorkflowStore.getState().batchAddEdges(newEdges, { skipValidation: true })
232235
}
233236
}
234237
break
@@ -1004,7 +1007,11 @@ export function useCollaborativeWorkflow() {
10041007

10051008
if (edges.length === 0) return false
10061009

1007-
const newEdges = filterNewEdges(edges, useWorkflowStore.getState().edges)
1010+
// Filter out invalid edges (e.g., edges targeting trigger blocks) and duplicates
1011+
const blocks = useWorkflowStore.getState().blocks
1012+
const currentEdges = useWorkflowStore.getState().edges
1013+
const validEdges = filterValidEdges(edges, blocks)
1014+
const newEdges = filterNewEdges(validEdges, currentEdges)
10081015
if (newEdges.length === 0) return false
10091016

10101017
const operationId = crypto.randomUUID()
@@ -1020,7 +1027,7 @@ export function useCollaborativeWorkflow() {
10201027
userId: session?.user?.id || 'unknown',
10211028
})
10221029

1023-
useWorkflowStore.getState().batchAddEdges(newEdges)
1030+
useWorkflowStore.getState().batchAddEdges(newEdges, { skipValidation: true })
10241031

10251032
if (!options?.skipUndoRedo) {
10261033
newEdges.forEach((edge) => undoRedo.recordAddEdge(edge.id))
@@ -1484,9 +1491,23 @@ export function useCollaborativeWorkflow() {
14841491

14851492
if (blocks.length === 0) return false
14861493

1494+
// Filter out invalid edges (e.g., edges targeting trigger blocks)
1495+
// Combine existing blocks with new blocks for validation
1496+
const existingBlocks = useWorkflowStore.getState().blocks
1497+
const newBlocksMap = blocks.reduce(
1498+
(acc, block) => {
1499+
acc[block.id] = block
1500+
return acc
1501+
},
1502+
{} as Record<string, BlockState>
1503+
)
1504+
const allBlocks = { ...existingBlocks, ...newBlocksMap }
1505+
const validEdges = filterValidEdges(edges, allBlocks)
1506+
14871507
logger.info('Batch adding blocks collaboratively', {
14881508
blockCount: blocks.length,
1489-
edgeCount: edges.length,
1509+
edgeCount: validEdges.length,
1510+
filteredEdges: edges.length - validEdges.length,
14901511
})
14911512

14921513
const operationId = crypto.randomUUID()
@@ -1496,16 +1517,18 @@ export function useCollaborativeWorkflow() {
14961517
operation: {
14971518
operation: BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS,
14981519
target: OPERATION_TARGETS.BLOCKS,
1499-
payload: { blocks, edges, loops, parallels, subBlockValues },
1520+
payload: { blocks, edges: validEdges, loops, parallels, subBlockValues },
15001521
},
15011522
workflowId: activeWorkflowId || '',
15021523
userId: session?.user?.id || 'unknown',
15031524
})
15041525

1505-
useWorkflowStore.getState().batchAddBlocks(blocks, edges, subBlockValues)
1526+
useWorkflowStore.getState().batchAddBlocks(blocks, validEdges, subBlockValues, {
1527+
skipEdgeValidation: true,
1528+
})
15061529

15071530
if (!options?.skipUndoRedo) {
1508-
undoRedo.recordBatchAddBlocks(blocks, edges, subBlockValues)
1531+
undoRedo.recordBatchAddBlocks(blocks, validEdges, subBlockValues)
15091532
}
15101533

15111534
return true

apps/sim/stores/workflows/utils.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import type { Edge } from 'reactflow'
22
import { v4 as uuidv4 } from 'uuid'
33
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
44
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
5+
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
56
import { getBlock } from '@/blocks'
6-
import { normalizeName } from '@/executor/constants'
7+
import { isAnnotationOnlyBlock, normalizeName } from '@/executor/constants'
78
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
89
import type {
910
BlockState,
@@ -17,6 +18,32 @@ import { TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
1718

1819
const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']
1920

21+
/**
22+
* Checks if an edge is valid (source and target exist, not annotation-only, target is not a trigger)
23+
*/
24+
function isValidEdge(
25+
edge: Edge,
26+
blocks: Record<string, { type: string; triggerMode?: boolean }>
27+
): boolean {
28+
const sourceBlock = blocks[edge.source]
29+
const targetBlock = blocks[edge.target]
30+
if (!sourceBlock || !targetBlock) return false
31+
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
32+
if (isAnnotationOnlyBlock(targetBlock.type)) return false
33+
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
34+
return true
35+
}
36+
37+
/**
38+
* Filters edges to only include valid ones (target exists and is not a trigger block)
39+
*/
40+
export function filterValidEdges(
41+
edges: Edge[],
42+
blocks: Record<string, { type: string; triggerMode?: boolean }>
43+
): Edge[] {
44+
return edges.filter((edge) => isValidEdge(edge, blocks))
45+
}
46+
2047
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
2148
return edgesToAdd.filter((edge) => {
2249
if (edge.source === edge.target) return false

apps/sim/stores/workflows/workflow/store.ts

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ import { create } from 'zustand'
44
import { devtools } from 'zustand/middleware'
55
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
66
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
7-
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
87
import { getBlock } from '@/blocks'
98
import type { SubBlockConfig } from '@/blocks/types'
10-
import { isAnnotationOnlyBlock, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
9+
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
1110
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
1211
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
13-
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
12+
import {
13+
filterNewEdges,
14+
filterValidEdges,
15+
getUniqueBlockName,
16+
mergeSubblockState,
17+
} from '@/stores/workflows/utils'
1418
import type {
1519
Position,
1620
SubBlockState,
@@ -91,26 +95,6 @@ function resolveInitialSubblockValue(config: SubBlockConfig): unknown {
9195
return null
9296
}
9397

94-
function isValidEdge(
95-
edge: Edge,
96-
blocks: Record<string, { type: string; triggerMode?: boolean }>
97-
): boolean {
98-
const sourceBlock = blocks[edge.source]
99-
const targetBlock = blocks[edge.target]
100-
if (!sourceBlock || !targetBlock) return false
101-
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
102-
if (isAnnotationOnlyBlock(targetBlock.type)) return false
103-
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
104-
return true
105-
}
106-
107-
function filterValidEdges(
108-
edges: Edge[],
109-
blocks: Record<string, { type: string; triggerMode?: boolean }>
110-
): Edge[] {
111-
return edges.filter((edge) => isValidEdge(edge, blocks))
112-
}
113-
11498
const initialState = {
11599
blocks: {},
116100
edges: [],
@@ -356,7 +340,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
356340
data?: Record<string, any>
357341
}>,
358342
edges?: Edge[],
359-
subBlockValues?: Record<string, Record<string, unknown>>
343+
subBlockValues?: Record<string, Record<string, unknown>>,
344+
options?: { skipEdgeValidation?: boolean }
360345
) => {
361346
const currentBlocks = get().blocks
362347
const currentEdges = get().edges
@@ -381,7 +366,10 @@ export const useWorkflowStore = create<WorkflowStore>()(
381366
}
382367

383368
if (edges && edges.length > 0) {
384-
const validEdges = filterValidEdges(edges, newBlocks)
369+
// Skip validation if already validated by caller (e.g., collaborative layer)
370+
const validEdges = options?.skipEdgeValidation
371+
? edges
372+
: filterValidEdges(edges, newBlocks)
385373
const existingEdgeIds = new Set(currentEdges.map((e) => e.id))
386374
for (const edge of validEdges) {
387375
if (!existingEdgeIds.has(edge.id)) {
@@ -516,11 +504,12 @@ export const useWorkflowStore = create<WorkflowStore>()(
516504
get().updateLastSaved()
517505
},
518506

519-
batchAddEdges: (edges: Edge[]) => {
507+
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => {
520508
const blocks = get().blocks
521509
const currentEdges = get().edges
522510

523-
const validEdges = filterValidEdges(edges, blocks)
511+
// Skip validation if already validated by caller (e.g., collaborative layer)
512+
const validEdges = options?.skipValidation ? edges : filterValidEdges(edges, blocks)
524513
const filtered = filterNewEdges(validEdges, currentEdges)
525514
const newEdges = [...currentEdges]
526515

apps/sim/stores/workflows/workflow/types.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,13 @@ export interface WorkflowActions {
203203
batchAddBlocks: (
204204
blocks: BlockState[],
205205
edges?: Edge[],
206-
subBlockValues?: Record<string, Record<string, unknown>>
206+
subBlockValues?: Record<string, Record<string, unknown>>,
207+
options?: { skipEdgeValidation?: boolean }
207208
) => void
208209
batchRemoveBlocks: (ids: string[]) => void
209210
batchToggleEnabled: (ids: string[]) => void
210211
batchToggleHandles: (ids: string[]) => void
211-
batchAddEdges: (edges: Edge[]) => void
212+
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => void
212213
batchRemoveEdges: (ids: string[]) => void
213214
clear: () => Partial<WorkflowState>
214215
updateLastSaved: () => void

0 commit comments

Comments
 (0)