@@ -92,6 +92,8 @@ const reliableDataChannel = '_reliable';
9292const minReconnectWait = 2 * 1000 ;
9393const leaveReconnect = 'leave-reconnect' ;
9494const reliabeReceiveStateTTL = 30_000 ;
95+ const lossyDataChannelBufferThresholdMin = 8 * 1024 ;
96+ const lossyDataChannelBufferThresholdMax = 256 * 1024 ;
9597
9698enum PCState {
9799 New ,
@@ -203,6 +205,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
203205
204206 private reliableReceivedState : TTLMap < string , number > = new TTLMap ( reliabeReceiveStateTTL ) ;
205207
208+ private lossyDataStatCurrentBytes : number = 0 ;
209+
210+ private lossyDataStatByterate : number = 0 ;
211+
212+ private lossyDataStatInterval : ReturnType < typeof setInterval > | undefined ;
213+
214+ private lossyDataDropCount : number = 0 ;
215+
206216 private midToTrackId : { [ key : string ] : string } = { } ;
207217
208218 /** used to indicate whether the browser is currently waiting to reconnect */
@@ -312,6 +322,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
312322 this . removeAllListeners ( ) ;
313323 this . deregisterOnLineListener ( ) ;
314324 this . clearPendingReconnect ( ) ;
325+ this . cleanupLossyDataStats ( ) ;
315326 await this . cleanupPeerConnections ( ) ;
316327 await this . cleanupClient ( ) ;
317328 } finally {
@@ -347,6 +358,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
347358 this . reliableReceivedState . clear ( ) ;
348359 }
349360
361+ cleanupLossyDataStats ( ) {
362+ this . lossyDataStatByterate = 0 ;
363+ this . lossyDataStatCurrentBytes = 0 ;
364+ if ( this . lossyDataStatInterval ) {
365+ clearInterval ( this . lossyDataStatInterval ) ;
366+ this . lossyDataStatInterval = undefined ;
367+ }
368+ this . lossyDataDropCount = 0 ;
369+ }
370+
350371 async cleanupClient ( ) {
351372 await this . client . close ( ) ;
352373 this . client . resetCallbacks ( ) ;
@@ -713,6 +734,22 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
713734 // handle buffer amount low events
714735 this . lossyDC . onbufferedamountlow = this . handleBufferedAmountLow ;
715736 this . reliableDC . onbufferedamountlow = this . handleBufferedAmountLow ;
737+
738+ this . cleanupLossyDataStats ( ) ;
739+ this . lossyDataStatInterval = setInterval ( ( ) => {
740+ this . lossyDataStatByterate = this . lossyDataStatCurrentBytes ;
741+ this . lossyDataStatCurrentBytes = 0 ;
742+
743+ const dc = this . dataChannelForKind ( DataPacket_Kind . LOSSY ) ;
744+ if ( dc ) {
745+ // control buffered latency to ~100ms
746+ const threshold = this . lossyDataStatByterate / 10 ;
747+ dc . bufferedAmountLowThreshold = Math . min (
748+ Math . max ( threshold , lossyDataChannelBufferThresholdMin ) ,
749+ lossyDataChannelBufferThresholdMax ,
750+ ) ;
751+ }
752+ } , 1000 ) ;
716753 }
717754
718755 private handleDataChannel = async ( { channel } : RTCDataChannelEvent ) => {
@@ -1282,12 +1319,24 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
12821319
12831320 const msg = packet . toBinary ( ) ;
12841321
1285- await this . waitForBufferStatusLow ( kind ) ;
1286-
12871322 const dc = this . dataChannelForKind ( kind ) ;
12881323 if ( dc ) {
12891324 if ( kind === DataPacket_Kind . RELIABLE ) {
1325+ await this . waitForBufferStatusLow ( kind ) ;
12901326 this . reliableMessageBuffer . push ( { data : msg , sequence : packet . sequence } ) ;
1327+ } else {
1328+ // lossy channel, drop messages to reduce latency
1329+ if ( ! this . isBufferStatusLow ( kind ) ) {
1330+ this . lossyDataDropCount += 1 ;
1331+ if ( this . lossyDataDropCount % 100 === 0 ) {
1332+ this . log . warn (
1333+ `dropping lossy data channel messages, total dropped: ${ this . lossyDataDropCount } ` ,
1334+ this . logContext ,
1335+ ) ;
1336+ }
1337+ return ;
1338+ }
1339+ this . lossyDataStatCurrentBytes += msg . byteLength ;
12911340 }
12921341
12931342 if ( this . attemptingReconnect ) {
0 commit comments