Skip to content

Commit 6ff08cc

Browse files
authored
Initialize should be called with the packet reception time (#762)
Adding private function that does almost the same thing as the existing initialze, with the diff that it takes what it used to be now from the packet reception time. Main reason is that initialize might be delayed (until stable sequence of packets arrive), if time.Now is used on the first packet which was gated for a while - it will induce an offset. Using packet reception time might have as a result that some other track initialized the syncrhonizer main start point to a time t1 that comes after a time of reception of this track first packet. This will in turn make base offset negative - and potentially first few PTSs returned by GetPTS. For now these negative PTS could be ignored at the caller site before pushing a packet to the gst pipeline. Ideally - these are not ignored but a new offset is generated for the given source with a base time that would reflect the negative offset and pushed to the gst appsource.
1 parent 69048ff commit 6ff08cc

File tree

2 files changed

+78
-8
lines changed

2 files changed

+78
-8
lines changed

pkg/synchronizer/track.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (t *TrackSynchronizer) OnSenderReport(f func(drift time.Duration)) {
139139
func (t *TrackSynchronizer) PrimeForStart(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool) {
140140
if t.initialized || t.startGate == nil {
141141
if !t.initialized {
142-
t.Initialize(pkt.Packet)
142+
t.initialize(pkt)
143143
}
144144
return []jitter.ExtPacket{pkt}, 0, true
145145
}
@@ -154,21 +154,34 @@ func (t *TrackSynchronizer) PrimeForStart(pkt jitter.ExtPacket) ([]jitter.ExtPac
154154
}
155155

156156
if !t.initialized {
157-
t.Initialize(ready[0].Packet)
157+
t.initialize(ready[0])
158158
}
159159

160160
return ready, dropped, true
161161
}
162162

163163
// Initialize should be called as soon as the first packet is received
164164
func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
165-
now := mono.Now()
165+
t.initialize(jitter.ExtPacket{
166+
Packet: pkt,
167+
ReceivedAt: mono.Now(),
168+
})
169+
}
170+
171+
func (t *TrackSynchronizer) initialize(extPkt jitter.ExtPacket) {
172+
receivedAt := extPkt.ReceivedAt
173+
if receivedAt.IsZero() {
174+
receivedAt = mono.Now()
175+
}
176+
166177
t.Lock()
167178
synchronizer := t.sync
168179
t.Unlock()
169-
startedAt := now.UnixNano()
180+
181+
startedAt := receivedAt.UnixNano()
182+
firstStartedAt := startedAt
170183
if synchronizer != nil {
171-
startedAt = synchronizer.getOrSetStartedAt(now.UnixNano())
184+
firstStartedAt = synchronizer.getOrSetStartedAt(startedAt)
172185
}
173186

174187
t.Lock()
@@ -177,12 +190,12 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) {
177190
return
178191
}
179192

180-
t.currentPTSOffset = time.Duration(now.UnixNano() - startedAt)
193+
t.currentPTSOffset = time.Duration(startedAt - firstStartedAt)
181194
t.desiredPTSOffset = t.currentPTSOffset
182195
t.basePTSOffset = t.desiredPTSOffset
183196

184-
t.startTime = now
185-
t.startRTP = pkt.Timestamp
197+
t.startTime = receivedAt
198+
t.startRTP = extPkt.Packet.Timestamp
186199
t.lastPTS = 0
187200
t.lastPTSAdjusted = t.currentPTSOffset
188201
t.initialized = true

pkg/synchronizer/track_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,63 @@ func TestGetPTSWithoutRebase_Increasing(t *testing.T) {
122122
require.Greater(t, adj2, adj1)
123123
}
124124

125+
func TestGetPTSWithoutRebase_NegativeAdjustedPTS(t *testing.T) {
126+
sync := NewSynchronizerWithOptions()
127+
base := time.Now()
128+
futureStart := base.Add(500 * time.Millisecond)
129+
sync.getOrSetStartedAt(futureStart.UnixNano())
130+
131+
track := fakeTrack{id: "neg", rate: 90000, kind: webrtc.RTPCodecTypeVideo}
132+
ts := newTrackSynchronizer(sync, track)
133+
ts.logger = logger.NewTestLogger(t)
134+
135+
firstReceivedAt := base
136+
firstPacket := jitter.ExtPacket{
137+
Packet: &rtp.Packet{
138+
Header: rtp.Header{Timestamp: 90000, SequenceNumber: 1},
139+
Payload: []byte{0x01},
140+
},
141+
ReceivedAt: firstReceivedAt,
142+
}
143+
144+
ts.initialize(firstPacket)
145+
require.Less(t, ts.currentPTSOffset, time.Duration(0), "expected negative PTS offset when synchronizer start is later")
146+
147+
stepTS := ts.rtpConverter.toRTP(10 * time.Millisecond)
148+
secondPacket := jitter.ExtPacket{
149+
Packet: &rtp.Packet{
150+
Header: rtp.Header{Timestamp: firstPacket.Packet.Timestamp + stepTS, SequenceNumber: 2},
151+
Payload: []byte{0x02},
152+
},
153+
ReceivedAt: firstReceivedAt.Add(10 * time.Millisecond),
154+
}
155+
156+
adjusted, err := ts.GetPTS(secondPacket)
157+
require.NoError(t, err)
158+
require.Less(t, adjusted, time.Duration(0), "expected negative adjusted PTS")
159+
}
160+
161+
func TestInitializeUsesBufferedPacketTiming(t *testing.T) {
162+
sync := NewSynchronizerWithOptions()
163+
syncStart := mono.Now().Add(-250 * time.Millisecond)
164+
sync.getOrSetStartedAt(syncStart.UnixNano())
165+
166+
track := fakeTrack{id: "init", rate: 90000, kind: webrtc.RTPCodecTypeVideo}
167+
ts := newTrackSynchronizer(sync, track)
168+
ts.logger = logger.NewTestLogger(t)
169+
170+
receivedAt := time.Now().Add(-50 * time.Millisecond)
171+
packet := &rtp.Packet{Header: rtp.Header{Timestamp: 42}}
172+
extPkt := jitter.ExtPacket{Packet: packet, ReceivedAt: receivedAt}
173+
174+
ts.initialize(extPkt)
175+
176+
require.True(t, ts.initialized, "track should initialize")
177+
require.Equal(t, receivedAt, ts.startTime, "startTime should match buffered arrival time")
178+
require.Equal(t, packet.Timestamp, ts.startRTP, "startRTP should come from packet timestamp")
179+
require.InDelta(t, receivedAt.Sub(syncStart), ts.currentPTSOffset, float64(5*time.Millisecond))
180+
}
181+
125182
func TestGetPTSWithRebase_PropelsForward(t *testing.T) {
126183
clock := uint32(48000)
127184
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeAudio)

0 commit comments

Comments
 (0)