Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,79 @@
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";

const ParamsSchema = z.object({
runId: z.string(),
streamId: z.string(),
});

// Plain action for backwards compatibility with older clients that don't send auth headers
export async function action({ request, params }: ActionFunctionArgs) {
const parsedParams = ParamsSchema.safeParse(params);

if (!parsedParams.success) {
return new Response("Invalid parameters", { status: 400 });
}

const { runId, streamId } = parsedParams.data;

// Look up the run without environment scoping for backwards compatibility
const run = await $replica.taskRun.findFirst({
where: {
friendlyId: runId,
},
select: {
id: true,
friendlyId: true,
runtimeEnvironment: {
include: {
project: true,
organization: true,
orgMember: true,
},
},
},
});

if (!run) {
return new Response("Run not found", { status: 404 });
}

// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
let resumeFromChunkNumber: number | undefined = undefined;
if (resumeFromChunk) {
const parsed = parseInt(resumeFromChunk, 10);
if (isNaN(parsed) || parsed < 0) {
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
status: 400,
});
}
resumeFromChunkNumber = parsed;
}

// The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion);

return realtimeStream.ingestData(
request.body,
run.friendlyId,
streamId,
clientId,
resumeFromChunkNumber
);
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
Expand Down