Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions pkg/media/pcmlocaltrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ type PCMLocalTrack struct {

closed atomic.Bool
muted atomic.Bool

logger protoLogger.Logger
logState pcmLocalTrackLogState
}

type pcmLocalTrackLogState struct {
at time.Time
totalWritten uint64
totalProcessed uint64
prevWritten uint64
prevProcessed uint64
}

// NewPCMLocalTrack creates a wrapper around a webrtc.TrackLocalStaticSample that accepts PCM16 samples via the WriteSample method,
Expand Down Expand Up @@ -116,6 +127,10 @@ func NewPCMLocalTrack(
sourceChannels: sourceChannels,
chunkBuffer: new(deque.Deque[media.PCM16Sample]),
samplesPerFrame: (sourceSampleRate * sourceChannels * int(defaultPCMFrameDuration/time.Nanosecond)) / 1e9,
logger: logger,
logState: pcmLocalTrackLogState{
at: time.Now(),
},
}

t.cond = sync.NewCond(&t.mu)
Expand Down Expand Up @@ -165,6 +180,8 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error {
return errors.New("track is closed")
}

now := time.Now()
var logSnapshot *pcmLocalTrackLogSnapshot
if t.muted.Load() || len(chunk) == 0 {
return nil
}
Expand All @@ -175,7 +192,12 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error {
t.mu.Lock()
t.chunkBuffer.PushBack(chunkCopy)
t.cond.Broadcast()
t.logState.totalWritten += uint64(len(chunk))
logSnapshot = t.collectLogSnapshotLocked(now)
t.mu.Unlock()
if logSnapshot != nil {
t.emitLogSnapshot(logSnapshot)
}
return nil
}

Expand All @@ -188,13 +210,24 @@ func (t *PCMLocalTrack) processSamples() {
break
}

var frame media.PCM16Sample
var logSnapshot *pcmLocalTrackLogSnapshot

t.mu.Lock()
frame := t.getFrameFromChunkBuffer()
frame = t.getFrameFromChunkBuffer()
if frame != nil {
t.resampledPCMWriter.WriteSample(frame)
t.logState.totalProcessed += uint64(len(frame))
logSnapshot = t.collectLogSnapshotLocked(time.Now())
}
t.mu.Unlock()

if frame != nil {
t.resampledPCMWriter.WriteSample(frame)
if logSnapshot != nil {
t.emitLogSnapshot(logSnapshot)
}
}

<-ticker.C
}

Expand Down Expand Up @@ -249,3 +282,80 @@ func (t *PCMLocalTrack) Close() {
t.mu.Unlock()
}
}

type pcmLocalTrackLogSnapshot struct {
interval time.Duration
queueSamples int
totalWritten uint64
totalProcessed uint64
deltaWritten uint64
deltaProcessed uint64
}

func (t *PCMLocalTrack) collectLogSnapshotLocked(now time.Time) *pcmLocalTrackLogSnapshot {
if t.logState.at.IsZero() {
t.logState.at = now
return nil
}

const logInterval = 5 * time.Second
if now.Sub(t.logState.at) < logInterval {
return nil
}

interval := now.Sub(t.logState.at)
snapshot := &pcmLocalTrackLogSnapshot{
interval: interval,
queueSamples: t.getNumSamplesInChunkBuffer(),
totalWritten: t.logState.totalWritten,
totalProcessed: t.logState.totalProcessed,
deltaWritten: t.logState.totalWritten - t.logState.prevWritten,
deltaProcessed: t.logState.totalProcessed - t.logState.prevProcessed,
}

t.logState.at = now
t.logState.prevWritten = t.logState.totalWritten
t.logState.prevProcessed = t.logState.totalProcessed

return snapshot
}

func (t *PCMLocalTrack) emitLogSnapshot(snapshot *pcmLocalTrackLogSnapshot) {
if snapshot == nil || snapshot.interval <= 0 {
return
}

elapsed := snapshot.interval.Seconds()
if elapsed == 0 {
return
}

chanCount := float64(t.sourceChannels)
if chanCount == 0 {
chanCount = 1
}

ingressHz := float64(snapshot.deltaWritten) / chanCount / elapsed

processedSamples := float64(snapshot.deltaProcessed)
if t.sourceSampleRate != 0 && t.sourceSampleRate != DefaultOpusSampleRate {
processedSamples *= float64(DefaultOpusSampleRate) / float64(t.sourceSampleRate)
}
egressHz := processedSamples / chanCount / elapsed

queueSeconds := 0.0
if t.sourceSampleRate != 0 {
queueSeconds = float64(snapshot.queueSamples) / (chanCount * float64(t.sourceSampleRate))
}

t.logger.Infow("pcm local track stats",
"interval_s", elapsed,
"ingress_hz", ingressHz,
"egress_hz", egressHz,
"queue_samples", snapshot.queueSamples,
"queue_s", queueSeconds,
"total_written", snapshot.totalWritten,
"total_processed", snapshot.totalProcessed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth logging sourceSampleRate also to get full picture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"source_sample_rate", t.sourceSampleRate,
)
}
Loading