Skip to content

Commit a06b7d1

Browse files
committed
Move retrying logic to a separate function, it was getting very messy
1 parent 0a2c090 commit a06b7d1

File tree

2 files changed

+307
-192
lines changed

2 files changed

+307
-192
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 142 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import {
7171
isPendingExecuting,
7272
} from "./statuses";
7373
import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types";
74+
import { retryOutcomeFromCompletion } from "./retrying";
7475

7576
const workerCatalog = {
7677
finishWaitpoint: {
@@ -2867,228 +2868,177 @@ export class RunEngine {
28672868

28682869
const failedAt = new Date();
28692870

2870-
if (
2871-
completion.error.type === "INTERNAL_ERROR" &&
2872-
completion.error.code === "TASK_RUN_CANCELLED"
2873-
) {
2874-
// We need to cancel the task run instead of fail it
2875-
const result = await this.cancelRun({
2876-
runId,
2877-
completedAt: failedAt,
2878-
reason: completion.error.message,
2879-
finalizeRun: true,
2880-
tx: prisma,
2881-
});
2882-
return {
2883-
attemptStatus:
2884-
result.snapshot.executionStatus === "PENDING_CANCEL"
2885-
? "RUN_PENDING_CANCEL"
2886-
: "RUN_FINISHED",
2887-
...result,
2888-
};
2889-
}
2890-
2891-
const error = sanitizeError(completion.error);
2892-
const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
2893-
2894-
const permanentlyFailRun = async (run?: {
2895-
status: TaskRunStatus;
2896-
spanId: string;
2897-
createdAt: Date;
2898-
completedAt: Date | null;
2899-
taskEventStore: string;
2900-
}) => {
2901-
// Emit an event so we can complete any spans of stalled executions
2902-
if (forceRequeue && run) {
2903-
this.eventBus.emit("runAttemptFailed", {
2904-
time: failedAt,
2905-
run: {
2906-
id: runId,
2907-
status: run.status,
2908-
spanId: run.spanId,
2909-
error,
2910-
attemptNumber: latestSnapshot.attemptNumber ?? 0,
2911-
createdAt: run.createdAt,
2912-
completedAt: run.completedAt,
2913-
taskEventStore: run.taskEventStore,
2914-
},
2915-
});
2916-
}
2917-
2918-
return await this.#permanentlyFailRun({
2919-
runId,
2920-
snapshotId,
2921-
failedAt,
2922-
error,
2923-
workerId,
2924-
runnerId,
2925-
});
2926-
};
2927-
2928-
// Error is not retriable, fail the run
2929-
if (!retriableError) {
2930-
return await permanentlyFailRun();
2931-
}
2932-
2933-
// No retry config attached to completion, fail the run
2934-
if (completion.retry === undefined) {
2935-
return await permanentlyFailRun();
2936-
}
2937-
2938-
// Run attempts have reached the global maximum, fail the run
2939-
if (
2940-
latestSnapshot.attemptNumber !== null &&
2941-
latestSnapshot.attemptNumber >= MAX_TASK_RUN_ATTEMPTS
2942-
) {
2943-
return await permanentlyFailRun();
2944-
}
2871+
const retryResult = await retryOutcomeFromCompletion(prisma, {
2872+
runId,
2873+
error: completion.error,
2874+
retryUsingQueue: forceRequeue ?? false,
2875+
retrySettings: completion.retry,
2876+
attemptNumber: latestSnapshot.attemptNumber,
2877+
});
29452878

2946-
const minimalRun = await prisma.taskRun.findFirst({
2947-
where: {
2948-
id: runId,
2949-
},
2950-
select: {
2951-
status: true,
2952-
spanId: true,
2953-
maxAttempts: true,
2954-
runtimeEnvironment: {
2955-
select: {
2956-
organizationId: true,
2879+
// Force requeue means it was crashed so the attempt span needs to be closed
2880+
if (forceRequeue) {
2881+
const minimalRun = await prisma.taskRun.findFirst({
2882+
where: {
2883+
id: runId,
2884+
},
2885+
select: {
2886+
status: true,
2887+
spanId: true,
2888+
maxAttempts: true,
2889+
runtimeEnvironment: {
2890+
select: {
2891+
organizationId: true,
2892+
},
29572893
},
2894+
taskEventStore: true,
2895+
createdAt: true,
2896+
completedAt: true,
29582897
},
2959-
taskEventStore: true,
2960-
createdAt: true,
2961-
completedAt: true,
2962-
},
2963-
});
2964-
2965-
if (!minimalRun) {
2966-
throw new ServiceValidationError("Run not found", 404);
2967-
}
2968-
2969-
// Run doesn't have any max attempts set which is required for retrying, fail the run
2970-
if (!minimalRun.maxAttempts) {
2971-
return await permanentlyFailRun(minimalRun);
2972-
}
2898+
});
29732899

2974-
// Run has reached the maximum configured number of attempts, fail the run
2975-
if (
2976-
latestSnapshot.attemptNumber !== null &&
2977-
latestSnapshot.attemptNumber >= minimalRun.maxAttempts
2978-
) {
2979-
return await permanentlyFailRun(minimalRun);
2980-
}
2900+
if (!minimalRun) {
2901+
throw new ServiceValidationError("Run not found", 404);
2902+
}
29812903

2982-
// This error didn't come from user code, so we need to emit an event to complete any spans
2983-
if (forceRequeue) {
29842904
this.eventBus.emit("runAttemptFailed", {
29852905
time: failedAt,
29862906
run: {
29872907
id: runId,
29882908
status: minimalRun.status,
29892909
spanId: minimalRun.spanId,
2990-
error,
2910+
error: completion.error,
29912911
attemptNumber: latestSnapshot.attemptNumber ?? 0,
2992-
taskEventStore: minimalRun.taskEventStore,
29932912
createdAt: minimalRun.createdAt,
29942913
completedAt: minimalRun.completedAt,
2914+
taskEventStore: minimalRun.taskEventStore,
29952915
},
29962916
});
29972917
}
29982918

2999-
const retryAt = new Date(completion.retry.timestamp);
2919+
switch (retryResult.outcome) {
2920+
case "cancel_run": {
2921+
const result = await this.cancelRun({
2922+
runId,
2923+
completedAt: failedAt,
2924+
reason: retryResult.reason,
2925+
finalizeRun: true,
2926+
tx: prisma,
2927+
});
2928+
return {
2929+
attemptStatus:
2930+
result.snapshot.executionStatus === "PENDING_CANCEL"
2931+
? "RUN_PENDING_CANCEL"
2932+
: "RUN_FINISHED",
2933+
...result,
2934+
};
2935+
}
2936+
case "fail_run": {
2937+
return await this.#permanentlyFailRun({
2938+
runId,
2939+
snapshotId,
2940+
failedAt,
2941+
error: retryResult.sanitizedError,
2942+
workerId,
2943+
runnerId,
2944+
});
2945+
}
2946+
case "retry": {
2947+
const retryAt = new Date(retryResult.settings.timestamp);
30002948

3001-
const run = await prisma.taskRun.update({
3002-
where: {
3003-
id: runId,
3004-
},
3005-
data: {
3006-
status: "RETRYING_AFTER_FAILURE",
3007-
},
3008-
include: {
3009-
runtimeEnvironment: {
2949+
const run = await prisma.taskRun.update({
2950+
where: {
2951+
id: runId,
2952+
},
2953+
data: {
2954+
status: "RETRYING_AFTER_FAILURE",
2955+
machinePreset: retryResult.machine,
2956+
},
30102957
include: {
3011-
project: true,
3012-
organization: true,
3013-
orgMember: true,
2958+
runtimeEnvironment: {
2959+
include: {
2960+
project: true,
2961+
organization: true,
2962+
orgMember: true,
2963+
},
2964+
},
30142965
},
3015-
},
3016-
},
3017-
});
2966+
});
30182967

3019-
const nextAttemptNumber =
3020-
latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1;
2968+
const nextAttemptNumber =
2969+
latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1;
30212970

3022-
this.eventBus.emit("runRetryScheduled", {
3023-
time: failedAt,
3024-
run: {
3025-
id: run.id,
3026-
friendlyId: run.friendlyId,
3027-
attemptNumber: nextAttemptNumber,
3028-
queue: run.queue,
3029-
taskIdentifier: run.taskIdentifier,
3030-
traceContext: run.traceContext as Record<string, string | undefined>,
3031-
baseCostInCents: run.baseCostInCents,
3032-
spanId: run.spanId,
3033-
},
3034-
organization: {
3035-
id: run.runtimeEnvironment.organizationId,
3036-
},
3037-
environment: run.runtimeEnvironment,
3038-
retryAt,
3039-
});
2971+
this.eventBus.emit("runRetryScheduled", {
2972+
time: failedAt,
2973+
run: {
2974+
id: run.id,
2975+
friendlyId: run.friendlyId,
2976+
attemptNumber: nextAttemptNumber,
2977+
queue: run.queue,
2978+
taskIdentifier: run.taskIdentifier,
2979+
traceContext: run.traceContext as Record<string, string | undefined>,
2980+
baseCostInCents: run.baseCostInCents,
2981+
spanId: run.spanId,
2982+
},
2983+
organization: {
2984+
id: run.runtimeEnvironment.organizationId,
2985+
},
2986+
environment: run.runtimeEnvironment,
2987+
retryAt,
2988+
});
2989+
2990+
//if it's a long delay and we support checkpointing, put it back in the queue
2991+
if (
2992+
forceRequeue ||
2993+
retryResult.method === "queue" ||
2994+
(this.options.retryWarmStartThresholdMs !== undefined &&
2995+
retryResult.settings.delay >= this.options.retryWarmStartThresholdMs)
2996+
) {
2997+
//we nack the message, requeuing it for later
2998+
const nackResult = await this.#tryNackAndRequeue({
2999+
run,
3000+
environment: run.runtimeEnvironment,
3001+
orgId: run.runtimeEnvironment.organizationId,
3002+
timestamp: retryAt.getTime(),
3003+
error: {
3004+
type: "INTERNAL_ERROR",
3005+
code: "TASK_RUN_DEQUEUED_MAX_RETRIES",
3006+
message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`,
3007+
},
3008+
tx: prisma,
3009+
});
30403010

3041-
//todo anything special for DEV? Ideally not.
3011+
if (!nackResult.wasRequeued) {
3012+
return {
3013+
attemptStatus: "RUN_FINISHED",
3014+
...nackResult,
3015+
};
3016+
} else {
3017+
return { attemptStatus: "RETRY_QUEUED", ...nackResult };
3018+
}
3019+
}
30423020

3043-
//if it's a long delay and we support checkpointing, put it back in the queue
3044-
if (
3045-
forceRequeue ||
3046-
(this.options.retryWarmStartThresholdMs !== undefined &&
3047-
completion.retry.delay >= this.options.retryWarmStartThresholdMs)
3048-
) {
3049-
//we nack the message, requeuing it for later
3050-
const nackResult = await this.#tryNackAndRequeue({
3051-
run,
3052-
environment: run.runtimeEnvironment,
3053-
orgId: run.runtimeEnvironment.organizationId,
3054-
timestamp: retryAt.getTime(),
3055-
error: {
3056-
type: "INTERNAL_ERROR",
3057-
code: "TASK_RUN_DEQUEUED_MAX_RETRIES",
3058-
message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`,
3059-
},
3060-
tx: prisma,
3061-
});
3021+
//it will continue running because the retry delay is short
3022+
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
3023+
run,
3024+
snapshot: {
3025+
executionStatus: "PENDING_EXECUTING",
3026+
description: "Attempt failed with a short delay, starting a new attempt",
3027+
},
3028+
environmentId: latestSnapshot.environmentId,
3029+
environmentType: latestSnapshot.environmentType,
3030+
workerId,
3031+
runnerId,
3032+
});
3033+
//the worker can fetch the latest snapshot and should create a new attempt
3034+
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
30623035

3063-
if (!nackResult.wasRequeued) {
30643036
return {
3065-
attemptStatus: "RUN_FINISHED",
3066-
...nackResult,
3037+
attemptStatus: "RETRY_IMMEDIATELY",
3038+
...executionResultFromSnapshot(newSnapshot),
30673039
};
3068-
} else {
3069-
return { attemptStatus: "RETRY_QUEUED", ...nackResult };
30703040
}
30713041
}
3072-
3073-
//it will continue running because the retry delay is short
3074-
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
3075-
run,
3076-
snapshot: {
3077-
executionStatus: "PENDING_EXECUTING",
3078-
description: "Attempt failed with a short delay, starting a new attempt",
3079-
},
3080-
environmentId: latestSnapshot.environmentId,
3081-
environmentType: latestSnapshot.environmentType,
3082-
workerId,
3083-
runnerId,
3084-
});
3085-
//the worker can fetch the latest snapshot and should create a new attempt
3086-
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
3087-
3088-
return {
3089-
attemptStatus: "RETRY_IMMEDIATELY",
3090-
...executionResultFromSnapshot(newSnapshot),
3091-
};
30923042
});
30933043
});
30943044
}

0 commit comments

Comments
 (0)