@@ -46,11 +46,13 @@ type RTCEngine struct {
4646 subscriber * PCTransport
4747 client * SignalClient
4848
49- dclock sync.RWMutex
50- reliableDC * webrtc.DataChannel
51- lossyDC * webrtc.DataChannel
52- reliableDCSub * webrtc.DataChannel
53- lossyDCSub * webrtc.DataChannel
49+ dclock sync.RWMutex
50+ reliableDC * webrtc.DataChannel
51+ lossyDC * webrtc.DataChannel
52+ reliableDCSub * webrtc.DataChannel
53+ lossyDCSub * webrtc.DataChannel
54+ reliableMsgLock sync.Mutex
55+ reliableMsgSeq uint32
5456
5557 trackPublishedListenersLock sync.Mutex
5658 trackPublishedListeners map [string ]chan * livekit.TrackPublishedResponse
@@ -107,6 +109,7 @@ func NewRTCEngine() *RTCEngine {
107109 client : NewSignalClient (),
108110 trackPublishedListeners : make (map [string ]chan * livekit.TrackPublishedResponse ),
109111 JoinTimeout : 15 * time .Second ,
112+ reliableMsgSeq : 1 ,
110113 }
111114
112115 e .client .OnParticipantUpdate = func (info []* livekit.ParticipantInfo ) {
@@ -284,6 +287,12 @@ func (e *RTCEngine) configure(
284287 subscriberPrimary * bool ) error {
285288
286289 configuration := e .makeRTCConfiguration (iceServers , clientConfig )
290+
291+ // reset reliable message sequence
292+ e .reliableMsgLock .Lock ()
293+ e .reliableMsgSeq = 1
294+ e .reliableMsgLock .Unlock ()
295+
287296 e .pclock .Lock ()
288297 defer e .pclock .Unlock ()
289298
@@ -835,13 +844,7 @@ func (e *RTCEngine) makeRTCConfiguration(iceServers []*livekit.ICEServer, client
835844}
836845
837846func (e * RTCEngine ) publishDataPacket (pck * livekit.DataPacket , kind livekit.DataPacket_Kind ) error {
838- data , err := proto .Marshal (pck )
839- if err != nil {
840- e .log .Errorw ("could not marshal data packet" , err )
841- return err
842- }
843-
844- err = e .ensurePublisherConnected (true )
847+ err := e .ensurePublisherConnected (true )
845848 if err != nil {
846849 e .log .Errorw ("could not ensure publisher connected" , err )
847850 return err
@@ -853,6 +856,20 @@ func (e *RTCEngine) publishDataPacket(pck *livekit.DataPacket, kind livekit.Data
853856 return errors .New ("datachannel not found" )
854857 }
855858
859+ if kind == livekit .DataPacket_RELIABLE {
860+ e .reliableMsgLock .Lock ()
861+ defer e .reliableMsgLock .Unlock ()
862+
863+ pck .Sequence = e .reliableMsgSeq
864+ e .reliableMsgSeq ++
865+ }
866+
867+ data , err := proto .Marshal (pck )
868+ if err != nil {
869+ e .log .Errorw ("could not marshal data packet" , err )
870+ return err
871+ }
872+
856873 dc .Send (data )
857874 return nil
858875}
0 commit comments