44
55/* eslint-disable require-jsdoc */
66/* global Promise, Map, WebTransport, WebTransportBidirectionalStream,
7- Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */
7+ Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor,
8+ proto */
89
910'use strict' ;
1011
@@ -44,6 +45,10 @@ export class QuicConnection extends EventDispatcher {
4445 this . _transportId = this . _token . transportId ;
4546 this . _initReceiveStreamReader ( ) ;
4647 this . _worker = new Worker ( workerDir + '/media-worker.js' , { type : 'module' } ) ;
48+ // Key is SSRC, value is a MediaStreamTrackGenerator writer.
49+ this . _mstGeneratorWriters = new Map ( ) ;
50+ this . _initRtpModule ( ) ;
51+ this . _initDatagramReader ( ) ;
4752 }
4853
4954 /**
@@ -77,6 +82,10 @@ export class QuicConnection extends EventDispatcher {
7782 await this . _authenticate ( this . _tokenString ) ;
7883 }
7984
85+ async _initRtpModule ( ) {
86+ this . _worker . postMessage ( [ 'init-rtp' ] ) ;
87+ }
88+
8089 async _initReceiveStreamReader ( ) {
8190 const receiveStreamReader =
8291 this . _quicTransport . incomingBidirectionalStreams . getReader ( ) ;
@@ -173,6 +182,20 @@ export class QuicConnection extends EventDispatcher {
173182 }
174183 }
175184
185+ async _initDatagramReader ( ) {
186+ const datagramReader = this . _quicTransport . datagrams . readable . getReader ( ) ;
187+ console . log ( 'Get datagram reader.' ) ;
188+ let receivingDone = false ;
189+ while ( ! receivingDone ) {
190+ const { value : datagram , done : readingDatagramsDone } =
191+ await datagramReader . read ( ) ;
192+ this . _worker . postMessage ( [ 'rtp-packet' , datagram ] ) ;
193+ if ( readingDatagramsDone ) {
194+ receivingDone = true ;
195+ }
196+ }
197+ }
198+
176199 _createSubscription ( id , receiveStream ) {
177200 // TODO: Incomplete subscription.
178201 const subscription = new Subscription ( id , ( ) => {
@@ -207,6 +230,94 @@ export class QuicConnection extends EventDispatcher {
207230 return quicStream ;
208231 }
209232
233+ async bindFeedbackReader ( stream , publicationId ) {
234+ // The receiver side of a publication stream starts with a UUID of
235+ // publication ID, then each feedback message has a 4 bytes header indicates
236+ // its length, and followed by protobuf encoded body.
237+ const feedbackChunkReader = stream . readable . getReader ( ) ;
238+ let feedbackChunksDone = false ;
239+ let publicationIdOffset = 0 ;
240+ const headerSize = 4 ;
241+ const header = new Uint8Array ( headerSize ) ;
242+ let headerOffset = 0 ;
243+ let bodySize = 0 ;
244+ let bodyOffset = 0 ;
245+ let bodyBytes ;
246+ while ( ! feedbackChunksDone ) {
247+ let valueOffset = 0 ;
248+ const { value, done} = await feedbackChunkReader . read ( ) ;
249+ Logger . debug ( value ) ;
250+ while ( valueOffset < value . byteLength ) {
251+ if ( publicationIdOffset < uuidByteLength ) {
252+ // TODO: Check publication ID matches. For now, we just skip this ID.
253+ const readLength =
254+ Math . min ( uuidByteLength - publicationIdOffset , value . byteLength ) ;
255+ valueOffset += readLength ;
256+ publicationIdOffset += readLength ;
257+ }
258+ if ( headerOffset < headerSize ) {
259+ // Read header.
260+ const copyLength = Math . min (
261+ headerSize - headerOffset , value . byteLength - valueOffset ) ;
262+ if ( copyLength === 0 ) {
263+ continue ;
264+ }
265+ header . set (
266+ value . subarray ( valueOffset , valueOffset + copyLength ) ,
267+ headerOffset ) ;
268+ headerOffset += copyLength ;
269+ valueOffset += copyLength ;
270+ if ( headerOffset < headerSize ) {
271+ continue ;
272+ }
273+ bodySize = 0 ;
274+ bodyOffset = 0 ;
275+ for ( let i = 0 ; i < headerSize ; i ++ ) {
276+ bodySize += ( header [ i ] << ( ( headerSize - 1 - i ) * 8 ) ) ;
277+ }
278+ bodyBytes = new Uint8Array ( bodySize ) ;
279+ Logger . debug ( 'Body size ' + bodySize ) ;
280+ }
281+ if ( bodyOffset < bodySize ) {
282+ const copyLength =
283+ Math . min ( bodySize - bodyOffset , value . byteLength - valueOffset ) ;
284+ if ( copyLength === 0 ) {
285+ continue ;
286+ }
287+ Logger . debug ( 'Bytes for body: ' + copyLength ) ;
288+ bodyBytes . set (
289+ value . subarray ( valueOffset , valueOffset + copyLength ) ,
290+ bodyOffset ) ;
291+ bodyOffset += copyLength ;
292+ valueOffset += copyLength ;
293+ if ( valueOffset < bodySize ) {
294+ continue ;
295+ }
296+ // Decode body.
297+ const feedback = proto . owt . protobuf . Feedback . deserializeBinary ( bodyBytes ) ;
298+ this . handleFeedback ( feedback , publicationId ) ;
299+ }
300+ }
301+ if ( done ) {
302+ feedbackChunksDone = true ;
303+ break ;
304+ }
305+ }
306+ }
307+
308+ async handleFeedback ( feedback , publicationId ) {
309+ Logger . debug (
310+ 'Key frame request type: ' +
311+ proto . owt . protobuf . Feedback . Type . KEY_FRAME_REQUEST ) ;
312+ if ( feedback . getType ( ) ===
313+ proto . owt . protobuf . Feedback . Type . KEY_FRAME_REQUEST ) {
314+ this . _worker . postMessage (
315+ [ 'rtcp-feedback' , [ 'key-frame-request' , publicationId ] ] ) ;
316+ } else {
317+ Logger . warning ( 'Unrecognized feedback type ' + feedback . getType ( ) ) ;
318+ }
319+ }
320+
210321 async publish ( stream , options ) {
211322 // TODO: Avoid a stream to be published twice. The first 16 bit data send to
212323 // server must be it's publication ID.
@@ -225,6 +336,7 @@ export class QuicConnection extends EventDispatcher {
225336 for ( const track of stream . stream . getTracks ( ) ) {
226337 const quicStream =
227338 await this . _quicTransport . createBidirectionalStream ( ) ;
339+ this . bindFeedbackReader ( quicStream , publicationId ) ;
228340 this . _quicMediaStreamTracks . set ( track . id , quicStream ) ;
229341 quicStreams . push ( quicStream ) ;
230342 }
@@ -262,6 +374,7 @@ export class QuicConnection extends EventDispatcher {
262374 [
263375 'media-sender' ,
264376 [
377+ publicationId ,
265378 track . id ,
266379 track . kind ,
267380 processor . readable ,
@@ -317,12 +430,12 @@ export class QuicConnection extends EventDispatcher {
317430 if ( typeof options !== 'object' ) {
318431 return Promise . reject ( new TypeError ( 'Options should be an object.' ) ) ;
319432 }
320- // if (options.audio === undefined) {
321- // options.audio = !!stream.settings.audio;
322- // }
323- // if (options.video === undefined) {
324- // options.video = !!stream.settings.video;
325- // }
433+ if ( options . audio === undefined ) {
434+ options . audio = ! ! stream . settings . audio ;
435+ }
436+ if ( options . video === undefined ) {
437+ options . video = ! ! stream . settings . video ;
438+ }
326439 let mediaOptions ;
327440 let dataOptions ;
328441 if ( options . audio || options . video ) {
@@ -375,19 +488,40 @@ export class QuicConnection extends EventDispatcher {
375488 } )
376489 . then ( ( data ) => {
377490 this . _subscribeOptions . set ( data . id , options ) ;
378- Logger . debug ( 'Subscribe info is set.' ) ;
379- if ( this . _quicDataStreams . has ( data . id ) ) {
380- // QUIC stream created before signaling returns.
381- // TODO: Update subscription to accept list of QUIC streams.
382- const subscription = this . _createSubscription (
383- data . id , this . _quicDataStreams . get ( data . id ) [ 0 ] ) ;
384- resolve ( subscription ) ;
491+ if ( dataOptions ) {
492+ // A WebTransport stream is associated with a subscription for
493+ // data.
494+ if ( this . _quicDataStreams . has ( data . id ) ) {
495+ // QUIC stream created before signaling returns.
496+ // TODO: Update subscription to accept list of QUIC streams.
497+ const subscription = this . _createSubscription (
498+ data . id , this . _quicDataStreams . get ( data . id ) [ 0 ] ) ;
499+ resolve ( subscription ) ;
500+ } else {
501+ this . _quicDataStreams . set ( data . id , null ) ;
502+ // QUIC stream is not created yet, resolve promise after getting
503+ // QUIC stream.
504+ this . _subscribePromises . set (
505+ data . id , { resolve : resolve , reject : reject } ) ;
506+ }
385507 } else {
386- this . _quicDataStreams . set ( data . id , null ) ;
387- // QUIC stream is not created yet, resolve promise after getting
388- // QUIC stream.
389- this . _subscribePromises . set (
390- data . id , { resolve : resolve , reject : reject } ) ;
508+ // A MediaStream is associated with a subscription for media.
509+ // Media packets are received over WebTransport datagram.
510+ const ms = new Module . MediaSession ( ) ;
511+ console . log ( 'Created media session.' ) ;
512+ const generators = [ ] ;
513+ for ( const track of mediaOptions ) {
514+ const generator =
515+ new MediaStreamTrackGenerator ( { kind : track . type } ) ;
516+ generators . push ( generator ) ;
517+ // TODO: Update key with the correct SSRC.
518+ this . _mstGeneratorWriters . set (
519+ 0 , generator . writable . getWriter ( ) ) ;
520+ }
521+ const mediaStream = new MediaStream ( generators ) ;
522+ const subscription =
523+ this . _createSubscription ( data . id , mediaStream ) ;
524+ resolve ( subscription ) ;
391525 }
392526 if ( this . _subscriptionInfoReady . has ( data . id ) ) {
393527 this . _subscriptionInfoReady . get ( data . id ) ( ) ;
0 commit comments