Skip to content

Commit 0ef5be9

Browse files
committed
stream: enhance internal pipe handling with additional error management and tests
1 parent 7fdc21f commit 0ef5be9

File tree

2 files changed

+108
-42
lines changed

2 files changed

+108
-42
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131

3232
const {
3333
AbortError,
34+
ErrnoException,
3435
codes: {
3536
ERR_ILLEGAL_CONSTRUCTOR,
3637
ERR_INVALID_ARG_TYPE,
@@ -49,6 +50,13 @@ const {
4950
StreamPipe,
5051
} = internalBinding('stream_pipe');
5152

53+
const {
54+
kReadBytesOrError,
55+
streamBaseState,
56+
} = internalBinding('stream_wrap');
57+
58+
const { UV_EOF } = internalBinding('uv');
59+
5260
const {
5361
isArrayBufferView,
5462
isDataView,
@@ -130,6 +138,7 @@ const {
130138
isWritableStreamDefaultWriter,
131139

132140
writableStreamAbort,
141+
writableStreamClose,
133142
writableStreamCloseQueuedOrInFlight,
134143
writableStreamDefaultWriterCloseWithErrorPropagation,
135144
writableStreamDefaultWriterRelease,
@@ -1383,18 +1392,67 @@ function readableStreamPipeTo(
13831392
!preventClose &&
13841393
!preventAbort &&
13851394
!preventCancel) {
1386-
// Use native piping
1395+
// Native C++ StreamPipe path for internal-to-internal piping.
1396+
// Ref: https://github.com/nodejs/performance/issues/134
13871397
const promise = PromiseWithResolvers();
13881398

13891399
source[kState].disturbed = true;
13901400

1401+
let pipeError = null;
1402+
let isComplete = false;
1403+
const originalSourceOnread = sourceStreamBase.onread;
1404+
1405+
sourceStreamBase.onread = (arrayBuffer) => {
1406+
const nread = streamBaseState[kReadBytesOrError];
1407+
if (nread < 0 && nread !== UV_EOF) {
1408+
pipeError = new ErrnoException(nread, 'read');
1409+
}
1410+
if (originalSourceOnread) {
1411+
return originalSourceOnread(arrayBuffer);
1412+
}
1413+
};
1414+
1415+
function finalize(error) {
1416+
if (isComplete) return;
1417+
isComplete = true;
1418+
sourceStreamBase.onread = originalSourceOnread;
1419+
1420+
if (error) {
1421+
if (source[kState].state === 'readable') {
1422+
readableStreamError(source, error);
1423+
}
1424+
if (dest[kState].state === 'writable') {
1425+
writableStreamAbort(dest, error);
1426+
}
1427+
promise.reject(error);
1428+
} else {
1429+
if (source[kState].state === 'readable') {
1430+
readableStreamClose(source);
1431+
}
1432+
if (dest[kState].state === 'writable' &&
1433+
!writableStreamCloseQueuedOrInFlight(dest)) {
1434+
PromisePrototypeThen(
1435+
writableStreamClose(dest),
1436+
() => promise.resolve(),
1437+
(err) => promise.reject(err),
1438+
);
1439+
} else {
1440+
promise.resolve();
1441+
}
1442+
}
1443+
}
1444+
13911445
try {
13921446
const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
13931447
pipe.onunpipe = () => {
1394-
promise.resolve();
1448+
if (pipeError) {
1449+
finalize(pipeError);
1450+
}
13951451
};
1452+
pipe.oncomplete = () => finalize(pipeError);
13961453
pipe.start();
13971454
} catch (error) {
1455+
sourceStreamBase.onread = originalSourceOnread;
13981456
promise.reject(error);
13991457
}
14001458

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,25 @@
11
// Flags: --expose-internals --no-warnings
22
'use strict';
33

4-
// Tests for the internal StreamBase pipe optimization infrastructure
5-
// described in nodejs/performance#134
6-
//
7-
// Note(mertcanaltin): Full fast-path testing requires real StreamBase implementations
8-
// (like HTTP/2 streams or TCP sockets), not JSStream mocks.
9-
// These tests verify the marker attachment and fallback behavior.
4+
// Tests for internal StreamBase pipe optimization (nodejs/performance#134)
105

116
const common = require('../common');
12-
137
const assert = require('assert');
14-
15-
const {
16-
internalBinding,
17-
} = require('internal/test/binding');
8+
const { internalBinding } = require('internal/test/binding');
189

1910
const {
2011
newWritableStreamFromStreamBase,
2112
newReadableStreamFromStreamBase,
2213
} = require('internal/webstreams/adapters');
2314

24-
const {
25-
kStreamBase,
26-
} = require('internal/webstreams/util');
27-
28-
const {
29-
JSStream,
30-
} = internalBinding('js_stream');
15+
const { kStreamBase } = require('internal/webstreams/util');
16+
const { JSStream } = internalBinding('js_stream');
3117

3218
// kStreamBase marker is attached to ReadableStream
3319
{
3420
const stream = new JSStream();
3521
const readable = newReadableStreamFromStreamBase(stream);
36-
3722
assert.strictEqual(readable[kStreamBase], stream);
38-
39-
// Cleanup
4023
stream.emitEOF();
4124
}
4225

@@ -47,10 +30,7 @@ const {
4730
stream.onshutdown = (req) => req.oncomplete();
4831

4932
const writable = newWritableStreamFromStreamBase(stream);
50-
5133
assert.strictEqual(writable[kStreamBase], stream);
52-
53-
// Cleanup
5434
writable.close();
5535
}
5636

@@ -65,18 +45,15 @@ const {
6545
},
6646
});
6747

68-
const ws = new WritableStream({
69-
write() {},
70-
});
48+
const ws = new WritableStream({ write() {} });
7149

7250
assert.strictEqual(rs[kStreamBase], undefined);
7351
assert.strictEqual(ws[kStreamBase], undefined);
7452

75-
// Pipe should still work (standard path)
7653
rs.pipeTo(ws).then(common.mustCall());
7754
}
7855

79-
// Mixed streams (one internal, one JS) use standard path
56+
// Mixed streams use standard JS path
8057
{
8158
const stream = new JSStream();
8259
stream.onshutdown = (req) => req.oncomplete();
@@ -85,17 +62,13 @@ const {
8562
const { WritableStream } = require('stream/web');
8663
const chunks = [];
8764
const ws = new WritableStream({
88-
write(chunk) {
89-
chunks.push(chunk);
90-
},
65+
write(chunk) { chunks.push(chunk); },
9166
});
9267

93-
// Readable has kStreamBase, ws does not - should use standard path
9468
assert.ok(readable[kStreamBase]);
9569
assert.strictEqual(ws[kStreamBase], undefined);
9670

9771
const pipePromise = readable.pipeTo(ws);
98-
9972
stream.readBuffer(Buffer.from('hello'));
10073
stream.emitEOF();
10174

@@ -104,12 +77,47 @@ const {
10477
}));
10578
}
10679

107-
// Verify kStreamBase is the correct symbol from util
80+
// Verify kStreamBase symbol identity
10881
{
109-
const {
110-
kStreamBase: kStreamBase2,
111-
} = require('internal/webstreams/util');
112-
113-
// Should be the same symbol
82+
const { kStreamBase: kStreamBase2 } = require('internal/webstreams/util');
11483
assert.strictEqual(kStreamBase, kStreamBase2);
11584
}
85+
86+
// FileHandle.readableWebStream() uses async reads, not StreamBase
87+
{
88+
const fs = require('fs/promises');
89+
const path = require('path');
90+
const os = require('os');
91+
92+
async function testFileStreamPipe() {
93+
const tmpDir = os.tmpdir();
94+
const testFile = path.join(tmpDir, `test-webstream-pipe-${process.pid}.txt`);
95+
const testData = 'Hello, WebStreams pipe!';
96+
97+
await fs.writeFile(testFile, testData);
98+
99+
try {
100+
const fileHandle = await fs.open(testFile, 'r');
101+
const readable = fileHandle.readableWebStream();
102+
103+
assert.strictEqual(readable[kStreamBase], undefined);
104+
105+
const chunks = [];
106+
const writable = new (require('stream/web').WritableStream)({
107+
write(chunk) { chunks.push(chunk); },
108+
});
109+
110+
await readable.pipeTo(writable);
111+
await fileHandle.close();
112+
113+
const result = Buffer.concat(chunks.map((c) =>
114+
(c instanceof Uint8Array ? Buffer.from(c) : Buffer.from(c)),
115+
)).toString();
116+
assert.strictEqual(result, testData);
117+
} finally {
118+
await fs.unlink(testFile).catch(() => {});
119+
}
120+
}
121+
122+
testFileStreamPipe().then(common.mustCall());
123+
}

0 commit comments

Comments
 (0)