Skip to content

Commit 5a4480e

Browse files
committed
feat(engine): Improve execution stalls troubleshooting, align dev and prod behavior, adding heartbeats.yield utility
1 parent 5db583b commit 5a4480e

File tree

21 files changed

+446
-109
lines changed

21 files changed

+446
-109
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,8 +519,8 @@ const EnvironmentSchema = z
519519
RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
520520
RUN_ENGINE_TIMEOUT_PENDING_EXECUTING: z.coerce.number().int().default(60_000),
521521
RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000),
522-
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
523-
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
522+
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(600_000), // 10 minutes
523+
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(600_000), // 10 minutes
524524
RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce
525525
.number()
526526
.int()

docs/troubleshooting.mdx

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,38 @@ View the [rate limits](/limits) page for more information.
181181

182182
This can happen in different situations, for example when using plain strings as idempotency keys. Support for `Crypto` without a special flag was added in Node `v19.0.0`. You will have to upgrade Node - we recommend even-numbered major releases, e.g. `v20` or `v22`. Alternatively, you can switch from plain strings to the `idempotencyKeys.create` SDK function. [Read the guide](/idempotency).
183183

184+
### Task run stalled executing
185+
186+
If you see a `TASK_RUN_STALLED_EXECUTING` error it means that we didn't receive a heartbeat from your task before the stall timeout. We automatically heartbeat runs every 30 seconds, and the heartbeat timeout is 10 minutes.
187+
188+
<Note>
189+
190+
If this was a dev run, then most likely the `trigger.dev dev` CLI was stopped, and it wasn't an issue with your code.
191+
192+
</Note>
193+
194+
These errors can happen when code inside your task is blocking the event loop for too long. The most likely cause would be an accidental infinite loop. It could also be a CPU-heavy operation that's blocking the event loop, like nested loops with very large arrays. We recommend reading the [Don't Block the Event Loop](https://nodejs.org/en/learn/asynchronous-work/dont-block-the-event-loop) guide from Node.js for common patterns that can cause this.
195+
196+
If you are doing a continuous CPU-heavy task, then we recommend you try using our `heartbeats.yield` function to automatically yield to the event loop periodically:
197+
198+
```ts
199+
import { heartbeats } from "@trigger.dev/sdk";
200+
201+
// code inside your task
202+
for (const row of bigDataset) {
203+
await heartbeats.yield(); // safe to call every iteration, we will only actually yield when we need to
204+
process(row); // this is a synchronous operation
205+
}
206+
```
207+
208+
<Note>
209+
210+
You could also offload the CPU-heavy work to a Node.js worker thread, but this is more complex to setup currently. We are planning on adding support for this in the future.
211+
212+
</Note>
213+
214+
If the above doesn't work, then we recommend you try increasing the machine size of your task. See our [machines guide](/machines) for more information.
215+
184216
## Framework specific issues
185217

186218
### NestJS swallows all errors/exceptions

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { assertExhaustive } from "@trigger.dev/core";
22
import { TaskRunError } from "@trigger.dev/core/v3";
3-
import { TaskRunStatus } from "@trigger.dev/database";
3+
import { RuntimeEnvironmentType, TaskRunStatus } from "@trigger.dev/database";
44

5-
export function runStatusFromError(error: TaskRunError): TaskRunStatus {
5+
export function runStatusFromError(
6+
error: TaskRunError,
7+
environmentType: RuntimeEnvironmentType
8+
): TaskRunStatus {
69
if (error.type !== "INTERNAL_ERROR") {
710
return "COMPLETED_WITH_ERRORS";
811
}
@@ -21,6 +24,15 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
2124
return "CANCELED";
2225
case "MAX_DURATION_EXCEEDED":
2326
return "TIMED_OUT";
27+
case "TASK_RUN_STALLED_EXECUTING":
28+
case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS": {
29+
if (environmentType === "DEVELOPMENT") {
30+
return "CANCELED";
31+
}
32+
33+
return "COMPLETED_WITH_ERRORS";
34+
}
35+
2436
case "TASK_PROCESS_OOM_KILLED":
2537
case "TASK_PROCESS_MAYBE_OOM_KILLED":
2638
case "TASK_PROCESS_SIGSEGV":
@@ -40,8 +52,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
4052
case "TASK_DEQUEUED_INVALID_STATE":
4153
case "TASK_DEQUEUED_QUEUE_NOT_FOUND":
4254
case "TASK_RUN_DEQUEUED_MAX_RETRIES":
43-
case "TASK_RUN_STALLED_EXECUTING":
44-
case "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS":
4555
case "TASK_HAS_N0_EXECUTION_SNAPSHOT":
4656
case "GRACEFUL_EXIT_TIMEOUT":
4757
case "POD_EVICTED":

internal-packages/run-engine/src/engine/index.ts

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
CreateCheckpointResult,
99
DequeuedMessage,
1010
ExecutionResult,
11+
formatDurationMilliseconds,
1112
RunExecutionData,
1213
StartRunAttemptResult,
1314
TaskRunContext,
@@ -212,6 +213,8 @@ export class RunEngine {
212213
});
213214

214215
if (!options.worker.disabled) {
216+
console.log("✅ Starting run engine worker");
217+
215218
this.worker.start();
216219
}
217220

@@ -1190,23 +1193,6 @@ export class RunEngine {
11901193
snapshot: latestSnapshot,
11911194
});
11921195

1193-
// For dev, we just cancel runs that are stuck
1194-
if (latestSnapshot.environmentType === "DEVELOPMENT") {
1195-
this.logger.log("RunEngine.#handleStalledSnapshot() cancelling DEV run", {
1196-
runId,
1197-
snapshot: latestSnapshot,
1198-
});
1199-
1200-
await this.cancelRun({
1201-
runId: latestSnapshot.runId,
1202-
finalizeRun: true,
1203-
reason:
1204-
"Run was disconnected, check you're running the CLI dev command and your network connection is healthy.",
1205-
tx,
1206-
});
1207-
return;
1208-
}
1209-
12101196
switch (latestSnapshot.executionStatus) {
12111197
case "RUN_CREATED": {
12121198
throw new NotImplementedError("There shouldn't be a heartbeat for RUN_CREATED");
@@ -1265,7 +1251,19 @@ export class RunEngine {
12651251
case "EXECUTING_WITH_WAITPOINTS": {
12661252
const retryDelay = 250;
12671253

1268-
//todo call attemptFailed and force requeuing
1254+
const timeoutDuration =
1255+
latestSnapshot.executionStatus === "EXECUTING"
1256+
? formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING)
1257+
: formatDurationMilliseconds(this.heartbeatTimeouts.EXECUTING_WITH_WAITPOINTS);
1258+
1259+
// Dev runs don't retry, because the vast majority of the time these snapshots stall because
1260+
// they have quit the CLI
1261+
const shouldRetry = latestSnapshot.environmentType !== "DEVELOPMENT";
1262+
const errorMessage =
1263+
latestSnapshot.environmentType === "DEVELOPMENT"
1264+
? `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). Check if your \`trigger.dev dev\` CLI is still running, or if CPU-heavy work is blocking the main thread.`
1265+
: `Run timed out after ${timeoutDuration} due to missing heartbeats (sent every 30s). This typically happens when CPU-heavy work blocks the main thread.`;
1266+
12691267
await this.runAttemptSystem.attemptFailed({
12701268
runId,
12711269
snapshotId: latestSnapshot.id,
@@ -1278,13 +1276,15 @@ export class RunEngine {
12781276
latestSnapshot.executionStatus === "EXECUTING"
12791277
? "TASK_RUN_STALLED_EXECUTING"
12801278
: "TASK_RUN_STALLED_EXECUTING_WITH_WAITPOINTS",
1281-
message: `Run stalled while executing. This can happen when the run becomes unresponsive, for example because the CPU is overloaded.`,
1282-
},
1283-
retry: {
1284-
//250ms in the future
1285-
timestamp: Date.now() + retryDelay,
1286-
delay: retryDelay,
1279+
message: errorMessage,
12871280
},
1281+
retry: shouldRetry
1282+
? {
1283+
//250ms in the future
1284+
timestamp: Date.now() + retryDelay,
1285+
delay: retryDelay,
1286+
}
1287+
: undefined,
12881288
},
12891289
forceRequeue: true,
12901290
tx: prisma,

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import { RunEngineOptions } from "../types.js";
5151
import { BatchSystem } from "./batchSystem.js";
5252
import { DelayedRunSystem } from "./delayedRunSystem.js";
5353
import {
54+
EnhancedExecutionSnapshot,
5455
executionResultFromSnapshot,
5556
ExecutionSnapshotSystem,
5657
getLatestExecutionSnapshot,
@@ -690,6 +691,10 @@ export class RunAttemptSystem {
690691
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
691692
}
692693

694+
if (latestSnapshot.executionStatus === "FINISHED") {
695+
throw new ServiceValidationError("Run is already finished", 400);
696+
}
697+
693698
span.setAttribute("completionStatus", completion.ok);
694699

695700
const completedAt = new Date();
@@ -843,6 +848,10 @@ export class RunAttemptSystem {
843848
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
844849
}
845850

851+
if (latestSnapshot.executionStatus === "FINISHED") {
852+
throw new ServiceValidationError("Run is already finished", 400);
853+
}
854+
846855
span.setAttribute("completionStatus", completion.ok);
847856

848857
//remove waitpoints blocking the run
@@ -923,7 +932,7 @@ export class RunAttemptSystem {
923932
case "fail_run": {
924933
return await this.#permanentlyFailRun({
925934
runId,
926-
snapshotId,
935+
latestSnapshot,
927936
failedAt,
928937
error: retryResult.sanitizedError,
929938
workerId,
@@ -1443,14 +1452,14 @@ export class RunAttemptSystem {
14431452

14441453
async #permanentlyFailRun({
14451454
runId,
1446-
snapshotId,
1455+
latestSnapshot,
14471456
failedAt,
14481457
error,
14491458
workerId,
14501459
runnerId,
14511460
}: {
14521461
runId: string;
1453-
snapshotId?: string;
1462+
latestSnapshot: EnhancedExecutionSnapshot;
14541463
failedAt: Date;
14551464
error: TaskRunError;
14561465
workerId?: string;
@@ -1459,7 +1468,7 @@ export class RunAttemptSystem {
14591468
const prisma = this.$.prisma;
14601469

14611470
return startSpan(this.$.tracer, "permanentlyFailRun", async (span) => {
1462-
const status = runStatusFromError(error);
1471+
const status = runStatusFromError(error, latestSnapshot.environmentType);
14631472

14641473
//run permanently failed
14651474
const run = await prisma.taskRun.update({
@@ -1512,7 +1521,7 @@ export class RunAttemptSystem {
15121521
executionStatus: "FINISHED",
15131522
description: "Run failed",
15141523
},
1515-
previousSnapshotId: snapshotId,
1524+
previousSnapshotId: latestSnapshot.id,
15161525
environmentId: run.runtimeEnvironment.id,
15171526
environmentType: run.runtimeEnvironment.type,
15181527
projectId: run.runtimeEnvironment.project.id,

0 commit comments

Comments
 (0)