fix(streams): buffer v1 streams on read to prevent split chunks#2669
fix(streams): buffer v1 streams on read to prevent split chunks#2669
Conversation
|
WalkthroughThe changes modify how Redis realtime streams handle incoming data chunks by introducing a buffering transform that accumulates partial lines across chunk boundaries, emitting only complete lines when newline delimiters are encountered. A flush handler releases any remaining buffered content when the stream ends. The emitted data shape is updated to include line content alongside existing metadata, while empty lines are skipped and Redis IDs are tracked to mark line boundaries. Two new regression tests validate that JSON lines split across multiple chunks are correctly assembled before emission. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
references/realtime-streams/src/trigger/streams.tsis excluded by!references/**
📒 Files selected for processing (2)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(1 hunks)apps/webapp/test/redisRealtimeStreams.test.ts(1 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/test/redisRealtimeStreams.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/test/redisRealtimeStreams.test.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Our tests are all vitest
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}: Do not import app/env.server.ts into tests, either directly or indirectly
Tests should only import classes/functions from files under apps/webapp/app/**/*.ts
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
🧬 Code graph analysis (2)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
scripts/analyze_marqs.mjs (1)
lines(27-27)
apps/webapp/test/redisRealtimeStreams.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
redisTest(167-167)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
RedisRealtimeStreams(23-469)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
apps/webapp/test/redisRealtimeStreams.test.ts (1)
1421-1522: Appreciate the focused regression coverage.This test perfectly reproduces the mid-line split we saw in production and exercises the full ingest/stream path, so it should keep us honest on future buffering changes.
| // On stream end, emit any leftover buffered text | ||
| if (buffer.trim().length > 0) { | ||
| controller.enqueue({ | ||
| type: "data", | ||
| redisId: lastRedisId, | ||
| data: "", | ||
| line: buffer.trim(), | ||
| }); |
There was a problem hiding this comment.
Don't trim buffered payloads on flush.
Calling buffer.trim() here rewrites the payload we send to clients; any legitimate leading/trailing whitespace (indentation, formatting, plain text logs) on a last line with no newline now gets dropped, which is a regression from the previous behavior where we forwarded the bytes exactly as Redis stored them. Please preserve the original string and only trim for the emptiness check.
- if (buffer.trim().length > 0) {
- controller.enqueue({
- type: "data",
- redisId: lastRedisId,
- data: "",
- line: buffer.trim(),
- });
- }
+ const pendingLine = buffer;
+ if (pendingLine.trim().length > 0) {
+ controller.enqueue({
+ type: "data",
+ redisId: lastRedisId,
+ data: "",
+ line: pendingLine,
+ });
+ }
+ buffer = "";📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // On stream end, emit any leftover buffered text | |
| if (buffer.trim().length > 0) { | |
| controller.enqueue({ | |
| type: "data", | |
| redisId: lastRedisId, | |
| data: "", | |
| line: buffer.trim(), | |
| }); | |
| // On stream end, emit any leftover buffered text | |
| const pendingLine = buffer; | |
| if (pendingLine.trim().length > 0) { | |
| controller.enqueue({ | |
| type: "data", | |
| redisId: lastRedisId, | |
| data: "", | |
| line: pendingLine, | |
| }); | |
| } | |
| buffer = ""; |
🤖 Prompt for AI Agents
In apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts around lines
241 to 248, the code trims the buffered payload before sending it which strips
legitimate leading/trailing whitespace; keep the existing emptiness check using
buffer.trim() but send the original buffer bytes to clients instead of
buffer.trim(). Concretely, leave the if (buffer.trim().length > 0) check as-is,
but set the emitted line data to buffer (unchanged) so we forward the exact
Redis-stored string.
No description provided.