From 1289a3150d3791a3527aeb6b7823d8ea415177e3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 11 Nov 2025 18:50:25 +0000 Subject: [PATCH] fix(streams): buffer v1 streams on read to prevent split chunks --- .../realtime/redisRealtimeStreams.server.ts | 58 +++++++--- apps/webapp/test/redisRealtimeStreams.test.ts | 104 ++++++++++++++++++ .../realtime-streams/src/trigger/streams.ts | 1 - 3 files changed, 149 insertions(+), 14 deletions(-) diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 9db3809b52..b5c8c57322 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -204,20 +204,52 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { }, }) .pipeThrough( - // Transform 1: Split data content by newlines, preserving metadata - new TransformStream({ - transform(chunk, controller) { - if (chunk.type === "ping") { - controller.enqueue(chunk); - } else if (chunk.type === "data" || chunk.type === "legacy-data") { - // Split data by newlines, emit separate chunks with same metadata - const lines = chunk.data.split("\n").filter((line) => line.trim().length > 0); - for (const line of lines) { - controller.enqueue({ ...chunk, line }); + // Transform 1: Buffer partial lines across Redis entries + (() => { + let buffer = ""; + let lastRedisId = "0"; + + return new TransformStream({ + transform(chunk, controller) { + if (chunk.type === "ping") { + controller.enqueue(chunk as any); + } else if (chunk.type === "data" || chunk.type === "legacy-data") { + // Buffer partial lines: accumulate until we see newlines + buffer += chunk.data; + + // Split on newlines + const lines = buffer.split("\n"); + + // The last element might be incomplete, hold it back in buffer + buffer = lines.pop() || ""; + + // Emit complete lines with the Redis ID of the chunk that completed them + for (const line of lines) { + if (line.trim().length > 0) { + controller.enqueue({ + ...chunk, + line, + }); + } + } + + // Update last Redis ID for next iteration + lastRedisId = chunk.redisId; } - } - }, - }) + }, + flush(controller) { + // On stream end, emit any leftover buffered text + if (buffer.trim().length > 0) { + controller.enqueue({ + type: "data", + redisId: lastRedisId, + data: "", + line: buffer.trim(), + }); + } + }, + }); + })() ) .pipeThrough( // Transform 2: Format as SSE diff --git a/apps/webapp/test/redisRealtimeStreams.test.ts b/apps/webapp/test/redisRealtimeStreams.test.ts index e441e4ace6..0511754393 100644 --- a/apps/webapp/test/redisRealtimeStreams.test.ts +++ b/apps/webapp/test/redisRealtimeStreams.test.ts @@ -1417,4 +1417,108 @@ describe("RedisRealtimeStreams", () => { await redis.quit(); } ); + + redisTest( + "Should handle chunks split mid-line (regression test)", + { timeout: 30_000 }, + async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + const redisRealtimeStreams = new RedisRealtimeStreams({ + redis: redisOptions, + }); + + const runId = "run_split_test"; + const streamId = "test-split-stream"; + + // Simulate what happens in production: a JSON line split across multiple network chunks + // This reproduces the issue where we see partial chunks like: + // - "{\"timestamp\":" + // - "1762880245493,\"chunkIndex\":780,\"data\":\"Chunk 781/1000\"}" + const fullLine = JSON.stringify({ + timestamp: 1762880245493, + chunkIndex: 780, + data: "Chunk 781/1000", + }); + + // Split the line at an arbitrary position (in the middle of the JSON) + const splitPoint = 16; // Splits after '{"timestamp":' + const chunk1 = fullLine.substring(0, splitPoint); + const chunk2 = fullLine.substring(splitPoint); + + // Create a ReadableStream that sends split chunks + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(chunk1)); + controller.enqueue(encoder.encode(chunk2 + "\n")); // Add newline at end + controller.close(); + }, + }); + + // Ingest the split data + await redisRealtimeStreams.ingestData(stream, runId, streamId, "client1"); + + // Now consume the stream and verify we get the complete line, not split chunks + const abortController = new AbortController(); + const response = await redisRealtimeStreams.streamResponse( + new Request("http://localhost/test"), + runId, + streamId, + abortController.signal + ); + + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + let receivedData = ""; + + // Read all chunks from the response + const readTimeout = setTimeout(() => { + abortController.abort(); + }, 5000); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + receivedData += decoder.decode(value, { stream: true }); + + // Once we have data, we can stop + if (receivedData.includes("data: ")) { + break; + } + } + } finally { + clearTimeout(readTimeout); + abortController.abort(); + reader.releaseLock(); + } + + // Parse the SSE data + const lines = receivedData.split("\n").filter((line) => line.startsWith("data: ")); + + // We should receive exactly ONE complete line, not two partial lines + expect(lines.length).toBe(1); + + // Extract the data (remove "data: " prefix) + const dataLine = lines[0].substring(6); + + // Verify it's the complete, valid JSON + expect(dataLine).toBe(fullLine); + + // Verify it parses correctly as JSON + const parsed = JSON.parse(dataLine) as { + timestamp: number; + chunkIndex: number; + data: string; + }; + expect(parsed.timestamp).toBe(1762880245493); + expect(parsed.chunkIndex).toBe(780); + expect(parsed.data).toBe("Chunk 781/1000"); + + // Cleanup + await redis.del(`stream:${runId}:${streamId}`); + await redis.quit(); + } + ); }); diff --git a/references/realtime-streams/src/trigger/streams.ts b/references/realtime-streams/src/trigger/streams.ts index cfa64618ce..388ff960aa 100644 --- a/references/realtime-streams/src/trigger/streams.ts +++ b/references/realtime-streams/src/trigger/streams.ts @@ -822,7 +822,6 @@ const streamsStressTesterTask = task({ switch (payload.streamsVersion) { case "v1": { - assert.ok(chunks.length < 2000, "Expected less than 2000 chunks"); break; } case "v2": {