Skip to content

Commit 4cdfa42

Browse files
authored
Use packet receive time to calculate PTS. (#755)
The pipeline clock is anchored to when the first track of the synchronizer group is innitialised. And all packets should have PTS with reference to that. That clock is set to when the first packet of any track in the synchronizer group is read off the wire (non-padding packet). So, (packet_receive_time - pipeline_start_time) should be used to get the correct PTS. A burst of packets is an interesting case. - smooth flow of packets - 3 seconds of gap - a burst of three seconds packets In this case, the following will happen - old packet drop threshold will kick in and drop some of those packets. - estimated PTS will be higher by upto (3 seconds - old_packet_treshold) for the burst. But, they should still be acceptable and pushed with proper PTS, i. e. PTS calculated using RTP timestamp diff should be correct. Have to ensure that different thresholds do not render the pipeline drop tracks.
1 parent 378746a commit 4cdfa42

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

pkg/synchronizer/track.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ type TrackSynchronizer struct {
6565
oldPacketThreshold time.Duration
6666

6767
// timing info
68-
initTime time.Time
69-
startTime time.Time // time first packet was pushed
70-
startRTP uint32 // RTP timestamp of PTS 0
68+
startTime time.Time // time at initialization --> this should be when first packet is received
69+
startRTP uint32 // RTP timestamp of first packet
70+
firstTime time.Time // time at which first packet was pushed
7171
lastTS uint32 // previous RTP timestamp
7272
lastTime time.Time
7373
lastPTS time.Duration // previous presentation timestamp
@@ -141,7 +141,7 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
141141
t.desiredPTSOffset = t.currentPTSOffset
142142
t.basePTSOffset = t.desiredPTSOffset
143143

144-
t.initTime = now
144+
t.startTime = now
145145
t.startRTP = pkt.Timestamp
146146
t.lastPTS = 0
147147
t.lastPTSAdjusted = t.currentPTSOffset
@@ -174,17 +174,19 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
174174
defer t.Unlock()
175175

176176
now := mono.Now()
177-
if t.startTime.IsZero() {
178-
if t.preJitterBufferReceiveTimeEnabled {
179-
t.startTime = pkt.ReceivedAt
180-
} else {
181-
t.startTime = now
182-
}
177+
var pktReceiveTime time.Time
178+
if t.preJitterBufferReceiveTimeEnabled {
179+
pktReceiveTime = pkt.ReceivedAt
180+
} else {
181+
pktReceiveTime = now
182+
}
183+
if t.firstTime.IsZero() {
184+
t.firstTime = now
183185
t.logger.Infow(
184186
"starting track synchronizer",
185187
"state", t,
186188
"pktReceiveTime", pkt.ReceivedAt,
187-
"startDelay", t.startTime.Sub(pkt.ReceivedAt),
189+
"startDelay", t.firstTime.Sub(pkt.ReceivedAt),
188190
)
189191
}
190192

@@ -215,7 +217,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
215217
}
216218
}
217219

218-
estimatedPTS := now.Sub(t.startTime)
220+
estimatedPTS := pktReceiveTime.Sub(t.startTime)
219221

220222
var pts time.Duration
221223
if t.lastPTS == 0 {
@@ -292,11 +294,14 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
292294
t.Lock()
293295
defer t.Unlock()
294296

295-
if t.startTime.IsZero() {
296-
t.startTime = pkt.ReceivedAt
297+
now := mono.Now()
298+
if t.firstTime.IsZero() {
299+
t.firstTime = now
297300
t.logger.Infow(
298301
"starting track synchronizer",
299302
"state", t,
303+
"pktReceiveTime", pkt.ReceivedAt,
304+
"startDelay", t.firstTime.Sub(pkt.ReceivedAt),
300305
)
301306
}
302307

@@ -329,15 +334,14 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
329334
"dropping old packet",
330335
"currentTS", ts,
331336
"receivedAt", pkt.ReceivedAt,
332-
"now", mono.Now(),
333-
"age", mono.Now().Sub(pkt.ReceivedAt),
337+
"now", now,
338+
"age", now.Sub(pkt.ReceivedAt),
334339
"state", t,
335340
)
336341
return 0, ErrPacketTooOld
337342
}
338343

339-
now := mono.Now()
340-
estimatedPTS := max(time.Nanosecond, now.Sub(t.startTime))
344+
estimatedPTS := pkt.ReceivedAt.Sub(t.startTime)
341345

342346
var pts time.Duration
343347
if t.lastPTS == 0 {
@@ -716,9 +720,9 @@ func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
716720
return nil
717721
}
718722

719-
e.AddTime("initTime", t.initTime)
720723
e.AddTime("startTime", t.startTime)
721724
e.AddUint32("startRTP", t.startRTP)
725+
e.AddTime("firstTime", t.firstTime)
722726
e.AddUint32("lastTS", t.lastTS)
723727
e.AddTime("lastTime", t.lastTime)
724728
e.AddDuration("lastPTS", t.lastPTS)

pkg/synchronizer/track_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ func newTSForTests(tc *testing.T, clockRate uint32, kind webrtc.RTPCodecType) *T
5353
}
5454
// set a stable startTime well in the past to make time.Since(startTime) > 0
5555
t.startTime = time.Now().Add(-150 * time.Millisecond)
56-
t.initTime = t.startTime
5756
// pick an arbitrary RTP base
5857
t.startRTP = 1000
5958
return t
@@ -144,7 +143,7 @@ func TestGetPTSWithRebase_PropelsForward(t *testing.T) {
144143
ts1 := ts.startRTP + rtp500ms
145144
adj1, err := ts.getPTSWithRebase(jitter.ExtPacket{
146145
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts1}},
147-
ReceivedAt: time.Time{}, // not used for estimatedPTS here
146+
ReceivedAt: mono.Now(),
148147
})
149148
require.NoError(t, err)
150149
require.InDelta(t, 500*time.Millisecond, adj1, float64(20*time.Millisecond))
@@ -159,7 +158,7 @@ func TestGetPTSWithRebase_PropelsForward(t *testing.T) {
159158

160159
adj2, err := ts.getPTSWithRebase(jitter.ExtPacket{
161160
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts2}},
162-
ReceivedAt: time.Time{},
161+
ReceivedAt: mono.Now(),
163162
})
164163
require.NoError(t, err)
165164
require.Equal(t, want, adj2)

0 commit comments

Comments
 (0)