From 93274c5a64a5b64b663506ced4cfbda215276d42 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 27 Oct 2025 19:43:34 +0100 Subject: [PATCH] stream: readable read one buffer at a time Instead of wasting cycles concatenating buffers, just return each one by one. Old behavior can be achieved by using `readable.read(readable.readableLength)` instead of `readable.read()`. PR: https://github.com/nodejs/node/pull/60441 --- lib/internal/streams/readable.js | 5 +++ .../test-crypto-cipheriv-decipheriv.js | 4 +-- test/parallel/test-runner-run.mjs | 34 +++++++++---------- test/parallel/test-stream-compose.js | 2 +- test/parallel/test-stream-push-strings.js | 2 +- .../test-stream-readable-emittedReadable.js | 2 +- .../test-stream-readable-infinite-read.js | 14 ++------ .../test-stream-readable-needReadable.js | 2 +- .../test-stream-readable-readable-one.js | 20 +++++++++++ test/parallel/test-stream-typedarray.js | 14 ++++++-- test/parallel/test-stream-uint8array.js | 10 ++++-- test/parallel/test-stream2-transform.js | 4 ++- test/parallel/test-webstreams-pipeline.js | 4 +-- test/parallel/test-worker-heap-snapshot.js | 3 +- .../test-worker-stdio-from-preload-module.js | 2 +- .../test-zlib-flush-write-sync-interleaved.js | 4 +-- 16 files changed, 80 insertions(+), 46 deletions(-) create mode 100644 test/parallel/test-stream-readable-readable-one.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 97fe9dc6f60c2f..6de42563dab15e 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -636,9 +636,14 @@ function howMuchToRead(n, state) { if ((state[kState] & kObjectMode) !== 0) return 1; if (NumberIsNaN(n)) { + // Fast path for buffers. + if ((state[kState] & kDecoder) === 0 && state.length) + return state.buffer[state.bufferIndex].length; + // Only flow one buffer at a time. if ((state[kState] & kFlowing) !== 0 && state.length) return state.buffer[state.bufferIndex].length; + return state.length; } if (n <= state.length) diff --git a/test/parallel/test-crypto-cipheriv-decipheriv.js b/test/parallel/test-crypto-cipheriv-decipheriv.js index 6742722f9e9091..3f0ea5b31ba189 100644 --- a/test/parallel/test-crypto-cipheriv-decipheriv.js +++ b/test/parallel/test-crypto-cipheriv-decipheriv.js @@ -31,11 +31,11 @@ function testCipher1(key, iv) { // quite small, so there's no harm. const cStream = crypto.createCipheriv('des-ede3-cbc', key, iv); cStream.end(plaintext); - ciph = cStream.read(); + ciph = cStream.read(cStream.readableLength); const dStream = crypto.createDecipheriv('des-ede3-cbc', key, iv); dStream.end(ciph); - txt = dStream.read().toString('utf8'); + txt = dStream.read(dStream.readableLength).toString('utf8'); assert.strictEqual(txt, plaintext, `streaming cipher with key ${key} and iv ${iv}`); diff --git a/test/parallel/test-runner-run.mjs b/test/parallel/test-runner-run.mjs index 7ddd8c1dcd83e5..a0811c19c6a31a 100644 --- a/test/parallel/test-runner-run.mjs +++ b/test/parallel/test-runner-run.mjs @@ -3,6 +3,7 @@ import * as fixtures from '../common/fixtures.mjs'; import { join } from 'node:path'; import { describe, it, run } from 'node:test'; import { dot, spec, tap } from 'node:test/reporters'; +import consumers from 'node:stream/consumers'; import assert from 'node:assert'; import util from 'node:util'; @@ -111,34 +112,31 @@ describe('require(\'node:test\').run', { concurrency: true }, () => { describe('should be piped with spec reporter', () => { it('new spec', async () => { const specReporter = new spec(); - const result = await run({ + const result = await consumers.text(run({ files: [join(testFixtures, 'default-behavior/test/random.cjs')] - }).compose(specReporter).toArray(); - const stringResults = result.map((bfr) => bfr.toString()); - assert.match(stringResults[0], /this should pass/); - assert.match(stringResults[1], /tests 1/); - assert.match(stringResults[1], /pass 1/); + }).compose(specReporter)); + assert.match(result, /this should pass/); + assert.match(result, /tests 1/); + assert.match(result, /pass 1/); }); it('spec()', async () => { const specReporter = spec(); - const result = await run({ + const result = await consumers.text(run({ files: [join(testFixtures, 'default-behavior/test/random.cjs')] - }).compose(specReporter).toArray(); - const stringResults = result.map((bfr) => bfr.toString()); - assert.match(stringResults[0], /this should pass/); - assert.match(stringResults[1], /tests 1/); - assert.match(stringResults[1], /pass 1/); + }).compose(specReporter)); + assert.match(result, /this should pass/); + assert.match(result, /tests 1/); + assert.match(result, /pass 1/); }); it('spec', async () => { - const result = await run({ + const result = await consumers.text(run({ files: [join(testFixtures, 'default-behavior/test/random.cjs')] - }).compose(spec).toArray(); - const stringResults = result.map((bfr) => bfr.toString()); - assert.match(stringResults[0], /this should pass/); - assert.match(stringResults[1], /tests 1/); - assert.match(stringResults[1], /pass 1/); + }).compose(spec)); + assert.match(result, /this should pass/); + assert.match(result, /tests 1/); + assert.match(result, /pass 1/); }); }); diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index 36581d6d858276..57b6e3659352ab 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -490,7 +490,7 @@ const assert = require('assert'); newStream.end(); - assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]); + assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve Rogers'), Buffer.from('On your left')]); })().then(common.mustCall()); } diff --git a/test/parallel/test-stream-push-strings.js b/test/parallel/test-stream-push-strings.js index d582c8add005a5..5fece74a115cf3 100644 --- a/test/parallel/test-stream-push-strings.js +++ b/test/parallel/test-stream-push-strings.js @@ -59,7 +59,7 @@ ms.on('readable', function() { results.push(String(chunk)); }); -const expect = [ 'first chunksecond to last chunk', 'last chunk' ]; +const expect = [ 'first chunk', 'second to last chunk', 'last chunk' ]; process.on('exit', function() { assert.strictEqual(ms._chunks, -1); assert.deepStrictEqual(results, expect); diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index ba613f9e9ff19d..ffaf1d5b943334 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -10,7 +10,7 @@ const readable = new Readable({ // Initialized to false. assert.strictEqual(readable._readableState.emittedReadable, false); -const expected = [Buffer.from('foobar'), Buffer.from('quo'), null]; +const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('quo'), null]; readable.on('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(readable._readableState.emittedReadable, true); diff --git a/test/parallel/test-stream-readable-infinite-read.js b/test/parallel/test-stream-readable-infinite-read.js index df88d78b74c36f..8360df52ed7d0f 100644 --- a/test/parallel/test-stream-readable-infinite-read.js +++ b/test/parallel/test-stream-readable-infinite-read.js @@ -10,24 +10,16 @@ const readable = new Readable({ highWaterMark: 16 * 1024, read: common.mustCall(function() { this.push(buf); - }, 31) + }, 12) }); let i = 0; readable.on('readable', common.mustCall(function() { if (i++ === 10) { - // We will just terminate now. - process.removeAllListeners('readable'); + readable.removeAllListeners('readable'); return; } - const data = readable.read(); - // TODO(mcollina): there is something odd in the highWaterMark logic - // investigate. - if (i === 1) { - assert.strictEqual(data.length, 8192 * 2); - } else { - assert.strictEqual(data.length, 8192 * 3); - } + assert.strictEqual(readable.read().length, 8192); }, 11)); diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index c4bc90bb19d3e2..3f26db791c7bb3 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -32,7 +32,7 @@ const asyncReadable = new Readable({ }); asyncReadable.on('readable', common.mustCall(() => { - if (asyncReadable.read() !== null) { + if (asyncReadable.read(asyncReadable.readableLength) !== null) { // After each read(), the buffer is empty. // If the stream doesn't end now, // then we need to notify the reader on future changes. diff --git a/test/parallel/test-stream-readable-readable-one.js b/test/parallel/test-stream-readable-readable-one.js new file mode 100644 index 00000000000000..c15df5777cab6e --- /dev/null +++ b/test/parallel/test-stream-readable-readable-one.js @@ -0,0 +1,20 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable } = require('stream'); + +// Read one buffer at a time and don't waste cycles allocating +// and copying into a new larger buffer. +{ + const r = new Readable({ + read() {} + }); + const buffers = [Buffer.allocUnsafe(5), Buffer.allocUnsafe(10)]; + for (const buf of buffers) { + r.push(buf); + } + for (const buf of buffers) { + assert.strictEqual(r.read(), buf); + } +} diff --git a/test/parallel/test-stream-typedarray.js b/test/parallel/test-stream-typedarray.js index ae5846da09dbbe..5ef2733a91bdbb 100644 --- a/test/parallel/test-stream-typedarray.js +++ b/test/parallel/test-stream-typedarray.js @@ -83,9 +83,19 @@ const views = common.getArrayBufferViews(buffer); readable.push(views[2]); readable.unshift(views[0]); - const buf = readable.read(); + let buf; + + buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...views[0]]); + + buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...views[1]]); + + buf = readable.read(); assert(buf instanceof Buffer); - assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]); + assert.deepStrictEqual([...buf], [...views[2]]); } { diff --git a/test/parallel/test-stream-uint8array.js b/test/parallel/test-stream-uint8array.js index f1de4c873fd3a8..a34f6e1e5a48d8 100644 --- a/test/parallel/test-stream-uint8array.js +++ b/test/parallel/test-stream-uint8array.js @@ -80,9 +80,15 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]); readable.push(DEF); readable.unshift(ABC); - const buf = readable.read(); + let buf; + + buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...ABC]); + + buf = readable.read(); assert(buf instanceof Buffer); - assert.deepStrictEqual([...buf], [...ABC, ...DEF]); + assert.deepStrictEqual([...buf], [...DEF]); } { diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index f222f1c03b48b5..457001fd886932 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -282,7 +282,9 @@ const { PassThrough, Transform } = require('stream'); pt.write(Buffer.from('ef'), common.mustCall(function() { pt.end(); })); - assert.strictEqual(pt.read().toString(), 'abcdef'); + assert.strictEqual(pt.read().toString(), 'abc'); + assert.strictEqual(pt.read().toString(), 'd'); + assert.strictEqual(pt.read().toString(), 'ef'); assert.strictEqual(pt.read(), null); }); }); diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 79188d88bf54be..a4bf579f5f11d7 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -126,7 +126,7 @@ const http = require('http'); }); pipeline(r, ws, common.mustSucceed(() => { - assert.deepStrictEqual(values, ['helloworld']); + assert.deepStrictEqual(values, ['hello', 'world']); })); r.push('hello'); @@ -181,7 +181,7 @@ const http = require('http'); }); pipeline(rs, t, ws, common.mustSucceed(() => { - assert.deepStrictEqual(values, ['HELLOWORLD']); + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); })); c.enqueue('hello'); diff --git a/test/parallel/test-worker-heap-snapshot.js b/test/parallel/test-worker-heap-snapshot.js index 5358f2effca508..20ff3d540f9827 100644 --- a/test/parallel/test-worker-heap-snapshot.js +++ b/test/parallel/test-worker-heap-snapshot.js @@ -10,7 +10,8 @@ const { once } = require('events'); const worker = new Worker('setInterval(() => {}, 1000);', { eval: true }); await once(worker, 'online'); const stream = await worker.getHeapSnapshot(); - assert.ok(JSON.parse(stream.read())); + stream.read(0); // Trigger the stream to start flowing + assert.ok(JSON.parse(stream.read(stream.readableLength))); await worker.terminate(); })().then(common.mustCall()); diff --git a/test/parallel/test-worker-stdio-from-preload-module.js b/test/parallel/test-worker-stdio-from-preload-module.js index e4178c58d46b21..20a72c04adb4c1 100644 --- a/test/parallel/test-worker-stdio-from-preload-module.js +++ b/test/parallel/test-worker-stdio-from-preload-module.js @@ -15,6 +15,6 @@ for (let i = 0; i < 10; i++) { stdout: true }); w.on('exit', common.mustCall(() => { - assert.strictEqual(w.stdout.read().toString(), 'A\nB\n'); + assert.strictEqual(w.stdout.read(w.stdout.readableLength).toString(), 'A\nB\n'); })); } diff --git a/test/parallel/test-zlib-flush-write-sync-interleaved.js b/test/parallel/test-zlib-flush-write-sync-interleaved.js index f8387f40069b5f..87ca9fe1e9a24b 100644 --- a/test/parallel/test-zlib-flush-write-sync-interleaved.js +++ b/test/parallel/test-zlib-flush-write-sync-interleaved.js @@ -19,7 +19,7 @@ for (const chunk of ['abc', 'def', 'ghi']) { compress.write(chunk, common.mustCall(() => events.push({ written: chunk }))); compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => { events.push('flushed'); - const chunk = compress.read(); + const chunk = compress.read(compress.readableLength); if (chunk !== null) compressedChunks.push(chunk); })); @@ -36,7 +36,7 @@ function writeToDecompress() { const chunk = compressedChunks.shift(); if (chunk === undefined) return decompress.end(); decompress.write(chunk, common.mustCall(() => { - events.push({ read: decompress.read() }); + events.push({ read: decompress.read(decompress.readableLength) }); writeToDecompress(); })); }