|
1 | 1 | import { CodeError } from '@libp2p/interface/errors' |
2 | 2 | import { lpStream } from 'it-length-prefixed-stream' |
| 3 | +import * as varint from 'uint8-varint' |
| 4 | +import { Uint8ArrayList } from 'uint8arraylist' |
3 | 5 | import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' |
4 | 6 | import { MAX_PROTOCOL_LENGTH } from './constants.js' |
5 | 7 | import * as multistream from './multistream.js' |
@@ -53,6 +55,7 @@ import type { Duplex } from 'it-stream-types' |
53 | 55 | export async function select <Stream extends Duplex<any, any, any>> (stream: Stream, protocols: string | string[], options: MultistreamSelectInit): Promise<ProtocolStream<Stream>> { |
54 | 56 | protocols = Array.isArray(protocols) ? [...protocols] : [protocols] |
55 | 57 | const lp = lpStream(stream, { |
| 58 | + ...options, |
56 | 59 | maxDataLength: MAX_PROTOCOL_LENGTH |
57 | 60 | }) |
58 | 61 | const protocol = protocols.shift() |
@@ -109,29 +112,58 @@ export async function select <Stream extends Duplex<any, any, any>> (stream: Str |
109 | 112 | export function lazySelect <Stream extends Duplex<any, any, any>> (stream: Stream, protocol: string, options: MultistreamSelectInit): ProtocolStream<Stream> { |
110 | 113 | const originalSink = stream.sink.bind(stream) |
111 | 114 | const originalSource = stream.source |
| 115 | + let selected = false |
112 | 116 |
|
113 | 117 | const lp = lpStream({ |
114 | 118 | sink: originalSink, |
115 | 119 | source: originalSource |
116 | 120 | }, { |
| 121 | + ...options, |
117 | 122 | maxDataLength: MAX_PROTOCOL_LENGTH |
118 | 123 | }) |
119 | 124 |
|
120 | 125 | stream.sink = async source => { |
121 | | - options?.log.trace('lazy: write ["%s", "%s"]', PROTOCOL_ID, protocol) |
122 | | - |
123 | | - await lp.writeV([ |
124 | | - uint8ArrayFromString(`${PROTOCOL_ID}\n`), |
125 | | - uint8ArrayFromString(`${protocol}\n`) |
126 | | - ]) |
127 | | - |
128 | | - options?.log.trace('lazy: writing rest of "%s" stream', protocol) |
129 | | - await lp.unwrap().sink(source) |
| 126 | + const { sink } = lp.unwrap() |
| 127 | + |
| 128 | + await sink(async function * () { |
| 129 | + for await (const buf of source) { |
| 130 | + // if writing before selecting, send selection with first data chunk |
| 131 | + if (!selected) { |
| 132 | + selected = true |
| 133 | + options?.log.trace('lazy: write ["%s", "%s", data] in sink', PROTOCOL_ID, protocol) |
| 134 | + |
| 135 | + const protocolString = `${protocol}\n` |
| 136 | + |
| 137 | + // send protocols in first chunk of data written to transport |
| 138 | + yield new Uint8ArrayList( |
| 139 | + Uint8Array.from([19]), // length of PROTOCOL_ID plus newline |
| 140 | + uint8ArrayFromString(`${PROTOCOL_ID}\n`), |
| 141 | + varint.encode(protocolString.length), |
| 142 | + uint8ArrayFromString(protocolString), |
| 143 | + buf |
| 144 | + ).subarray() |
| 145 | + |
| 146 | + options?.log.trace('lazy: wrote ["%s", "%s", data] in sink', PROTOCOL_ID, protocol) |
| 147 | + } else { |
| 148 | + yield buf |
| 149 | + } |
| 150 | + } |
| 151 | + }()) |
130 | 152 | } |
131 | 153 |
|
132 | 154 | stream.source = (async function * () { |
133 | | - options?.log.trace('lazy: reading multistream select header') |
| 155 | + // if reading before selecting, send selection before first data chunk |
| 156 | + if (!selected) { |
| 157 | + selected = true |
| 158 | + options?.log.trace('lazy: write ["%s", "%s", data] in source', PROTOCOL_ID, protocol) |
| 159 | + await lp.writeV([ |
| 160 | + uint8ArrayFromString(`${PROTOCOL_ID}\n`), |
| 161 | + uint8ArrayFromString(`${protocol}\n`) |
| 162 | + ]) |
| 163 | + options?.log.trace('lazy: wrote ["%s", "%s", data] in source', PROTOCOL_ID, protocol) |
| 164 | + } |
134 | 165 |
|
| 166 | + options?.log.trace('lazy: reading multistream select header') |
135 | 167 | let response = await multistream.readString(lp, options) |
136 | 168 | options?.log.trace('lazy: read multistream select header "%s"', response) |
137 | 169 |
|
|
0 commit comments