Skip to content

Commit 77ee017

Browse files
author
priyanshu.solanki
committed
fixed logs for parallel and loop execution flow
1 parent 474762d commit 77ee017

File tree

6 files changed

+211
-12
lines changed

6 files changed

+211
-12
lines changed

apps/sim/executor/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ export const DEFAULTS = {
128128
BLOCK_TITLE: 'Untitled Block',
129129
WORKFLOW_NAME: 'Workflow',
130130
MAX_LOOP_ITERATIONS: 1000,
131+
MAX_FOREACH_ITEMS: 1000,
132+
MAX_PARALLEL_BRANCHES: 20,
131133
MAX_WORKFLOW_DEPTH: 10,
132134
EXECUTION_TIME: 0,
133135
TOKENS: {

apps/sim/executor/execution/executor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ export class DAGExecutor {
6363

6464
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
6565
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
66+
loopOrchestrator.setContextExtensions(this.contextExtensions)
6667
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
6768
parallelOrchestrator.setResolver(resolver)
69+
parallelOrchestrator.setContextExtensions(this.contextExtensions)
6870
const allHandlers = createBlockHandlers()
6971
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
7072
const edgeManager = new EdgeManager(dag)

apps/sim/executor/execution/state.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export interface LoopScope {
1414
condition?: string
1515
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
1616
skipFirstConditionCheck?: boolean
17+
/** Error message if loop validation failed (e.g., exceeded max iterations) */
18+
validationError?: string
1719
}
1820

1921
export interface ParallelScope {
@@ -23,6 +25,8 @@ export interface ParallelScope {
2325
completedCount: number
2426
totalExpectedNodes: number
2527
items?: any[]
28+
/** Error message if parallel validation failed (e.g., exceeded max branches) */
29+
validationError?: string
2630
}
2731

2832
export class ExecutionState implements BlockStateController {

apps/sim/executor/orchestrators/loop.ts

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
55
import type { DAG } from '@/executor/dag/builder'
66
import type { EdgeManager } from '@/executor/execution/edge-manager'
77
import type { LoopScope } from '@/executor/execution/state'
8-
import type { BlockStateController } from '@/executor/execution/types'
9-
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
8+
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
9+
import type { BlockLog, ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
1010
import type { LoopConfigWithNodes } from '@/executor/types/loop'
1111
import { replaceValidReferences } from '@/executor/utils/reference-validation'
1212
import {
@@ -32,13 +32,18 @@ export interface LoopContinuationResult {
3232

3333
export class LoopOrchestrator {
3434
private edgeManager: EdgeManager | null = null
35+
private contextExtensions: ContextExtensions | null = null
3536

3637
constructor(
3738
private dag: DAG,
3839
private state: BlockStateController,
3940
private resolver: VariableResolver
4041
) {}
4142

43+
setContextExtensions(contextExtensions: ContextExtensions): void {
44+
this.contextExtensions = contextExtensions
45+
}
46+
4247
setEdgeManager(edgeManager: EdgeManager): void {
4348
this.edgeManager = edgeManager
4449
}
@@ -58,18 +63,54 @@ export class LoopOrchestrator {
5863
const loopType = loopConfig.loopType
5964

6065
switch (loopType) {
61-
case 'for':
66+
case 'for': {
6267
scope.loopType = 'for'
63-
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
68+
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
69+
70+
if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
71+
const errorMessage = `For loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
72+
logger.error(errorMessage, { loopId, requestedIterations })
73+
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
74+
// Set to 0 iterations to prevent loop from running
75+
scope.maxIterations = 0
76+
scope.validationError = errorMessage
77+
} else {
78+
scope.maxIterations = requestedIterations
79+
}
80+
6481
scope.condition = buildLoopIndexCondition(scope.maxIterations)
6582
break
83+
}
6684

6785
case 'forEach': {
6886
scope.loopType = 'forEach'
87+
if (!Array.isArray(loopConfig.forEachItems)) {
88+
const errorMessage =
89+
'ForEach loop collection is not a valid array. Loop execution blocked.'
90+
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
91+
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
92+
scope.items = []
93+
scope.maxIterations = 0
94+
scope.validationError = errorMessage
95+
scope.condition = buildLoopIndexCondition(0)
96+
break
97+
}
6998
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
70-
scope.items = items
71-
scope.maxIterations = items.length
72-
scope.item = items[0]
99+
const originalLength = items.length
100+
101+
if (originalLength > DEFAULTS.MAX_FOREACH_ITEMS) {
102+
const errorMessage = `ForEach loop collection size (${originalLength}) exceeds maximum allowed (${DEFAULTS.MAX_FOREACH_ITEMS}). Loop execution blocked.`
103+
logger.error(errorMessage, { loopId, originalLength })
104+
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
105+
scope.items = []
106+
scope.maxIterations = 0
107+
scope.validationError = errorMessage
108+
} else {
109+
scope.items = items
110+
scope.maxIterations = items.length
111+
scope.item = items[0]
112+
}
113+
73114
scope.condition = buildLoopIndexCondition(scope.maxIterations)
74115
break
75116
}
@@ -79,15 +120,28 @@ export class LoopOrchestrator {
79120
scope.condition = loopConfig.whileCondition
80121
break
81122

82-
case 'doWhile':
123+
case 'doWhile': {
83124
scope.loopType = 'doWhile'
84125
if (loopConfig.doWhileCondition) {
85126
scope.condition = loopConfig.doWhileCondition
86127
} else {
87-
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
128+
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
129+
130+
if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
131+
const errorMessage = `Do-While loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
132+
logger.error(errorMessage, { loopId, requestedIterations })
133+
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
134+
// Set to 0 iterations to prevent loop from running
135+
scope.maxIterations = 0
136+
scope.validationError = errorMessage
137+
} else {
138+
scope.maxIterations = requestedIterations
139+
}
140+
88141
scope.condition = buildLoopIndexCondition(scope.maxIterations)
89142
}
90143
break
144+
}
91145

92146
default:
93147
throw new Error(`Unknown loop type: ${loopType}`)
@@ -100,6 +154,47 @@ export class LoopOrchestrator {
100154
return scope
101155
}
102156

157+
/**
158+
* Adds an error log entry for loop validation errors.
159+
* These errors appear in the block console on the logs dashboard.
160+
*/
161+
private addLoopErrorLog(
162+
ctx: ExecutionContext,
163+
loopId: string,
164+
loopType: string,
165+
errorMessage: string
166+
): void {
167+
const now = new Date().toISOString()
168+
169+
// Get the actual loop block name from the workflow
170+
const loopBlock = ctx.workflow?.blocks?.find((b) => b.id === loopId)
171+
const blockName = loopBlock?.metadata?.name || `Loop`
172+
173+
const blockLog: BlockLog = {
174+
blockId: loopId,
175+
blockName,
176+
blockType: 'loop',
177+
startedAt: now,
178+
endedAt: now,
179+
durationMs: 0,
180+
success: false,
181+
error: errorMessage,
182+
input: {},
183+
output: { error: errorMessage },
184+
loopId,
185+
}
186+
ctx.blockLogs.push(blockLog)
187+
188+
// Emit the error through onBlockComplete callback so it appears in the UI console
189+
if (this.contextExtensions?.onBlockComplete) {
190+
this.contextExtensions.onBlockComplete(loopId, blockName, 'loop', {
191+
input: {},
192+
output: { error: errorMessage },
193+
executionTime: 0,
194+
})
195+
}
196+
}
197+
103198
storeLoopNodeOutput(
104199
ctx: ExecutionContext,
105200
loopId: string,

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { createLogger } from '@/lib/logs/console/logger'
2+
import { DEFAULTS } from '@/executor/constants'
23
import type { DAG, DAGNode } from '@/executor/dag/builder'
34
import type { ParallelScope } from '@/executor/execution/state'
4-
import type { BlockStateWriter } from '@/executor/execution/types'
5-
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
5+
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
6+
import type { BlockLog, ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
67
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
78
import {
89
buildBranchNodeId,
@@ -32,6 +33,7 @@ export interface ParallelAggregationResult {
3233

3334
export class ParallelOrchestrator {
3435
private resolver: VariableResolver | null = null
36+
private contextExtensions: ContextExtensions | null = null
3537

3638
constructor(
3739
private dag: DAG,
@@ -42,18 +44,70 @@ export class ParallelOrchestrator {
4244
this.resolver = resolver
4345
}
4446

47+
setContextExtensions(contextExtensions: ContextExtensions): void {
48+
this.contextExtensions = contextExtensions
49+
}
50+
4551
initializeParallelScope(
4652
ctx: ExecutionContext,
4753
parallelId: string,
4854
totalBranches: number,
4955
terminalNodesCount = 1
5056
): ParallelScope {
5157
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
58+
59+
// Validate distribution items if parallel uses distribution
60+
if (parallelConfig?.distribution !== undefined && parallelConfig?.distribution !== null) {
61+
if (!Array.isArray(parallelConfig.distribution)) {
62+
const errorMessage =
63+
'Parallel distribution is not a valid array. Parallel execution blocked.'
64+
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
65+
this.addParallelErrorLog(ctx, parallelId, errorMessage)
66+
67+
const scope: ParallelScope = {
68+
parallelId,
69+
totalBranches: 0,
70+
branchOutputs: new Map(),
71+
completedCount: 0,
72+
totalExpectedNodes: 0,
73+
items: [],
74+
validationError: errorMessage,
75+
}
76+
if (!ctx.parallelExecutions) {
77+
ctx.parallelExecutions = new Map()
78+
}
79+
ctx.parallelExecutions.set(parallelId, scope)
80+
return scope
81+
}
82+
}
83+
5284
const items = parallelConfig ? this.resolveDistributionItems(ctx, parallelConfig) : undefined
5385

5486
// If we have more items than pre-built branches, expand the DAG
5587
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches
5688

89+
// Validate branch count doesn't exceed maximum
90+
if (actualBranchCount > DEFAULTS.MAX_PARALLEL_BRANCHES) {
91+
const errorMessage = `Parallel branch count (${actualBranchCount}) exceeds maximum allowed (${DEFAULTS.MAX_PARALLEL_BRANCHES}). Parallel execution blocked.`
92+
logger.error(errorMessage, { parallelId, actualBranchCount })
93+
this.addParallelErrorLog(ctx, parallelId, errorMessage)
94+
95+
const scope: ParallelScope = {
96+
parallelId,
97+
totalBranches: 0,
98+
branchOutputs: new Map(),
99+
completedCount: 0,
100+
totalExpectedNodes: 0,
101+
items: [],
102+
validationError: errorMessage,
103+
}
104+
if (!ctx.parallelExecutions) {
105+
ctx.parallelExecutions = new Map()
106+
}
107+
ctx.parallelExecutions.set(parallelId, scope)
108+
return scope
109+
}
110+
57111
const scope: ParallelScope = {
58112
parallelId,
59113
totalBranches: actualBranchCount,
@@ -108,6 +162,46 @@ export class ParallelOrchestrator {
108162
return scope
109163
}
110164

165+
/**
166+
* Adds an error log entry for parallel validation errors.
167+
* These errors appear in the block console on the logs dashboard.
168+
*/
169+
private addParallelErrorLog(
170+
ctx: ExecutionContext,
171+
parallelId: string,
172+
errorMessage: string
173+
): void {
174+
const now = new Date().toISOString()
175+
176+
// Get the actual parallel block name from the workflow
177+
const parallelBlock = ctx.workflow?.blocks?.find((b) => b.id === parallelId)
178+
const blockName = parallelBlock?.metadata?.name || 'Parallel'
179+
180+
const blockLog: BlockLog = {
181+
blockId: parallelId,
182+
blockName,
183+
blockType: 'parallel',
184+
startedAt: now,
185+
endedAt: now,
186+
durationMs: 0,
187+
success: false,
188+
error: errorMessage,
189+
input: {},
190+
output: { error: errorMessage },
191+
parallelId,
192+
}
193+
ctx.blockLogs.push(blockLog)
194+
195+
// Emit the error through onBlockComplete callback so it appears in the UI console
196+
if (this.contextExtensions?.onBlockComplete) {
197+
this.contextExtensions.onBlockComplete(parallelId, blockName, 'parallel', {
198+
input: {},
199+
output: { error: errorMessage },
200+
executionTime: 0,
201+
})
202+
}
203+
}
204+
111205
/**
112206
* Dynamically expand the DAG to include additional branch nodes when
113207
* the resolved item count exceeds the pre-built branch count.

apps/sim/lib/logs/execution/trace-spans/trace-spans.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,10 @@ function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] {
471471
}
472472
})
473473

474+
// Include loop/parallel spans that have errors (e.g., validation errors that blocked execution)
475+
// These won't have iteration children, so they should appear directly in results
474476
const nonIterationContainerSpans = normalSpans.filter(
475-
(span) => span.type !== 'parallel' && span.type !== 'loop'
477+
(span) => (span.type !== 'parallel' && span.type !== 'loop') || span.status === 'error'
476478
)
477479

478480
if (iterationSpans.length > 0) {

0 commit comments

Comments
 (0)