@@ -248,7 +248,23 @@ func (e *RTCEngine) JoinContext(
248248 e .token .Store (token )
249249 e .connParams = connectParams
250250
251- err := e .signalTransport .Join (ctx , url , token , * connectParams )
251+ var (
252+ publisherOffer webrtc.SessionDescription
253+ err error
254+ )
255+ if e .signallingVersion == signalling .SignallingVersionV2 {
256+ e .pclock .Lock ()
257+ e .createPublisherPCLocked (webrtc.Configuration {}, false )
258+
259+ publisherOffer , err = e .publisher .GetOffer ()
260+ if err != nil {
261+ e .pclock .Unlock ()
262+ return false , err
263+ }
264+ e .pclock .Unlock ()
265+ }
266+
267+ err = e .signalTransport .Join (ctx , url , token , * connectParams , publisherOffer )
252268 if err != nil {
253269 if verr := e .validate (ctx , url , token , connectParams , "" ); verr != nil {
254270 return false , verr
@@ -335,6 +351,7 @@ func (e *RTCEngine) configure(
335351 clientConfig * livekit.ClientConfiguration ,
336352 subscriberPrimary * bool ,
337353) error {
354+ e .log .Debugw ("Using ICE servers" , "servers" , iceServers )
338355 configuration := e .makeRTCConfiguration (iceServers , clientConfig )
339356
340357 // reset reliable message sequence
@@ -345,16 +362,30 @@ func (e *RTCEngine) configure(
345362 e .pclock .Lock ()
346363 defer e .pclock .Unlock ()
347364
348- // remove previous transport
365+ if subscriberPrimary != nil {
366+ e .subscriberPrimary = * subscriberPrimary
367+ }
368+
349369 if e .publisher != nil {
350- e .publisher .Close ()
351- e .publisher = nil
370+ setConfiguration (e .publisher , configuration )
371+ } else {
372+ if err := e .createPublisherPCLocked (configuration , ! e .subscriberPrimary ); err != nil {
373+ return err
374+ }
352375 }
376+
353377 if e .subscriber != nil {
354- e .subscriber .Close ()
355- e .subscriber = nil
378+ setConfiguration (e .subscriber , configuration )
379+ } else {
380+ if err := e .createSubscriberPCLocked (configuration , e .subscriberPrimary ); err != nil {
381+ return err
382+ }
356383 }
357384
385+ return nil
386+ }
387+
388+ func (e * RTCEngine ) createPublisherPCLocked (configuration webrtc.Configuration , isPrimary bool ) error {
358389 var err error
359390 if e .publisher , err = NewPCTransport (PCTransportParams {
360391 Configuration : configuration ,
@@ -366,20 +397,7 @@ func (e *RTCEngine) configure(
366397 }); err != nil {
367398 return err
368399 }
369- if e .subscriber , err = NewPCTransport (PCTransportParams {
370- Configuration : configuration ,
371- RetransmitBufferSize : e .connParams .RetransmitBufferSize ,
372- }); err != nil {
373- return err
374- }
375400 e .publisher .SetLogger (e .log )
376- e .subscriber .SetLogger (e .log )
377- e .log .Debugw ("Using ICE servers" , "servers" , iceServers )
378-
379- if subscriberPrimary != nil {
380- e .subscriberPrimary = * subscriberPrimary
381- }
382- e .subscriber .OnRemoteDescriptionSettled (e .createSubscriberPCAnswerAndSend )
383401
384402 e .publisher .pc .OnICECandidate (func (candidate * webrtc.ICECandidate ) {
385403 if candidate == nil {
@@ -401,61 +419,8 @@ func (e *RTCEngine) configure(
401419 }
402420 })
403421
404- e .subscriber .pc .OnICECandidate (func (candidate * webrtc.ICECandidate ) {
405- if candidate == nil {
406- // done
407- return
408- }
409- init := candidate .ToJSON ()
410- e .log .Debugw (
411- "local ICE candidate" ,
412- "target" , livekit .SignalTarget_SUBSCRIBER ,
413- "candidate" , init .Candidate ,
414- )
415- if err := e .signalTransport .SendMessage (
416- e .signalling .SignalICECandidate (
417- protosignalling .ToProtoTrickle (init , livekit .SignalTarget_SUBSCRIBER , false ),
418- ),
419- ); err != nil {
420- e .log .Errorw ("could not send ICE candidates for subscriber" , err )
421- }
422- })
423-
424- primaryTransport := e .publisher
425- if e .subscriberPrimary {
426- primaryTransport = e .subscriber
427- }
428- primaryTransport .pc .OnICEConnectionStateChange (func (state webrtc.ICEConnectionState ) {
429- switch state {
430- case webrtc .ICEConnectionStateConnected :
431- var fields []interface {}
432- if pair , err := primaryTransport .GetSelectedCandidatePair (); err == nil {
433- fields = append (fields , "iceCandidatePair" , pair )
434- }
435- e .log .Debugw ("ICE connected" , fields ... )
436- case webrtc .ICEConnectionStateDisconnected :
437- e .log .Debugw ("ICE disconnected" )
438- case webrtc .ICEConnectionStateFailed :
439- e .log .Debugw ("ICE failed" )
440- e .handleDisconnect (false )
441- }
442- })
443-
444- e .subscriber .pc .OnTrack (func (remote * webrtc.TrackRemote , receiver * webrtc.RTPReceiver ) {
445- e .engineHandler .OnMediaTrack (remote , receiver )
446- })
447-
448- e .subscriber .pc .OnDataChannel (func (c * webrtc.DataChannel ) {
449- e .dclock .Lock ()
450- defer e .dclock .Unlock ()
451- if c .Label () == reliableDataChannelName {
452- e .reliableDCSub = c
453- } else if c .Label () == lossyDataChannelName {
454- e .lossyDCSub = c
455- } else {
456- return
457- }
458- c .OnMessage (e .handleDataPacket )
422+ e .publisher .pc .OnICEConnectionStateChange (func (state webrtc.ICEConnectionState ) {
423+ e .handleICEConnectionStateChange (e .publisher , livekit .SignalTarget_PUBLISHER , isPrimary , state )
459424 })
460425
461426 e .publisher .OnOffer = func (offer webrtc.SessionDescription ) {
@@ -473,7 +438,7 @@ func (e *RTCEngine) configure(
473438 falseVal := false
474439 maxRetries := uint16 (1 )
475440 e .dclock .Lock ()
476- e .lossyDC , err = e .publisher .PeerConnection () .CreateDataChannel (lossyDataChannelName , & webrtc.DataChannelInit {
441+ e .lossyDC , err = e .publisher .pc .CreateDataChannel (lossyDataChannelName , & webrtc.DataChannelInit {
477442 Ordered : & falseVal ,
478443 MaxRetransmits : & maxRetries ,
479444 })
@@ -483,7 +448,7 @@ func (e *RTCEngine) configure(
483448 }
484449 e .lossyDC .OnMessage (e .handleDataPacket )
485450
486- e .reliableDC , err = e .publisher .PeerConnection () .CreateDataChannel (reliableDataChannelName , & webrtc.DataChannelInit {
451+ e .reliableDC , err = e .publisher .pc .CreateDataChannel (reliableDataChannelName , & webrtc.DataChannelInit {
487452 Ordered : & trueVal ,
488453 })
489454 if err != nil {
@@ -492,10 +457,11 @@ func (e *RTCEngine) configure(
492457 }
493458 e .reliableDC .OnMessage (e .handleDataPacket )
494459
495- // SIGNALLING-V2-TODO: instantiating this rely on signal transport strategy rather than signalling version
460+ // SIGNALLING-V2-TODO: may need a separate peer connection
461+ // SIGNALLING-V2-TODO: instantiating this should rely on signal transport strategy rather than signalling version
496462 // SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
497463 if e .signallingVersion == signalling .SignallingVersionV2 {
498- e .signallingDC , err = e .publisher .PeerConnection () .CreateDataChannel (signallingDataChannelName , & webrtc.DataChannelInit {
464+ e .signallingDC , err = e .publisher .pc .CreateDataChannel (signallingDataChannelName , & webrtc.DataChannelInit {
499465 Ordered : & trueVal ,
500466 })
501467 if err != nil {
@@ -520,6 +486,100 @@ func (e *RTCEngine) configure(
520486 return nil
521487}
522488
489+ func (e * RTCEngine ) createSubscriberPCLocked (configuration webrtc.Configuration , isPrimary bool ) error {
490+ var err error
491+ if e .subscriber , err = NewPCTransport (PCTransportParams {
492+ Configuration : configuration ,
493+ RetransmitBufferSize : e .connParams .RetransmitBufferSize ,
494+ }); err != nil {
495+ return err
496+ }
497+ e .subscriber .SetLogger (e .log )
498+
499+ e .subscriber .OnRemoteDescriptionSettled (e .createSubscriberPCAnswerAndSend )
500+
501+ e .subscriber .pc .OnICECandidate (func (candidate * webrtc.ICECandidate ) {
502+ if candidate == nil {
503+ // done
504+ return
505+ }
506+ init := candidate .ToJSON ()
507+ e .log .Debugw (
508+ "local ICE candidate" ,
509+ "target" , livekit .SignalTarget_SUBSCRIBER ,
510+ "candidate" , init .Candidate ,
511+ )
512+ if err := e .signalTransport .SendMessage (
513+ e .signalling .SignalICECandidate (
514+ protosignalling .ToProtoTrickle (init , livekit .SignalTarget_SUBSCRIBER , false ),
515+ ),
516+ ); err != nil {
517+ e .log .Errorw ("could not send ICE candidates for subscriber" , err )
518+ }
519+ })
520+
521+ e .subscriber .pc .OnICEConnectionStateChange (func (state webrtc.ICEConnectionState ) {
522+ e .handleICEConnectionStateChange (e .subscriber , livekit .SignalTarget_SUBSCRIBER , isPrimary , state )
523+ })
524+
525+ e .subscriber .pc .OnTrack (func (remote * webrtc.TrackRemote , receiver * webrtc.RTPReceiver ) {
526+ e .engineHandler .OnMediaTrack (remote , receiver )
527+ })
528+
529+ e .subscriber .pc .OnDataChannel (func (c * webrtc.DataChannel ) {
530+ e .dclock .Lock ()
531+ defer e .dclock .Unlock ()
532+ if c .Label () == reliableDataChannelName {
533+ e .reliableDCSub = c
534+ } else if c .Label () == lossyDataChannelName {
535+ e .lossyDCSub = c
536+ } else {
537+ return
538+ }
539+ c .OnMessage (e .handleDataPacket )
540+ })
541+
542+ return nil
543+ }
544+
545+ func (e * RTCEngine ) handleICEConnectionStateChange (
546+ transport * PCTransport ,
547+ signalTarget livekit.SignalTarget ,
548+ isPrimary bool ,
549+ state webrtc.ICEConnectionState ,
550+ ) {
551+ switch state {
552+ case webrtc .ICEConnectionStateConnected :
553+ var fields []interface {}
554+ if pair , err := transport .GetSelectedCandidatePair (); err == nil {
555+ fields = append (fields , "transport" , signalTarget , "iceCandidatePair" , pair )
556+ }
557+ e .log .Debugw ("ICE connected" , fields ... )
558+ case webrtc .ICEConnectionStateDisconnected :
559+ e .log .Debugw ("ICE disconnected" , "transport" , signalTarget )
560+ case webrtc .ICEConnectionStateFailed :
561+ e .log .Debugw ("ICE failed" , "transport" , signalTarget )
562+ if isPrimary {
563+ e .handleDisconnect (false )
564+ }
565+ }
566+ }
567+
568+ func (e * RTCEngine ) closePeerConnections () {
569+ e .pclock .Lock ()
570+ defer e .pclock .Unlock ()
571+
572+ if e .publisher != nil {
573+ e .publisher .Close ()
574+ e .publisher = nil
575+ }
576+
577+ if e .subscriber != nil {
578+ e .subscriber .Close ()
579+ e .subscriber = nil
580+ }
581+ }
582+
523583func (e * RTCEngine ) GetDataChannel (kind livekit.DataPacket_Kind ) * webrtc.DataChannel {
524584 e .dclock .RLock ()
525585 defer e .dclock .RUnlock ()
@@ -787,6 +847,8 @@ func (e *RTCEngine) restartConnection() error {
787847 }
788848 e .signalTransport .Close ()
789849
850+ e .closePeerConnections ()
851+
790852 _ , err := e .JoinContext (context .TODO (), e .url , e .token .Load (), e .connParams )
791853 return err
792854}
@@ -1388,3 +1450,11 @@ func (e *RTCEngine) OnConnectResponse(res *livekit.ConnectResponse) error {
13881450 }
13891451 return nil
13901452}
1453+
1454+ // ------------------------------------
1455+
1456+ func setConfiguration (pcTransport * PCTransport , configuration webrtc.Configuration ) {
1457+ if pcTransport != nil {
1458+ pcTransport .SetConfiguration (configuration )
1459+ }
1460+ }
0 commit comments