Skip to content

Commit 997c183

Browse files
authored
Merge branch 'main' into docs/improve-env-var-guide
2 parents 577a4bf + 892bed8 commit 997c183

File tree

35 files changed

+824
-159
lines changed

35 files changed

+824
-159
lines changed

.changeset/tiny-carrots-rest.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
---
4+
5+
Prevent uncaught errors in the `onSuccess`, `onComplete`, and `onFailure` lifecycle hooks from failing attempts/runs.
6+
7+
Deprecated the `onStart` lifecycle hook (which only fires before the `run` function on the first attempt). Replaced with `onStartAttempt` that fires before the run function on every attempt:
8+
9+
```ts
10+
export const taskWithOnStartAttempt = task({
11+
id: "task-with-on-start-attempt",
12+
onStartAttempt: async ({ payload, ctx }) => {
13+
//...
14+
},
15+
run: async (payload: any, { ctx }) => {
16+
//...
17+
},
18+
});
19+
20+
// Default a global lifecycle hook using tasks
21+
tasks.onStartAttempt(({ ctx, payload, task }) => {
22+
console.log(
23+
`Run ${ctx.run.id} started on task ${task} attempt ${ctx.run.attempt.number}`,
24+
ctx.run
25+
);
26+
});
27+
```
28+
29+
If you want to execute code before just the first attempt, you can use the `onStartAttempt` function and check `ctx.run.attempt.number === 1`:
30+
31+
```ts /trigger/on-start-attempt.ts
32+
export const taskWithOnStartAttempt = task({
33+
id: "task-with-on-start-attempt",
34+
onStartAttempt: async ({ payload, ctx }) => {
35+
if (ctx.run.attempt.number === 1) {
36+
console.log("Run started on attempt 1", ctx.run);
37+
}
38+
},
39+
});
40+
```
41+

apps/webapp/app/components/runs/v3/RunIcon.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
9898
return <RunFunctionIcon className={cn(className, "text-text-dimmed")} />;
9999
case "task-hook-init":
100100
case "task-hook-onStart":
101+
case "task-hook-onStartAttempt":
101102
case "task-hook-onSuccess":
102103
case "task-hook-onWait":
103104
case "task-hook-onResume":

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,7 @@ const EnvironmentSchema = z
12361236
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
12371237
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
12381238
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1239+
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
12391240
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
12401241
})
12411242
.and(GithubAppEnvSchema)

apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { authenticateApiRequest } from "~/services/apiAuth.server";
99
import { logger } from "~/services/logger.server";
1010
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
1111
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
12+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1213

1314
const ParamsSchema = z.object({
1415
taskId: z.string(),
@@ -40,6 +41,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
4041
"trigger-version": triggerVersion,
4142
"x-trigger-span-parent-as-link": spanParentAsLink,
4243
"x-trigger-worker": isFromWorker,
44+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
4345
traceparent,
4446
tracestate,
4547
} = headers.data;
@@ -100,6 +102,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
100102
triggerVersion: triggerVersion ?? undefined,
101103
traceContext,
102104
spanParentAsLink: spanParentAsLink === 1,
105+
realtimeStreamsVersion: determineRealtimeStreamsVersion(realtimeStreamsVersion ?? undefined),
103106
});
104107

105108
if (!result) {

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { prisma } from "~/db.server";
1111
import { env } from "~/env.server";
1212
import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
14+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1415
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1516
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1617
import {
@@ -126,7 +127,9 @@ const { action, loader } = createActionApiRoute(
126127
traceContext,
127128
spanParentAsLink: spanParentAsLink === 1,
128129
oneTimeUseToken,
129-
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
130+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
131+
realtimeStreamsVersion ?? undefined
132+
),
130133
},
131134
engineVersion ?? undefined
132135
);

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from "~/v3/services/batchTriggerV3.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1818
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
19+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1920

2021
const { action, loader } = createActionApiRoute(
2122
{
@@ -69,6 +70,7 @@ const { action, loader } = createActionApiRoute(
6970
"x-trigger-client": triggerClient,
7071
"x-trigger-engine-version": engineVersion,
7172
"batch-processing-strategy": batchProcessingStrategy,
73+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
7274
traceparent,
7375
tracestate,
7476
} = headers;
@@ -107,6 +109,9 @@ const { action, loader } = createActionApiRoute(
107109
traceContext,
108110
spanParentAsLink: spanParentAsLink === 1,
109111
oneTimeUseToken,
112+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
113+
realtimeStreamsVersion ?? undefined
114+
),
110115
});
111116

112117
const $responseHeaders = await responseHeaders(

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { ServiceValidationError } from "~/v3/services/baseService.server";
1818
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
1919
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
2020
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
21+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
2122

2223
const { action, loader } = createActionApiRoute(
2324
{
@@ -59,6 +60,7 @@ const { action, loader } = createActionApiRoute(
5960
"x-trigger-engine-version": engineVersion,
6061
"batch-processing-strategy": batchProcessingStrategy,
6162
"x-trigger-request-idempotency-key": requestIdempotencyKey,
63+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
6264
traceparent,
6365
tracestate,
6466
} = headers;
@@ -119,6 +121,9 @@ const { action, loader } = createActionApiRoute(
119121
traceContext,
120122
spanParentAsLink: spanParentAsLink === 1,
121123
oneTimeUseToken,
124+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
125+
realtimeStreamsVersion ?? undefined
126+
),
122127
});
123128

124129
const $responseHeaders = await responseHeaders(

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,79 @@
1+
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
12
import { z } from "zod";
23
import { $replica } from "~/db.server";
34
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
45
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
57

68
const ParamsSchema = z.object({
79
runId: z.string(),
810
streamId: z.string(),
911
});
1012

13+
// Plain action for backwards compatibility with older clients that don't send auth headers
14+
export async function action({ request, params }: ActionFunctionArgs) {
15+
const parsedParams = ParamsSchema.safeParse(params);
16+
17+
if (!parsedParams.success) {
18+
return new Response("Invalid parameters", { status: 400 });
19+
}
20+
21+
const { runId, streamId } = parsedParams.data;
22+
23+
// Look up the run without environment scoping for backwards compatibility
24+
const run = await $replica.taskRun.findFirst({
25+
where: {
26+
friendlyId: runId,
27+
},
28+
select: {
29+
id: true,
30+
friendlyId: true,
31+
runtimeEnvironment: {
32+
include: {
33+
project: true,
34+
organization: true,
35+
orgMember: true,
36+
},
37+
},
38+
},
39+
});
40+
41+
if (!run) {
42+
return new Response("Run not found", { status: 404 });
43+
}
44+
45+
// Extract client ID from header, default to "default" if not provided
46+
const clientId = request.headers.get("X-Client-Id") || "default";
47+
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
48+
49+
if (!request.body) {
50+
return new Response("No body provided", { status: 400 });
51+
}
52+
53+
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
54+
let resumeFromChunkNumber: number | undefined = undefined;
55+
if (resumeFromChunk) {
56+
const parsed = parseInt(resumeFromChunk, 10);
57+
if (isNaN(parsed) || parsed < 0) {
58+
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
59+
status: 400,
60+
});
61+
}
62+
resumeFromChunkNumber = parsed;
63+
}
64+
65+
// The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment
66+
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion);
67+
68+
return realtimeStream.ingestData(
69+
request.body,
70+
run.friendlyId,
71+
streamId,
72+
clientId,
73+
resumeFromChunkNumber
74+
);
75+
}
76+
1177
export const loader = createLoaderApiRoute(
1278
{
1379
params: ParamsSchema,

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export type BatchTriggerTaskServiceOptions = {
4747
traceContext?: Record<string, string | undefined | Record<string, string | undefined>>;
4848
spanParentAsLink?: boolean;
4949
oneTimeUseToken?: string;
50+
realtimeStreamsVersion?: "v1" | "v2";
5051
};
5152

5253
/**
@@ -708,6 +709,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
708709
batchIndex: currentIndex,
709710
skipChecks: true, // Skip entitlement and queue checks since we already validated at batch/chunk level
710711
planType, // Pass planType from batch-level entitlement check
712+
realtimeStreamsVersion: options?.realtimeStreamsVersion,
711713
},
712714
"V2"
713715
);

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ export function getRealtimeStreamInstance(
6060
}
6161
}
6262

63+
export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | "v2" {
64+
if (!streamVersion) {
65+
return env.REALTIME_STREAMS_DEFAULT_VERSION;
66+
}
67+
68+
if (
69+
streamVersion === "v2" &&
70+
env.REALTIME_STREAMS_S2_BASIN &&
71+
env.REALTIME_STREAMS_S2_ACCESS_TOKEN
72+
) {
73+
return "v2";
74+
}
75+
76+
return "v1";
77+
}
78+
6379
const s2RealtimeStreamsCache = singleton(
6480
"s2RealtimeStreamsCache",
6581
initializeS2RealtimeStreamsCache

0 commit comments

Comments
 (0)