Skip to content

Commit 5e677d6

Browse files
authored
stream: do not pass readable.compose() output via Readable.from()
PR-URL: #60907 Fixes: #55203 Reviewed-By: Raz Luvaton <rluvaton@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent dc8215b commit 5e677d6

File tree

4 files changed

+68
-40
lines changed

4 files changed

+68
-40
lines changed

doc/api/stream.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2027,7 +2027,7 @@ changes:
20272027
description: Marking the API stable.
20282028
-->
20292029

2030-
* `stream` {Stream|Iterable|AsyncIterable|Function}
2030+
* `stream` {Writable|Duplex|WritableStream|TransformStream|Function}
20312031
* `options` {Object}
20322032
* `signal` {AbortSignal} allows destroying the stream if the signal is
20332033
aborted.
@@ -2046,13 +2046,18 @@ async function* splitToWords(source) {
20462046
}
20472047
}
20482048

2049-
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
2049+
const wordsStream = Readable.from(['text passed through', 'composed stream']).compose(splitToWords);
20502050
const words = await wordsStream.toArray();
20512051

2052-
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
2052+
console.log(words); // prints ['text', 'passed', 'through', 'composed', 'stream']
20532053
```
20542054

2055-
See [`stream.compose`][] for more information.
2055+
`readable.compose(s)` is equivalent to `stream.compose(readable, s)`.
2056+
2057+
This method also allows for an {AbortSignal} to be provided, which will destroy
2058+
the composed stream when aborted.
2059+
2060+
See [`stream.compose(...streams)`][] for more information.
20562061

20572062
##### `readable.iterator([options])`
20582063

@@ -3050,7 +3055,8 @@ await finished(compose(s1, s2, s3));
30503055
console.log(res); // prints 'HELLOWORLD'
30513056
```
30523057

3053-
See [`readable.compose(stream)`][] for `stream.compose` as operator.
3058+
For convenience, the [`readable.compose(stream)`][] method is available on
3059+
{Readable} and {Duplex} streams as a wrapper for this function.
30543060

30553061
### `stream.isErrored(stream)`
30563062

@@ -4998,7 +5004,7 @@ contain multi-byte characters.
49985004
[`readable.setEncoding()`]: #readablesetencodingencoding
49995005
[`stream.Readable.from()`]: #streamreadablefromiterable-options
50005006
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
5001-
[`stream.compose`]: #streamcomposestreams
5007+
[`stream.compose(...streams)`]: #streamcomposestreams
50025008
[`stream.cork()`]: #writablecork
50035009
[`stream.duplexPair()`]: #streamduplexpairoptions
50045010
[`stream.finished()`]: #streamfinishedstream-options-callback

lib/internal/streams/operators.js

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const { AbortController, AbortSignal } = require('internal/abort_controller');
1818
const {
1919
AbortError,
2020
codes: {
21-
ERR_INVALID_ARG_VALUE,
2221
ERR_MISSING_ARGS,
2322
ERR_OUT_OF_RANGE,
2423
},
@@ -31,40 +30,10 @@ const {
3130
} = require('internal/validators');
3231
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
3332
const { finished } = require('internal/streams/end-of-stream');
34-
const staticCompose = require('internal/streams/compose');
35-
const {
36-
addAbortSignalNoValidate,
37-
} = require('internal/streams/add-abort-signal');
38-
const { isWritable, isNodeStream } = require('internal/streams/utils');
3933

4034
const kEmpty = Symbol('kEmpty');
4135
const kEof = Symbol('kEof');
4236

43-
function compose(stream, options) {
44-
if (options != null) {
45-
validateObject(options, 'options');
46-
}
47-
if (options?.signal != null) {
48-
validateAbortSignal(options.signal, 'options.signal');
49-
}
50-
51-
if (isNodeStream(stream) && !isWritable(stream)) {
52-
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
53-
}
54-
55-
const composedStream = staticCompose(this, stream);
56-
57-
if (options?.signal) {
58-
// Not validating as we already validated before
59-
addAbortSignalNoValidate(
60-
options.signal,
61-
composedStream,
62-
);
63-
}
64-
65-
return composedStream;
66-
}
67-
6837
function map(fn, options) {
6938
validateFunction(fn, 'fn');
7039
if (options != null) {
@@ -408,7 +377,6 @@ module.exports.streamReturningOperators = {
408377
flatMap,
409378
map,
410379
take,
411-
compose,
412380
};
413381

414382
module.exports.promiseReturningOperators = {

lib/internal/streams/readable.js

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const { Buffer } = require('buffer');
4848

4949
const {
5050
addAbortSignal,
51+
addAbortSignalNoValidate,
5152
} = require('internal/streams/add-abort-signal');
5253
const eos = require('internal/streams/end-of-stream');
5354

@@ -86,7 +87,10 @@ const {
8687
ERR_UNKNOWN_ENCODING,
8788
},
8889
} = require('internal/errors');
89-
const { validateObject } = require('internal/validators');
90+
const {
91+
validateAbortSignal,
92+
validateObject,
93+
} = require('internal/validators');
9094

9195
const FastBuffer = Buffer[SymbolSpecies];
9296

@@ -1409,6 +1413,30 @@ async function* createAsyncIterator(stream, options) {
14091413
}
14101414
}
14111415

1416+
let composeImpl;
1417+
1418+
Readable.prototype.compose = function compose(stream, options) {
1419+
if (options != null) {
1420+
validateObject(options, 'options');
1421+
}
1422+
if (options?.signal != null) {
1423+
validateAbortSignal(options.signal, 'options.signal');
1424+
}
1425+
1426+
composeImpl ??= require('internal/streams/compose');
1427+
const composedStream = composeImpl(this, stream);
1428+
1429+
if (options?.signal) {
1430+
// Not validating as we already validated before
1431+
addAbortSignalNoValidate(
1432+
options.signal,
1433+
composedStream,
1434+
);
1435+
}
1436+
1437+
return composedStream;
1438+
};
1439+
14121440
// Making it explicit these properties are not enumerable
14131441
// because otherwise some prototype manipulation in
14141442
// userland will fail.

test/parallel/test-stream-compose-operator.js renamed to test/parallel/test-stream-readable-compose.js

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
const common = require('../common');
44
const {
5-
Readable, Transform,
5+
PassThrough,
6+
Readable,
7+
Transform,
68
} = require('stream');
79
const assert = require('assert');
810

@@ -19,6 +21,8 @@ const assert = require('assert');
1921
}
2022
}
2123
});
24+
assert.strictEqual(stream.readable, true);
25+
assert.strictEqual(stream.writable, false);
2226
const result = ['ab', 'cd'];
2327
(async () => {
2428
for await (const item of stream) {
@@ -35,6 +39,8 @@ const assert = require('assert');
3539
callback(null, chunk);
3640
}, 4)
3741
}));
42+
assert.strictEqual(stream.readable, true);
43+
assert.strictEqual(stream.writable, false);
3844
const result = ['a', 'b', 'c', 'd'];
3945
(async () => {
4046
for await (const item of stream) {
@@ -43,6 +49,26 @@ const assert = require('assert');
4349
})().then(common.mustCall());
4450
}
4551

52+
{
53+
// With Duplex stream as `this`, ensuring writes to the composed stream
54+
// are passed to the head of the pipeline
55+
const pt = new PassThrough({ objectMode: true });
56+
const composed = pt.compose(async function *(stream) {
57+
for await (const chunk of stream) {
58+
yield chunk * 2;
59+
}
60+
});
61+
assert.strictEqual(composed.readable, true);
62+
assert.strictEqual(composed.writable, true);
63+
pt.on('data', common.mustCall((chunk) => {
64+
assert.strictEqual(chunk, 123);
65+
}));
66+
composed.on('data', common.mustCall((chunk) => {
67+
assert.strictEqual(chunk, 246);
68+
}));
69+
composed.end(123);
70+
}
71+
4672
{
4773
// Throwing an error during `compose` (before waiting for data)
4874
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

0 commit comments

Comments
 (0)