From cf4777b8bb856160316adf6b7015cfda66bf4505 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 27 Oct 2025 18:02:49 +0100 Subject: [PATCH] stream: add Readable.readv A faster alternative to Readable.read for certain use cases. e.g. fs.writevSync(fd, readable.readv()) --- lib/internal/streams/readable.js | 96 +++++++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index d4096a30994f44..2b02ac86000171 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -641,8 +641,16 @@ function howMuchToRead(n, state) { return (state[kState] & kEnded) !== 0 ? state.length : 0; } +Readable.prototype.readv = function readv () { + return _read.call(this, true); +}; + +Readable.prototype.read = function read () { + return _read.call(this, false); +} + // You can override either this method, or the async _read(n) below. -Readable.prototype.read = function(n) { +function _read (n, returnArr) { debug('read', n); // Same as parseInt(undefined, 10), however V8 7.3 performance regressed // in this scenario, so we are doing it manually. @@ -748,7 +756,7 @@ Readable.prototype.read = function(n) { let ret; if (n > 0) - ret = fromList(n, state); + ret = returnArr ? arrFromList(n, state) : fromList(n, state); else ret = null; @@ -777,7 +785,13 @@ Readable.prototype.read = function(n) { if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) { state[kState] |= kDataEmitted; - this.emit('data', ret); + if (returnArr) { + for (let i = 0; i < ret.length; ++i) { + this.emit('data', ret[i]); + } + } else { + this.emit('data', ret); + } } return ret; @@ -1682,6 +1696,82 @@ function fromList(n, state) { return ret; } +function arrFromList(n, state) { + // nothing buffered. + if (state.length === 0) + return null; + + let idx = state.bufferIndex; + let ret; + + const buf = state.buffer; + const len = buf.length; + + if ((state[kState] & kObjectMode) !== 0 || !n || n >= state.length) { + ret = buf.slice(idx); + idx += ret.length; + } else if (n < buf[idx].length) { + // `slice` is the same for buffers and strings. + ret = [buf[idx].slice(0, n)]; + buf[idx] = buf[idx].slice(n); + } else if (n === buf[idx].length) { + // First chunk is a perfect match. + ret = [buf[idx]]; + buf[idx++] = null; + } else if ((state[kState] & kDecoder) !== 0) { + ret = []; + while (idx < len) { + const str = buf[idx]; + if (n > str.length) { + ret.push(str); + n -= str.length; + buf[idx++] = null; + } else { + if (n === buf.length) { + ret.push(str); + buf[idx++] = null; + } else { + ret.push(str.slice(0, n)); + buf[idx] = str.slice(n); + } + break; + } + } + } else { + ret = []; + const retLen = n; + while (idx < len) { + const data = buf[idx]; + if (n > data.length) { + ret.push(data); + n -= data.length; + buf[idx++] = null; + } else { + if (n === data.length) { + ret.push(data); + buf[idx++] = null; + } else { + ret.push(new FastBuffer(data.buffer, data.byteOffset, n)); + buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n); + } + break; + } + } + } + + if (idx === len) { + state.buffer.length = 0; + state.bufferIndex = 0; + } else if (idx > 1024) { + state.buffer.splice(0, idx); + state.bufferIndex = 0; + } else { + state.bufferIndex = idx; + } + + return ret; +} + function endReadable(stream) { const state = stream._readableState;