From 10138cfc0ab351d82502c0c5f51bd34ad2debf1a Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Tue, 28 Oct 2025 22:59:49 +0100 Subject: [PATCH 1/3] Logging rx and tx rate for pcmlocaltrack --- pkg/media/pcmlocaltrack.go | 104 +++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/pkg/media/pcmlocaltrack.go b/pkg/media/pcmlocaltrack.go index c5082702..644b3677 100644 --- a/pkg/media/pcmlocaltrack.go +++ b/pkg/media/pcmlocaltrack.go @@ -52,6 +52,17 @@ type PCMLocalTrack struct { closed atomic.Bool muted atomic.Bool + + logger protoLogger.Logger + logState pcmLocalTrackLogState +} + +type pcmLocalTrackLogState struct { + at time.Time + totalWritten uint64 + totalProcessed uint64 + prevWritten uint64 + prevProcessed uint64 } // NewPCMLocalTrack creates a wrapper around a webrtc.TrackLocalStaticSample that accepts PCM16 samples via the WriteSample method, @@ -116,6 +127,10 @@ func NewPCMLocalTrack( sourceChannels: sourceChannels, chunkBuffer: new(deque.Deque[media.PCM16Sample]), samplesPerFrame: (sourceSampleRate * sourceChannels * int(defaultPCMFrameDuration/time.Nanosecond)) / 1e9, + logger: logger, + logState: pcmLocalTrackLogState{ + at: time.Now(), + }, } t.cond = sync.NewCond(&t.mu) @@ -165,6 +180,8 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error { return errors.New("track is closed") } + now := time.Now() + var logSnapshot *pcmLocalTrackLogSnapshot if t.muted.Load() || len(chunk) == 0 { return nil } @@ -175,7 +192,12 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error { t.mu.Lock() t.chunkBuffer.PushBack(chunkCopy) t.cond.Broadcast() + t.logState.totalWritten += uint64(len(chunk)) + logSnapshot = t.collectLogSnapshotLocked(now) t.mu.Unlock() + if logSnapshot != nil { + t.emitLogSnapshot(logSnapshot) + } return nil } @@ -188,12 +210,18 @@ func (t *PCMLocalTrack) processSamples() { break } + var logSnapshot *pcmLocalTrackLogSnapshot t.mu.Lock() frame := t.getFrameFromChunkBuffer() if frame != nil { t.resampledPCMWriter.WriteSample(frame) + t.logState.totalProcessed += uint64(len(frame)) + logSnapshot = t.collectLogSnapshotLocked(time.Now()) } t.mu.Unlock() + if logSnapshot != nil { + t.emitLogSnapshot(logSnapshot) + } <-ticker.C } @@ -249,3 +277,79 @@ func (t *PCMLocalTrack) Close() { t.mu.Unlock() } } + +type pcmLocalTrackLogSnapshot struct { + interval time.Duration + queueSamples int + totalWritten uint64 + totalProcessed uint64 + deltaWritten uint64 + deltaProcessed uint64 +} + +func (t *PCMLocalTrack) collectLogSnapshotLocked(now time.Time) *pcmLocalTrackLogSnapshot { + if t.logState.at.IsZero() { + t.logState.at = now + return nil + } + + const logInterval = 5 * time.Second + if now.Sub(t.logState.at) < logInterval { + return nil + } + + interval := now.Sub(t.logState.at) + snapshot := &pcmLocalTrackLogSnapshot{ + interval: interval, + queueSamples: t.getNumSamplesInChunkBuffer(), + totalWritten: t.logState.totalWritten, + totalProcessed: t.logState.totalProcessed, + deltaWritten: t.logState.totalWritten - t.logState.prevWritten, + deltaProcessed: t.logState.totalProcessed - t.logState.prevProcessed, + } + + t.logState.at = now + t.logState.prevWritten = t.logState.totalWritten + t.logState.prevProcessed = t.logState.totalProcessed + + return snapshot +} + +func (t *PCMLocalTrack) emitLogSnapshot(snapshot *pcmLocalTrackLogSnapshot) { + if snapshot == nil || snapshot.interval <= 0 { + return + } + + elapsed := snapshot.interval.Seconds() + if elapsed == 0 { + return + } + + chanCount := float64(t.sourceChannels) + if chanCount == 0 { + chanCount = 1 + } + + ingressHz := float64(snapshot.deltaWritten) / chanCount / elapsed + + processedSamples := float64(snapshot.deltaProcessed) + if t.sourceSampleRate != 0 && t.sourceSampleRate != DefaultOpusSampleRate { + processedSamples *= float64(DefaultOpusSampleRate) / float64(t.sourceSampleRate) + } + egressHz := processedSamples / chanCount / elapsed + + queueSeconds := 0.0 + if t.sourceSampleRate != 0 { + queueSeconds = float64(snapshot.queueSamples) / (chanCount * float64(t.sourceSampleRate)) + } + + t.logger.Infow("pcm local track stats", + "interval_s", elapsed, + "ingress_hz", ingressHz, + "egress_hz", egressHz, + "queue_samples", snapshot.queueSamples, + "queue_s", queueSeconds, + "total_written", snapshot.totalWritten, + "total_processed", snapshot.totalProcessed, + ) +} From ec6c26797bc5ab47e62c860c82549632cfcbaee3 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Wed, 29 Oct 2025 10:44:21 +0100 Subject: [PATCH 2/3] adding source sample rate --- pkg/media/pcmlocaltrack.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/media/pcmlocaltrack.go b/pkg/media/pcmlocaltrack.go index 644b3677..fc20f2e8 100644 --- a/pkg/media/pcmlocaltrack.go +++ b/pkg/media/pcmlocaltrack.go @@ -351,5 +351,6 @@ func (t *PCMLocalTrack) emitLogSnapshot(snapshot *pcmLocalTrackLogSnapshot) { "queue_s", queueSeconds, "total_written", snapshot.totalWritten, "total_processed", snapshot.totalProcessed, + "source_sample_rate", t.sourceSampleRate, ) } From 22d61b9b247edd39db3752768dea16f3efcf56b3 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Fri, 31 Oct 2025 16:28:04 +0100 Subject: [PATCH 3/3] Move WriteSample API call out of the critical secion --- pkg/media/pcmlocaltrack.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/media/pcmlocaltrack.go b/pkg/media/pcmlocaltrack.go index fc20f2e8..527e992e 100644 --- a/pkg/media/pcmlocaltrack.go +++ b/pkg/media/pcmlocaltrack.go @@ -210,17 +210,22 @@ func (t *PCMLocalTrack) processSamples() { break } + var frame media.PCM16Sample var logSnapshot *pcmLocalTrackLogSnapshot + t.mu.Lock() - frame := t.getFrameFromChunkBuffer() + frame = t.getFrameFromChunkBuffer() if frame != nil { - t.resampledPCMWriter.WriteSample(frame) t.logState.totalProcessed += uint64(len(frame)) logSnapshot = t.collectLogSnapshotLocked(time.Now()) } t.mu.Unlock() - if logSnapshot != nil { - t.emitLogSnapshot(logSnapshot) + + if frame != nil { + t.resampledPCMWriter.WriteSample(frame) + if logSnapshot != nil { + t.emitLogSnapshot(logSnapshot) + } } <-ticker.C