Skip to content

Commit 3b5079b

Browse files
authored
Maintain and display gap histogram. (#758)
* Maintain and display gap histogram. Also, split out stats into a struct for cleaner separation. * log
1 parent 1eede19 commit 3b5079b

File tree

2 files changed

+103
-26
lines changed

2 files changed

+103
-26
lines changed

pkg/synchronizer/synchronizer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (s *Synchronizer) RemoveTrack(trackID string) {
203203

204204
p.Lock()
205205
if ts := p.tracks[ssrc]; ts != nil {
206-
ts.sync = nil
206+
ts.Close()
207207
}
208208
delete(p.tracks, ssrc)
209209
p.Unlock()

pkg/synchronizer/track.go

Lines changed: 102 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package synchronizer
1616

1717
import (
18+
"fmt"
1819
"io"
1920
"math"
2021
"sync"
@@ -36,6 +37,7 @@ const (
3637
cStartTimeAdjustThreshold = 5 * time.Second
3738

3839
cHighDriftLoggingThreshold = 20 * time.Millisecond
40+
cGapHistogramNumBins = 101
3941
)
4042

4143
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -69,6 +71,7 @@ type TrackSynchronizer struct {
6971
startRTP uint32 // RTP timestamp of first packet
7072
firstTime time.Time // time at which first packet was pushed
7173
lastTS uint32 // previous RTP timestamp
74+
lastSN uint16 // previous RTP sequence number
7275
lastTime time.Time
7376
lastPTS time.Duration // previous presentation timestamp
7477
lastPTSAdjusted time.Duration // previous adjusted presentation timestamp
@@ -92,14 +95,7 @@ type TrackSynchronizer struct {
9295
totalStartTimeAdjustment time.Duration
9396
startTimeAdjustResidual time.Duration
9497

95-
// packet stats
96-
numEmitted uint32
97-
numDroppedOld uint32
98-
numDroppedOutOfOrder uint32
99-
numDroppedEOF uint32
100-
101-
// sender report stats
102-
numSenderReports uint32
98+
stats stats
10399
}
104100

105101
func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer {
@@ -132,7 +128,13 @@ func (t *TrackSynchronizer) OnSenderReport(f func(drift time.Duration)) {
132128
// Initialize should be called as soon as the first packet is received
133129
func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
134130
now := mono.Now()
135-
startedAt := t.sync.getOrSetStartedAt(now.UnixNano())
131+
t.Lock()
132+
synchronizer := t.sync
133+
t.Unlock()
134+
if synchronizer == nil {
135+
return
136+
}
137+
startedAt := synchronizer.getOrSetStartedAt(now.UnixNano())
136138

137139
t.Lock()
138140
defer t.Unlock()
@@ -159,6 +161,14 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
159161
)
160162
}
161163

164+
func (t *TrackSynchronizer) Close() {
165+
t.Lock()
166+
defer t.Unlock()
167+
168+
t.sync = nil
169+
t.logger.Infow("closing track synchronizer", "state", t)
170+
}
171+
162172
// GetPTS will adjust PTS offsets if necessary
163173
// Packets are expected to be in order
164174
func (t *TrackSynchronizer) GetPTS(pkt jitter.ExtPacket) (time.Duration, error) {
@@ -188,14 +198,17 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
188198
"pktReceiveTime", pkt.ReceivedAt,
189199
"startDelay", t.firstTime.Sub(pkt.ReceivedAt),
190200
)
201+
} else {
202+
t.updateGapHistogram(pkt.SequenceNumber - t.lastSN)
191203
}
204+
t.lastSN = pkt.SequenceNumber
192205

193206
ts := pkt.Timestamp
194207

195208
// if first packet of a frame was accepted,
196209
// accept all packets of the frame even if they are old
197210
if ts == t.lastTS {
198-
t.numEmitted++
211+
t.stats.numEmitted++
199212
return t.lastPTSAdjusted, nil
200213
}
201214

@@ -204,7 +217,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
204217
// drop all packets of the frame irrespective of whether they are old or not
205218
if ts == t.lastTSOldDropped || t.isPacketTooOld(pkt.ReceivedAt) {
206219
t.lastTSOldDropped = ts
207-
t.numDroppedOld++
220+
t.stats.numDroppedOld++
208221
t.logger.Infow(
209222
"dropping old packet",
210223
"currentTS", ts,
@@ -276,7 +289,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
276289

277290
// if past end time, return EOF
278291
if t.maxPTS > 0 && (adjusted > t.maxPTS) {
279-
t.numDroppedEOF++
292+
t.stats.numDroppedEOF++
280293
return 0, io.EOF
281294
}
282295

@@ -286,7 +299,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
286299
t.lastPTS = pts
287300
t.lastPTSAdjusted = adjusted
288301

289-
t.numEmitted++
302+
t.stats.numEmitted++
290303
return adjusted, nil
291304
}
292305

@@ -303,20 +316,23 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
303316
"pktReceiveTime", pkt.ReceivedAt,
304317
"startDelay", t.firstTime.Sub(pkt.ReceivedAt),
305318
)
319+
} else {
320+
t.updateGapHistogram(pkt.SequenceNumber - t.lastSN)
306321
}
322+
t.lastSN = pkt.SequenceNumber
307323

308324
ts := pkt.Timestamp
309325

310326
// if first packet of a frame was accepted,
311327
// accept all packets of the frame even if they are old
312328
if ts == t.lastTS {
313-
t.numEmitted++
329+
t.stats.numEmitted++
314330
return t.lastPTSAdjusted, nil
315331
}
316332

317333
// packets are expected in order, just a safety net
318334
if t.lastTS != 0 && (ts-t.lastTS) > (1<<31) {
319-
t.numDroppedOutOfOrder++
335+
t.stats.numDroppedOutOfOrder++
320336
t.logger.Infow(
321337
"dropping out-of-order packet",
322338
"currentTS", ts,
@@ -329,7 +345,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
329345
// drop all packets of the frame irrespective of whether they are old or not
330346
if ts == t.lastTSOldDropped || t.isPacketTooOld(pkt.ReceivedAt) {
331347
t.lastTSOldDropped = ts
332-
t.numDroppedOld++
348+
t.stats.numDroppedOld++
333349
t.logger.Infow(
334350
"dropping old packet",
335351
"currentTS", ts,
@@ -411,7 +427,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
411427

412428
// if past end time, return EOF
413429
if t.maxPTS > 0 && (adjusted > t.maxPTS) {
414-
t.numDroppedEOF++
430+
t.stats.numDroppedEOF++
415431
return 0, io.EOF
416432
}
417433

@@ -421,7 +437,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
421437
t.lastPTS = pts
422438
t.lastPTSAdjusted = adjusted
423439

424-
t.numEmitted++
440+
t.stats.numEmitted++
425441
return adjusted, nil
426442
}
427443

@@ -459,7 +475,7 @@ func (t *TrackSynchronizer) onSenderReportWithoutRebase(pkt *rtcp.SenderReport)
459475
return
460476
}
461477

462-
t.numSenderReports++
478+
t.stats.numSenderReports++
463479

464480
var pts time.Duration
465481
if pkt.RTPTime > t.lastTS {
@@ -530,7 +546,7 @@ func (t *TrackSynchronizer) onSenderReportWithRebase(pkt *rtcp.SenderReport) {
530546
return
531547
}
532548

533-
t.numSenderReports++
549+
t.stats.numSenderReports++
534550

535551
var ptsSR time.Duration
536552
if (pkt.RTPTime - t.lastTS) < (1 << 31) {
@@ -719,6 +735,21 @@ func (t *TrackSynchronizer) applyQuantizedStartTimeAdvance(deltaTotal time.Durat
719735
return 0
720736
}
721737

738+
func (t *TrackSynchronizer) updateGapHistogram(gap uint16) {
739+
if gap < 2 {
740+
return
741+
}
742+
743+
missing := gap - 1
744+
if int(missing) > len(t.stats.gapHistogram) {
745+
t.stats.gapHistogram[len(t.stats.gapHistogram)-1]++
746+
} else {
747+
t.stats.gapHistogram[missing-1]++
748+
}
749+
750+
t.stats.largestGap = max(missing, t.stats.largestGap)
751+
}
752+
722753
func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
723754
if t == nil {
724755
return nil
@@ -738,16 +769,13 @@ func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
738769
e.AddDuration("basePTSOffset", t.basePTSOffset)
739770
e.AddDuration("totalPTSAdjustmentPositive", t.totalPTSAdjustmentPositive)
740771
e.AddDuration("totalPTSAdjustmentNegative", t.totalPTSAdjustmentNegative)
772+
e.AddUint16("lastSN", t.lastSN)
741773
e.AddObject("lastSR", wrappedAugmentedSenderReportLogger{t.lastSR})
742774
e.AddTime("nextPTSAdjustmentAt", t.nextPTSAdjustmentAt)
743775
e.AddObject("propagationDelayEstimator", t.propagationDelayEstimator)
744776
e.AddDuration("totalStartTimeAdjustment", t.totalStartTimeAdjustment)
745777
e.AddDuration("startTimeAdjustResidual", t.startTimeAdjustResidual)
746-
e.AddUint32("numEmitted", t.numEmitted)
747-
e.AddUint32("numDroppedOld", t.numDroppedOld)
748-
e.AddUint32("numDroppedOutOfOrder", t.numDroppedOutOfOrder)
749-
e.AddUint32("numDroppedEOF", t.numDroppedEOF)
750-
e.AddUint32("numSenderReports", t.numSenderReports)
778+
e.AddObject("stats", t.stats)
751779
return nil
752780
}
753781

@@ -807,3 +835,52 @@ func (w wrappedAugmentedSenderReportLogger) MarshalLogObject(e zapcore.ObjectEnc
807835
e.AddTime("receivedAtAdjusted", time.Unix(0, asr.receivedAtAdjusted))
808836
return nil
809837
}
838+
839+
// -----------------------------
840+
841+
type stats struct {
842+
// packet stats
843+
numEmitted uint32
844+
numDroppedOld uint32
845+
numDroppedOutOfOrder uint32
846+
numDroppedEOF uint32
847+
848+
gapHistogram [cGapHistogramNumBins]uint32
849+
largestGap uint16
850+
851+
// sender report stats
852+
numSenderReports uint32
853+
}
854+
855+
func (s stats) MarshalLogObject(e zapcore.ObjectEncoder) error {
856+
e.AddUint32("numEmitted", s.numEmitted)
857+
e.AddUint32("numDroppedOld", s.numDroppedOld)
858+
e.AddUint32("numDroppedOutOfOrder", s.numDroppedOutOfOrder)
859+
e.AddUint32("numDroppedEOF", s.numDroppedEOF)
860+
861+
hasLoss := false
862+
first := true
863+
str := "["
864+
for burst, count := range s.gapHistogram {
865+
if count == 0 {
866+
continue
867+
}
868+
869+
hasLoss = true
870+
871+
if !first {
872+
str += ", "
873+
}
874+
first = false
875+
str += fmt.Sprintf("%d:%d", burst+1, count)
876+
}
877+
str += "]"
878+
if hasLoss {
879+
e.AddString("gapHistogram", str)
880+
}
881+
882+
e.AddUint16("largestGap", s.largestGap)
883+
884+
e.AddUint32("numSenderReports", s.numSenderReports)
885+
return nil
886+
}

0 commit comments

Comments
 (0)