Skip to content

Commit ed02e94

Browse files
authored
Adding burst estimator based start gate for initializing track synchronizer (#759)
* Adding burst estimator based start gate for initializing track synchronizer RTP packets arriving at a receiver site might come in a burst because of reconnect sending older packets through a NACK + retransmission. Anchoring track start time during phases of bursts would result in setting wrong start time for the track and breaking inter-track syncrhonization. Adding burst esitmator gate before init which will check between packets RTP and reception time and would allow init only when pace stabilizes (no burst is onging).
1 parent 3b5079b commit ed02e94

File tree

4 files changed

+325
-3
lines changed

4 files changed

+325
-3
lines changed

pkg/synchronizer/start_gate.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package synchronizer
2+
3+
import (
4+
"strings"
5+
"time"
6+
7+
"github.com/livekit/media-sdk/jitter"
8+
"github.com/livekit/protocol/logger"
9+
"github.com/pion/webrtc/v4"
10+
)
11+
12+
// startGate is a lightweight buffer that decides when a track should be
13+
// considered "live". Callers push packets into the gate until it returns a
14+
// slice of packets that can be used to initialize downstream synchronisation.
15+
// The second return value exposes how many packets were discarded while the
16+
// gate was waiting for stability, and the third return value indicates whether
17+
// the gate has finished its job.
18+
type startGate interface {
19+
Push(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool)
20+
}
21+
22+
// burstEstimatorGate implements startGate using the burst-estimation logic. It
23+
// buffers packets until their arrival cadence matches the RTP timestamp spacing
24+
// closely enough to assume we are caught up with the realtime stream. Minimum
25+
// arrival spacing is derived from a fraction of the expected RTP interval
26+
type burstEstimatorGate struct {
27+
clockRate uint32
28+
maxSkewFactor float64
29+
minArrivalFactor float64
30+
scoreTarget int
31+
maxStableDuration time.Duration
32+
flushAfter time.Duration
33+
firstArrival time.Time
34+
logger logger.Logger
35+
36+
score int
37+
lastTS uint32
38+
lastArrival time.Time
39+
hasLast bool
40+
done bool
41+
buffer []jitter.ExtPacket
42+
maxBuffer int
43+
44+
// stats
45+
totalDropped int
46+
}
47+
48+
func newStartGate(clockRate uint32, kind webrtc.RTPCodecType, logger logger.Logger) startGate {
49+
be := &burstEstimatorGate{
50+
clockRate: clockRate,
51+
maxSkewFactor: 0.3,
52+
minArrivalFactor: 0.2,
53+
scoreTarget: 5,
54+
maxBuffer: 1000, // high bitrate key frames can span hundreds of packets
55+
maxStableDuration: time.Second,
56+
flushAfter: 2 * time.Second,
57+
logger: logger,
58+
}
59+
60+
if kind == webrtc.RTPCodecTypeAudio {
61+
be.maxBuffer = 200
62+
}
63+
64+
return be
65+
}
66+
67+
// Push feeds one packet into the gate. Once pacing stabilizes it returns the
68+
// buffered packets that should be used to initialize the track synchronizer,
69+
// along with the number of packets dropped while waiting and a done flag.
70+
func (b *burstEstimatorGate) Push(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool) {
71+
if b.done {
72+
return nil, 0, true
73+
}
74+
75+
elapsed := time.Duration(0)
76+
if !b.firstArrival.IsZero() {
77+
elapsed = time.Since(b.firstArrival)
78+
} else {
79+
b.firstArrival = time.Now()
80+
}
81+
if b.flushAfter > 0 && elapsed > b.flushAfter {
82+
ready := append(b.buffer, pkt)
83+
b.buffer = nil
84+
b.done = true
85+
b.logCompletion("flush_after_timeout")
86+
return ready, 0, true
87+
}
88+
89+
if len(pkt.Payload) == 0 {
90+
return nil, 0, false
91+
}
92+
93+
if !b.hasLast {
94+
b.lastTS = pkt.Timestamp
95+
b.lastArrival = pkt.ReceivedAt
96+
b.hasLast = true
97+
b.buffer = append(b.buffer[:0], pkt)
98+
return nil, 0, false
99+
}
100+
101+
signedTsDelta := int32(pkt.Timestamp - b.lastTS)
102+
arrivalDelta := pkt.ReceivedAt.Sub(b.lastArrival)
103+
104+
if signedTsDelta < 0 {
105+
// ignore out-of-order packets while continuing to wait for stability
106+
return nil, 0, false
107+
}
108+
109+
if signedTsDelta == 0 {
110+
// multiple packets with the same timestamp (e.g. key frame)
111+
b.buffer = append(b.buffer, pkt)
112+
if dropped := b.enforceWindow(); dropped > 0 {
113+
return nil, dropped, false
114+
}
115+
return nil, 0, false
116+
}
117+
118+
b.lastTS = pkt.Timestamp
119+
b.lastArrival = pkt.ReceivedAt
120+
121+
tsDuration := b.timestampToDuration(uint32(signedTsDelta))
122+
123+
minArrival := time.Duration(float64(tsDuration) * b.minArrivalFactor)
124+
125+
if arrivalDelta < minArrival {
126+
dropped := b.restartSequence(pkt)
127+
return nil, dropped, false
128+
}
129+
130+
skew := arrivalDelta - tsDuration
131+
if skew < 0 {
132+
skew = -skew
133+
}
134+
135+
maxSkew := time.Duration(float64(tsDuration) * b.maxSkewFactor)
136+
137+
if skew > maxSkew {
138+
dropped := b.restartSequence(pkt)
139+
return nil, dropped, false
140+
}
141+
142+
b.buffer = append(b.buffer, pkt)
143+
if dropped := b.enforceWindow(); dropped > 0 {
144+
return nil, dropped, false
145+
}
146+
147+
b.score++
148+
149+
reachedScore := b.score >= b.scoreTarget
150+
reachedDuration := b.totalBufferedDuration() >= b.maxStableDuration
151+
if reachedScore || reachedDuration {
152+
b.done = true
153+
ready := b.buffer
154+
b.buffer = nil
155+
reasons := make([]string, 0, 2)
156+
if reachedScore {
157+
reasons = append(reasons, "score_target")
158+
}
159+
if reachedDuration {
160+
reasons = append(reasons, "max_stable_duration")
161+
}
162+
reason := strings.Join(reasons, ",")
163+
b.logCompletion(reason)
164+
return ready, 0, true
165+
}
166+
167+
return nil, 0, false
168+
}
169+
170+
func (b *burstEstimatorGate) timestampToDuration(delta uint32) time.Duration {
171+
if b.clockRate == 0 {
172+
return 0
173+
}
174+
return time.Duration(int64(delta) * int64(time.Second) / int64(b.clockRate))
175+
}
176+
177+
func (b *burstEstimatorGate) enforceWindow() int {
178+
if b.maxBuffer == 0 || len(b.buffer) <= b.maxBuffer {
179+
return 0
180+
}
181+
dropped := len(b.buffer) - b.maxBuffer
182+
b.buffer = b.buffer[dropped:]
183+
b.score = 0
184+
b.totalDropped += dropped
185+
return dropped
186+
}
187+
188+
func (b *burstEstimatorGate) restartSequence(seed jitter.ExtPacket) int {
189+
dropped := len(b.buffer)
190+
b.buffer = b.buffer[:0]
191+
b.score = 0
192+
b.buffer = append(b.buffer, seed)
193+
b.totalDropped += dropped
194+
return dropped
195+
}
196+
197+
func (b *burstEstimatorGate) totalBufferedDuration() time.Duration {
198+
if len(b.buffer) < 2 {
199+
return 0
200+
}
201+
first := b.buffer[0].Timestamp
202+
last := b.buffer[len(b.buffer)-1].Timestamp
203+
return b.timestampToDuration(last - first)
204+
}
205+
206+
func (b *burstEstimatorGate) logCompletion(reason string) {
207+
if b.logger == nil {
208+
return
209+
}
210+
211+
b.logger.Infow(
212+
"start gate sequence completed",
213+
"reason", reason,
214+
"score", b.score,
215+
"elapsed", time.Since(b.firstArrival),
216+
"droppedPackets", b.totalDropped,
217+
)
218+
}

pkg/synchronizer/synchronizer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type SynchronizerConfig struct {
3939
PreJitterBufferReceiveTimeEnabled bool
4040
RTCPSenderReportRebaseEnabled bool
4141
OldPacketThreshold time.Duration
42+
EnableStartGate bool
4243

4344
OnStarted func()
4445
}
@@ -108,6 +109,13 @@ func WithOldPacketThreshold(oldPacketThreshold time.Duration) SynchronizerOption
108109
}
109110
}
110111

112+
// WithStartGate enabled will buffer incoming packets until pacing stabilizes before initializing tracks
113+
func WithStartGate() SynchronizerOption {
114+
return func(config *SynchronizerConfig) {
115+
config.EnableStartGate = true
116+
}
117+
}
118+
111119
// WithOnStarted sets the callback to be called when the synchronizer starts
112120
func WithOnStarted(onStarted func()) SynchronizerOption {
113121
return func(config *SynchronizerConfig) {

pkg/synchronizer/track.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type TrackSynchronizer struct {
5656
track TrackRemote
5757
logger logger.Logger
5858
*rtpConverter
59+
startGate startGate
5960

6061
// config
6162
maxTsDiff time.Duration // maximum acceptable difference between RTP packets
@@ -94,6 +95,7 @@ type TrackSynchronizer struct {
9495
propagationDelayEstimator *OWDEstimator
9596
totalStartTimeAdjustment time.Duration
9697
startTimeAdjustResidual time.Duration
98+
initialized bool
9799

98100
stats stats
99101
}
@@ -115,6 +117,10 @@ func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer
115117
propagationDelayEstimator: NewOWDEstimator(OWDEstimatorParamsDefault),
116118
}
117119

120+
if s.config.EnableStartGate {
121+
t.startGate = newStartGate(track.Codec().ClockRate, track.Kind(), t.logger)
122+
}
123+
118124
return t
119125
}
120126

@@ -125,19 +131,56 @@ func (t *TrackSynchronizer) OnSenderReport(f func(drift time.Duration)) {
125131
t.onSR = f
126132
}
127133

134+
// PrimeForStart buffers incoming packets until pacing stabilizes, initializing the
135+
// track synchronizer automatically once a suitable sequence has been observed.
136+
// It returns the packets that should be forwarded, the number of packets
137+
// dropped while waiting, and a boolean indicating whether the track is ready to
138+
// process samples. After the gate finishes, there is no need to call the API again.
139+
func (t *TrackSynchronizer) PrimeForStart(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool) {
140+
if !t.initialized && len(pkt.Payload) == 0 {
141+
// skip dummy packets until the track is initialized
142+
return nil, 0, false
143+
}
144+
145+
if t.initialized || t.startGate == nil {
146+
if !t.initialized {
147+
t.Initialize(pkt.Packet)
148+
}
149+
return []jitter.ExtPacket{pkt}, 0, true
150+
}
151+
152+
ready, dropped, done := t.startGate.Push(pkt)
153+
if !done {
154+
return nil, dropped, false
155+
}
156+
157+
if len(ready) == 0 {
158+
ready = []jitter.ExtPacket{pkt}
159+
}
160+
161+
if !t.initialized {
162+
t.Initialize(ready[0].Packet)
163+
}
164+
165+
return ready, dropped, true
166+
}
167+
128168
// Initialize should be called as soon as the first packet is received
129169
func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
130170
now := mono.Now()
131171
t.Lock()
132172
synchronizer := t.sync
133173
t.Unlock()
134-
if synchronizer == nil {
135-
return
174+
startedAt := now.UnixNano()
175+
if synchronizer != nil {
176+
startedAt = synchronizer.getOrSetStartedAt(now.UnixNano())
136177
}
137-
startedAt := synchronizer.getOrSetStartedAt(now.UnixNano())
138178

139179
t.Lock()
140180
defer t.Unlock()
181+
if t.initialized {
182+
return
183+
}
141184

142185
t.currentPTSOffset = time.Duration(now.UnixNano() - startedAt)
143186
t.desiredPTSOffset = t.currentPTSOffset
@@ -147,6 +190,7 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
147190
t.startRTP = pkt.Timestamp
148191
t.lastPTS = 0
149192
t.lastPTSAdjusted = t.currentPTSOffset
193+
t.initialized = true
150194
t.logger.Infow(
151195
"initialized track synchronizer",
152196
"state", t,

pkg/synchronizer/track_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,58 @@ func TestShouldAdjustPTS_Deadband_AboveAdjusts(t *testing.T) {
205205
require.True(t, ts.shouldAdjustPTS(), "delta > step should allow adjustment")
206206
}
207207

208+
func TestPrimeForStartWithStartGate(t *testing.T) {
209+
clock := uint32(90000)
210+
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeVideo)
211+
ts.startGate = newStartGate(clock, webrtc.RTPCodecTypeVideo, ts.logger)
212+
ts.sync = NewSynchronizerWithOptions()
213+
214+
stepDur := 20 * time.Millisecond
215+
step := ts.rtpConverter.toRTP(stepDur)
216+
baseTS := ts.startRTP
217+
base := time.Now()
218+
219+
for i := 0; i < 5; i++ {
220+
timestamp := baseTS + uint32(i+1)*step
221+
pkt := jitter.ExtPacket{
222+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: timestamp}, Payload: []byte{0x01}},
223+
ReceivedAt: base.Add(time.Duration(i+1) * stepDur),
224+
}
225+
ready, dropped, done := ts.PrimeForStart(pkt)
226+
require.False(t, done)
227+
require.Zero(t, dropped)
228+
require.Nil(t, ready)
229+
}
230+
231+
finalPkt := jitter.ExtPacket{
232+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: baseTS + uint32(6*step)}, Payload: []byte{0x02}},
233+
ReceivedAt: base.Add(6 * stepDur),
234+
}
235+
ready, dropped, done := ts.PrimeForStart(finalPkt)
236+
require.True(t, done, "gate should be done after final packet")
237+
require.Zero(t, dropped, "no packets should be dropped")
238+
require.NotEmpty(t, ready, "ready should have at least the final packet")
239+
require.True(t, ts.initialized, "track should be initialized")
240+
}
241+
242+
func TestPrimeForStartWithoutStartGate(t *testing.T) {
243+
clock := uint32(48000)
244+
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeAudio)
245+
ts.startGate = nil
246+
ts.sync = NewSynchronizerWithOptions()
247+
248+
pkt := jitter.ExtPacket{
249+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts.startRTP + 1234}, Payload: []byte{0x01}},
250+
ReceivedAt: time.Now(),
251+
}
252+
253+
ready, dropped, done := ts.PrimeForStart(pkt)
254+
require.True(t, done, "gate should be done after packet")
255+
require.Zero(t, dropped, "no packets should be dropped")
256+
require.Len(t, ready, 1, "ready should have the packet")
257+
require.True(t, ts.initialized, "track should be initialized")
258+
}
259+
208260
func TestShouldAdjustPTS_Deadband_NegativeDelta_Suppresses(t *testing.T) {
209261
clock := uint32(48000)
210262
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeVideo)

0 commit comments

Comments
 (0)