Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ const EnvironmentSchema = z
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
})
.and(GithubAppEnvSchema)
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";

const ParamsSchema = z.object({
taskId: z.string(),
Expand Down Expand Up @@ -40,6 +41,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
traceparent,
tracestate,
} = headers.data;
Expand Down Expand Up @@ -100,6 +102,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
realtimeStreamsVersion: determineRealtimeStreamsVersion(realtimeStreamsVersion ?? undefined),
});

if (!result) {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import {
Expand Down Expand Up @@ -126,7 +127,9 @@ const { action, loader } = createActionApiRoute(
traceContext,
spanParentAsLink: spanParentAsLink === 1,
oneTimeUseToken,
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
},
engineVersion ?? undefined
);
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";

const { action, loader } = createActionApiRoute(
{
Expand Down Expand Up @@ -69,6 +70,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-client": triggerClient,
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -107,6 +109,9 @@ const { action, loader } = createActionApiRoute(
traceContext,
spanParentAsLink: spanParentAsLink === 1,
oneTimeUseToken,
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
});

const $responseHeaders = await responseHeaders(
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";

const { action, loader } = createActionApiRoute(
{
Expand Down Expand Up @@ -59,6 +60,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -119,6 +121,9 @@ const { action, loader } = createActionApiRoute(
traceContext,
spanParentAsLink: spanParentAsLink === 1,
oneTimeUseToken,
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
});

const $responseHeaders = await responseHeaders(
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export type BatchTriggerTaskServiceOptions = {
traceContext?: Record<string, string | undefined | Record<string, string | undefined>>;
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
};

/**
Expand Down Expand Up @@ -708,6 +709,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
batchIndex: currentIndex,
skipChecks: true, // Skip entitlement and queue checks since we already validated at batch/chunk level
planType, // Pass planType from batch-level entitlement check
realtimeStreamsVersion: options?.realtimeStreamsVersion,
},
"V2"
);
Expand Down
16 changes: 16 additions & 0 deletions apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ export function getRealtimeStreamInstance(
}
}

export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | "v2" {
if (!streamVersion) {
return env.REALTIME_STREAMS_DEFAULT_VERSION;
}

if (
streamVersion === "v2" &&
env.REALTIME_STREAMS_S2_BASIN &&
env.REALTIME_STREAMS_S2_ACCESS_TOKEN
) {
return "v2";
}

return "v1";
}

const s2RealtimeStreamsCache = singleton(
"s2RealtimeStreamsCache",
initializeS2RealtimeStreamsCache
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export type BatchTriggerTaskServiceOptions = {
traceContext?: Record<string, string | undefined>;
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
};

type RunItemData = {
Expand Down Expand Up @@ -851,6 +852,7 @@ export class BatchTriggerV3Service extends BaseService {
batchId: batch.friendlyId,
skipChecks: true,
runFriendlyId: task.runId,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
}
);

Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { BaseService } from "./baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
import { type RunOptionsData } from "../testTask";
import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";

type OverrideOptions = {
environmentId?: string;
Expand Down Expand Up @@ -118,7 +119,9 @@ export class ReplayTaskRunService extends BaseService {
traceContext: {
traceparent: `00-${existingTaskRun.traceId}-${existingTaskRun.spanId}-01`,
},
realtimeStreamsVersion: existingTaskRun.realtimeStreamsVersion,
realtimeStreamsVersion: determineRealtimeStreamsVersion(
existingTaskRun.realtimeStreamsVersion
),
}
);

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export type TriggerTaskServiceOptions = {
overrideCreatedAt?: Date;
replayedFromTaskRunFriendlyId?: string;
planType?: string;
realtimeStreamsVersion?: string;
realtimeStreamsVersion?: "v1" | "v2";
};

export class OutOfEntitlementError extends Error {
Expand Down
14 changes: 9 additions & 5 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1213,13 +1213,17 @@ export class ApiClient {

headers[API_VERSION_HEADER_NAME] = API_VERSION;

const streamFlag = this.futureFlags.v2RealtimeStreams ?? true;

if (
this.futureFlags.v2RealtimeStreams ||
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "1" ||
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "true" ||
getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "1" ||
getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "true"
streamFlag === false ||
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "0" ||
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "false" ||
getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "0" ||
getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "false"
) {
headers["x-trigger-realtime-streams-version"] = "v1";
} else {
headers["x-trigger-realtime-streams-version"] = "v2";
}

Expand Down
21 changes: 5 additions & 16 deletions references/realtime-streams/src/app/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export async function triggerStreamTask(
redirectPath?: string,
useDurableStreams?: boolean
) {
const config = useDurableStreams
const config = !useDurableStreams
? {
future: {
v2RealtimeStreams: true,
v2RealtimeStreams: false,
},
}
: undefined;
Expand Down Expand Up @@ -45,20 +45,9 @@ export async function triggerStreamTask(

export async function triggerAIChatTask(messages: UIMessage[]) {
// Trigger the AI chat task
const handle = await tasks.trigger<typeof aiChatTask>(
"ai-chat",
{
messages,
},
{},
{
clientConfig: {
future: {
v2RealtimeStreams: true,
},
},
}
);
const handle = await tasks.trigger<typeof aiChatTask>("ai-chat", {
messages,
});

console.log("Triggered AI chat run:", handle.id);

Expand Down