Skip to content

Commit 3a9e5f3

Browse files
author
priyanshu.solanki
committed
extracted utility functions
1 parent 39444fa commit 3a9e5f3

File tree

3 files changed

+89
-64
lines changed

3 files changed

+89
-64
lines changed

apps/sim/executor/orchestrators/loop.ts

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import {
1414
buildSentinelEndId,
1515
buildSentinelStartId,
1616
extractBaseBlockId,
17+
hasValidInput,
1718
resolveArrayInput,
19+
validateMaxCount,
1820
} from '@/executor/utils/subflow-utils'
1921
import type { VariableResolver } from '@/executor/variables/resolver'
2022
import type { SerializedLoop } from '@/serializer/types'
@@ -68,17 +70,21 @@ export class LoopOrchestrator {
6870
scope.loopType = 'for'
6971
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
7072

71-
if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
72-
const errorMessage = `For loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
73-
logger.error(errorMessage, { loopId, requestedIterations })
74-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
73+
const iterationError = validateMaxCount(
74+
requestedIterations,
75+
DEFAULTS.MAX_LOOP_ITERATIONS,
76+
'For loop iterations'
77+
)
78+
if (iterationError) {
79+
logger.error(iterationError, { loopId, requestedIterations })
80+
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
7581
iterations: requestedIterations,
7682
})
7783
scope.maxIterations = 0
78-
scope.validationError = errorMessage
84+
scope.validationError = iterationError
7985
scope.condition = buildLoopIndexCondition(0)
8086
ctx.loopExecutions?.set(loopId, scope)
81-
throw new Error(errorMessage)
87+
throw new Error(iterationError)
8288
}
8389

8490
scope.maxIterations = requestedIterations
@@ -89,11 +95,8 @@ export class LoopOrchestrator {
8995
case 'forEach': {
9096
scope.loopType = 'forEach'
9197
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
92-
const hasInput =
93-
loopConfig.forEachItems !== undefined &&
94-
loopConfig.forEachItems !== null &&
95-
loopConfig.forEachItems !== ''
96-
if (hasInput && items.length === 0) {
98+
99+
if (hasValidInput(loopConfig.forEachItems) && items.length === 0) {
97100
const errorMessage =
98101
'ForEach loop collection is not a valid array. Loop execution blocked.'
99102
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
@@ -108,20 +111,23 @@ export class LoopOrchestrator {
108111
throw new Error(errorMessage)
109112
}
110113

111-
const originalLength = items.length
112-
if (originalLength > DEFAULTS.MAX_FOREACH_ITEMS) {
113-
const errorMessage = `ForEach loop collection size (${originalLength}) exceeds maximum allowed (${DEFAULTS.MAX_FOREACH_ITEMS}). Loop execution blocked.`
114-
logger.error(errorMessage, { loopId, originalLength })
115-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
114+
const sizeError = validateMaxCount(
115+
items.length,
116+
DEFAULTS.MAX_FOREACH_ITEMS,
117+
'ForEach loop collection size'
118+
)
119+
if (sizeError) {
120+
logger.error(sizeError, { loopId, collectionSize: items.length })
121+
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
116122
forEachItems: loopConfig.forEachItems,
117-
collectionSize: originalLength,
123+
collectionSize: items.length,
118124
})
119125
scope.items = []
120126
scope.maxIterations = 0
121-
scope.validationError = errorMessage
127+
scope.validationError = sizeError
122128
scope.condition = buildLoopIndexCondition(0)
123129
ctx.loopExecutions?.set(loopId, scope)
124-
throw new Error(errorMessage)
130+
throw new Error(sizeError)
125131
}
126132

127133
scope.items = items
@@ -143,17 +149,21 @@ export class LoopOrchestrator {
143149
} else {
144150
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
145151

146-
if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
147-
const errorMessage = `Do-While loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
148-
logger.error(errorMessage, { loopId, requestedIterations })
149-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
152+
const iterationError = validateMaxCount(
153+
requestedIterations,
154+
DEFAULTS.MAX_LOOP_ITERATIONS,
155+
'Do-While loop iterations'
156+
)
157+
if (iterationError) {
158+
logger.error(iterationError, { loopId, requestedIterations })
159+
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
150160
iterations: requestedIterations,
151161
})
152162
scope.maxIterations = 0
153-
scope.validationError = errorMessage
163+
scope.validationError = iterationError
154164
scope.condition = buildLoopIndexCondition(0)
155165
ctx.loopExecutions?.set(loopId, scope)
156-
throw new Error(errorMessage)
166+
throw new Error(iterationError)
157167
}
158168

159169
scope.maxIterations = requestedIterations

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import {
1111
calculateBranchCount,
1212
extractBaseBlockId,
1313
extractBranchIndex,
14+
hasValidInput,
1415
parseDistributionItems,
1516
resolveArrayInput,
17+
validateMaxCount,
1618
} from '@/executor/utils/subflow-utils'
1719
import type { VariableResolver } from '@/executor/variables/resolver'
1820
import type { SerializedParallel } from '@/serializer/types'
@@ -62,13 +64,9 @@ export class ParallelOrchestrator {
6264

6365
if (parallelConfig?.distribution !== undefined && parallelConfig?.distribution !== null) {
6466
const rawDistribution = parallelConfig.distribution
65-
const hasInput =
66-
(typeof rawDistribution === 'string' && rawDistribution !== '') ||
67-
(Array.isArray(rawDistribution) && rawDistribution.length > 0) ||
68-
(typeof rawDistribution === 'object' && Object.keys(rawDistribution).length > 0)
69-
7067
const inputWasEmptyArray = Array.isArray(rawDistribution) && rawDistribution.length === 0
71-
if (hasInput && !inputWasEmptyArray && (!items || items.length === 0)) {
68+
69+
if (hasValidInput(rawDistribution) && !inputWasEmptyArray && (!items || items.length === 0)) {
7270
const errorMessage =
7371
'Parallel distribution is not a valid array. Parallel execution blocked.'
7472
logger.error(errorMessage, {
@@ -79,47 +77,26 @@ export class ParallelOrchestrator {
7977
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
8078
distribution: parallelConfig.distribution,
8179
})
82-
83-
const scope: ParallelScope = {
84-
parallelId,
85-
totalBranches: 0,
86-
branchOutputs: new Map(),
87-
completedCount: 0,
88-
totalExpectedNodes: 0,
89-
items: [],
90-
validationError: errorMessage,
91-
}
92-
if (!ctx.parallelExecutions) {
93-
ctx.parallelExecutions = new Map()
94-
}
95-
ctx.parallelExecutions.set(parallelId, scope)
80+
this.setErrorScope(ctx, parallelId, errorMessage)
9681
throw new Error(errorMessage)
9782
}
9883
}
84+
9985
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches
10086

101-
if (actualBranchCount > DEFAULTS.MAX_PARALLEL_BRANCHES) {
102-
const errorMessage = `Parallel branch count (${actualBranchCount}) exceeds maximum allowed (${DEFAULTS.MAX_PARALLEL_BRANCHES}). Parallel execution blocked.`
103-
logger.error(errorMessage, { parallelId, actualBranchCount })
104-
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
87+
const branchError = validateMaxCount(
88+
actualBranchCount,
89+
DEFAULTS.MAX_PARALLEL_BRANCHES,
90+
'Parallel branch count'
91+
)
92+
if (branchError) {
93+
logger.error(branchError, { parallelId, actualBranchCount })
94+
this.addParallelErrorLog(ctx, parallelId, branchError, {
10595
distribution: parallelConfig?.distribution,
10696
branchCount: actualBranchCount,
10797
})
108-
109-
const scope: ParallelScope = {
110-
parallelId,
111-
totalBranches: 0,
112-
branchOutputs: new Map(),
113-
completedCount: 0,
114-
totalExpectedNodes: 0,
115-
items: [],
116-
validationError: errorMessage,
117-
}
118-
if (!ctx.parallelExecutions) {
119-
ctx.parallelExecutions = new Map()
120-
}
121-
ctx.parallelExecutions.set(parallelId, scope)
122-
throw new Error(errorMessage)
98+
this.setErrorScope(ctx, parallelId, branchError)
99+
throw new Error(branchError)
123100
}
124101

125102
const scope: ParallelScope = {
@@ -192,6 +169,22 @@ export class ParallelOrchestrator {
192169
)
193170
}
194171

172+
private setErrorScope(ctx: ExecutionContext, parallelId: string, errorMessage: string): void {
173+
const scope: ParallelScope = {
174+
parallelId,
175+
totalBranches: 0,
176+
branchOutputs: new Map(),
177+
completedCount: 0,
178+
totalExpectedNodes: 0,
179+
items: [],
180+
validationError: errorMessage,
181+
}
182+
if (!ctx.parallelExecutions) {
183+
ctx.parallelExecutions = new Map()
184+
}
185+
ctx.parallelExecutions.set(parallelId, scope)
186+
}
187+
195188
/**
196189
* Dynamically expand the DAG to include additional branch nodes when
197190
* the resolved item count exceeds the pre-built branch count.

apps/sim/executor/utils/subflow-utils.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,28 @@ export function normalizeNodeId(nodeId: string): string {
136136
return nodeId
137137
}
138138

139+
/**
140+
* Checks if a value represents valid input (not empty/null/undefined).
141+
*/
142+
export function hasValidInput(value: any): boolean {
143+
if (value === undefined || value === null) return false
144+
if (typeof value === 'string') return value !== ''
145+
if (Array.isArray(value)) return value.length > 0
146+
if (typeof value === 'object') return Object.keys(value).length > 0
147+
return true
148+
}
149+
150+
/**
151+
* Validates that a count doesn't exceed a maximum limit.
152+
* Returns an error message if validation fails, undefined otherwise.
153+
*/
154+
export function validateMaxCount(count: number, max: number, itemType: string): string | undefined {
155+
if (count > max) {
156+
return `${itemType} (${count}) exceeds maximum allowed (${max}). Execution blocked.`
157+
}
158+
return undefined
159+
}
160+
139161
/**
140162
* Resolves array input at runtime. Handles arrays, objects, references, and JSON strings.
141163
* Used by both loop forEach and parallel distribution resolution.

0 commit comments

Comments
 (0)