3030 */
3131
3232import { noise } from '@chainsafe/libp2p-noise'
33- import { type Transport , transportSymbol , type CreateListenerOptions , type DialOptions , type Listener , type ComponentLogger , type Logger , type Connection , type MultiaddrConnection , type Stream , type CounterGroup , type Metrics , type PeerId , type StreamMuxerFactory , type StreamMuxerInit , type StreamMuxer } from '@libp2p/interface'
34- import { type Multiaddr , type AbortOptions } from '@multiformats/multiaddr'
33+ import { AbortError , CodeError , transportSymbol } from '@libp2p/interface'
3534import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher'
36- import { webtransportBiDiStreamToStream } from './stream.js'
35+ import { raceSignal } from 'race-signal'
36+ import createListener from './listener.js'
37+ import { webtransportMuxer } from './muxer.js'
3738import { inertDuplex } from './utils/inert-duplex.js'
3839import { isSubset } from './utils/is-subset.js'
3940import { parseMultiaddr } from './utils/parse-multiaddr.js'
41+ import WebTransport from './webtransport.js'
42+ import type { Transport , CreateListenerOptions , DialOptions , Listener , ComponentLogger , Logger , Connection , MultiaddrConnection , CounterGroup , Metrics , PeerId } from '@libp2p/interface'
43+ import type { Multiaddr } from '@multiformats/multiaddr'
4044import type { Source } from 'it-stream-types'
4145import type { MultihashDigest } from 'multiformats/hashes/interface'
4246import type { Uint8ArrayList } from 'uint8arraylist'
4347
48+ /**
49+ * PEM format server certificate and private key
50+ */
51+ export interface WebTransportCertificate {
52+ privateKey : string
53+ pem : string
54+ hash : MultihashDigest < number >
55+ secret : string
56+ }
57+
4458interface WebTransportSessionCleanup {
4559 ( metric : string ) : void
4660}
4761
4862export interface WebTransportInit {
4963 maxInboundStreams ?: number
64+ certificates ?: WebTransportCertificate [ ]
5065}
5166
5267export interface WebTransportComponents {
@@ -69,7 +84,9 @@ class WebTransportTransport implements Transport {
6984 this . log = components . logger . forComponent ( 'libp2p:webtransport' )
7085 this . components = components
7186 this . config = {
72- maxInboundStreams : init . maxInboundStreams ?? 1000
87+ ...init ,
88+ maxInboundStreams : init . maxInboundStreams ?? 1000 ,
89+ certificates : init . certificates ?? [ ]
7390 }
7491
7592 if ( components . metrics != null ) {
@@ -87,24 +104,26 @@ class WebTransportTransport implements Transport {
87104 readonly [ transportSymbol ] = true
88105
89106 async dial ( ma : Multiaddr , options : DialOptions ) : Promise < Connection > {
90- options ?. signal ?. throwIfAborted ( )
107+ if ( options ?. signal ?. aborted === true ) {
108+ throw new AbortError ( )
109+ }
91110
92111 this . log ( 'dialing %s' , ma )
93112 const localPeer = this . components . peerId
94113 if ( localPeer === undefined ) {
95- throw new Error ( 'Need a local peerid' )
114+ throw new CodeError ( 'Need a local peerid' , 'ERR_INVALID_PARAMETERS ')
96115 }
97116
98117 options = options ?? { }
99118
100119 const { url, certhashes, remotePeer } = parseMultiaddr ( ma )
101120
102121 if ( remotePeer == null ) {
103- throw new Error ( 'Need a target peerid' )
122+ throw new CodeError ( 'Need a target peerid' , 'ERR_INVALID_PARAMETERS ')
104123 }
105124
106125 if ( certhashes . length === 0 ) {
107- throw new Error ( 'Expected multiaddr to contain certhashes' )
126+ throw new CodeError ( 'Expected multiaddr to contain certhashes' , 'ERR_INVALID_PARAMETERS ')
108127 }
109128
110129 let abortListener : ( ( ) => void ) | undefined
@@ -159,10 +178,12 @@ class WebTransportTransport implements Transport {
159178 once : true
160179 } )
161180
181+ this . log ( 'wait for session to be ready' )
162182 await Promise . race ( [
163183 wt . closed ,
164184 wt . ready
165185 ] )
186+ this . log ( 'session became ready' )
166187
167188 ready = true
168189 this . metrics ?. dialerEvents . increment ( { ready : true } )
@@ -175,15 +196,17 @@ class WebTransportTransport implements Transport {
175196 cleanUpWTSession ( 'remote_close' )
176197 } )
177198
178- if ( ! await this . authenticateWebTransport ( wt , localPeer , remotePeer , certhashes ) ) {
179- throw new Error ( 'Failed to authenticate webtransport' )
199+ authenticated = await raceSignal ( this . authenticateWebTransport ( wt , localPeer , remotePeer , certhashes ) , options . signal )
200+
201+ if ( ! authenticated ) {
202+ throw new CodeError ( 'Failed to authenticate webtransport' , 'ERR_AUTHENTICATION_FAILED' )
180203 }
181204
182205 this . metrics ?. dialerEvents . increment ( { open : true } )
183206
184207 maConn = {
185208 close : async ( ) => {
186- this . log ( 'Closing webtransport' )
209+ this . log ( 'closing webtransport' )
187210 cleanUpWTSession ( 'close' )
188211 } ,
189212 abort : ( err : Error ) => {
@@ -199,9 +222,11 @@ class WebTransportTransport implements Transport {
199222 ...inertDuplex ( )
200223 }
201224
202- authenticated = true
203-
204- return await options . upgrader . upgradeOutbound ( maConn , { skipEncryption : true , muxerFactory : this . webtransportMuxer ( wt ) , skipProtection : true } )
225+ return await options . upgrader . upgradeOutbound ( maConn , {
226+ skipEncryption : true ,
227+ muxerFactory : webtransportMuxer ( wt , wt . incomingBidirectionalStreams . getReader ( ) , this . components . logger , this . config ) ,
228+ skipProtection : true
229+ } )
205230 } catch ( err : any ) {
206231 this . log . error ( 'caught wt session err' , err )
207232
@@ -221,11 +246,14 @@ class WebTransportTransport implements Transport {
221246 }
222247 }
223248
224- async authenticateWebTransport ( wt : InstanceType < typeof WebTransport > , localPeer : PeerId , remotePeer : PeerId , certhashes : Array < MultihashDigest < number > > ) : Promise < boolean > {
249+ async authenticateWebTransport ( wt : WebTransport , localPeer : PeerId , remotePeer : PeerId , certhashes : Array < MultihashDigest < number > > , signal ?: AbortSignal ) : Promise < boolean > {
250+ if ( signal ?. aborted === true ) {
251+ throw new AbortError ( )
252+ }
253+
225254 const stream = await wt . createBidirectionalStream ( )
226255 const writer = stream . writable . getWriter ( )
227256 const reader = stream . readable . getReader ( )
228- await writer . ready
229257
230258 const duplex = {
231259 source : ( async function * ( ) {
@@ -241,13 +269,15 @@ class WebTransportTransport implements Transport {
241269 }
242270 }
243271 } ) ( ) ,
244- sink : async function ( source : Source < Uint8Array | Uint8ArrayList > ) {
272+ sink : async ( source : Source < Uint8Array | Uint8ArrayList > ) => {
245273 for await ( const chunk of source ) {
246- if ( chunk instanceof Uint8Array ) {
247- await writer . write ( chunk )
248- } else {
249- await writer . write ( chunk . subarray ( ) )
250- }
274+ await raceSignal ( writer . ready , signal )
275+
276+ const buf = chunk instanceof Uint8Array ? chunk : chunk . subarray ( )
277+
278+ writer . write ( buf ) . catch ( err => {
279+ this . log . error ( 'could not write chunk during authentication of WebTransport stream' , err )
280+ } )
251281 }
252282 }
253283 }
@@ -273,105 +303,12 @@ class WebTransportTransport implements Transport {
273303 return true
274304 }
275305
276- webtransportMuxer ( wt : WebTransport ) : StreamMuxerFactory {
277- let streamIDCounter = 0
278- const config = this . config
279- const self = this
280- return {
281- protocol : 'webtransport' ,
282- createStreamMuxer : ( init ?: StreamMuxerInit ) : StreamMuxer => {
283- // !TODO handle abort signal when WebTransport supports this.
284-
285- if ( typeof init === 'function' ) {
286- // The api docs say that init may be a function
287- init = { onIncomingStream : init }
288- }
289-
290- const activeStreams : Stream [ ] = [ ] ;
291-
292- ( async function ( ) {
293- //! TODO unclear how to add backpressure here?
294-
295- const reader = wt . incomingBidirectionalStreams . getReader ( )
296- while ( true ) {
297- const { done, value : wtStream } = await reader . read ( )
298-
299- if ( done ) {
300- break
301- }
302-
303- if ( activeStreams . length >= config . maxInboundStreams ) {
304- // We've reached our limit, close this stream.
305- wtStream . writable . close ( ) . catch ( ( err : Error ) => {
306- self . log . error ( `Failed to close inbound stream that crossed our maxInboundStream limit: ${ err . message } ` )
307- } )
308- wtStream . readable . cancel ( ) . catch ( ( err : Error ) => {
309- self . log . error ( `Failed to close inbound stream that crossed our maxInboundStream limit: ${ err . message } ` )
310- } )
311- } else {
312- const stream = await webtransportBiDiStreamToStream (
313- wtStream ,
314- String ( streamIDCounter ++ ) ,
315- 'inbound' ,
316- activeStreams ,
317- init ?. onStreamEnd ,
318- self . components . logger
319- )
320- activeStreams . push ( stream )
321- init ?. onIncomingStream ?.( stream )
322- }
323- }
324- } ) ( ) . catch ( ( ) => {
325- this . log . error ( 'WebTransport failed to receive incoming stream' )
326- } )
327-
328- const muxer : StreamMuxer = {
329- protocol : 'webtransport' ,
330- streams : activeStreams ,
331- newStream : async ( name ?: string ) : Promise < Stream > => {
332- const wtStream = await wt . createBidirectionalStream ( )
333-
334- const stream = await webtransportBiDiStreamToStream (
335- wtStream ,
336- String ( streamIDCounter ++ ) ,
337- init ?. direction ?? 'outbound' ,
338- activeStreams ,
339- init ?. onStreamEnd ,
340- self . components . logger
341- )
342- activeStreams . push ( stream )
343-
344- return stream
345- } ,
346-
347- /**
348- * Close or abort all tracked streams and stop the muxer
349- */
350- close : async ( options ?: AbortOptions ) => {
351- this . log ( 'Closing webtransport muxer' )
352-
353- await Promise . all (
354- activeStreams . map ( async s => s . close ( options ) )
355- )
356- } ,
357- abort : ( err : Error ) => {
358- this . log ( 'Aborting webtransport muxer with err:' , err )
359-
360- for ( const stream of activeStreams ) {
361- stream . abort ( err )
362- }
363- } ,
364- // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
365- ...inertDuplex ( )
366- }
367-
368- return muxer
369- }
370- }
371- }
372-
373306 createListener ( options : CreateListenerOptions ) : Listener {
374- throw new Error ( 'Webtransport servers are not supported in Node or the browser' )
307+ return createListener ( this . components , {
308+ ...options ,
309+ certificates : this . config . certificates ,
310+ maxInboundStreams : this . config . maxInboundStreams
311+ } )
375312 }
376313
377314 /**
0 commit comments