Skip to content

Commit af54a02

Browse files
authored
Add Stats to jitter buffer, avoid divide by 0 (#613)
* add PacketsDropped to jitter buffer, avoid divide by 0 * more stats * add lock around PopSamples
1 parent 86eee81 commit af54a02

File tree

2 files changed

+77
-17
lines changed

2 files changed

+77
-17
lines changed

egressclient.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import (
1818
"context"
1919
"net/http"
2020

21+
"github.com/twitchtv/twirp"
22+
2123
"github.com/livekit/protocol/livekit"
2224
"github.com/livekit/protocol/utils/xtwirp"
23-
"github.com/twitchtv/twirp"
2425
)
2526

2627
type EgressClient struct {

pkg/jitter/buffer.go

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ type Buffer struct {
2929
maxLate uint32
3030
clockRate uint32
3131
onPacketDropped func()
32-
packetsDropped int
33-
packetsTotal int
32+
stats *BufferStats
3433
logger logger.Logger
3534

3635
mu sync.Mutex
@@ -45,6 +44,14 @@ type Buffer struct {
4544
minTS uint32
4645
}
4746

47+
type BufferStats struct {
48+
PacketsPushed uint64
49+
PaddingPushed uint64
50+
PacketsDropped uint64
51+
PacketsPopped uint64
52+
SamplesPopped uint64
53+
}
54+
4855
type packet struct {
4956
prev, next *packet
5057
start, end bool
@@ -57,6 +64,7 @@ func NewBuffer(depacketizer rtp.Depacketizer, clockRate uint32, maxLatency time.
5764
depacketizer: depacketizer,
5865
maxLate: uint32(float64(maxLatency) / float64(time.Second) * float64(clockRate)),
5966
clockRate: clockRate,
67+
stats: &BufferStats{},
6068
logger: logger.LogRLogger(logr.Discard()),
6169
}
6270
for _, opt := range opts {
@@ -78,7 +86,11 @@ func (b *Buffer) Push(pkt *rtp.Packet) {
7886
b.mu.Lock()
7987
defer b.mu.Unlock()
8088

81-
b.packetsTotal++
89+
b.stats.PacketsPushed++
90+
if pkt.Padding {
91+
b.stats.PaddingPushed++
92+
}
93+
8294
var start, end, padding bool
8395
if len(pkt.Payload) == 0 {
8496
// drop padding packets from the beginning of the stream
@@ -121,7 +133,7 @@ func (b *Buffer) Push(pkt *rtp.Packet) {
121133
} else if beforePrev && !outsidePrevRange {
122134
// drop if packet comes before previously pushed packet
123135
if !p.padding {
124-
b.packetsDropped++
136+
b.stats.PacketsDropped++
125137
if b.onPacketDropped != nil {
126138
b.onPacketDropped()
127139
}
@@ -229,18 +241,38 @@ func (b *Buffer) Pop(force bool) []*rtp.Packet {
229241
}
230242

231243
func (b *Buffer) PopSamples(force bool) [][]*rtp.Packet {
244+
b.mu.Lock()
245+
defer b.mu.Unlock()
246+
232247
if force {
233248
return b.forcePopSamples()
234249
} else {
235250
return b.popSamples()
236251
}
237252
}
238253

254+
func (b *Buffer) Stats() *BufferStats {
255+
b.mu.Lock()
256+
defer b.mu.Unlock()
257+
258+
return &BufferStats{
259+
PacketsPushed: b.stats.PacketsPushed,
260+
PacketsDropped: b.stats.PacketsDropped,
261+
PaddingPushed: b.stats.PaddingPushed,
262+
PacketsPopped: b.stats.PacketsPopped,
263+
SamplesPopped: b.stats.SamplesPopped,
264+
}
265+
}
266+
239267
func (b *Buffer) PacketLoss() float64 {
240268
b.mu.Lock()
241269
defer b.mu.Unlock()
242270

243-
return float64(b.packetsDropped) / float64(b.packetsTotal)
271+
if b.stats.PacketsPushed == 0 {
272+
return 0
273+
}
274+
275+
return float64(b.stats.PacketsDropped) / float64(b.stats.PacketsPushed)
244276
}
245277

246278
func (b *Buffer) forcePop() []*rtp.Packet {
@@ -249,7 +281,13 @@ func (b *Buffer) forcePop() []*rtp.Packet {
249281
var next *packet
250282
for c := b.head; c != nil; c = next {
251283
next = c.next
252-
packets = append(packets, c.packet)
284+
if !c.padding {
285+
packets = append(packets, c.packet)
286+
b.stats.PacketsPopped++
287+
}
288+
if c.end {
289+
b.stats.SamplesPopped++
290+
}
253291
b.free(c)
254292
}
255293

@@ -265,15 +303,24 @@ func (b *Buffer) forcePopSamples() [][]*rtp.Packet {
265303
var next *packet
266304
for c := b.head; c != nil; c = next {
267305
next = c.next
306+
268307
if c.start && len(sample) > 0 {
308+
b.stats.SamplesPopped++
269309
packets = append(packets, sample)
270310
sample = make([]*rtp.Packet, 0)
271311
}
272-
sample = append(sample, c.packet)
312+
313+
if !c.padding {
314+
sample = append(sample, c.packet)
315+
b.stats.PacketsPopped++
316+
}
317+
273318
if c.end {
319+
b.stats.SamplesPopped++
274320
packets = append(packets, sample)
275321
sample = make([]*rtp.Packet, 0)
276322
}
323+
277324
b.free(c)
278325
}
279326

@@ -301,16 +348,22 @@ func (b *Buffer) pop() []*rtp.Packet {
301348
var next *packet
302349
for c := b.head; ; c = next {
303350
next = c.next
304-
if !c.padding {
305-
packets = append(packets, c.packet)
306-
}
307351
if next != nil {
308352
if outsideRange(next.packet.SequenceNumber, c.packet.SequenceNumber) {
309353
// adjust minTS to account for sequence number reset
310354
b.minTS += next.packet.Timestamp - c.packet.Timestamp - b.maxSampleSize
311355
}
312356
next.prev = nil
313357
}
358+
359+
if !c.padding {
360+
packets = append(packets, c.packet)
361+
b.stats.PacketsPopped++
362+
}
363+
if c.end {
364+
b.stats.SamplesPopped++
365+
}
366+
314367
if c == end {
315368
b.prevSN = c.packet.SequenceNumber
316369
b.head = next
@@ -320,6 +373,7 @@ func (b *Buffer) pop() []*rtp.Packet {
320373
b.free(c)
321374
return packets
322375
}
376+
323377
b.free(c)
324378
}
325379
}
@@ -344,20 +398,24 @@ func (b *Buffer) popSamples() [][]*rtp.Packet {
344398
var next *packet
345399
for c := b.head; ; c = next {
346400
next = c.next
347-
if !c.padding {
348-
sample = append(sample, c.packet)
349-
}
350401
if next != nil {
351402
if outsideRange(next.packet.SequenceNumber, c.packet.SequenceNumber) {
352403
// adjust minTS to account for sequence number reset
353404
b.minTS += next.packet.Timestamp - c.packet.Timestamp - b.maxSampleSize
354405
}
355406
next.prev = nil
356407
}
408+
409+
if !c.padding {
410+
sample = append(sample, c.packet)
411+
b.stats.PacketsPopped++
412+
}
357413
if c.end {
414+
b.stats.SamplesPopped++
358415
packets = append(packets, sample)
359416
sample = make([]*rtp.Packet, 0)
360417
}
418+
361419
if c == end {
362420
b.prevSN = c.packet.SequenceNumber
363421
b.head = next
@@ -367,6 +425,7 @@ func (b *Buffer) popSamples() [][]*rtp.Packet {
367425
b.free(c)
368426
return packets
369427
}
428+
370429
b.free(c)
371430
}
372431
}
@@ -408,13 +467,13 @@ func (b *Buffer) drop() {
408467
// lost packets will now be too old even if we receive them
409468
// on sequence number reset, skip callback because we don't know whether we lost any
410469
if !b.head.reset {
411-
b.packetsDropped++
412470
dropped = true
471+
b.stats.PacketsDropped++
413472
}
414473

415474
for b.head != nil && !b.head.start && before32(b.head.packet.Timestamp-b.maxSampleSize, b.minTS) {
416475
dropped = true
417-
b.packetsDropped++
476+
b.stats.PacketsDropped++
418477
b.prevSN = b.head.packet.SequenceNumber - 1
419478
b.dropHead()
420479
}
@@ -435,7 +494,7 @@ func (b *Buffer) drop() {
435494
count := 0
436495
ts := c.packet.Timestamp
437496
for {
438-
b.packetsDropped++
497+
b.stats.PacketsDropped++
439498
count++
440499
b.dropHead()
441500
c = b.head

0 commit comments

Comments
 (0)