diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index 44d7858596..822c10a810 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -1,13 +1,79 @@ +import { type ActionFunctionArgs } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; const ParamsSchema = z.object({ runId: z.string(), streamId: z.string(), }); +// Plain action for backwards compatibility with older clients that don't send auth headers +export async function action({ request, params }: ActionFunctionArgs) { + const parsedParams = ParamsSchema.safeParse(params); + + if (!parsedParams.success) { + return new Response("Invalid parameters", { status: 400 }); + } + + const { runId, streamId } = parsedParams.data; + + // Look up the run without environment scoping for backwards compatibility + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: runId, + }, + select: { + id: true, + friendlyId: true, + runtimeEnvironment: { + include: { + project: true, + organization: true, + orgMember: true, + }, + }, + }, + }); + + if (!run) { + return new Response("Run not found", { status: 404 }); + } + + // Extract client ID from header, default to "default" if not provided + const clientId = request.headers.get("X-Client-Id") || "default"; + const streamVersion = request.headers.get("X-Stream-Version") || "v1"; + + if (!request.body) { + return new Response("No body provided", { status: 400 }); + } + + const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); + let resumeFromChunkNumber: number | undefined = undefined; + if (resumeFromChunk) { + const parsed = parseInt(resumeFromChunk, 10); + if (isNaN(parsed) || parsed < 0) { + return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, { + status: 400, + }); + } + resumeFromChunkNumber = parsed; + } + + // The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment + const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion); + + return realtimeStream.ingestData( + request.body, + run.friendlyId, + streamId, + clientId, + resumeFromChunkNumber + ); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema,