Skip to content

Comments

fix(streams): buffer v1 streams on read to prevent split chunks#2669

Merged
ericallam merged 1 commit intomainfrom
ea-branch-96
Nov 11, 2025
Merged

fix(streams): buffer v1 streams on read to prevent split chunks#2669
ericallam merged 1 commit intomainfrom
ea-branch-96

Conversation

@ericallam
Copy link
Member

No description provided.

@changeset-bot
Copy link

changeset-bot bot commented Nov 11, 2025

⚠️ No Changeset found

Latest commit: 1289a31

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 11, 2025

Walkthrough

The changes modify how Redis realtime streams handle incoming data chunks by introducing a buffering transform that accumulates partial lines across chunk boundaries, emitting only complete lines when newline delimiters are encountered. A flush handler releases any remaining buffered content when the stream ends. The emitted data shape is updated to include line content alongside existing metadata, while empty lines are skipped and Redis IDs are tracked to mark line boundaries. Two new regression tests validate that JSON lines split across multiple chunks are correctly assembled before emission.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Duplicate test cases: The two identical regression tests in the test file appear to be duplicates—investigate whether this is intentional or an accidental duplication that should be consolidated.
  • Buffering state management: Review the buffer accumulation logic, lastRedisId tracking, and boundary conditions to ensure partial lines are correctly reassembled without data loss or corruption.
  • Flush behavior: Verify that the flush handler correctly emits final buffered content with the appropriate Redis ID when the stream terminates.
  • Empty line handling: Confirm that the skip-empty-lines logic doesn't inadvertently discard valid data or break downstream consumers expecting consistent output.
  • Ping chunk propagation: Ensure ping chunks bypass the buffering logic and flow through unchanged.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely missing; no content was provided by the author to explain changes, testing, or changelog information. Add a complete PR description following the repository template, including issue reference, testing steps, changelog entry, and checklist items.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly describes the main change: buffering v1 streams to prevent split chunks, which aligns with the code modifications.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ea-branch-96

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d0ad38d and 1289a31.

⛔ Files ignored due to path filters (1)
  • references/realtime-streams/src/trigger/streams.ts is excluded by !references/**
📒 Files selected for processing (2)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/test/redisRealtimeStreams.test.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/app/**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead

Files:

  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}: Do not import app/env.server.ts into tests, either directly or indirectly
Tests should only import classes/functions from files under apps/webapp/app/**/*.ts

Files:

  • apps/webapp/test/redisRealtimeStreams.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks

Files:

  • apps/webapp/test/redisRealtimeStreams.test.ts
🧬 Code graph analysis (2)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
scripts/analyze_marqs.mjs (1)
  • lines (27-27)
apps/webapp/test/redisRealtimeStreams.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
  • redisTest (167-167)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
  • RedisRealtimeStreams (23-469)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
apps/webapp/test/redisRealtimeStreams.test.ts (1)

1421-1522: Appreciate the focused regression coverage.

This test perfectly reproduces the mid-line split we saw in production and exercises the full ingest/stream path, so it should keep us honest on future buffering changes.

Comment on lines +241 to +248
// On stream end, emit any leftover buffered text
if (buffer.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: buffer.trim(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't trim buffered payloads on flush.

Calling buffer.trim() here rewrites the payload we send to clients; any legitimate leading/trailing whitespace (indentation, formatting, plain text logs) on a last line with no newline now gets dropped, which is a regression from the previous behavior where we forwarded the bytes exactly as Redis stored them. Please preserve the original string and only trim for the emptiness check.

-              if (buffer.trim().length > 0) {
-                controller.enqueue({
-                  type: "data",
-                  redisId: lastRedisId,
-                  data: "",
-                  line: buffer.trim(),
-                });
-              }
+              const pendingLine = buffer;
+              if (pendingLine.trim().length > 0) {
+                controller.enqueue({
+                  type: "data",
+                  redisId: lastRedisId,
+                  data: "",
+                  line: pendingLine,
+                });
+              }
+              buffer = "";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// On stream end, emit any leftover buffered text
if (buffer.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: buffer.trim(),
});
// On stream end, emit any leftover buffered text
const pendingLine = buffer;
if (pendingLine.trim().length > 0) {
controller.enqueue({
type: "data",
redisId: lastRedisId,
data: "",
line: pendingLine,
});
}
buffer = "";
🤖 Prompt for AI Agents
In apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts around lines
241 to 248, the code trims the buffered payload before sending it which strips
legitimate leading/trailing whitespace; keep the existing emptiness check using
buffer.trim() but send the original buffer bytes to clients instead of
buffer.trim(). Concretely, leave the if (buffer.trim().length > 0) check as-is,
but set the emitted line data to buffer (unchanged) so we forward the exact
Redis-stored string.

@ericallam ericallam merged commit 668559e into main Nov 11, 2025
31 checks passed
@ericallam ericallam deleted the ea-branch-96 branch November 11, 2025 21:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants