1- import { CodeError } from '@libp2p/interface/errors'
2- import { symbol , type Transport , type CreateListenerOptions , type Listener , type Upgrader } from '@libp2p/interface/transport'
3- import { peerIdFromBytes , peerIdFromString } from '@libp2p/peer-id'
4- import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
5- import * as mafmt from '@multiformats/mafmt'
6- import { multiaddr } from '@multiformats/multiaddr'
7- import { pbStream } from 'it-protobuf-stream'
8- import { CIRCUIT_PROTO_CODE , ERR_HOP_REQUEST_FAILED , ERR_RELAYED_DIAL , MAX_CONNECTIONS , RELAY_V2_HOP_CODEC , RELAY_V2_STOP_CODEC } from '../constants.js'
9- import { StopMessage , HopMessage , Status } from '../pb/index.js'
10- import { RelayDiscovery , type RelayDiscoveryComponents } from './discovery.js'
11- import { createListener } from './listener.js'
12- import { type RelayStoreInit , ReservationStore } from './reservation-store.js'
13- import type { Libp2pEvents , AbortOptions , ComponentLogger , Logger } from '@libp2p/interface'
14- import type { Connection , Stream } from '@libp2p/interface/connection'
1+ import { type Transport , type Upgrader } from '@libp2p/interface/transport'
2+ import { type RelayDiscoveryComponents } from './discovery.js'
3+ import { type RelayStoreInit } from './reservation-store.js'
4+ import { CircuitRelayTransport } from './transport.js'
5+ import type { Libp2pEvents , ComponentLogger } from '@libp2p/interface'
156import type { ConnectionGater } from '@libp2p/interface/connection-gater'
167import type { ContentRouting } from '@libp2p/interface/content-routing'
178import type { TypedEventTarget } from '@libp2p/interface/events'
189import type { PeerId } from '@libp2p/interface/peer-id'
1910import type { PeerStore } from '@libp2p/interface/peer-store'
2011import type { AddressManager } from '@libp2p/interface-internal/address-manager'
2112import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
22- import type { IncomingStreamData , Registrar } from '@libp2p/interface-internal/registrar'
23- import type { Multiaddr } from '@multiformats/multiaddr'
24-
25- const isValidStop = ( request : StopMessage ) : request is Required < StopMessage > => {
26- if ( request . peer == null ) {
27- return false
28- }
29-
30- try {
31- request . peer . addrs . forEach ( multiaddr )
32- } catch {
33- return false
34- }
35-
36- return true
37- }
13+ import type { Registrar } from '@libp2p/interface-internal/registrar'
3814
3915export interface CircuitRelayTransportComponents extends RelayDiscoveryComponents {
4016 peerId : PeerId
@@ -49,16 +25,6 @@ export interface CircuitRelayTransportComponents extends RelayDiscoveryComponent
4925 logger : ComponentLogger
5026}
5127
52- interface ConnectOptions {
53- stream : Stream
54- connection : Connection
55- destinationPeer : PeerId
56- destinationAddr : Multiaddr
57- relayAddr : Multiaddr
58- ma : Multiaddr
59- disconnectOnFailure : boolean
60- }
61-
6228/**
6329 * RelayConfig configures the circuit v2 relay transport.
6430 */
@@ -96,301 +62,6 @@ export interface CircuitRelayTransportInit extends RelayStoreInit {
9662 reservationCompletionTimeout ?: number
9763}
9864
99- const defaults = {
100- maxInboundStopStreams : MAX_CONNECTIONS ,
101- maxOutboundStopStreams : MAX_CONNECTIONS ,
102- stopTimeout : 30000
103- }
104-
105- class CircuitRelayTransport implements Transport {
106- private readonly discovery ?: RelayDiscovery
107- private readonly registrar : Registrar
108- private readonly peerStore : PeerStore
109- private readonly connectionManager : ConnectionManager
110- private readonly peerId : PeerId
111- private readonly upgrader : Upgrader
112- private readonly addressManager : AddressManager
113- private readonly connectionGater : ConnectionGater
114- private readonly reservationStore : ReservationStore
115- private readonly logger : ComponentLogger
116- private readonly maxInboundStopStreams : number
117- private readonly maxOutboundStopStreams ?: number
118- private readonly stopTimeout : number
119- private started : boolean
120- private readonly log : Logger
121-
122- constructor ( components : CircuitRelayTransportComponents , init : CircuitRelayTransportInit ) {
123- this . log = components . logger . forComponent ( 'libp2p:circuit-relay:transport' )
124- this . registrar = components . registrar
125- this . peerStore = components . peerStore
126- this . connectionManager = components . connectionManager
127- this . logger = components . logger
128- this . peerId = components . peerId
129- this . upgrader = components . upgrader
130- this . addressManager = components . addressManager
131- this . connectionGater = components . connectionGater
132- this . maxInboundStopStreams = init . maxInboundStopStreams ?? defaults . maxInboundStopStreams
133- this . maxOutboundStopStreams = init . maxOutboundStopStreams ?? defaults . maxOutboundStopStreams
134- this . stopTimeout = init . stopTimeout ?? defaults . stopTimeout
135-
136- if ( init . discoverRelays != null && init . discoverRelays > 0 ) {
137- this . discovery = new RelayDiscovery ( components )
138- this . discovery . addEventListener ( 'relay:discover' , ( evt ) => {
139- this . reservationStore . addRelay ( evt . detail , 'discovered' )
140- . catch ( err => {
141- this . log . error ( 'could not add discovered relay %p' , evt . detail , err )
142- } )
143- } )
144- }
145-
146- this . reservationStore = new ReservationStore ( components , init )
147- this . reservationStore . addEventListener ( 'relay:not-enough-relays' , ( ) => {
148- this . discovery ?. discover ( )
149- . catch ( err => {
150- this . log . error ( 'could not discover relays' , err )
151- } )
152- } )
153-
154- this . started = false
155- }
156-
157- isStarted ( ) : boolean {
158- return this . started
159- }
160-
161- async start ( ) : Promise < void > {
162- await this . reservationStore . start ( )
163- await this . discovery ?. start ( )
164-
165- await this . registrar . handle ( RELAY_V2_STOP_CODEC , ( data ) => {
166- void this . onStop ( data ) . catch ( err => {
167- this . log . error ( 'error while handling STOP protocol' , err )
168- data . stream . abort ( err )
169- } )
170- } , {
171- maxInboundStreams : this . maxInboundStopStreams ,
172- maxOutboundStreams : this . maxOutboundStopStreams ,
173- runOnTransientConnection : true
174- } )
175-
176- this . started = true
177- }
178-
179- async stop ( ) : Promise < void > {
180- this . discovery ?. stop ( )
181- await this . reservationStore . stop ( )
182- await this . registrar . unhandle ( RELAY_V2_STOP_CODEC )
183-
184- this . started = false
185- }
186-
187- readonly [ symbol ] = true
188-
189- readonly [ Symbol . toStringTag ] = 'libp2p/circuit-relay-v2'
190-
191- /**
192- * Dial a peer over a relay
193- */
194- async dial ( ma : Multiaddr , options : AbortOptions = { } ) : Promise < Connection > {
195- if ( ma . protoCodes ( ) . filter ( code => code === CIRCUIT_PROTO_CODE ) . length !== 1 ) {
196- const errMsg = 'Invalid circuit relay address'
197- this . log . error ( errMsg , ma )
198- throw new CodeError ( errMsg , ERR_RELAYED_DIAL )
199- }
200-
201- // Check the multiaddr to see if it contains a relay and a destination peer
202- const addrs = ma . toString ( ) . split ( '/p2p-circuit' )
203- const relayAddr = multiaddr ( addrs [ 0 ] )
204- const destinationAddr = multiaddr ( addrs [ addrs . length - 1 ] )
205- const relayId = relayAddr . getPeerId ( )
206- const destinationId = destinationAddr . getPeerId ( )
207-
208- if ( relayId == null || destinationId == null ) {
209- const errMsg = `Circuit relay dial to ${ ma . toString ( ) } failed as address did not have peer ids`
210- this . log . error ( errMsg )
211- throw new CodeError ( errMsg , ERR_RELAYED_DIAL )
212- }
213-
214- const relayPeer = peerIdFromString ( relayId )
215- const destinationPeer = peerIdFromString ( destinationId )
216-
217- let disconnectOnFailure = false
218- const relayConnections = this . connectionManager . getConnections ( relayPeer )
219- let relayConnection = relayConnections [ 0 ]
220-
221- if ( relayConnection == null ) {
222- await this . peerStore . merge ( relayPeer , {
223- multiaddrs : [ relayAddr ]
224- } )
225- relayConnection = await this . connectionManager . openConnection ( relayPeer , options )
226- disconnectOnFailure = true
227- }
228-
229- let stream : Stream | undefined
230-
231- try {
232- stream = await relayConnection . newStream ( RELAY_V2_HOP_CODEC )
233-
234- return await this . connectV2 ( {
235- stream,
236- connection : relayConnection ,
237- destinationPeer,
238- destinationAddr,
239- relayAddr,
240- ma,
241- disconnectOnFailure
242- } )
243- } catch ( err : any ) {
244- this . log . error ( 'circuit relay dial to destination %p via relay %p failed' , destinationPeer , relayPeer , err )
245-
246- if ( stream != null ) {
247- stream . abort ( err )
248- }
249- disconnectOnFailure && await relayConnection . close ( )
250- throw err
251- }
252- }
253-
254- async connectV2 (
255- {
256- stream, connection, destinationPeer,
257- destinationAddr, relayAddr, ma,
258- disconnectOnFailure
259- } : ConnectOptions
260- ) : Promise < Connection > {
261- try {
262- const pbstr = pbStream ( stream )
263- const hopstr = pbstr . pb ( HopMessage )
264- await hopstr . write ( {
265- type : HopMessage . Type . CONNECT ,
266- peer : {
267- id : destinationPeer . toBytes ( ) ,
268- addrs : [ multiaddr ( destinationAddr ) . bytes ]
269- }
270- } )
271-
272- const status = await hopstr . read ( )
273-
274- if ( status . status !== Status . OK ) {
275- throw new CodeError ( `failed to connect via relay with status ${ status ?. status ?. toString ( ) ?? 'undefined' } ` , ERR_HOP_REQUEST_FAILED )
276- }
277-
278- const maConn = streamToMaConnection ( {
279- stream : pbstr . unwrap ( ) ,
280- remoteAddr : ma ,
281- localAddr : relayAddr . encapsulate ( `/p2p-circuit/p2p/${ this . peerId . toString ( ) } ` ) ,
282- logger : this . logger
283- } )
284-
285- this . log ( 'new outbound transient connection %a' , maConn . remoteAddr )
286- return await this . upgrader . upgradeOutbound ( maConn , {
287- transient : true
288- } )
289- } catch ( err : any ) {
290- this . log . error ( `Circuit relay dial to destination ${ destinationPeer . toString ( ) } via relay ${ connection . remotePeer . toString ( ) } failed` , err )
291- disconnectOnFailure && await connection . close ( )
292- throw err
293- }
294- }
295-
296- /**
297- * Create a listener
298- */
299- createListener ( options : CreateListenerOptions ) : Listener {
300- return createListener ( {
301- connectionManager : this . connectionManager ,
302- relayStore : this . reservationStore ,
303- logger : this . logger
304- } )
305- }
306-
307- /**
308- * Filter check for all Multiaddrs that this transport can dial on
309- *
310- * @param {Multiaddr[] } multiaddrs
311- * @returns {Multiaddr[] }
312- */
313- filter ( multiaddrs : Multiaddr [ ] ) : Multiaddr [ ] {
314- multiaddrs = Array . isArray ( multiaddrs ) ? multiaddrs : [ multiaddrs ]
315-
316- return multiaddrs . filter ( ( ma ) => {
317- return mafmt . Circuit . matches ( ma )
318- } )
319- }
320-
321- /**
322- * An incoming STOP request means a remote peer wants to dial us via a relay
323- */
324- async onStop ( { connection, stream } : IncomingStreamData ) : Promise < void > {
325- const signal = AbortSignal . timeout ( this . stopTimeout )
326- const pbstr = pbStream ( stream ) . pb ( StopMessage )
327- const request = await pbstr . read ( {
328- signal
329- } )
330-
331- this . log ( 'new circuit relay v2 stop stream from %p with type %s' , connection . remotePeer , request . type )
332-
333- if ( request ?. type === undefined ) {
334- this . log . error ( 'type was missing from circuit v2 stop protocol request from %s' , connection . remotePeer )
335- await pbstr . write ( { type : StopMessage . Type . STATUS , status : Status . MALFORMED_MESSAGE } , {
336- signal
337- } )
338- await stream . close ( )
339- return
340- }
341-
342- // Validate the STOP request has the required input
343- if ( request . type !== StopMessage . Type . CONNECT ) {
344- this . log . error ( 'invalid stop connect request via peer %p' , connection . remotePeer )
345- await pbstr . write ( { type : StopMessage . Type . STATUS , status : Status . UNEXPECTED_MESSAGE } , {
346- signal
347- } )
348- await stream . close ( )
349- return
350- }
351-
352- if ( ! isValidStop ( request ) ) {
353- this . log . error ( 'invalid stop connect request via peer %p' , connection . remotePeer )
354- await pbstr . write ( { type : StopMessage . Type . STATUS , status : Status . MALFORMED_MESSAGE } , {
355- signal
356- } )
357- await stream . close ( )
358- return
359- }
360-
361- const remotePeerId = peerIdFromBytes ( request . peer . id )
362-
363- if ( ( await this . connectionGater . denyInboundRelayedConnection ?.( connection . remotePeer , remotePeerId ) ) === true ) {
364- this . log . error ( 'connection gater denied inbound relayed connection from %p' , connection . remotePeer )
365- await pbstr . write ( { type : StopMessage . Type . STATUS , status : Status . PERMISSION_DENIED } , {
366- signal
367- } )
368- await stream . close ( )
369- return
370- }
371-
372- this . log . trace ( 'sending success response to %p' , connection . remotePeer )
373- await pbstr . write ( { type : StopMessage . Type . STATUS , status : Status . OK } , {
374- signal
375- } )
376-
377- const remoteAddr = connection . remoteAddr . encapsulate ( `/p2p-circuit/p2p/${ remotePeerId . toString ( ) } ` )
378- const localAddr = this . addressManager . getAddresses ( ) [ 0 ]
379- const maConn = streamToMaConnection ( {
380- stream : pbstr . unwrap ( ) . unwrap ( ) ,
381- remoteAddr,
382- localAddr,
383- logger : this . logger
384- } )
385-
386- this . log ( 'new inbound transient connection %a' , maConn . remoteAddr )
387- await this . upgrader . upgradeInbound ( maConn , {
388- transient : true
389- } )
390- this . log ( '%s connection %a upgraded' , 'inbound' , maConn . remoteAddr )
391- }
392- }
393-
39465export function circuitRelayTransport ( init : CircuitRelayTransportInit = { } ) : ( components : CircuitRelayTransportComponents ) => Transport {
39566 return ( components ) => {
39667 return new CircuitRelayTransport ( components , init )
0 commit comments