@@ -73,6 +73,7 @@ const {
7373 validateBoolean,
7474 validateFunction,
7575 validateObject,
76+ validateOneOf,
7677} = require ( 'internal/validators' ) ;
7778
7879const {
@@ -417,7 +418,8 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
417418 * @typedef {import('./queuingstrategies').QueuingStrategy } QueuingStrategy
418419 * @param {Readable } streamReadable
419420 * @param {{
420- * strategy : QueuingStrategy
421+ * strategy? : QueuingStrategy
422+ * type? : 'bytes',
421423 * }} [options]
422424 * @returns {ReadableStream }
423425 */
@@ -432,6 +434,12 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
432434 'stream.Readable' ,
433435 streamReadable ) ;
434436 }
437+ validateObject ( options , 'options' ) ;
438+ if ( options . type !== undefined ) {
439+ validateOneOf ( options . type , 'options.type' , [ 'bytes' , undefined ] ) ;
440+ }
441+
442+ const isBYOB = options . type === 'bytes' ;
435443
436444 if ( isDestroyed ( streamReadable ) || ! isReadable ( streamReadable ) ) {
437445 const readable = new ReadableStream ( ) ;
@@ -443,6 +451,9 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
443451 const highWaterMark = streamReadable . readableHighWaterMark ;
444452
445453 const evaluateStrategyOrFallback = ( strategy ) => {
454+ // If the stream is BYOB, we only use highWaterMark
455+ if ( isBYOB )
456+ return { highWaterMark } ;
446457 // If there is a strategy available, use it
447458 if ( strategy )
448459 return strategy ;
@@ -491,7 +502,19 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
491502 streamReadable . on ( 'data' , onData ) ;
492503
493504 return new ReadableStream ( {
494- start ( c ) { controller = c ; } ,
505+ type : isBYOB ? 'bytes' : undefined ,
506+ start ( c ) {
507+ controller = c ;
508+ if ( isBYOB ) {
509+ streamReadable . once ( 'end' , ( ) => {
510+ // close the controller
511+ controller . close ( ) ;
512+ // And unlock the last BYOB read request
513+ controller . byobRequest ?. respond ( 0 ) ;
514+ wasCanceled = true ;
515+ } ) ;
516+ }
517+ } ,
495518
496519 pull ( ) { streamReadable . resume ( ) ; } ,
497520
@@ -601,9 +624,10 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj
601624
602625/**
603626 * @param {Duplex } duplex
627+ * @param {{ type?: 'bytes' } } [options]
604628 * @returns {ReadableWritablePair }
605629 */
606- function newReadableWritablePairFromDuplex ( duplex ) {
630+ function newReadableWritablePairFromDuplex ( duplex , options = kEmptyObject ) {
607631 // Not using the internal/streams/utils isWritableNodeStream and
608632 // isReadableNodeStream utilities here because they will return false
609633 // if the duplex was created with writable or readable options set to
@@ -615,9 +639,11 @@ function newReadableWritablePairFromDuplex(duplex) {
615639 throw new ERR_INVALID_ARG_TYPE ( 'duplex' , 'stream.Duplex' , duplex ) ;
616640 }
617641
642+ validateObject ( options , 'options' ) ;
643+
618644 if ( isDestroyed ( duplex ) ) {
619645 const writable = new WritableStream ( ) ;
620- const readable = new ReadableStream ( ) ;
646+ const readable = new ReadableStream ( { type : options . type } ) ;
621647 writable . close ( ) ;
622648 readable . cancel ( ) ;
623649 return { readable, writable } ;
@@ -633,8 +659,8 @@ function newReadableWritablePairFromDuplex(duplex) {
633659
634660 const readable =
635661 isReadable ( duplex ) ?
636- newReadableStreamFromStreamReadable ( duplex ) :
637- new ReadableStream ( ) ;
662+ newReadableStreamFromStreamReadable ( duplex , options ) :
663+ new ReadableStream ( { type : options . type } ) ;
638664
639665 if ( ! isReadable ( duplex ) )
640666 readable . cancel ( ) ;
0 commit comments