Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,52 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
},
})
.pipeThrough(
// Transform 1: Split data content by newlines, preserving metadata
new TransformStream<StreamChunk, StreamChunk & { line?: string }>({
transform(chunk, controller) {
if (chunk.type === "ping") {
controller.enqueue(chunk);
} else if (chunk.type === "data" || chunk.type === "legacy-data") {
// Split data by newlines, emit separate chunks with same metadata
const lines = chunk.data.split("\n").filter((line) => line.trim().length > 0);
for (const line of lines) {
controller.enqueue({ ...chunk, line });
// Transform 1: Buffer partial lines across Redis entries
(() => {
let buffer = "";
let lastRedisId = "0";

return new TransformStream<StreamChunk, StreamChunk & { line: string }>({
transform(chunk, controller) {
if (chunk.type === "ping") {
controller.enqueue(chunk as any);
} else if (chunk.type === "data" || chunk.type === "legacy-data") {
// Buffer partial lines: accumulate until we see newlines
buffer += chunk.data;

// Split on newlines
const lines = buffer.split("\n");

// The last element might be incomplete, hold it back in buffer
buffer = lines.pop() || "";

// Emit complete lines with the Redis ID of the chunk that completed them
for (const line of lines) {
if (line.trim().length > 0) {
controller.enqueue({
...chunk,
line,
});
}
}

// Update last Redis ID for next iteration
lastRedisId = chunk.redisId;
}
}
},
})
},
flush(controller) {
// On stream end, emit any leftover buffered text
if (buffer.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: buffer.trim(),
});
Comment on lines +241 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't trim buffered payloads on flush.

Calling buffer.trim() here rewrites the payload we send to clients; any legitimate leading/trailing whitespace (indentation, formatting, plain text logs) on a last line with no newline now gets dropped, which is a regression from the previous behavior where we forwarded the bytes exactly as Redis stored them. Please preserve the original string and only trim for the emptiness check.

-              if (buffer.trim().length > 0) {
-                controller.enqueue({
-                  type: "data",
-                  redisId: lastRedisId,
-                  data: "",
-                  line: buffer.trim(),
-                });
-              }
+              const pendingLine = buffer;
+              if (pendingLine.trim().length > 0) {
+                controller.enqueue({
+                  type: "data",
+                  redisId: lastRedisId,
+                  data: "",
+                  line: pendingLine,
+                });
+              }
+              buffer = "";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// On stream end, emit any leftover buffered text
if (buffer.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: buffer.trim(),
});
// On stream end, emit any leftover buffered text
const pendingLine = buffer;
if (pendingLine.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: pendingLine,
});
}
buffer = "";
🤖 Prompt for AI Agents
In apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts around lines
241 to 248, the code trims the buffered payload before sending it which strips
legitimate leading/trailing whitespace; keep the existing emptiness check using
buffer.trim() but send the original buffer bytes to clients instead of
buffer.trim(). Concretely, leave the if (buffer.trim().length > 0) check as-is,
but set the emitted line data to buffer (unchanged) so we forward the exact
Redis-stored string.

}
},
});
})()
)
.pipeThrough(
// Transform 2: Format as SSE
Expand Down
104 changes: 104 additions & 0 deletions apps/webapp/test/redisRealtimeStreams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1417,4 +1417,108 @@ describe("RedisRealtimeStreams", () => {
await redis.quit();
}
);

redisTest(
"Should handle chunks split mid-line (regression test)",
{ timeout: 30_000 },
async ({ redisOptions }) => {
const redis = new Redis(redisOptions);
const redisRealtimeStreams = new RedisRealtimeStreams({
redis: redisOptions,
});

const runId = "run_split_test";
const streamId = "test-split-stream";

// Simulate what happens in production: a JSON line split across multiple network chunks
// This reproduces the issue where we see partial chunks like:
// - "{\"timestamp\":"
// - "1762880245493,\"chunkIndex\":780,\"data\":\"Chunk 781/1000\"}"
const fullLine = JSON.stringify({
timestamp: 1762880245493,
chunkIndex: 780,
data: "Chunk 781/1000",
});

// Split the line at an arbitrary position (in the middle of the JSON)
const splitPoint = 16; // Splits after '{"timestamp":'
const chunk1 = fullLine.substring(0, splitPoint);
const chunk2 = fullLine.substring(splitPoint);

// Create a ReadableStream that sends split chunks
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(chunk1));
controller.enqueue(encoder.encode(chunk2 + "\n")); // Add newline at end
controller.close();
},
});

// Ingest the split data
await redisRealtimeStreams.ingestData(stream, runId, streamId, "client1");

// Now consume the stream and verify we get the complete line, not split chunks
const abortController = new AbortController();
const response = await redisRealtimeStreams.streamResponse(
new Request("http://localhost/test"),
runId,
streamId,
abortController.signal
);

const reader = response.body!.getReader();
const decoder = new TextDecoder();
let receivedData = "";

// Read all chunks from the response
const readTimeout = setTimeout(() => {
abortController.abort();
}, 5000);

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

receivedData += decoder.decode(value, { stream: true });

// Once we have data, we can stop
if (receivedData.includes("data: ")) {
break;
}
}
} finally {
clearTimeout(readTimeout);
abortController.abort();
reader.releaseLock();
}

// Parse the SSE data
const lines = receivedData.split("\n").filter((line) => line.startsWith("data: "));

// We should receive exactly ONE complete line, not two partial lines
expect(lines.length).toBe(1);

// Extract the data (remove "data: " prefix)
const dataLine = lines[0].substring(6);

// Verify it's the complete, valid JSON
expect(dataLine).toBe(fullLine);

// Verify it parses correctly as JSON
const parsed = JSON.parse(dataLine) as {
timestamp: number;
chunkIndex: number;
data: string;
};
expect(parsed.timestamp).toBe(1762880245493);
expect(parsed.chunkIndex).toBe(780);
expect(parsed.data).toBe("Chunk 781/1000");

// Cleanup
await redis.del(`stream:${runId}:${streamId}`);
await redis.quit();
}
);
});
1 change: 0 additions & 1 deletion references/realtime-streams/src/trigger/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,6 @@ const streamsStressTesterTask = task({

switch (payload.streamsVersion) {
case "v1": {
assert.ok(chunks.length < 2000, "Expected less than 2000 chunks");
break;
}
case "v2": {
Expand Down