diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index abaf8ffa..828953c4 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -945,6 +945,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip case <-ticker.C: c.log().Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.stats.Update() c.printStats(c.log()) case <-ctx.Done(): c.closeWithHangup() @@ -1150,7 +1151,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD } func (c *inboundCall) printStats(log logger.Logger) { - log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes())) + c.stats.Log(log, c.callStart) } // close should only be called from handleInvite. diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 5ceea564..741ff1cc 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "sync/atomic" + "time" "github.com/pion/interceptor" prtp "github.com/pion/rtp" @@ -27,6 +28,7 @@ import ( "github.com/livekit/media-sdk/rtp" + "github.com/livekit/protocol/logger" "github.com/livekit/sip/pkg/stats" ) @@ -45,38 +47,6 @@ type StatsSnapshot struct { Closed bool `json:"closed"` } -type PortStatsSnapshot struct { - Streams uint64 `json:"streams"` - Packets uint64 `json:"packets"` - IgnoredPackets uint64 `json:"packets_ignored"` - InputPackets uint64 `json:"packets_input"` - - MuxPackets uint64 `json:"mux_packets"` - MuxBytes uint64 `json:"mux_bytes"` - - AudioPackets uint64 `json:"audio_packets"` - AudioBytes uint64 `json:"audio_bytes"` - - DTMFPackets uint64 `json:"dtmf_packets"` - DTMFBytes uint64 `json:"dtmf_bytes"` - - Closed bool `json:"closed"` -} - -type RoomStatsSnapshot struct { - InputPackets uint64 `json:"input_packets"` - InputBytes uint64 `json:"input_bytes"` - DTMFPackets uint64 `json:"dtmf_packets"` - - MixerSamples uint64 `json:"mixer_samples"` - MixerFrames uint64 `json:"mixer_frames"` - - OutputSamples uint64 `json:"output_samples"` - OutputFrames uint64 `json:"output_frames"` - - Closed bool `json:"closed"` -} - type MixerStatsSnapshot struct { Tracks int64 `json:"tracks"` TracksTotal uint64 `json:"tracks_total"` @@ -97,34 +67,21 @@ type MixerStatsSnapshot struct { OutputFrames uint64 `json:"output_frames"` } +func (s *Stats) Update() { + if s == nil { + return + } + s.Port.Update() + s.Room.Update() +} + func (s *Stats) Load() StatsSnapshot { p := &s.Port r := &s.Room m := &r.Mixer return StatsSnapshot{ - Port: PortStatsSnapshot{ - Streams: p.Streams.Load(), - Packets: p.Packets.Load(), - IgnoredPackets: p.IgnoredPackets.Load(), - InputPackets: p.InputPackets.Load(), - MuxPackets: p.MuxPackets.Load(), - MuxBytes: p.MuxBytes.Load(), - AudioPackets: p.AudioPackets.Load(), - AudioBytes: p.AudioBytes.Load(), - DTMFPackets: p.DTMFPackets.Load(), - DTMFBytes: p.DTMFBytes.Load(), - Closed: p.Closed.Load(), - }, - Room: RoomStatsSnapshot{ - InputPackets: r.InputPackets.Load(), - InputBytes: r.InputBytes.Load(), - DTMFPackets: r.DTMFPackets.Load(), - MixerSamples: r.MixerSamples.Load(), - MixerFrames: r.MixerFrames.Load(), - OutputSamples: r.OutputSamples.Load(), - OutputFrames: r.OutputFrames.Load(), - Closed: r.Closed.Load(), - }, + Port: p.Load(), + Room: r.Load(), Mixer: MixerStatsSnapshot{ Tracks: m.Tracks.Load(), TracksTotal: m.TracksTotal.Load(), @@ -144,10 +101,30 @@ func (s *Stats) Load() StatsSnapshot { } } +func (s *Stats) Log(log logger.Logger, callStart time.Time) { + const expectedSampleRate = RoomSampleRate + st := s.Load() + log.Infow("call statistics", + "stats", st, + "durMin", int(time.Since(callStart).Minutes()), + "sip_rx_ppm", ratePPM(st.Port.AudioRX, expectedSampleRate), + "sip_tx_ppm", ratePPM(st.Port.AudioTX, expectedSampleRate), + "lk_publish_ppm", ratePPM(st.Room.PublishTX, expectedSampleRate), + "expected_pcm_hz", expectedSampleRate, + ) +} + func (s *Stats) MarshalJSON() ([]byte, error) { return json.Marshal(s.Load()) } +func ratePPM(rate float64, expected int) float64 { + if expected <= 0 { + return 0 + } + return (rate - float64(expected)) / float64(expected) * 1_000_000 +} + const ( channels = 1 RoomSampleRate = 48000 diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 65e8729a..aea93cc6 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "math" "net" "net/netip" "strings" @@ -45,6 +46,32 @@ const ( defaultMediaTimeoutInitial = 30 * time.Second ) +type PortStatsSnapshot struct { + Streams uint64 `json:"streams"` + Packets uint64 `json:"packets"` + IgnoredPackets uint64 `json:"packets_ignored"` + InputPackets uint64 `json:"packets_input"` + + MuxPackets uint64 `json:"mux_packets"` + MuxBytes uint64 `json:"mux_bytes"` + + AudioPackets uint64 `json:"audio_packets"` + AudioBytes uint64 `json:"audio_bytes"` + + AudioInFrames uint64 `json:"audio_in_frames"` + AudioInSamples uint64 `json:"audio_in_samples"` + AudioOutFrames uint64 `json:"audio_out_frames"` + AudioOutSamples uint64 `json:"audio_out_samples"` + + AudioRX float64 `json:"audio_rx"` + AudioTX float64 `json:"audio_tx"` + + DTMFPackets uint64 `json:"dtmf_packets"` + DTMFBytes uint64 `json:"dtmf_bytes"` + + Closed bool `json:"closed"` +} + type PortStats struct { Streams atomic.Uint64 Packets atomic.Uint64 @@ -57,10 +84,72 @@ type PortStats struct { AudioPackets atomic.Uint64 AudioBytes atomic.Uint64 + AudioInFrames atomic.Uint64 + AudioInSamples atomic.Uint64 + AudioOutFrames atomic.Uint64 + AudioOutSamples atomic.Uint64 + + AudioRX atomic.Uint64 // based on AudioInSamples + AudioTX atomic.Uint64 // based on AudioOutSamples + DTMFPackets atomic.Uint64 DTMFBytes atomic.Uint64 Closed atomic.Bool + + mu sync.Mutex + last struct { + Time time.Time + AudioInSamples uint64 + AudioOutSamples uint64 + } +} + +func (s *PortStats) Load() PortStatsSnapshot { + return PortStatsSnapshot{ + Streams: s.Streams.Load(), + Packets: s.Packets.Load(), + IgnoredPackets: s.IgnoredPackets.Load(), + InputPackets: s.InputPackets.Load(), + MuxPackets: s.MuxPackets.Load(), + MuxBytes: s.MuxBytes.Load(), + AudioPackets: s.AudioPackets.Load(), + AudioBytes: s.AudioBytes.Load(), + AudioInFrames: s.AudioInFrames.Load(), + AudioInSamples: s.AudioInSamples.Load(), + AudioOutFrames: s.AudioOutFrames.Load(), + AudioOutSamples: s.AudioOutSamples.Load(), + AudioRX: math.Float64frombits(s.AudioRX.Load()), + AudioTX: math.Float64frombits(s.AudioTX.Load()), + DTMFPackets: s.DTMFPackets.Load(), + DTMFBytes: s.DTMFBytes.Load(), + Closed: s.Closed.Load(), + } +} + +func (s *PortStats) Update() { + s.mu.Lock() + defer s.mu.Unlock() + t := time.Now() + dt := t.Sub(s.last.Time).Seconds() + + curAudioInSamples := s.AudioInSamples.Load() + curAudioOutSamples := s.AudioOutSamples.Load() + + if dt > 0 { + rxSamples := curAudioInSamples - s.last.AudioInSamples + txSamples := curAudioOutSamples - s.last.AudioOutSamples + + rxRate := float64(rxSamples) / dt + txRate := float64(txSamples) / dt + + s.AudioRX.Store(math.Float64bits(rxRate)) + s.AudioTX.Store(math.Float64bits(txRate)) + } + + s.last.Time = t + s.last.AudioInSamples = curAudioInSamples + s.last.AudioOutSamples = curAudioOutSamples } type UDPConn interface { @@ -610,6 +699,9 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { // Encoding pipeline (LK PCM -> SIP RTP) audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP) + if p.stats != nil { + audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) + } if p.conf.Audio.DTMFType != 0 { p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) @@ -638,7 +730,12 @@ func (p *MediaPort) setupInput() { if p.opts.NoInputResample { p.audioIn.SetSampleRate(codecInfo.SampleRate) } - audioHandler := codec.DecodeRTP(p.audioIn, p.conf.Audio.Type) + + var audioWriter msdk.PCM16Writer = p.audioIn + if p.stats != nil { + audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) + } + audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) p.audioInHandler = audioHandler mux := rtp.NewMux(nil) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index d4640c25..2fe84ba6 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -230,6 +230,7 @@ func (c *outboundCall) waitClose(ctx context.Context, tid traceid.ID) error { case <-ticker.C: c.log.Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.stats.Update() c.printStats() case <-c.Disconnected(): c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED) @@ -284,7 +285,7 @@ func (c *outboundCall) closeWithTimeout() { } func (c *outboundCall) printStats() { - c.log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes())) + c.stats.Log(c.log, c.callStart) } func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) { diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 4ec68db6..3c0cc013 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -18,7 +18,10 @@ import ( "context" "errors" "io" + "math" + "sync" "sync/atomic" + "time" "github.com/frostbyte73/core" "github.com/pion/webrtc/v4" @@ -38,11 +41,35 @@ import ( "github.com/livekit/sip/pkg/media/opus" ) +type RoomStatsSnapshot struct { + InputPackets uint64 `json:"input_packets"` + InputBytes uint64 `json:"input_bytes"` + DTMFPackets uint64 `json:"dtmf_packets"` + + PublishedFrames uint64 `json:"published_frames"` + PublishedSamples uint64 `json:"published_samples"` + + PublishTX float64 `json:"publish_tx"` + + MixerSamples uint64 `json:"mixer_samples"` + MixerFrames uint64 `json:"mixer_frames"` + + OutputSamples uint64 `json:"output_samples"` + OutputFrames uint64 `json:"output_frames"` + + Closed bool `json:"closed"` +} + type RoomStats struct { InputPackets atomic.Uint64 InputBytes atomic.Uint64 DTMFPackets atomic.Uint64 + PublishedFrames atomic.Uint64 + PublishedSamples atomic.Uint64 + + PublishTX atomic.Uint64 + MixerFrames atomic.Uint64 MixerSamples atomic.Uint64 @@ -52,6 +79,48 @@ type RoomStats struct { OutputSamples atomic.Uint64 Closed atomic.Bool + + mu sync.Mutex + last struct { + Time time.Time + PublishedSamples uint64 + } +} + +func (s *RoomStats) Load() RoomStatsSnapshot { + return RoomStatsSnapshot{ + InputPackets: s.InputPackets.Load(), + InputBytes: s.InputBytes.Load(), + DTMFPackets: s.DTMFPackets.Load(), + PublishedFrames: s.PublishedFrames.Load(), + PublishedSamples: s.PublishedSamples.Load(), + PublishTX: math.Float64frombits(s.PublishTX.Load()), + MixerSamples: s.MixerSamples.Load(), + MixerFrames: s.MixerFrames.Load(), + OutputSamples: s.OutputSamples.Load(), + OutputFrames: s.OutputFrames.Load(), + Closed: s.Closed.Load(), + } +} + +func (s *RoomStats) Update() { + s.mu.Lock() + defer s.mu.Unlock() + t := time.Now() + dt := t.Sub(s.last.Time).Seconds() + + curPublishedSamples := s.PublishedSamples.Load() + + if dt > 0 { + txSamples := curPublishedSamples - s.last.PublishedSamples + + txRate := float64(txSamples) / dt + + s.PublishTX.Store(math.Float64bits(txRate)) + } + + s.last.Time = t + s.last.PublishedSamples = curPublishedSamples } type ParticipantInfo struct { @@ -452,7 +521,7 @@ func (r *Room) NewParticipantTrack(sampleRate int) (msdk.WriteCloser[msdk.PCM16S if err != nil { return nil, err } - return pw, nil + return newMediaWriterCount(pw, &r.stats.PublishedFrames, &r.stats.PublishedSamples), nil } func (r *Room) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error {