@@ -117,14 +117,12 @@ export class DefaultUpgrader implements Upgrader {
117117 private readonly muxers : Map < string , StreamMuxerFactory >
118118 private readonly inboundUpgradeTimeout : number
119119 private readonly events : TypedEventTarget < Libp2pEvents >
120- private readonly logger : ComponentLogger
121120 private readonly log : Logger
122121
123122 constructor ( components : DefaultUpgraderComponents , init : UpgraderInit ) {
124123 this . components = components
125124 this . connectionEncryption = new Map ( )
126125 this . log = components . logger . forComponent ( 'libp2p:upgrader' )
127- this . logger = components . logger
128126
129127 init . connectionEncryption . forEach ( encrypter => {
130128 this . connectionEncryption . set ( encrypter . protocol , encrypter )
@@ -415,6 +413,21 @@ export class DefaultUpgrader implements Upgrader {
415413 muxedStream . sink = stream . sink
416414 muxedStream . protocol = protocol
417415
416+ // allow closing the write end of a not-yet-negotiated stream
417+ if ( stream . closeWrite != null ) {
418+ muxedStream . closeWrite = stream . closeWrite
419+ }
420+
421+ // allow closing the read end of a not-yet-negotiated stream
422+ if ( stream . closeRead != null ) {
423+ muxedStream . closeRead = stream . closeRead
424+ }
425+
426+ // make sure we don't try to negotiate a stream we are closing
427+ if ( stream . close != null ) {
428+ muxedStream . close = stream . close
429+ }
430+
418431 // If a protocol stream has been successfully negotiated and is to be passed to the application,
419432 // the peerstore should ensure that the peer is registered with that protocol
420433 await this . components . peerStore . merge ( remotePeer , {
@@ -426,7 +439,7 @@ export class DefaultUpgrader implements Upgrader {
426439 this . _onStream ( { connection, stream : muxedStream , protocol } )
427440 } )
428441 . catch ( async err => {
429- this . log . error ( 'error handling incoming stream id %d ' , muxedStream . id , err . message , err . code , err . stack )
442+ this . log . error ( 'error handling incoming stream id %s ' , muxedStream . id , err . message , err . code , err . stack )
430443
431444 if ( muxedStream . timeline . close == null ) {
432445 await muxedStream . close ( )
@@ -440,13 +453,13 @@ export class DefaultUpgrader implements Upgrader {
440453 throw new CodeError ( 'Stream is not multiplexed' , codes . ERR_MUXER_UNAVAILABLE )
441454 }
442455
443- connection . log ( 'starting new stream for protocols [%s] ' , protocols )
456+ connection . log ( 'starting new stream for protocols %s ' , protocols )
444457 const muxedStream = await muxer . newStream ( )
445- connection . log . trace ( 'starting new stream %s for protocols [%s] ' , muxedStream . id , protocols )
458+ connection . log . trace ( 'started new stream %s for protocols %s ' , muxedStream . id , protocols )
446459
447460 try {
448461 if ( options . signal == null ) {
449- this . log ( 'No abort signal was passed while trying to negotiate protocols [%s] falling back to default timeout' , protocols )
462+ this . log ( 'No abort signal was passed while trying to negotiate protocols %s falling back to default timeout' , protocols )
450463
451464 const signal = AbortSignal . timeout ( DEFAULT_PROTOCOL_SELECT_TIMEOUT )
452465 setMaxListeners ( Infinity , signal )
@@ -457,13 +470,18 @@ export class DefaultUpgrader implements Upgrader {
457470 }
458471 }
459472
460- const { stream, protocol } = await mss . select ( muxedStream , protocols , {
473+ muxedStream . log . trace ( 'selecting protocol from protocols %s' , protocols )
474+
475+ const {
476+ stream,
477+ protocol
478+ } = await mss . select ( muxedStream , protocols , {
461479 ...options ,
462480 log : muxedStream . log ,
463- yieldBytes : false
481+ yieldBytes : true
464482 } )
465483
466- connection . log ( 'negotiated protocol stream %s with id %s ' , protocol , muxedStream . id )
484+ muxedStream . log ( 'selected protocol %s ' , protocol )
467485
468486 const outgoingLimit = findOutgoingStreamLimit ( protocol , this . components . registrar , options )
469487 const streamCount = countStreams ( protocol , 'outbound' , connection )
@@ -487,6 +505,21 @@ export class DefaultUpgrader implements Upgrader {
487505 muxedStream . sink = stream . sink
488506 muxedStream . protocol = protocol
489507
508+ // allow closing the write end of a not-yet-negotiated stream
509+ if ( stream . closeWrite != null ) {
510+ muxedStream . closeWrite = stream . closeWrite
511+ }
512+
513+ // allow closing the read end of a not-yet-negotiated stream
514+ if ( stream . closeRead != null ) {
515+ muxedStream . closeRead = stream . closeRead
516+ }
517+
518+ // make sure we don't try to negotiate a stream we are closing
519+ if ( stream . close != null ) {
520+ muxedStream . close = stream . close
521+ }
522+
490523 this . components . metrics ?. trackProtocolStream ( muxedStream , connection )
491524
492525 return muxedStream
@@ -637,16 +670,23 @@ export class DefaultUpgrader implements Upgrader {
637670 this . log ( 'selecting outbound crypto protocol' , protocols )
638671
639672 try {
640- const { stream, protocol } = await mss . select ( connection , protocols , {
641- log : this . logger . forComponent ( 'libp2p:mss:select' )
673+ connection . log . trace ( 'selecting encrypter from %s' , protocols )
674+
675+ const {
676+ stream,
677+ protocol
678+ } = await mss . select ( connection , protocols , {
679+ log : connection . log ,
680+ yieldBytes : true
642681 } )
682+
643683 const encrypter = this . connectionEncryption . get ( protocol )
644684
645685 if ( encrypter == null ) {
646686 throw new Error ( `no crypto module found for ${ protocol } ` )
647687 }
648688
649- this . log ( 'encrypting outbound connection to %p' , remotePeerId )
689+ connection . log ( 'encrypting outbound connection to %p using %p' , remotePeerId )
650690
651691 return {
652692 ...await encrypter . secureOutbound ( this . components . peerId , stream , remotePeerId ) ,
@@ -665,15 +705,22 @@ export class DefaultUpgrader implements Upgrader {
665705 const protocols = Array . from ( muxers . keys ( ) )
666706 this . log ( 'outbound selecting muxer %s' , protocols )
667707 try {
668- const { stream, protocol } = await mss . select ( connection , protocols , {
669- log : this . logger . forComponent ( 'libp2p:mss:select' )
708+ connection . log . trace ( 'selecting stream muxer from %s' , protocols )
709+
710+ const {
711+ stream,
712+ protocol
713+ } = await mss . select ( connection , protocols , {
714+ log : connection . log ,
715+ yieldBytes : true
670716 } )
671- this . log ( '%s selected as muxer protocol' , protocol )
717+
718+ connection . log ( 'selected %s as muxer protocol' , protocol )
672719 const muxerFactory = muxers . get ( protocol )
673720
674721 return { stream, muxerFactory }
675722 } catch ( err : any ) {
676- this . log . error ( 'error multiplexing outbound stream' , err )
723+ connection . log . error ( 'error multiplexing outbound stream' , err )
677724 throw new CodeError ( String ( err ) , codes . ERR_MUXER_UNAVAILABLE )
678725 }
679726 }
0 commit comments