From 8293ff6ab433793f0fc55cdea37c923faeeba024 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Tue, 28 Oct 2025 17:58:49 +0100 Subject: [PATCH 1/3] Logging rx and tx sample/frame rate --- pkg/sip/inbound.go | 2 + pkg/sip/media.go | 140 ++++++++++++++++++++++++++++++++++++------ pkg/sip/media_port.go | 14 ++++- pkg/sip/outbound.go | 1 + pkg/sip/room.go | 5 +- 5 files changed, 142 insertions(+), 20 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 800a6dda..b173aea6 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -896,6 +896,8 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt c.media.HandleDTMF(c.handleDTMF) } + c.stats.startRateLogger(c.ctx, c.log, defaultRateLoggerInterval, RoomSampleRate) + // Must be set earlier to send the pin prompts. if w := c.lkRoom.SwapOutput(c.media.GetAudioWriter()); w != nil { _ = w.Close() diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 90aedd77..0d24cd7c 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,10 +15,12 @@ package sip import ( + "context" "encoding/json" "fmt" "strconv" "sync/atomic" + "time" "github.com/pion/interceptor" prtp "github.com/pion/rtp" @@ -27,15 +29,22 @@ import ( "github.com/livekit/media-sdk/rtp" + "github.com/livekit/protocol/logger" "github.com/livekit/sip/pkg/stats" ) +const ( + defaultRateLoggerInterval = 10 * time.Second +) + var _ json.Marshaler = (*Stats)(nil) type Stats struct { Port PortStats Room RoomStats Closed atomic.Bool + + rateLoggerStarted atomic.Bool } type StatsSnapshot struct { @@ -57,6 +66,11 @@ type PortStatsSnapshot struct { AudioPackets uint64 `json:"audio_packets"` AudioBytes uint64 `json:"audio_bytes"` + AudioInFrames uint64 `json:"audio_in_frames"` + AudioInSamples uint64 `json:"audio_in_samples"` + AudioOutFrames uint64 `json:"audio_out_frames"` + AudioOutSamples uint64 `json:"audio_out_samples"` + DTMFPackets uint64 `json:"dtmf_packets"` DTMFBytes uint64 `json:"dtmf_bytes"` @@ -67,6 +81,9 @@ type RoomStatsSnapshot struct { InputPackets uint64 `json:"input_packets"` InputBytes uint64 `json:"input_bytes"` + PublishedFrames uint64 `json:"published_frames"` + PublishedSamples uint64 `json:"published_samples"` + MixerSamples uint64 `json:"mixer_samples"` MixerFrames uint64 `json:"mixer_frames"` @@ -102,26 +119,32 @@ func (s *Stats) Load() StatsSnapshot { m := &r.Mixer return StatsSnapshot{ Port: PortStatsSnapshot{ - Streams: p.Streams.Load(), - Packets: p.Packets.Load(), - IgnoredPackets: p.IgnoredPackets.Load(), - InputPackets: p.InputPackets.Load(), - MuxPackets: p.MuxPackets.Load(), - MuxBytes: p.MuxBytes.Load(), - AudioPackets: p.AudioPackets.Load(), - AudioBytes: p.AudioBytes.Load(), - DTMFPackets: p.DTMFPackets.Load(), - DTMFBytes: p.DTMFBytes.Load(), - Closed: p.Closed.Load(), + Streams: p.Streams.Load(), + Packets: p.Packets.Load(), + IgnoredPackets: p.IgnoredPackets.Load(), + InputPackets: p.InputPackets.Load(), + MuxPackets: p.MuxPackets.Load(), + MuxBytes: p.MuxBytes.Load(), + AudioPackets: p.AudioPackets.Load(), + AudioBytes: p.AudioBytes.Load(), + AudioInFrames: p.AudioInFrames.Load(), + AudioInSamples: p.AudioInSamples.Load(), + AudioOutFrames: p.AudioOutFrames.Load(), + AudioOutSamples: p.AudioOutSamples.Load(), + DTMFPackets: p.DTMFPackets.Load(), + DTMFBytes: p.DTMFBytes.Load(), + Closed: p.Closed.Load(), }, Room: RoomStatsSnapshot{ - InputPackets: r.InputPackets.Load(), - InputBytes: r.InputBytes.Load(), - MixerSamples: r.MixerSamples.Load(), - MixerFrames: r.MixerFrames.Load(), - OutputSamples: r.OutputSamples.Load(), - OutputFrames: r.OutputFrames.Load(), - Closed: r.Closed.Load(), + InputPackets: r.InputPackets.Load(), + InputBytes: r.InputBytes.Load(), + PublishedFrames: r.PublishedFrames.Load(), + PublishedSamples: r.PublishedSamples.Load(), + MixerSamples: r.MixerSamples.Load(), + MixerFrames: r.MixerFrames.Load(), + OutputSamples: r.OutputSamples.Load(), + OutputFrames: r.OutputFrames.Load(), + Closed: r.Closed.Load(), }, Mixer: MixerStatsSnapshot{ Tracks: m.Tracks.Load(), @@ -146,6 +169,87 @@ func (s *Stats) MarshalJSON() ([]byte, error) { return json.Marshal(s.Load()) } +func (s *Stats) startRateLogger(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { + if interval <= 0 { + interval = defaultRateLoggerInterval + } + if expectedSampleRate <= 0 { + expectedSampleRate = RoomSampleRate + } + if !s.rateLoggerStarted.CompareAndSwap(false, true) { + return + } + go s.rateLoggerLoop(ctx, log, interval, expectedSampleRate) +} + +func (s *Stats) rateLoggerLoop(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + last := s.Load() + lastTime := time.Now() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + now := time.Now() + cur := s.Load() + dt := now.Sub(lastTime).Seconds() + if dt <= 0 { + last = cur + lastTime = now + continue + } + + rxSamples := cur.Port.AudioInSamples - last.Port.AudioInSamples + txSamples := cur.Port.AudioOutSamples - last.Port.AudioOutSamples + pubSamples := cur.Room.PublishedSamples - last.Room.PublishedSamples + + if rxSamples == 0 && txSamples == 0 && pubSamples == 0 { + last = cur + lastTime = now + if cur.Closed { + return + } + continue + } + + rxRate := float64(rxSamples) / dt + txRate := float64(txSamples) / dt + pubRate := float64(pubSamples) / dt + + log.Infow("media sample rate", + "sip_rx_pcm_hz", rxRate, + "sip_rx_ppm", ratePPM(rxRate, expectedSampleRate), + "sip_tx_pcm_hz", txRate, + "sip_tx_ppm", ratePPM(txRate, expectedSampleRate), + "lk_publish_pcm_hz", pubRate, + "lk_publish_ppm", ratePPM(pubRate, expectedSampleRate), + "expected_pcm_hz", expectedSampleRate, + "interval", now.Sub(lastTime), + "sip_rx_samples", rxSamples, + "sip_tx_samples", txSamples, + "lk_publish_samples", pubSamples, + ) + + last = cur + lastTime = now + if cur.Closed { + return + } + } + } +} + +func ratePPM(rate float64, expected int) float64 { + if expected <= 0 { + return 0 + } + return (rate - float64(expected)) / float64(expected) * 1_000_000 +} + const ( channels = 1 RoomSampleRate = 48000 diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 0d623d3b..8d44b058 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -56,6 +56,11 @@ type PortStats struct { AudioPackets atomic.Uint64 AudioBytes atomic.Uint64 + AudioInFrames atomic.Uint64 + AudioInSamples atomic.Uint64 + AudioOutFrames atomic.Uint64 + AudioOutSamples atomic.Uint64 + DTMFPackets atomic.Uint64 DTMFBytes atomic.Uint64 @@ -562,6 +567,9 @@ func (p *MediaPort) setupOutput() error { // Encoding pipeline (LK PCM -> SIP RTP) audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP) + if p.stats != nil { + audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) + } if p.conf.Audio.DTMFType != 0 { p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) @@ -585,7 +593,11 @@ func (p *MediaPort) setupOutput() error { func (p *MediaPort) setupInput() { // Decoding pipeline (SIP RTP -> LK PCM) - audioHandler := p.conf.Audio.Codec.DecodeRTP(p.audioIn, p.conf.Audio.Type) + var audioWriter msdk.PCM16Writer = p.audioIn + if p.stats != nil { + audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) + } + audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) p.audioInHandler = audioHandler mux := rtp.NewMux(nil) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 7acb6579..c9c0d2c6 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -575,6 +575,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { if err = c.media.SetConfig(mc); err != nil { return err } + c.stats.startRateLogger(context.Background(), c.log, defaultRateLoggerInterval, RoomSampleRate) c.c.cmu.Lock() c.c.byRemote[c.cc.Tag()] = c diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 8e003828..e52c6bcc 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -41,6 +41,9 @@ type RoomStats struct { InputPackets atomic.Uint64 InputBytes atomic.Uint64 + PublishedFrames atomic.Uint64 + PublishedSamples atomic.Uint64 + MixerFrames atomic.Uint64 MixerSamples atomic.Uint64 @@ -424,7 +427,7 @@ func (r *Room) NewParticipantTrack(sampleRate int) (msdk.WriteCloser[msdk.PCM16S if err != nil { return nil, err } - return pw, nil + return newMediaWriterCount(pw, &r.stats.PublishedFrames, &r.stats.PublishedSamples), nil } func (r *Room) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error { From ab5f29e9c7827c0d4cffd9deced8064626c2f28d Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 24 Nov 2025 14:52:35 +0100 Subject: [PATCH 2/3] passing logger correclty --- pkg/sip/inbound.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 91f41f65..527bca1b 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -1014,7 +1014,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit c.media.HandleDTMF(c.handleDTMF) } - c.stats.startRateLogger(c.ctx, c.log, defaultRateLoggerInterval, RoomSampleRate) + c.stats.startRateLogger(c.ctx, c.log(), defaultRateLoggerInterval, RoomSampleRate) // Must be set earlier to send the pin prompts. if w := c.lkRoom.SwapOutput(c.media.GetAudioWriter()); w != nil { From 8a87ee4757c05aab1ce8583cb2620d48834601a3 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Mon, 24 Nov 2025 22:34:47 +0200 Subject: [PATCH 3/3] Integrate media rx/tx stats into existing call statistics. --- pkg/sip/inbound.go | 5 +- pkg/sip/media.go | 173 ++++++------------------------------------ pkg/sip/media_port.go | 84 ++++++++++++++++++++ pkg/sip/outbound.go | 4 +- pkg/sip/room.go | 66 ++++++++++++++++ 5 files changed, 177 insertions(+), 155 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 527bca1b..828953c4 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -945,6 +945,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip case <-ticker.C: c.log().Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.stats.Update() c.printStats(c.log()) case <-ctx.Done(): c.closeWithHangup() @@ -1014,8 +1015,6 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit c.media.HandleDTMF(c.handleDTMF) } - c.stats.startRateLogger(c.ctx, c.log(), defaultRateLoggerInterval, RoomSampleRate) - // Must be set earlier to send the pin prompts. if w := c.lkRoom.SwapOutput(c.media.GetAudioWriter()); w != nil { _ = w.Close() @@ -1152,7 +1151,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD } func (c *inboundCall) printStats(log logger.Logger) { - log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes())) + c.stats.Log(log, c.callStart) } // close should only be called from handleInvite. diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 8a889d09..741ff1cc 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,7 +15,6 @@ package sip import ( - "context" "encoding/json" "fmt" "strconv" @@ -33,18 +32,12 @@ import ( "github.com/livekit/sip/pkg/stats" ) -const ( - defaultRateLoggerInterval = 10 * time.Second -) - var _ json.Marshaler = (*Stats)(nil) type Stats struct { Port PortStats Room RoomStats Closed atomic.Bool - - rateLoggerStarted atomic.Bool } type StatsSnapshot struct { @@ -54,46 +47,6 @@ type StatsSnapshot struct { Closed bool `json:"closed"` } -type PortStatsSnapshot struct { - Streams uint64 `json:"streams"` - Packets uint64 `json:"packets"` - IgnoredPackets uint64 `json:"packets_ignored"` - InputPackets uint64 `json:"packets_input"` - - MuxPackets uint64 `json:"mux_packets"` - MuxBytes uint64 `json:"mux_bytes"` - - AudioPackets uint64 `json:"audio_packets"` - AudioBytes uint64 `json:"audio_bytes"` - - AudioInFrames uint64 `json:"audio_in_frames"` - AudioInSamples uint64 `json:"audio_in_samples"` - AudioOutFrames uint64 `json:"audio_out_frames"` - AudioOutSamples uint64 `json:"audio_out_samples"` - - DTMFPackets uint64 `json:"dtmf_packets"` - DTMFBytes uint64 `json:"dtmf_bytes"` - - Closed bool `json:"closed"` -} - -type RoomStatsSnapshot struct { - InputPackets uint64 `json:"input_packets"` - InputBytes uint64 `json:"input_bytes"` - DTMFPackets uint64 `json:"dtmf_packets"` - - PublishedFrames uint64 `json:"published_frames"` - PublishedSamples uint64 `json:"published_samples"` - - MixerSamples uint64 `json:"mixer_samples"` - MixerFrames uint64 `json:"mixer_frames"` - - OutputSamples uint64 `json:"output_samples"` - OutputFrames uint64 `json:"output_frames"` - - Closed bool `json:"closed"` -} - type MixerStatsSnapshot struct { Tracks int64 `json:"tracks"` TracksTotal uint64 `json:"tracks_total"` @@ -114,40 +67,21 @@ type MixerStatsSnapshot struct { OutputFrames uint64 `json:"output_frames"` } +func (s *Stats) Update() { + if s == nil { + return + } + s.Port.Update() + s.Room.Update() +} + func (s *Stats) Load() StatsSnapshot { p := &s.Port r := &s.Room m := &r.Mixer return StatsSnapshot{ - Port: PortStatsSnapshot{ - Streams: p.Streams.Load(), - Packets: p.Packets.Load(), - IgnoredPackets: p.IgnoredPackets.Load(), - InputPackets: p.InputPackets.Load(), - MuxPackets: p.MuxPackets.Load(), - MuxBytes: p.MuxBytes.Load(), - AudioPackets: p.AudioPackets.Load(), - AudioBytes: p.AudioBytes.Load(), - AudioInFrames: p.AudioInFrames.Load(), - AudioInSamples: p.AudioInSamples.Load(), - AudioOutFrames: p.AudioOutFrames.Load(), - AudioOutSamples: p.AudioOutSamples.Load(), - DTMFPackets: p.DTMFPackets.Load(), - DTMFBytes: p.DTMFBytes.Load(), - Closed: p.Closed.Load(), - }, - Room: RoomStatsSnapshot{ - InputPackets: r.InputPackets.Load(), - InputBytes: r.InputBytes.Load(), - DTMFPackets: r.DTMFPackets.Load(), - PublishedFrames: r.PublishedFrames.Load(), - PublishedSamples: r.PublishedSamples.Load(), - MixerSamples: r.MixerSamples.Load(), - MixerFrames: r.MixerFrames.Load(), - OutputSamples: r.OutputSamples.Load(), - OutputFrames: r.OutputFrames.Load(), - Closed: r.Closed.Load(), - }, + Port: p.Load(), + Room: r.Load(), Mixer: MixerStatsSnapshot{ Tracks: m.Tracks.Load(), TracksTotal: m.TracksTotal.Load(), @@ -167,82 +101,21 @@ func (s *Stats) Load() StatsSnapshot { } } -func (s *Stats) MarshalJSON() ([]byte, error) { - return json.Marshal(s.Load()) -} - -func (s *Stats) startRateLogger(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { - if interval <= 0 { - interval = defaultRateLoggerInterval - } - if expectedSampleRate <= 0 { - expectedSampleRate = RoomSampleRate - } - if !s.rateLoggerStarted.CompareAndSwap(false, true) { - return - } - go s.rateLoggerLoop(ctx, log, interval, expectedSampleRate) +func (s *Stats) Log(log logger.Logger, callStart time.Time) { + const expectedSampleRate = RoomSampleRate + st := s.Load() + log.Infow("call statistics", + "stats", st, + "durMin", int(time.Since(callStart).Minutes()), + "sip_rx_ppm", ratePPM(st.Port.AudioRX, expectedSampleRate), + "sip_tx_ppm", ratePPM(st.Port.AudioTX, expectedSampleRate), + "lk_publish_ppm", ratePPM(st.Room.PublishTX, expectedSampleRate), + "expected_pcm_hz", expectedSampleRate, + ) } -func (s *Stats) rateLoggerLoop(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - last := s.Load() - lastTime := time.Now() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - now := time.Now() - cur := s.Load() - dt := now.Sub(lastTime).Seconds() - if dt <= 0 { - last = cur - lastTime = now - continue - } - - rxSamples := cur.Port.AudioInSamples - last.Port.AudioInSamples - txSamples := cur.Port.AudioOutSamples - last.Port.AudioOutSamples - pubSamples := cur.Room.PublishedSamples - last.Room.PublishedSamples - - if rxSamples == 0 && txSamples == 0 && pubSamples == 0 { - last = cur - lastTime = now - if cur.Closed { - return - } - continue - } - - rxRate := float64(rxSamples) / dt - txRate := float64(txSamples) / dt - pubRate := float64(pubSamples) / dt - - log.Infow("media sample rate", - "sip_rx_pcm_hz", rxRate, - "sip_rx_ppm", ratePPM(rxRate, expectedSampleRate), - "sip_tx_pcm_hz", txRate, - "sip_tx_ppm", ratePPM(txRate, expectedSampleRate), - "lk_publish_pcm_hz", pubRate, - "lk_publish_ppm", ratePPM(pubRate, expectedSampleRate), - "expected_pcm_hz", expectedSampleRate, - "interval", now.Sub(lastTime), - "sip_rx_samples", rxSamples, - "sip_tx_samples", txSamples, - "lk_publish_samples", pubSamples, - ) - - last = cur - lastTime = now - if cur.Closed { - return - } - } - } +func (s *Stats) MarshalJSON() ([]byte, error) { + return json.Marshal(s.Load()) } func ratePPM(rate float64, expected int) float64 { diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 75dba291..aea93cc6 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "math" "net" "net/netip" "strings" @@ -45,6 +46,32 @@ const ( defaultMediaTimeoutInitial = 30 * time.Second ) +type PortStatsSnapshot struct { + Streams uint64 `json:"streams"` + Packets uint64 `json:"packets"` + IgnoredPackets uint64 `json:"packets_ignored"` + InputPackets uint64 `json:"packets_input"` + + MuxPackets uint64 `json:"mux_packets"` + MuxBytes uint64 `json:"mux_bytes"` + + AudioPackets uint64 `json:"audio_packets"` + AudioBytes uint64 `json:"audio_bytes"` + + AudioInFrames uint64 `json:"audio_in_frames"` + AudioInSamples uint64 `json:"audio_in_samples"` + AudioOutFrames uint64 `json:"audio_out_frames"` + AudioOutSamples uint64 `json:"audio_out_samples"` + + AudioRX float64 `json:"audio_rx"` + AudioTX float64 `json:"audio_tx"` + + DTMFPackets uint64 `json:"dtmf_packets"` + DTMFBytes uint64 `json:"dtmf_bytes"` + + Closed bool `json:"closed"` +} + type PortStats struct { Streams atomic.Uint64 Packets atomic.Uint64 @@ -62,10 +89,67 @@ type PortStats struct { AudioOutFrames atomic.Uint64 AudioOutSamples atomic.Uint64 + AudioRX atomic.Uint64 // based on AudioInSamples + AudioTX atomic.Uint64 // based on AudioOutSamples + DTMFPackets atomic.Uint64 DTMFBytes atomic.Uint64 Closed atomic.Bool + + mu sync.Mutex + last struct { + Time time.Time + AudioInSamples uint64 + AudioOutSamples uint64 + } +} + +func (s *PortStats) Load() PortStatsSnapshot { + return PortStatsSnapshot{ + Streams: s.Streams.Load(), + Packets: s.Packets.Load(), + IgnoredPackets: s.IgnoredPackets.Load(), + InputPackets: s.InputPackets.Load(), + MuxPackets: s.MuxPackets.Load(), + MuxBytes: s.MuxBytes.Load(), + AudioPackets: s.AudioPackets.Load(), + AudioBytes: s.AudioBytes.Load(), + AudioInFrames: s.AudioInFrames.Load(), + AudioInSamples: s.AudioInSamples.Load(), + AudioOutFrames: s.AudioOutFrames.Load(), + AudioOutSamples: s.AudioOutSamples.Load(), + AudioRX: math.Float64frombits(s.AudioRX.Load()), + AudioTX: math.Float64frombits(s.AudioTX.Load()), + DTMFPackets: s.DTMFPackets.Load(), + DTMFBytes: s.DTMFBytes.Load(), + Closed: s.Closed.Load(), + } +} + +func (s *PortStats) Update() { + s.mu.Lock() + defer s.mu.Unlock() + t := time.Now() + dt := t.Sub(s.last.Time).Seconds() + + curAudioInSamples := s.AudioInSamples.Load() + curAudioOutSamples := s.AudioOutSamples.Load() + + if dt > 0 { + rxSamples := curAudioInSamples - s.last.AudioInSamples + txSamples := curAudioOutSamples - s.last.AudioOutSamples + + rxRate := float64(rxSamples) / dt + txRate := float64(txSamples) / dt + + s.AudioRX.Store(math.Float64bits(rxRate)) + s.AudioTX.Store(math.Float64bits(txRate)) + } + + s.last.Time = t + s.last.AudioInSamples = curAudioInSamples + s.last.AudioOutSamples = curAudioOutSamples } type UDPConn interface { diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 62cf98f4..2fe84ba6 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -230,6 +230,7 @@ func (c *outboundCall) waitClose(ctx context.Context, tid traceid.ID) error { case <-ticker.C: c.log.Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.stats.Update() c.printStats() case <-c.Disconnected(): c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED) @@ -284,7 +285,7 @@ func (c *outboundCall) closeWithTimeout() { } func (c *outboundCall) printStats() { - c.log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes())) + c.stats.Log(c.log, c.callStart) } func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) { @@ -596,7 +597,6 @@ func (c *outboundCall) sipSignal(ctx context.Context, tid traceid.ID) error { if err = c.media.SetConfig(mc); err != nil { return err } - c.stats.startRateLogger(context.Background(), c.log, defaultRateLoggerInterval, RoomSampleRate) c.c.cmu.Lock() c.c.byRemote[c.cc.Tag()] = c diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 0a1a1593..3c0cc013 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -18,7 +18,10 @@ import ( "context" "errors" "io" + "math" + "sync" "sync/atomic" + "time" "github.com/frostbyte73/core" "github.com/pion/webrtc/v4" @@ -38,6 +41,25 @@ import ( "github.com/livekit/sip/pkg/media/opus" ) +type RoomStatsSnapshot struct { + InputPackets uint64 `json:"input_packets"` + InputBytes uint64 `json:"input_bytes"` + DTMFPackets uint64 `json:"dtmf_packets"` + + PublishedFrames uint64 `json:"published_frames"` + PublishedSamples uint64 `json:"published_samples"` + + PublishTX float64 `json:"publish_tx"` + + MixerSamples uint64 `json:"mixer_samples"` + MixerFrames uint64 `json:"mixer_frames"` + + OutputSamples uint64 `json:"output_samples"` + OutputFrames uint64 `json:"output_frames"` + + Closed bool `json:"closed"` +} + type RoomStats struct { InputPackets atomic.Uint64 InputBytes atomic.Uint64 @@ -46,6 +68,8 @@ type RoomStats struct { PublishedFrames atomic.Uint64 PublishedSamples atomic.Uint64 + PublishTX atomic.Uint64 + MixerFrames atomic.Uint64 MixerSamples atomic.Uint64 @@ -55,6 +79,48 @@ type RoomStats struct { OutputSamples atomic.Uint64 Closed atomic.Bool + + mu sync.Mutex + last struct { + Time time.Time + PublishedSamples uint64 + } +} + +func (s *RoomStats) Load() RoomStatsSnapshot { + return RoomStatsSnapshot{ + InputPackets: s.InputPackets.Load(), + InputBytes: s.InputBytes.Load(), + DTMFPackets: s.DTMFPackets.Load(), + PublishedFrames: s.PublishedFrames.Load(), + PublishedSamples: s.PublishedSamples.Load(), + PublishTX: math.Float64frombits(s.PublishTX.Load()), + MixerSamples: s.MixerSamples.Load(), + MixerFrames: s.MixerFrames.Load(), + OutputSamples: s.OutputSamples.Load(), + OutputFrames: s.OutputFrames.Load(), + Closed: s.Closed.Load(), + } +} + +func (s *RoomStats) Update() { + s.mu.Lock() + defer s.mu.Unlock() + t := time.Now() + dt := t.Sub(s.last.Time).Seconds() + + curPublishedSamples := s.PublishedSamples.Load() + + if dt > 0 { + txSamples := curPublishedSamples - s.last.PublishedSamples + + txRate := float64(txSamples) / dt + + s.PublishTX.Store(math.Float64bits(txRate)) + } + + s.last.Time = t + s.last.PublishedSamples = curPublishedSamples } type ParticipantInfo struct {