Skip to content

Commit 0642c9c

Browse files
committed
More changes to blocking to support continuing after idempotent completed runs
1 parent 3d4068a commit 0642c9c

File tree

4 files changed

+119
-22
lines changed

4 files changed

+119
-22
lines changed

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
9090
} else {
9191
//We're using `andWait` so we need to block the parent run with a waitpoint
9292
if (
93-
existingRun.associatedWaitpoint?.status === "PENDING" &&
93+
existingRun.associatedWaitpoint &&
9494
body.options?.resumeParentOnCompletion &&
9595
body.options?.parentRunId
9696
) {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,6 +1593,29 @@ export class RunEngine {
15931593
return await this.runLock.lock([runId], 5000, async (signal) => {
15941594
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
15951595

1596+
//block the run with the waitpoints, returning how many waitpoints are pending
1597+
const insert = await prisma.$queryRaw<{ pending_count: number }[]>`
1598+
WITH inserted AS (
1599+
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt")
1600+
SELECT
1601+
gen_random_uuid(),
1602+
${runId},
1603+
w.id,
1604+
${projectId},
1605+
NOW(),
1606+
NOW()
1607+
FROM "Waitpoint" w
1608+
WHERE w.id IN (${Prisma.join(waitpointIds)})
1609+
ON CONFLICT DO NOTHING
1610+
RETURNING "waitpointId"
1611+
)
1612+
SELECT COUNT(*) as pending_count
1613+
FROM inserted i
1614+
JOIN "Waitpoint" w ON w.id = i."waitpointId"
1615+
WHERE w.status = 'PENDING';`;
1616+
1617+
const pendingCount = insert.at(0)?.pending_count ?? 0;
1618+
15961619
let newStatus: TaskRunExecutionStatus = "BLOCKED_BY_WAITPOINTS";
15971620
if (
15981621
snapshot.executionStatus === "EXECUTING" ||
@@ -1601,25 +1624,6 @@ export class RunEngine {
16011624
newStatus = "EXECUTING_WITH_WAITPOINTS";
16021625
}
16031626

1604-
const insertedBlockers = await prisma.$executeRaw`
1605-
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt")
1606-
SELECT
1607-
gen_random_uuid(),
1608-
${runId},
1609-
w.id,
1610-
${projectId},
1611-
NOW(),
1612-
NOW()
1613-
FROM "Waitpoint" w
1614-
WHERE w.id IN (${Prisma.join(waitpointIds)})
1615-
AND w.status = 'PENDING'
1616-
ON CONFLICT DO NOTHING;`;
1617-
1618-
//if no runs were blocked we don't want to do anything more
1619-
if (insertedBlockers === 0) {
1620-
return snapshot;
1621-
}
1622-
16231627
//if the state has changed, create a new snapshot
16241628
if (newStatus !== snapshot.executionStatus) {
16251629
snapshot = await this.#createExecutionSnapshot(prisma, {
@@ -1646,6 +1650,19 @@ export class RunEngine {
16461650
}
16471651
}
16481652

1653+
//no pending waitpoint, schedule unblocking the run
1654+
//debounce if we're rapidly adding waitpoints
1655+
if (pendingCount === 0) {
1656+
await this.worker.enqueue({
1657+
//this will debounce the call
1658+
id: `continueRunIfUnblocked:${runId}`,
1659+
job: "continueRunIfUnblocked",
1660+
payload: { runId: runId },
1661+
//100ms in the future
1662+
availableAt: new Date(Date.now() + 100),
1663+
});
1664+
}
1665+
16491666
return snapshot;
16501667
});
16511668
}

references/hello-world/src/trigger/example.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ export const batchParentTask = task({
6060
export const childTask = task({
6161
id: "child",
6262
run: async (
63-
{ message, failureChance = 0.3 }: { message?: string; failureChance?: number },
63+
{
64+
message,
65+
failureChance = 0.3,
66+
duration = 3_000,
67+
}: { message?: string; failureChance?: number; duration?: number },
6468
{ ctx }
6569
) => {
6670
logger.info("Hello, world from the child", { message, failureChance });
@@ -69,7 +73,7 @@ export const childTask = task({
6973
throw new Error("Random error at start");
7074
}
7175

72-
await setTimeout(3_000);
76+
await setTimeout(duration);
7377

7478
if (Math.random() < failureChance) {
7579
throw new Error("Random error at end");
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { batch, idempotencyKeys, logger, task, timeout, usage, wait } from "@trigger.dev/sdk/v3";
2+
import { setTimeout } from "timers/promises";
3+
import { childTask } from "./example.js";
4+
5+
export const idempotency = task({
6+
id: "idempotency",
7+
run: async (payload: any, { ctx }) => {
8+
logger.log("Hello, world from the parent", { payload });
9+
10+
const child1Key = await idempotencyKeys.create("a", { scope: "global" });
11+
12+
const child1 = await childTask.triggerAndWait(
13+
{ message: "Hello, world!", duration: 10_000 },
14+
{ idempotencyKey: child1Key, idempotencyKeyTTL: "60s" }
15+
);
16+
logger.log("Child 1", { child1 });
17+
18+
ctx.attempt.id;
19+
20+
const child2 = await childTask.triggerAndWait(
21+
{ message: "Hello, world!", duration: 10_000 },
22+
{ idempotencyKey: child1Key, idempotencyKeyTTL: "60s" }
23+
);
24+
logger.log("Child 2", { child2 });
25+
26+
// const results = await childTask.batchTriggerAndWait([
27+
// {
28+
// payload: { message: "Hello, world!" },
29+
// //@ts-ignore
30+
// options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" },
31+
// },
32+
// {
33+
// payload: { message: "Hello, world 2!" },
34+
// //@ts-ignore
35+
// options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" },
36+
// },
37+
// ]);
38+
// logger.log("Results", { results });
39+
40+
// const results2 = await batch.triggerAndWait<typeof childTask>([
41+
// {
42+
// id: "child",
43+
// payload: { message: "Hello, world !" },
44+
// //@ts-ignore
45+
// options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" },
46+
// },
47+
// {
48+
// id: "child",
49+
// payload: { message: "Hello, world 2!" },
50+
// //@ts-ignore
51+
// options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" },
52+
// },
53+
// ]);
54+
// logger.log("Results 2", { results2 });
55+
56+
// const results3 = await batch.triggerByTask([
57+
// {
58+
// task: childTask,
59+
// payload: { message: "Hello, world !" },
60+
// options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" },
61+
// },
62+
// {
63+
// task: childTask,
64+
// payload: { message: "Hello, world 2!" },
65+
// options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" },
66+
// },
67+
// ]);
68+
// logger.log("Results 3", { results3 });
69+
70+
// const results4 = await batch.triggerByTaskAndWait([
71+
// { task: childTask, payload: { message: "Hello, world !" } },
72+
// { task: childTask, payload: { message: "Hello, world 2!" } },
73+
// ]);
74+
// logger.log("Results 4", { results4 });
75+
},
76+
});

0 commit comments

Comments
 (0)