From 8fdba723ef55995312d5baa3955dbf48ceee070d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 13 Nov 2025 13:38:30 +0000 Subject: [PATCH] feat(streams): make v2 streams the default when using 4.1.0+ if they are supported --- apps/webapp/app/env.server.ts | 1 + .../app/routes/api.v1.tasks.$taskId.batch.ts | 3 +++ .../routes/api.v1.tasks.$taskId.trigger.ts | 5 ++++- apps/webapp/app/routes/api.v1.tasks.batch.ts | 5 +++++ apps/webapp/app/routes/api.v2.tasks.batch.ts | 5 +++++ .../runEngine/services/batchTrigger.server.ts | 2 ++ .../realtime/v1StreamsGlobal.server.ts | 16 ++++++++++++++ .../app/v3/services/batchTriggerV3.server.ts | 2 ++ .../app/v3/services/replayTaskRun.server.ts | 5 ++++- .../app/v3/services/triggerTask.server.ts | 2 +- packages/core/src/v3/apiClient/index.ts | 14 ++++++++----- .../realtime-streams/src/app/actions.ts | 21 +++++-------------- 12 files changed, 57 insertions(+), 24 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 89b93f1012..207a2bcfab 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1236,6 +1236,7 @@ const EnvironmentSchema = z REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100), REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10), REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60), + REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"), WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000), }) .and(GithubAppEnvSchema) diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts index ec1968705f..2e8c5e9749 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts @@ -9,6 +9,7 @@ import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; const ParamsSchema = z.object({ taskId: z.string(), @@ -40,6 +41,7 @@ export async function action({ request, params }: ActionFunctionArgs) { "trigger-version": triggerVersion, "x-trigger-span-parent-as-link": spanParentAsLink, "x-trigger-worker": isFromWorker, + "x-trigger-realtime-streams-version": realtimeStreamsVersion, traceparent, tracestate, } = headers.data; @@ -100,6 +102,7 @@ export async function action({ request, params }: ActionFunctionArgs) { triggerVersion: triggerVersion ?? undefined, traceContext, spanParentAsLink: spanParentAsLink === 1, + realtimeStreamsVersion: determineRealtimeStreamsVersion(realtimeStreamsVersion ?? undefined), }); if (!result) { diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 4037daf693..2582e9df16 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -11,6 +11,7 @@ import { prisma } from "~/db.server"; import { env } from "~/env.server"; import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { @@ -126,7 +127,9 @@ const { action, loader } = createActionApiRoute( traceContext, spanParentAsLink: spanParentAsLink === 1, oneTimeUseToken, - realtimeStreamsVersion: realtimeStreamsVersion ?? undefined, + realtimeStreamsVersion: determineRealtimeStreamsVersion( + realtimeStreamsVersion ?? undefined + ), }, engineVersion ?? undefined ); diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index d26c55f1cb..d21d277c47 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -16,6 +16,7 @@ import { } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; const { action, loader } = createActionApiRoute( { @@ -69,6 +70,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-client": triggerClient, "x-trigger-engine-version": engineVersion, "batch-processing-strategy": batchProcessingStrategy, + "x-trigger-realtime-streams-version": realtimeStreamsVersion, traceparent, tracestate, } = headers; @@ -107,6 +109,9 @@ const { action, loader } = createActionApiRoute( traceContext, spanParentAsLink: spanParentAsLink === 1, oneTimeUseToken, + realtimeStreamsVersion: determineRealtimeStreamsVersion( + realtimeStreamsVersion ?? undefined + ), }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index 93715bbdd5..252439b7bf 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -18,6 +18,7 @@ import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; const { action, loader } = createActionApiRoute( { @@ -59,6 +60,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-engine-version": engineVersion, "batch-processing-strategy": batchProcessingStrategy, "x-trigger-request-idempotency-key": requestIdempotencyKey, + "x-trigger-realtime-streams-version": realtimeStreamsVersion, traceparent, tracestate, } = headers; @@ -119,6 +121,9 @@ const { action, loader } = createActionApiRoute( traceContext, spanParentAsLink: spanParentAsLink === 1, oneTimeUseToken, + realtimeStreamsVersion: determineRealtimeStreamsVersion( + realtimeStreamsVersion ?? undefined + ), }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 21893948d4..bd796f3062 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -47,6 +47,7 @@ export type BatchTriggerTaskServiceOptions = { traceContext?: Record>; spanParentAsLink?: boolean; oneTimeUseToken?: string; + realtimeStreamsVersion?: "v1" | "v2"; }; /** @@ -708,6 +709,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { batchIndex: currentIndex, skipChecks: true, // Skip entitlement and queue checks since we already validated at batch/chunk level planType, // Pass planType from batch-level entitlement check + realtimeStreamsVersion: options?.realtimeStreamsVersion, }, "V2" ); diff --git a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts index 34792cba03..6c008b107e 100644 --- a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts +++ b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts @@ -60,6 +60,22 @@ export function getRealtimeStreamInstance( } } +export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | "v2" { + if (!streamVersion) { + return env.REALTIME_STREAMS_DEFAULT_VERSION; + } + + if ( + streamVersion === "v2" && + env.REALTIME_STREAMS_S2_BASIN && + env.REALTIME_STREAMS_S2_ACCESS_TOKEN + ) { + return "v2"; + } + + return "v1"; +} + const s2RealtimeStreamsCache = singleton( "s2RealtimeStreamsCache", initializeS2RealtimeStreamsCache diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 150c2a6f00..eb0850484f 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -56,6 +56,7 @@ export type BatchTriggerTaskServiceOptions = { traceContext?: Record; spanParentAsLink?: boolean; oneTimeUseToken?: string; + realtimeStreamsVersion?: "v1" | "v2"; }; type RunItemData = { @@ -851,6 +852,7 @@ export class BatchTriggerV3Service extends BaseService { batchId: batch.friendlyId, skipChecks: true, runFriendlyId: task.runId, + realtimeStreamsVersion: options?.realtimeStreamsVersion, } ); diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 17a2f3721a..2345d209dd 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -11,6 +11,7 @@ import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; import { type RunOptionsData } from "../testTask"; import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization"; +import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server"; type OverrideOptions = { environmentId?: string; @@ -118,7 +119,9 @@ export class ReplayTaskRunService extends BaseService { traceContext: { traceparent: `00-${existingTaskRun.traceId}-${existingTaskRun.spanId}-01`, }, - realtimeStreamsVersion: existingTaskRun.realtimeStreamsVersion, + realtimeStreamsVersion: determineRealtimeStreamsVersion( + existingTaskRun.realtimeStreamsVersion + ), } ); diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 36dc721d23..f68b23832b 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -33,7 +33,7 @@ export type TriggerTaskServiceOptions = { overrideCreatedAt?: Date; replayedFromTaskRunFriendlyId?: string; planType?: string; - realtimeStreamsVersion?: string; + realtimeStreamsVersion?: "v1" | "v2"; }; export class OutOfEntitlementError extends Error { diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index fe3f513d98..b88de7680f 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1213,13 +1213,17 @@ export class ApiClient { headers[API_VERSION_HEADER_NAME] = API_VERSION; + const streamFlag = this.futureFlags.v2RealtimeStreams ?? true; + if ( - this.futureFlags.v2RealtimeStreams || - getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "1" || - getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "true" || - getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "1" || - getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "true" + streamFlag === false || + getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "0" || + getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "false" || + getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "0" || + getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "false" ) { + headers["x-trigger-realtime-streams-version"] = "v1"; + } else { headers["x-trigger-realtime-streams-version"] = "v2"; } diff --git a/references/realtime-streams/src/app/actions.ts b/references/realtime-streams/src/app/actions.ts index ae2794abb5..2c18d11e6c 100644 --- a/references/realtime-streams/src/app/actions.ts +++ b/references/realtime-streams/src/app/actions.ts @@ -11,10 +11,10 @@ export async function triggerStreamTask( redirectPath?: string, useDurableStreams?: boolean ) { - const config = useDurableStreams + const config = !useDurableStreams ? { future: { - v2RealtimeStreams: true, + v2RealtimeStreams: false, }, } : undefined; @@ -45,20 +45,9 @@ export async function triggerStreamTask( export async function triggerAIChatTask(messages: UIMessage[]) { // Trigger the AI chat task - const handle = await tasks.trigger( - "ai-chat", - { - messages, - }, - {}, - { - clientConfig: { - future: { - v2RealtimeStreams: true, - }, - }, - } - ); + const handle = await tasks.trigger("ai-chat", { + messages, + }); console.log("Triggered AI chat run:", handle.id);