Skip to content

Commit a815bc2

Browse files
committed
Wait for token is working
1 parent c8a56b1 commit a815bc2

File tree

11 files changed

+285
-19
lines changed

11 files changed

+285
-19
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
CompleteWaitpointTokenRequestBody,
4+
CompleteWaitpointTokenResponseBody,
5+
conditionallyExportPacket,
6+
CreateWaitpointTokenResponseBody,
7+
stringifyIO,
8+
WaitForWaitpointTokenRequestBody,
9+
WaitForWaitpointTokenResponseBody,
10+
} from "@trigger.dev/core/v3";
11+
import { RunId, WaitpointId } from "@trigger.dev/core/v3/apps";
12+
import { z } from "zod";
13+
import { $replica } from "~/db.server";
14+
import { logger } from "~/services/logger.server";
15+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
16+
import { parseDelay } from "~/utils/delays";
17+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
18+
import { engine } from "~/v3/runEngine.server";
19+
20+
const { action } = createActionApiRoute(
21+
{
22+
params: z.object({
23+
runFriendlyId: z.string(),
24+
waitpointFriendlyId: z.string(),
25+
}),
26+
body: WaitForWaitpointTokenRequestBody,
27+
maxContentLength: 1024 * 10, // 10KB
28+
method: "POST",
29+
},
30+
async ({ authentication, body, params }) => {
31+
// Resume tokens are actually just waitpoints
32+
const waitpointId = WaitpointId.toId(params.waitpointFriendlyId);
33+
const runId = RunId.toId(params.runFriendlyId);
34+
35+
const timeout = await parseDelay(body.timeout);
36+
37+
try {
38+
//check permissions
39+
const waitpoint = await $replica.waitpoint.findFirst({
40+
where: {
41+
id: waitpointId,
42+
environmentId: authentication.environment.id,
43+
},
44+
});
45+
46+
if (!waitpoint) {
47+
throw json({ error: "Waitpoint not found" }, { status: 404 });
48+
}
49+
50+
const result = await engine.blockRunWithWaitpoint({
51+
runId,
52+
waitpoints: [waitpointId],
53+
environmentId: authentication.environment.id,
54+
projectId: authentication.environment.project.id,
55+
failAfter: timeout,
56+
});
57+
58+
return json<WaitForWaitpointTokenResponseBody>(
59+
{
60+
success: true,
61+
},
62+
{ status: 200 }
63+
);
64+
} catch (error) {
65+
logger.error("Failed to wait for waitpoint", { runId, waitpointId, error });
66+
throw json({ error: "Failed to wait for waitpoint token" }, { status: 500 });
67+
}
68+
}
69+
);
70+
71+
export { action };

internal-packages/run-engine/src/engine/index.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2067,8 +2067,8 @@ export class RunEngine {
20672067
id: `continueRunIfUnblocked:${runId}`,
20682068
job: "continueRunIfUnblocked",
20692069
payload: { runId: runId },
2070-
//100ms in the future
2071-
availableAt: new Date(Date.now() + 100),
2070+
//in the near future
2071+
availableAt: new Date(Date.now() + 50),
20722072
});
20732073
}
20742074

@@ -2099,13 +2099,15 @@ export class RunEngine {
20992099
});
21002100

21012101
if (affectedTaskRuns.length === 0) {
2102-
this.logger.warn(`No TaskRunWaitpoints found for waitpoint`, {
2102+
this.logger.warn(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
21032103
waitpointId: id,
21042104
});
21052105
}
21062106

21072107
// 2. Update the waitpoint to completed (only if it's pending)
2108-
const waitpoint = await tx.waitpoint
2108+
let waitpoint: Waitpoint | null = null;
2109+
try {
2110+
waitpoint = await tx.waitpoint
21092111
.update({
21102112
where: { id, status: "PENDING" },
21112113
data: {
@@ -2116,19 +2118,21 @@ export class RunEngine {
21162118
outputIsError: output?.isError,
21172119
},
21182120
})
2119-
.catch(async (error) => {
2120-
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
2121-
return tx.waitpoint.findUnique({
2122-
where: { id },
2123-
});
2124-
}
2121+
} catch(error) {
2122+
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
2123+
waitpoint = await tx.waitpoint.findFirst({
2124+
where: { id },
2125+
});
2126+
} else {
2127+
this.logger.log('completeWaitpoint: error updating waitpoint:', {error});
21252128
throw error;
2126-
});
2129+
}
2130+
};
21272131

21282132
return { waitpoint, affectedTaskRuns };
21292133
},
21302134
(error) => {
2131-
this.logger.error(`Error completing waitpoint ${id}, retrying`, { error });
2135+
this.logger.error(`completeWaitpoint: Error completing waitpoint ${id}, retrying`, { error });
21322136
throw error;
21332137
}
21342138
);

packages/core/src/v3/apiClient/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import {
3232
UpdateMetadataRequestBody,
3333
UpdateMetadataResponseBody,
3434
UpdateScheduleOptions,
35+
WaitForWaitpointTokenRequestBody,
36+
WaitForWaitpointTokenResponseBody,
3537
} from "../schemas/index.js";
3638
import { taskContext } from "../task-context-api.js";
3739
import { AnyRunTypes, TriggerJwtOptions } from "../types/tasks.js";
@@ -669,6 +671,24 @@ export class ApiClient {
669671
);
670672
}
671673

674+
waitForWaitpointToken(
675+
runFriendlyId: string,
676+
waitpointFriendlyId: string,
677+
options?: WaitForWaitpointTokenRequestBody,
678+
requestOptions?: ZodFetchOptions
679+
) {
680+
return zodfetch(
681+
WaitForWaitpointTokenResponseBody,
682+
`${this.baseUrl}/api/v1/runs/${runFriendlyId}/waitpoints/tokens/${waitpointFriendlyId}/wait`,
683+
{
684+
method: "POST",
685+
headers: this.#getHeaders(false),
686+
body: JSON.stringify(options ?? {}),
687+
},
688+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
689+
);
690+
}
691+
672692
subscribeToRun<TRunTypes extends AnyRunTypes>(
673693
runId: string,
674694
options?: {

packages/core/src/v3/runtime/index.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import {
44
BatchTaskRunExecutionResult,
55
TaskRunContext,
66
TaskRunExecutionResult,
7+
WaitForWaitpointTokenRequestBody,
8+
WaitpointTokenResult,
79
} from "../schemas/index.js";
810
import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js";
911
import { type RuntimeManager } from "./manager.js";
@@ -41,6 +43,15 @@ export class RuntimeAPI {
4143
return usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params));
4244
}
4345

46+
public waitForToken(
47+
waitpointFriendlyId: string,
48+
options?: WaitForWaitpointTokenRequestBody
49+
): Promise<WaitpointTokenResult> {
50+
return usage.pauseAsync(() =>
51+
this.#getRuntimeManager().waitForToken(waitpointFriendlyId, options)
52+
);
53+
}
54+
4455
public waitForBatch(params: {
4556
id: string;
4657
runCount: number;

packages/core/src/v3/runtime/managedRuntimeManager.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
TaskRunExecutionResult,
77
TaskRunFailedExecutionResult,
88
TaskRunSuccessfulExecutionResult,
9+
WaitForWaitpointTokenRequestBody,
10+
WaitpointTokenResult,
911
} from "../schemas/index.js";
1012
import { ExecutorToWorkerProcessConnection } from "../zodIpc.js";
1113
import { RuntimeManager } from "./manager.js";
@@ -92,6 +94,23 @@ export class ManagedRuntimeManager implements RuntimeManager {
9294
};
9395
}
9496

97+
async waitForToken(
98+
waitpointFriendlyId: string,
99+
options?: WaitForWaitpointTokenRequestBody
100+
): Promise<WaitpointTokenResult> {
101+
const promise = new Promise<CompletedWaitpoint>((resolve) => {
102+
this.resolversByWaitId.set(waitpointFriendlyId, resolve);
103+
});
104+
105+
const waitpoint = await promise;
106+
107+
return {
108+
ok: !waitpoint.outputIsError,
109+
output: waitpoint.output,
110+
outputType: waitpoint.outputType,
111+
};
112+
}
113+
95114
associateWaitWithWaitpoint(waitId: string, waitpointId: string) {
96115
this.resolversByWaitpoint.set(waitpointId, waitId);
97116
}
@@ -115,6 +134,8 @@ export class ManagedRuntimeManager implements RuntimeManager {
115134
//no waitpoint resolves associated with batch completions
116135
//a batch completion isn't when all the runs from a batch are completed
117136
return;
137+
} else if (waitpoint.type === "MANUAL") {
138+
waitId = waitpoint.friendlyId;
118139
} else {
119140
waitId = this.resolversByWaitpoint.get(waitpoint.id);
120141
}

packages/core/src/v3/runtime/manager.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import {
22
BatchTaskRunExecutionResult,
33
TaskRunContext,
44
TaskRunExecutionResult,
5+
WaitForWaitpointTokenRequestBody,
6+
WaitpointTokenResult,
57
} from "../schemas/index.js";
68

79
export interface RuntimeManager {
@@ -14,4 +16,8 @@ export interface RuntimeManager {
1416
runCount: number;
1517
ctx: TaskRunContext;
1618
}): Promise<BatchTaskRunExecutionResult>;
19+
waitForToken(
20+
waitpointFriendlyId: string,
21+
options?: WaitForWaitpointTokenRequestBody
22+
): Promise<WaitpointTokenResult>;
1723
}

packages/core/src/v3/runtime/noopRuntimeManager.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import {
33
TaskRunContext,
44
TaskRunErrorCodes,
55
TaskRunExecutionResult,
6+
WaitForWaitpointTokenRequestBody,
7+
WaitpointTokenResult,
68
} from "../schemas/index.js";
79
import { RuntimeManager } from "./manager.js";
810

@@ -40,4 +42,14 @@ export class NoopRuntimeManager implements RuntimeManager {
4042
items: [],
4143
});
4244
}
45+
46+
waitForToken(
47+
waitpointFriendlyId: string,
48+
options?: WaitForWaitpointTokenRequestBody
49+
): Promise<WaitpointTokenResult> {
50+
return Promise.resolve({
51+
ok: true,
52+
outputType: "application/json",
53+
});
54+
}
4355
}

packages/core/src/v3/schemas/api.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,22 @@ export const CompleteWaitpointTokenResponseBody = z.object({
928928
});
929929
export type CompleteWaitpointTokenResponseBody = z.infer<typeof CompleteWaitpointTokenResponseBody>;
930930

931+
export const WaitForWaitpointTokenRequestBody = z.object({
932+
/**
933+
* The maximum amount of time to wait for the token to be completed.
934+
* If this is exceeded, the waitpoint will timeout and will return `ok` of `false`.
935+
*
936+
* You can pass a `Date` object, or a string in this format: "30s", "1m", "2h", "3d", "4w".
937+
*/
938+
timeout: TimePeriod.optional(),
939+
});
940+
export type WaitForWaitpointTokenRequestBody = z.infer<typeof WaitForWaitpointTokenRequestBody>;
941+
942+
export const WaitForWaitpointTokenResponseBody = z.object({
943+
success: z.boolean(),
944+
});
945+
export type WaitForWaitpointTokenResponseBody = z.infer<typeof WaitForWaitpointTokenResponseBody>;
946+
931947
export const WAITPOINT_TIMEOUT_ERROR_CODE = "TRIGGER_WAITPOINT_TIMEOUT";
932948

933949
export function isWaitpointOutputTimeout(output: string): boolean {

packages/core/src/v3/schemas/common.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,23 @@ export const BatchTaskRunExecutionResult = z.object({
385385

386386
export type BatchTaskRunExecutionResult = z.infer<typeof BatchTaskRunExecutionResult>;
387387

388+
export const WaitpointTokenResult = z.object({
389+
ok: z.boolean(),
390+
output: z.string().optional(),
391+
outputType: z.string().optional(),
392+
});
393+
export type WaitpointTokenResult = z.infer<typeof WaitpointTokenResult>;
394+
395+
export type WaitpointTokenTypedResult<T> =
396+
| {
397+
ok: true;
398+
output: T;
399+
}
400+
| {
401+
ok: false;
402+
error: string;
403+
};
404+
388405
export const SerializedError = z.object({
389406
message: z.string(),
390407
name: z.string().optional(),

0 commit comments

Comments
 (0)