Skip to content

Commit 03c485b

Browse files
committed
Waitpoint timeouts and idempotency expiry
1 parent b49f5a1 commit 03c485b

File tree

4 files changed

+290
-27
lines changed

4 files changed

+290
-27
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 101 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
TaskRunInternalError,
2424
TaskRunSuccessfulExecutionResult,
2525
WaitForDurationResult,
26+
WAITPOINT_TIMEOUT_ERROR_CODE,
2627
} from "@trigger.dev/core/v3";
2728
import {
2829
BatchId,
@@ -118,6 +119,13 @@ const workerCatalog = {
118119
}),
119120
visibilityTimeoutMs: 10_000,
120121
},
122+
timeoutWaitpoint: {
123+
schema: z.object({
124+
waitpointId: z.string(),
125+
timeout: z.coerce.date(),
126+
}),
127+
visibilityTimeoutMs: 10_000,
128+
},
121129
};
122130

123131
type EngineWorker = Worker<typeof workerCatalog>;
@@ -199,6 +207,12 @@ export class RunEngine {
199207
runId: payload.runId,
200208
});
201209
},
210+
timeoutWaitpoint: async ({ payload }) => {
211+
await this.#timeoutWaitpoint({
212+
waitpointId: payload.waitpointId,
213+
timeout: payload.timeout,
214+
});
215+
},
202216
},
203217
}).start();
204218

@@ -1735,10 +1749,14 @@ export class RunEngine {
17351749
environmentId,
17361750
projectId,
17371751
idempotencyKey,
1752+
idempotencyKeyExpiresAt,
1753+
timeout,
17381754
}: {
17391755
environmentId: string;
17401756
projectId: string;
17411757
idempotencyKey?: string;
1758+
idempotencyKeyExpiresAt?: Date;
1759+
timeout?: Date;
17421760
}): Promise<Waitpoint> {
17431761
const existingWaitpoint = idempotencyKey
17441762
? await this.prisma.waitpoint.findUnique({
@@ -1752,19 +1770,59 @@ export class RunEngine {
17521770
: undefined;
17531771

17541772
if (existingWaitpoint) {
1755-
return existingWaitpoint;
1773+
if (
1774+
existingWaitpoint.idempotencyKeyExpiresAt &&
1775+
new Date() > existingWaitpoint.idempotencyKeyExpiresAt
1776+
) {
1777+
//the idempotency key has expired
1778+
//remove the waitpoint idempotencyKey
1779+
await this.prisma.waitpoint.update({
1780+
where: {
1781+
id: existingWaitpoint.id,
1782+
},
1783+
data: {
1784+
idempotencyKey: nanoid(24),
1785+
inactiveIdempotencyKey: existingWaitpoint.idempotencyKey,
1786+
},
1787+
});
1788+
1789+
//let it fall through to create a new waitpoint
1790+
} else {
1791+
return existingWaitpoint;
1792+
}
17561793
}
17571794

1758-
return this.prisma.waitpoint.create({
1759-
data: {
1795+
const waitpoint = await this.prisma.waitpoint.upsert({
1796+
where: {
1797+
environmentId_idempotencyKey: {
1798+
environmentId,
1799+
idempotencyKey: idempotencyKey ?? nanoid(24),
1800+
},
1801+
},
1802+
create: {
17601803
...WaitpointId.generate(),
17611804
type: "MANUAL",
17621805
idempotencyKey: idempotencyKey ?? nanoid(24),
1806+
idempotencyKeyExpiresAt,
17631807
userProvidedIdempotencyKey: !!idempotencyKey,
17641808
environmentId,
17651809
projectId,
1810+
completedAfter: timeout,
17661811
},
1812+
update: {},
17671813
});
1814+
1815+
//schedule the timeout
1816+
if (timeout) {
1817+
await this.worker.enqueue({
1818+
id: `timeoutWaitpoint.${waitpoint.id}`,
1819+
job: "timeoutWaitpoint",
1820+
payload: { waitpointId: waitpoint.id, timeout },
1821+
availableAt: timeout,
1822+
});
1823+
}
1824+
1825+
return waitpoint;
17681826
}
17691827

17701828
/** This block a run with a BATCH waitpoint.
@@ -2032,14 +2090,6 @@ export class RunEngine {
20322090
isError: boolean;
20332091
};
20342092
}): Promise<Waitpoint> {
2035-
const waitpoint = await this.prisma.waitpoint.findUnique({
2036-
where: { id },
2037-
});
2038-
2039-
if (!waitpoint) {
2040-
throw new Error(`Waitpoint ${id} not found`);
2041-
}
2042-
20432093
const result = await $transaction(
20442094
this.prisma,
20452095
async (tx) => {
@@ -2051,23 +2101,32 @@ export class RunEngine {
20512101

20522102
if (affectedTaskRuns.length === 0) {
20532103
this.logger.warn(`No TaskRunWaitpoints found for waitpoint`, {
2054-
waitpoint,
2104+
waitpointId: id,
20552105
});
20562106
}
20572107

2058-
// 2. Update the waitpoint to completed
2059-
const updatedWaitpoint = await tx.waitpoint.update({
2060-
where: { id },
2061-
data: {
2062-
status: "COMPLETED",
2063-
completedAt: new Date(),
2064-
output: output?.value,
2065-
outputType: output?.type,
2066-
outputIsError: output?.isError,
2067-
},
2068-
});
2108+
// 2. Update the waitpoint to completed (only if it's pending)
2109+
const waitpoint = await tx.waitpoint
2110+
.update({
2111+
where: { id, status: "PENDING" },
2112+
data: {
2113+
status: "COMPLETED",
2114+
completedAt: new Date(),
2115+
output: output?.value,
2116+
outputType: output?.type,
2117+
outputIsError: output?.isError,
2118+
},
2119+
})
2120+
.catch(async (error) => {
2121+
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
2122+
return tx.waitpoint.findUnique({
2123+
where: { id },
2124+
});
2125+
}
2126+
throw error;
2127+
});
20692128

2070-
return { updatedWaitpoint, affectedTaskRuns };
2129+
return { waitpoint, affectedTaskRuns };
20712130
},
20722131
(error) => {
20732132
this.logger.error(`Error completing waitpoint ${id}, retrying`, { error });
@@ -2079,6 +2138,10 @@ export class RunEngine {
20792138
throw new Error(`Waitpoint couldn't be updated`);
20802139
}
20812140

2141+
if (!result.waitpoint) {
2142+
throw new Error(`Waitpoint ${id} not found`);
2143+
}
2144+
20822145
//schedule trying to continue the runs
20832146
for (const run of result.affectedTaskRuns) {
20842147
await this.worker.enqueue({
@@ -2100,7 +2163,7 @@ export class RunEngine {
21002163
}
21012164
}
21022165

2103-
return result.updatedWaitpoint;
2166+
return result.waitpoint;
21042167
}
21052168

21062169
async createCheckpoint({
@@ -3491,6 +3554,19 @@ export class RunEngine {
34913554
};
34923555
}
34933556

3557+
async #timeoutWaitpoint({ waitpointId, timeout }: { waitpointId: string; timeout: Date }) {
3558+
await this.completeWaitpoint({
3559+
id: waitpointId,
3560+
output: {
3561+
value: JSON.stringify({
3562+
code: WAITPOINT_TIMEOUT_ERROR_CODE,
3563+
message: `Waitpoint timed out at ${timeout.toISOString()}`,
3564+
}),
3565+
isError: true,
3566+
},
3567+
});
3568+
}
3569+
34943570
async #clearBlockingWaitpoints({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) {
34953571
const prisma = tx ?? this.prisma;
34963572
const deleted = await prisma.taskRunWaitpoint.deleteMany({

internal-packages/run-engine/src/engine/tests/waitpoints.test.ts

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";
1111
import { EventBusEventArgs } from "../eventBus.js";
12+
import { isWaitpointOutputTimeout } from "@trigger.dev/core/v3";
1213

1314
describe("RunEngine Waitpoints", () => {
1415
containerTest("waitForDuration", { timeout: 15_000 }, async ({ prisma, redisOptions }) => {
@@ -398,7 +399,7 @@ describe("RunEngine Waitpoints", () => {
398399
);
399400

400401
containerTest(
401-
"Manual waitpoint timeout",
402+
"Manual waitpoint failAfter",
402403
{ timeout: 15_000 },
403404
async ({ prisma, redisOptions }) => {
404405
//create environment
@@ -665,4 +666,155 @@ describe("RunEngine Waitpoints", () => {
665666
}
666667
}
667668
);
669+
670+
containerTest(
671+
"Create a Manual waitpoint and let it timeout",
672+
{ timeout: 15_000 },
673+
async ({ prisma, redisOptions }) => {
674+
//create environment
675+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
676+
677+
const engine = new RunEngine({
678+
prisma,
679+
worker: {
680+
redis: redisOptions,
681+
workers: 1,
682+
tasksPerWorker: 10,
683+
pollIntervalMs: 100,
684+
},
685+
queue: {
686+
redis: redisOptions,
687+
},
688+
runLock: {
689+
redis: redisOptions,
690+
},
691+
machines: {
692+
defaultMachine: "small-1x",
693+
machines: {
694+
"small-1x": {
695+
name: "small-1x" as const,
696+
cpu: 0.5,
697+
memory: 0.5,
698+
centsPerMs: 0.0001,
699+
},
700+
},
701+
baseCostInCents: 0.0001,
702+
},
703+
tracer: trace.getTracer("test", "0.0.0"),
704+
});
705+
706+
try {
707+
const taskIdentifier = "test-task";
708+
709+
//create background worker
710+
await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier);
711+
712+
//trigger the run
713+
const run = await engine.trigger(
714+
{
715+
number: 1,
716+
friendlyId: "run_p1234",
717+
environment: authenticatedEnvironment,
718+
taskIdentifier,
719+
payload: "{}",
720+
payloadType: "application/json",
721+
context: {},
722+
traceContext: {},
723+
traceId: "t12345",
724+
spanId: "s12345",
725+
masterQueue: "main",
726+
queueName: "task/test-task",
727+
isTest: false,
728+
tags: [],
729+
},
730+
prisma
731+
);
732+
733+
//dequeue the run
734+
const dequeued = await engine.dequeueFromMasterQueue({
735+
consumerId: "test_12345",
736+
masterQueue: run.masterQueue,
737+
maxRunCount: 10,
738+
});
739+
740+
//create an attempt
741+
const attemptResult = await engine.startRunAttempt({
742+
runId: dequeued[0].run.id,
743+
snapshotId: dequeued[0].snapshot.id,
744+
});
745+
expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING");
746+
747+
//create a manual waitpoint with timeout
748+
const timeout = new Date(Date.now() + 1_000);
749+
const waitpoint = await engine.createManualWaitpoint({
750+
environmentId: authenticatedEnvironment.id,
751+
projectId: authenticatedEnvironment.projectId,
752+
timeout,
753+
});
754+
expect(waitpoint.status).toBe("PENDING");
755+
expect(waitpoint.completedAfter).toStrictEqual(timeout);
756+
757+
//block the run
758+
await engine.blockRunWithWaitpoint({
759+
runId: run.id,
760+
waitpoints: waitpoint.id,
761+
environmentId: authenticatedEnvironment.id,
762+
projectId: authenticatedEnvironment.projectId,
763+
});
764+
765+
const executionData = await engine.getRunExecutionData({ runId: run.id });
766+
expect(executionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
767+
768+
//check there is a waitpoint blocking the parent run
769+
const runWaitpointBefore = await prisma.taskRunWaitpoint.findFirst({
770+
where: {
771+
taskRunId: run.id,
772+
},
773+
include: {
774+
waitpoint: true,
775+
},
776+
});
777+
expect(runWaitpointBefore?.waitpointId).toBe(waitpoint.id);
778+
779+
let event: EventBusEventArgs<"workerNotification">[0] | undefined = undefined;
780+
engine.eventBus.on("workerNotification", (result) => {
781+
event = result;
782+
});
783+
784+
await setTimeout(1_250);
785+
786+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
787+
expect(executionData2?.snapshot.executionStatus).toBe("EXECUTING");
788+
789+
assertNonNullable(event);
790+
const notificationEvent = event as EventBusEventArgs<"workerNotification">[0];
791+
expect(notificationEvent.run.id).toBe(run.id);
792+
793+
//check there are no waitpoints blocking the parent run
794+
const runWaitpoint = await prisma.taskRunWaitpoint.findFirst({
795+
where: {
796+
taskRunId: run.id,
797+
},
798+
include: {
799+
waitpoint: true,
800+
},
801+
});
802+
expect(runWaitpoint).toBeNull();
803+
804+
const waitpoint2 = await prisma.waitpoint.findUnique({
805+
where: {
806+
id: waitpoint.id,
807+
},
808+
});
809+
assertNonNullable(waitpoint2);
810+
expect(waitpoint2.status).toBe("COMPLETED");
811+
expect(waitpoint2.outputIsError).toBe(true);
812+
assertNonNullable(waitpoint2.output);
813+
const isTimeout = isWaitpointOutputTimeout(waitpoint2.output);
814+
expect(isTimeout).toBe(true);
815+
} finally {
816+
engine.quit();
817+
}
818+
}
819+
);
668820
});

0 commit comments

Comments
 (0)