Skip to content

Commit 1477da5

Browse files
authored
apply maxAdjustment to initial track sync (#615)
* apply maxAdjustment to initial track sync * simplify onSenderReport
1 parent 3ccd0ff commit 1477da5

File tree

3 files changed

+60
-49
lines changed

3 files changed

+60
-49
lines changed

pkg/synchronizer/participant.go

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,51 +28,54 @@ type participantSynchronizer struct {
2828
sync.Mutex
2929

3030
ntpStart time.Time
31+
firstReport time.Time
3132
tracks map[uint32]*TrackSynchronizer
3233
senderReports map[uint32]*rtcp.SenderReport
3334
}
3435

36+
func newParticipantSynchronizer() *participantSynchronizer {
37+
return &participantSynchronizer{
38+
tracks: make(map[uint32]*TrackSynchronizer),
39+
senderReports: make(map[uint32]*rtcp.SenderReport),
40+
}
41+
}
42+
3543
func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
3644
p.Lock()
3745
defer p.Unlock()
3846

3947
if p.ntpStart.IsZero() {
40-
p.senderReports[pkt.SSRC] = pkt
41-
if len(p.senderReports) == len(p.tracks) {
42-
p.synchronizeTracks()
43-
}
44-
return
45-
}
46-
47-
if t := p.tracks[pkt.SSRC]; t != nil {
48+
p.initialize(pkt)
49+
} else if t := p.tracks[pkt.SSRC]; t != nil {
4850
t.onSenderReport(pkt, p.ntpStart)
4951
}
5052
}
5153

52-
func (p *participantSynchronizer) synchronizeTracks() {
53-
// get estimated ntp start times for all tracks
54-
estimatedStartTimes := make(map[uint32]time.Time)
55-
56-
// we will sync all tracks to the earliest
57-
var earliestStart time.Time
58-
for ssrc, pkt := range p.senderReports {
59-
t := p.tracks[ssrc]
60-
pts := t.getSenderReportPTS(pkt)
61-
ntpStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts)
62-
if earliestStart.IsZero() || ntpStart.Before(earliestStart) {
63-
earliestStart = ntpStart
54+
func (p *participantSynchronizer) initialize(pkt *rtcp.SenderReport) {
55+
if p.firstReport.IsZero() {
56+
p.firstReport = time.Now()
57+
}
58+
59+
p.senderReports[pkt.SSRC] = pkt
60+
if len(p.senderReports) < len(p.tracks) && time.Since(p.firstReport) < 5*time.Second {
61+
return
62+
}
63+
64+
// update ntp start time
65+
for ssrc, report := range p.senderReports {
66+
if t := p.tracks[ssrc]; t != nil {
67+
pts := t.getSenderReportPTS(report)
68+
ntpStart := mediatransportutil.NtpTime(report.NTPTime).Time().Add(-pts)
69+
if p.ntpStart.IsZero() || ntpStart.Before(p.ntpStart) {
70+
p.ntpStart = ntpStart
71+
}
6472
}
65-
estimatedStartTimes[ssrc] = ntpStart
6673
}
67-
p.ntpStart = earliestStart
68-
69-
// update pts delay so all ntp start times will match the earliest
70-
for ssrc, startedAt := range estimatedStartTimes {
71-
t := p.tracks[ssrc]
72-
if diff := startedAt.Sub(earliestStart); diff != 0 {
73-
t.Lock()
74-
t.ptsOffset += diff
75-
t.Unlock()
74+
75+
// call onSenderReport for all tracks
76+
for ssrc, report := range p.senderReports {
77+
if t := p.tracks[ssrc]; t != nil {
78+
t.onSenderReport(report, p.ntpStart)
7679
}
7780
}
7881
}
@@ -83,10 +86,12 @@ func (p *participantSynchronizer) getMaxOffset() time.Duration {
8386
p.Lock()
8487
for _, t := range p.tracks {
8588
t.Lock()
86-
if o := t.ptsOffset; o > maxOffset {
89+
o := t.ptsOffset
90+
t.Unlock()
91+
92+
if o > maxOffset {
8793
maxOffset = o
8894
}
89-
t.Unlock()
9095
}
9196
p.Unlock()
9297

pkg/synchronizer/synchronizer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ func (s *Synchronizer) AddTrack(track TrackRemote, identity string) *TrackSynchr
4949
s.Lock()
5050
p := s.psByIdentity[identity]
5151
if p == nil {
52-
p = &participantSynchronizer{
53-
tracks: make(map[uint32]*TrackSynchronizer),
54-
senderReports: make(map[uint32]*rtcp.SenderReport),
55-
}
52+
p = newParticipantSynchronizer()
5653
s.psByIdentity[identity] = p
5754
}
5855
ssrc := uint32(track.SSRC())
@@ -100,6 +97,7 @@ func (s *Synchronizer) getOrSetStartedAt(now int64) int64 {
10097

10198
if s.startedAt == 0 {
10299
s.startedAt = now
100+
103101
if s.onStarted != nil {
104102
s.onStarted()
105103
}

pkg/synchronizer/track.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
const (
3333
ewmaWeight = 0.9
34-
maxDrift = time.Millisecond * 15
34+
maxAdjustment = time.Millisecond * 15
3535
maxTSDiff = time.Minute
3636
maxSNDropout = 3000 // max sequence number skip
3737
uint32Half int64 = 2147483648
@@ -167,7 +167,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
167167
for ts < t.firstTS-uint32Half {
168168
ts += uint32Overflow
169169
}
170-
pts := t.getElapsed(ts) + t.ptsOffset
170+
pts := t.getPTS(ts)
171171
return ts, pts, true
172172
}
173173

@@ -198,7 +198,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
198198
}
199199

200200
// sanity check
201-
pts := t.getElapsed(ts) + t.ptsOffset
201+
pts := t.getPTS(ts)
202202
if expected := time.Since(t.startedAt.Add(t.ptsOffset)); pts > expected+maxTSDiff {
203203
// reset RTP timestamps
204204
ts, pts = t.resetRTP(pkt, []any{
@@ -212,8 +212,8 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
212212
return ts, pts, true
213213
}
214214

215-
func (t *TrackSynchronizer) getElapsed(ts int64) time.Duration {
216-
return t.rtpConverter.toDuration(ts - t.firstTS)
215+
func (t *TrackSynchronizer) getPTS(ts int64) time.Duration {
216+
return t.rtpConverter.toDuration(ts-t.firstTS) + t.ptsOffset
217217
}
218218

219219
func (t *TrackSynchronizer) resetRTP(pkt *rtp.Packet, fields []any) (int64, time.Duration) {
@@ -334,31 +334,38 @@ func (t *TrackSynchronizer) getSenderReportPTSLocked(pkt *rtcp.SenderReport) tim
334334
ts += uint32Overflow
335335
}
336336

337-
return t.getElapsed(ts) + t.ptsOffset
337+
return t.getPTS(ts)
338338
}
339339

340340
// onSenderReport handles pts adjustments for a track
341341
func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport, ntpStart time.Time) {
342342
t.Lock()
343343
defer t.Unlock()
344344

345-
// we receive every sender report twice
346-
if pkt.RTPTime == t.lastSR {
345+
if pkt.RTPTime == t.lastSR || t.startedAt.IsZero() {
347346
return
348347
}
349348

350349
pts := t.getSenderReportPTSLocked(pkt)
351350
calculatedNTPStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts)
352351
drift := calculatedNTPStart.Sub(ntpStart)
353352

353+
t.adjustOffsetLocked(drift)
354+
t.lastSR = pkt.RTPTime
355+
}
356+
357+
func (t *TrackSynchronizer) adjustOffsetLocked(drift time.Duration) {
358+
if drift == 0 {
359+
return
360+
}
361+
354362
t.stats.updateDrift(drift)
355-
if drift > maxDrift {
356-
drift = maxDrift
357-
} else if drift < -maxDrift {
358-
drift = -maxDrift
363+
if drift > maxAdjustment {
364+
drift = maxAdjustment
365+
} else if drift < -maxAdjustment {
366+
drift = -maxAdjustment
359367
}
360368
t.ptsOffset += drift
361-
t.lastSR = pkt.RTPTime
362369
}
363370

364371
type TrackStats struct {
@@ -371,6 +378,7 @@ func (t *TrackStats) updateDrift(drift time.Duration) {
371378
if drift < 0 {
372379
drift = -drift
373380
}
381+
374382
t.AvgDrift = ewmaWeight*t.AvgDrift + (1-ewmaWeight)*float64(drift)
375383
if drift > t.MaxDrift {
376384
t.MaxDrift = drift

0 commit comments

Comments
 (0)