Skip to content

Commit d708c54

Browse files
authored
Adding an option to configure max acceptable RTP packet PTS time difference before adjusting them (#716)
* Reducing max acceptable ts diff * Adding abiliy to configure max acceptable time difference
1 parent 6ac93dd commit d708c54

File tree

2 files changed

+63
-7
lines changed

2 files changed

+63
-7
lines changed

pkg/synchronizer/synchronizer.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,78 @@ import (
2121
"github.com/pion/rtcp"
2222
)
2323

24+
const (
25+
DefaultMaxTsDiff = time.Minute
26+
)
27+
28+
type SynchronizerOption func(*SynchronizerConfig)
29+
30+
// SynchronizerConfig holds configuration for the Synchronizer
31+
type SynchronizerConfig struct {
32+
MaxTsDiff time.Duration
33+
OnStarted func()
34+
}
35+
36+
// WithMaxTsDiff sets the maximum acceptable difference between RTP packets
37+
// In case exceeded the synchronizer will adjust the PTS offset to keep the audio and video in sync
38+
func WithMaxTsDiff(maxTsDiff time.Duration) SynchronizerOption {
39+
return func(config *SynchronizerConfig) {
40+
config.MaxTsDiff = maxTsDiff
41+
}
42+
}
43+
44+
// WithOnStarted sets the callback to be called when the synchronizer starts
45+
func WithOnStarted(onStarted func()) SynchronizerOption {
46+
return func(config *SynchronizerConfig) {
47+
config.OnStarted = onStarted
48+
}
49+
}
50+
2451
// a single Synchronizer is shared between all audio and video writers
2552
type Synchronizer struct {
2653
sync.RWMutex
2754

2855
startedAt int64
2956
onStarted func()
3057
endedAt int64
58+
config SynchronizerConfig
3159

3260
psByIdentity map[string]*participantSynchronizer
3361
psBySSRC map[uint32]*participantSynchronizer
3462
ssrcByID map[string]uint32
3563
}
3664

3765
func NewSynchronizer(onStarted func()) *Synchronizer {
66+
config := SynchronizerConfig{
67+
MaxTsDiff: DefaultMaxTsDiff,
68+
OnStarted: onStarted,
69+
}
70+
71+
return &Synchronizer{
72+
onStarted: config.OnStarted,
73+
psByIdentity: make(map[string]*participantSynchronizer),
74+
psBySSRC: make(map[uint32]*participantSynchronizer),
75+
ssrcByID: make(map[string]uint32),
76+
config: config,
77+
}
78+
}
79+
80+
func NewSynchronizerWithOptions(opts ...SynchronizerOption) *Synchronizer {
81+
config := SynchronizerConfig{
82+
MaxTsDiff: DefaultMaxTsDiff,
83+
OnStarted: nil,
84+
}
85+
86+
for _, opt := range opts {
87+
opt(&config)
88+
}
89+
3890
return &Synchronizer{
39-
onStarted: onStarted,
91+
onStarted: config.OnStarted,
4092
psByIdentity: make(map[string]*participantSynchronizer),
4193
psBySSRC: make(map[uint32]*participantSynchronizer),
4294
ssrcByID: make(map[string]uint32),
95+
config: config,
4396
}
4497
}
4598

pkg/synchronizer/track.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929

3030
const (
3131
maxAdjustment = time.Millisecond * 5
32-
maxTSDiff = time.Minute
3332
)
3433

3534
type TrackRemote interface {
@@ -46,6 +45,9 @@ type TrackSynchronizer struct {
4645
logger logger.Logger
4746
*rtpConverter
4847

48+
// config
49+
maxTsDiff time.Duration // maximum acceptable difference between RTP packets
50+
4951
// timing info
5052
startTime time.Time // time first packet was pushed
5153
startRTP uint32 // RTP timestamp of PTS 0
@@ -69,6 +71,7 @@ func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer
6971
track: track,
7072
logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType),
7173
rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)),
74+
maxTsDiff: s.config.MaxTsDiff,
7275
}
7376

7477
return t
@@ -115,7 +118,7 @@ func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) {
115118

116119
pts := t.lastPTS + t.toDuration(ts-t.lastTS)
117120
estimatedPTS := time.Since(t.startTime)
118-
if pts < t.lastPTS || !acceptable(pts-estimatedPTS) {
121+
if pts < t.lastPTS || !t.acceptable(pts-estimatedPTS) {
119122
pts = estimatedPTS
120123
t.startRTP = ts - t.toRTP(pts)
121124
}
@@ -155,7 +158,7 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
155158
} else {
156159
pts = t.lastPTS - t.toDuration(t.lastTS-pkt.RTPTime)
157160
}
158-
if !acceptable(pts - time.Since(t.startTime)) {
161+
if !t.acceptable(pts - time.Since(t.startTime)) {
159162
return
160163
}
161164

@@ -164,16 +167,16 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
164167
t.onSR(offset - t.desiredPTSOffset)
165168
}
166169

167-
if !acceptable(offset) {
170+
if !t.acceptable(offset) {
168171
return
169172
}
170173

171174
t.desiredPTSOffset = offset
172175
t.lastSR = pkt.RTPTime
173176
}
174177

175-
func acceptable(d time.Duration) bool {
176-
return d > -maxTSDiff && d < maxTSDiff
178+
func (t *TrackSynchronizer) acceptable(d time.Duration) bool {
179+
return d > -t.maxTsDiff && d < t.maxTsDiff
177180
}
178181

179182
type rtpConverter struct {

0 commit comments

Comments
 (0)