Skip to content

Commit fb6301c

Browse files
authored
Support dynacast. (#690)
* Support dynacast. * get mime from simulcast codec * clean up * comments * logging * use next sample time as it is adjusted for sample duration
1 parent 22b67ed commit fb6301c

File tree

6 files changed

+115
-43
lines changed

6 files changed

+115
-43
lines changed

engine.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,30 +69,31 @@ type RTCEngine struct {
6969
JoinTimeout time.Duration
7070

7171
// callbacks
72-
OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse)
73-
OnTrackRemoteMuted func(request *livekit.MuteTrackRequest)
74-
OnDisconnected func(reason DisconnectionReason)
75-
OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
76-
OnParticipantUpdate func([]*livekit.ParticipantInfo)
77-
OnSpeakersChanged func([]*livekit.SpeakerInfo)
78-
OnDataReceived func(userPacket *livekit.UserPacket) // Deprecated: Use OnDataPacket instead
79-
OnDataPacket func(identity string, dataPacket DataPacket)
80-
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
81-
OnRoomUpdate func(room *livekit.Room)
82-
OnRoomMoved func(moved *livekit.RoomMovedResponse)
83-
OnRestarting func()
84-
OnRestarted func(*livekit.JoinResponse)
85-
OnResuming func()
86-
OnResumed func()
87-
OnTranscription func(*livekit.Transcription)
88-
OnSignalClientConnected func(*livekit.JoinResponse)
89-
OnRpcRequest func(callerIdentity, requestId, method, payload string, responseTimeout time.Duration, version uint32)
90-
OnRpcAck func(requestId string)
91-
OnRpcResponse func(requestId string, payload *string, error *RpcError)
92-
OnStreamHeader func(*livekit.DataStream_Header, string)
93-
OnStreamChunk func(*livekit.DataStream_Chunk)
94-
OnStreamTrailer func(*livekit.DataStream_Trailer)
95-
OnLocalTrackSubscribed func(trackSubscribed *livekit.TrackSubscribed)
72+
OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse)
73+
OnTrackRemoteMuted func(request *livekit.MuteTrackRequest)
74+
OnDisconnected func(reason DisconnectionReason)
75+
OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
76+
OnParticipantUpdate func([]*livekit.ParticipantInfo)
77+
OnSpeakersChanged func([]*livekit.SpeakerInfo)
78+
OnDataReceived func(userPacket *livekit.UserPacket) // Deprecated: Use OnDataPacket instead
79+
OnDataPacket func(identity string, dataPacket DataPacket)
80+
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
81+
OnRoomUpdate func(room *livekit.Room)
82+
OnRoomMoved func(moved *livekit.RoomMovedResponse)
83+
OnRestarting func()
84+
OnRestarted func(*livekit.JoinResponse)
85+
OnResuming func()
86+
OnResumed func()
87+
OnTranscription func(*livekit.Transcription)
88+
OnSignalClientConnected func(*livekit.JoinResponse)
89+
OnRpcRequest func(callerIdentity, requestId, method, payload string, responseTimeout time.Duration, version uint32)
90+
OnRpcAck func(requestId string)
91+
OnRpcResponse func(requestId string, payload *string, error *RpcError)
92+
OnStreamHeader func(*livekit.DataStream_Header, string)
93+
OnStreamChunk func(*livekit.DataStream_Chunk)
94+
OnStreamTrailer func(*livekit.DataStream_Trailer)
95+
OnLocalTrackSubscribed func(trackSubscribed *livekit.TrackSubscribed)
96+
OnSubscribedQualityUpdate func(subscribedQualityUpdate *livekit.SubscribedQualityUpdate)
9697

9798
onClose []func()
9899
onCloseLock sync.Mutex
@@ -145,6 +146,11 @@ func NewRTCEngine() *RTCEngine {
145146
f(trackSubscribed)
146147
}
147148
}
149+
e.client.OnSubscribedQualityUpdate = func(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
150+
if f := e.OnSubscribedQualityUpdate; f != nil {
151+
f(subscribedQualityUpdate)
152+
}
153+
}
148154
e.client.OnClose = func() { e.handleDisconnect(false) }
149155
e.onClose = []func(){}
150156

localtrack.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (s *LocalTrack) WriteSample(sample media.Sample, opts *SampleWriteOptions)
359359
// 4. PrevDroppedPackets -> number of dropped packets before this sample
360360
//
361361
// The goal here is to calculate RTP time stamp of provided sample.
362-
// Priority of what is used (eevn if multiple are provided)
362+
// Priority of what is used (even if multiple are provided)
363363
// 1. PacketTimestamp
364364
// 2. Timestamp
365365
// 3. Duration
@@ -602,6 +602,7 @@ func (s *LocalTrack) writeWorker(provider SampleProvider, onComplete func()) {
602602
}
603603
}
604604

605+
sample.Timestamp = nextSampleTime
605606
if err := s.WriteSample(sample, opts); err != nil {
606607
s.log.Errorw("could not write sample", err)
607608
return

publication.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,31 @@ func (p *trackPublicationBase) Track() Track {
7272
}
7373

7474
func (p *trackPublicationBase) MimeType() string {
75+
// This requires some more work.
76+
// TrackInfo has a top level MimeType which is not set
77+
// on server side till the track is published.
78+
// So, it is not available in the TrackPublishedResponse.
79+
//
80+
// But, if client specified SimulcastCodecs in AddTrackRequest,
81+
// the TrackPublishedResponse will have Codecs populated and
82+
// that will have the MimeType specified in AddTrackRequest.
83+
// Just taking the first one here which has a non-nil MimeType.
84+
// This is okay (for tracks published from here)
85+
// as of 2025-05-12, 1:30 pm Pacific as
86+
// Go SDK does not (yet) support simulcast codec feature.
87+
//
88+
// When simulcast codec feature is added, this struct needs
89+
// to be updated to handle multiple mime types
7590
if info, ok := p.info.Load().(*livekit.TrackInfo); ok {
76-
return info.MimeType
91+
if info.MimeType != "" {
92+
return info.MimeType
93+
}
94+
95+
for _, codec := range info.Codecs {
96+
if codec.MimeType != "" {
97+
return codec.MimeType
98+
}
99+
}
77100
}
78101
return ""
79102
}

remoteparticipant.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@ func (p *RemoteParticipant) updateInfo(pi *livekit.ParticipantInfo) {
9393
}
9494
}
9595

96-
func (p *RemoteParticipant) addSubscribedMediaTrack(track *webrtc.TrackRemote, trackSID string,
97-
receiver *webrtc.RTPReceiver) {
96+
func (p *RemoteParticipant) addSubscribedMediaTrack(
97+
track *webrtc.TrackRemote,
98+
trackSID string,
99+
receiver *webrtc.RTPReceiver,
100+
) {
98101
pub := p.getPublication(trackSID)
99102
if pub == nil {
100103
// wait for metadata to arrive

room.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func NewRoom(callback *RoomCallback) *Room {
238238
engine.OnStreamChunk = r.handleStreamChunk
239239
engine.OnStreamTrailer = r.handleStreamTrailer
240240
engine.OnLocalTrackSubscribed = r.handleLocalTrackSubscribed
241+
engine.OnSubscribedQualityUpdate = r.handleSubscribedQualityUpdate
241242

242243
// callbacks engine can use to get data
243244
engine.CbGetLocalParticipantSID = r.getLocalParticipantSID
@@ -843,6 +844,39 @@ func (r *Room) handleLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscrib
843844
r.callback.OnLocalTrackSubscribed(trackPublication, r.LocalParticipant)
844845
}
845846

847+
func (r *Room) handleSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
848+
trackPublication := r.LocalParticipant.getLocalPublication(subscribedQualityUpdate.TrackSid)
849+
if trackPublication == nil {
850+
r.log.Debugw("recieved subscribed quality update for unknown track", "trackID", subscribedQualityUpdate.TrackSid)
851+
return
852+
}
853+
854+
r.log.Infow(
855+
"handling subscribed quality update",
856+
"trackID", trackPublication.SID(),
857+
"mime", trackPublication.MimeType(),
858+
"subscribedQualityUpdate", protoLogger.Proto(subscribedQualityUpdate),
859+
)
860+
for _, subscribedCodec := range subscribedQualityUpdate.SubscribedCodecs {
861+
if !strings.HasSuffix(strings.ToLower(trackPublication.MimeType()), subscribedCodec.Codec) {
862+
continue
863+
}
864+
865+
for _, subscribedQuality := range subscribedCodec.Qualities {
866+
track := trackPublication.GetSimulcastTrack(subscribedQuality.Quality)
867+
if track != nil {
868+
track.setMuted(!subscribedQuality.Enabled)
869+
r.log.Infow(
870+
"updating layer enable",
871+
"trackID", trackPublication.SID(),
872+
"quality", subscribedQuality.Quality,
873+
"enabled", subscribedQuality.Enabled,
874+
)
875+
}
876+
}
877+
}
878+
}
879+
846880
func (r *Room) sendSyncState() {
847881
subscriber, ok := r.engine.Subscriber()
848882
if !ok || subscriber.pc.RemoteDescription() == nil {

signalclient.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,22 @@ type SignalClient struct {
4848
pendingResponse *livekit.SignalResponse
4949
readerClosedCh chan struct{}
5050

51-
OnClose func()
52-
OnAnswer func(sd webrtc.SessionDescription)
53-
OnOffer func(sd webrtc.SessionDescription)
54-
OnTrickle func(init webrtc.ICECandidateInit, target livekit.SignalTarget)
55-
OnParticipantUpdate func([]*livekit.ParticipantInfo)
56-
OnLocalTrackPublished func(response *livekit.TrackPublishedResponse)
57-
OnSpeakersChanged func([]*livekit.SpeakerInfo)
58-
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
59-
OnRoomUpdate func(room *livekit.Room)
60-
OnRoomMoved func(moved *livekit.RoomMovedResponse)
61-
OnTrackRemoteMuted func(request *livekit.MuteTrackRequest)
62-
OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse)
63-
OnTokenRefresh func(refreshToken string)
64-
OnLeave func(*livekit.LeaveRequest)
65-
OnLocalTrackSubscribed func(trackSubscribed *livekit.TrackSubscribed)
51+
OnClose func()
52+
OnAnswer func(sd webrtc.SessionDescription)
53+
OnOffer func(sd webrtc.SessionDescription)
54+
OnTrickle func(init webrtc.ICECandidateInit, target livekit.SignalTarget)
55+
OnParticipantUpdate func([]*livekit.ParticipantInfo)
56+
OnLocalTrackPublished func(response *livekit.TrackPublishedResponse)
57+
OnSpeakersChanged func([]*livekit.SpeakerInfo)
58+
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
59+
OnRoomUpdate func(room *livekit.Room)
60+
OnRoomMoved func(moved *livekit.RoomMovedResponse)
61+
OnTrackRemoteMuted func(request *livekit.MuteTrackRequest)
62+
OnLocalTrackUnpublished func(response *livekit.TrackUnpublishedResponse)
63+
OnTokenRefresh func(refreshToken string)
64+
OnLeave func(*livekit.LeaveRequest)
65+
OnLocalTrackSubscribed func(trackSubscribed *livekit.TrackSubscribed)
66+
OnSubscribedQualityUpdate func(subscribedQualityUpdate *livekit.SubscribedQualityUpdate)
6667
}
6768

6869
func NewSignalClient() *SignalClient {
@@ -422,6 +423,10 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) {
422423
if c.OnLocalTrackSubscribed != nil {
423424
c.OnLocalTrackSubscribed(msg.TrackSubscribed)
424425
}
426+
case *livekit.SignalResponse_SubscribedQualityUpdate:
427+
if c.OnSubscribedQualityUpdate != nil {
428+
c.OnSubscribedQualityUpdate(msg.SubscribedQualityUpdate)
429+
}
425430
}
426431
}
427432

0 commit comments

Comments
 (0)