Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
case <-ticker.C:
c.log().Debugw("sending keep-alive")
c.state.ForceFlush(ctx)
c.stats.Update()
c.printStats(c.log())
case <-ctx.Done():
c.closeWithHangup()
Expand Down Expand Up @@ -1150,7 +1151,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
}

func (c *inboundCall) printStats(log logger.Logger) {
log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
c.stats.Log(log, c.callStart)
}

// close should only be called from handleInvite.
Expand Down
87 changes: 32 additions & 55 deletions pkg/sip/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"sync/atomic"
"time"

"github.com/pion/interceptor"
prtp "github.com/pion/rtp"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/livekit/media-sdk/rtp"

"github.com/livekit/protocol/logger"
"github.com/livekit/sip/pkg/stats"
)

Expand All @@ -45,38 +47,6 @@ type StatsSnapshot struct {
Closed bool `json:"closed"`
}

type PortStatsSnapshot struct {
Streams uint64 `json:"streams"`
Packets uint64 `json:"packets"`
IgnoredPackets uint64 `json:"packets_ignored"`
InputPackets uint64 `json:"packets_input"`

MuxPackets uint64 `json:"mux_packets"`
MuxBytes uint64 `json:"mux_bytes"`

AudioPackets uint64 `json:"audio_packets"`
AudioBytes uint64 `json:"audio_bytes"`

DTMFPackets uint64 `json:"dtmf_packets"`
DTMFBytes uint64 `json:"dtmf_bytes"`

Closed bool `json:"closed"`
}

type RoomStatsSnapshot struct {
InputPackets uint64 `json:"input_packets"`
InputBytes uint64 `json:"input_bytes"`
DTMFPackets uint64 `json:"dtmf_packets"`

MixerSamples uint64 `json:"mixer_samples"`
MixerFrames uint64 `json:"mixer_frames"`

OutputSamples uint64 `json:"output_samples"`
OutputFrames uint64 `json:"output_frames"`

Closed bool `json:"closed"`
}

type MixerStatsSnapshot struct {
Tracks int64 `json:"tracks"`
TracksTotal uint64 `json:"tracks_total"`
Expand All @@ -97,34 +67,21 @@ type MixerStatsSnapshot struct {
OutputFrames uint64 `json:"output_frames"`
}

func (s *Stats) Update() {
if s == nil {
return
}
s.Port.Update()
s.Room.Update()
}

func (s *Stats) Load() StatsSnapshot {
p := &s.Port
r := &s.Room
m := &r.Mixer
return StatsSnapshot{
Port: PortStatsSnapshot{
Streams: p.Streams.Load(),
Packets: p.Packets.Load(),
IgnoredPackets: p.IgnoredPackets.Load(),
InputPackets: p.InputPackets.Load(),
MuxPackets: p.MuxPackets.Load(),
MuxBytes: p.MuxBytes.Load(),
AudioPackets: p.AudioPackets.Load(),
AudioBytes: p.AudioBytes.Load(),
DTMFPackets: p.DTMFPackets.Load(),
DTMFBytes: p.DTMFBytes.Load(),
Closed: p.Closed.Load(),
},
Room: RoomStatsSnapshot{
InputPackets: r.InputPackets.Load(),
InputBytes: r.InputBytes.Load(),
DTMFPackets: r.DTMFPackets.Load(),
MixerSamples: r.MixerSamples.Load(),
MixerFrames: r.MixerFrames.Load(),
OutputSamples: r.OutputSamples.Load(),
OutputFrames: r.OutputFrames.Load(),
Closed: r.Closed.Load(),
},
Port: p.Load(),
Room: r.Load(),
Mixer: MixerStatsSnapshot{
Tracks: m.Tracks.Load(),
TracksTotal: m.TracksTotal.Load(),
Expand All @@ -144,10 +101,30 @@ func (s *Stats) Load() StatsSnapshot {
}
}

func (s *Stats) Log(log logger.Logger, callStart time.Time) {
const expectedSampleRate = RoomSampleRate
st := s.Load()
log.Infow("call statistics",
"stats", st,
"durMin", int(time.Since(callStart).Minutes()),
"sip_rx_ppm", ratePPM(st.Port.AudioRX, expectedSampleRate),
"sip_tx_ppm", ratePPM(st.Port.AudioTX, expectedSampleRate),
"lk_publish_ppm", ratePPM(st.Room.PublishTX, expectedSampleRate),
"expected_pcm_hz", expectedSampleRate,
)
}

func (s *Stats) MarshalJSON() ([]byte, error) {
return json.Marshal(s.Load())
}

func ratePPM(rate float64, expected int) float64 {
if expected <= 0 {
return 0
}
return (rate - float64(expected)) / float64(expected) * 1_000_000
}

const (
channels = 1
RoomSampleRate = 48000
Expand Down
99 changes: 98 additions & 1 deletion pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"io"
"math"
"net"
"net/netip"
"strings"
Expand Down Expand Up @@ -45,6 +46,32 @@ const (
defaultMediaTimeoutInitial = 30 * time.Second
)

type PortStatsSnapshot struct {
Streams uint64 `json:"streams"`
Packets uint64 `json:"packets"`
IgnoredPackets uint64 `json:"packets_ignored"`
InputPackets uint64 `json:"packets_input"`

MuxPackets uint64 `json:"mux_packets"`
MuxBytes uint64 `json:"mux_bytes"`

AudioPackets uint64 `json:"audio_packets"`
AudioBytes uint64 `json:"audio_bytes"`

AudioInFrames uint64 `json:"audio_in_frames"`
AudioInSamples uint64 `json:"audio_in_samples"`
AudioOutFrames uint64 `json:"audio_out_frames"`
AudioOutSamples uint64 `json:"audio_out_samples"`

AudioRX float64 `json:"audio_rx"`
AudioTX float64 `json:"audio_tx"`

DTMFPackets uint64 `json:"dtmf_packets"`
DTMFBytes uint64 `json:"dtmf_bytes"`

Closed bool `json:"closed"`
}

type PortStats struct {
Streams atomic.Uint64
Packets atomic.Uint64
Expand All @@ -57,10 +84,72 @@ type PortStats struct {
AudioPackets atomic.Uint64
AudioBytes atomic.Uint64

AudioInFrames atomic.Uint64
AudioInSamples atomic.Uint64
AudioOutFrames atomic.Uint64
AudioOutSamples atomic.Uint64

AudioRX atomic.Uint64 // based on AudioInSamples
AudioTX atomic.Uint64 // based on AudioOutSamples

DTMFPackets atomic.Uint64
DTMFBytes atomic.Uint64

Closed atomic.Bool

mu sync.Mutex
last struct {
Time time.Time
AudioInSamples uint64
AudioOutSamples uint64
}
}

func (s *PortStats) Load() PortStatsSnapshot {
return PortStatsSnapshot{
Streams: s.Streams.Load(),
Packets: s.Packets.Load(),
IgnoredPackets: s.IgnoredPackets.Load(),
InputPackets: s.InputPackets.Load(),
MuxPackets: s.MuxPackets.Load(),
MuxBytes: s.MuxBytes.Load(),
AudioPackets: s.AudioPackets.Load(),
AudioBytes: s.AudioBytes.Load(),
AudioInFrames: s.AudioInFrames.Load(),
AudioInSamples: s.AudioInSamples.Load(),
AudioOutFrames: s.AudioOutFrames.Load(),
AudioOutSamples: s.AudioOutSamples.Load(),
AudioRX: math.Float64frombits(s.AudioRX.Load()),
AudioTX: math.Float64frombits(s.AudioTX.Load()),
DTMFPackets: s.DTMFPackets.Load(),
DTMFBytes: s.DTMFBytes.Load(),
Closed: s.Closed.Load(),
}
}

func (s *PortStats) Update() {
s.mu.Lock()
defer s.mu.Unlock()
t := time.Now()
dt := t.Sub(s.last.Time).Seconds()

curAudioInSamples := s.AudioInSamples.Load()
curAudioOutSamples := s.AudioOutSamples.Load()

if dt > 0 {
rxSamples := curAudioInSamples - s.last.AudioInSamples
txSamples := curAudioOutSamples - s.last.AudioOutSamples

rxRate := float64(rxSamples) / dt
txRate := float64(txSamples) / dt

s.AudioRX.Store(math.Float64bits(rxRate))
s.AudioTX.Store(math.Float64bits(txRate))
}

s.last.Time = t
s.last.AudioInSamples = curAudioInSamples
s.last.AudioOutSamples = curAudioOutSamples
}

type UDPConn interface {
Expand Down Expand Up @@ -610,6 +699,9 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {

// Encoding pipeline (LK PCM -> SIP RTP)
audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP)
if p.stats != nil {
audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples)
}

if p.conf.Audio.DTMFType != 0 {
p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate)
Expand Down Expand Up @@ -638,7 +730,12 @@ func (p *MediaPort) setupInput() {
if p.opts.NoInputResample {
p.audioIn.SetSampleRate(codecInfo.SampleRate)
}
audioHandler := codec.DecodeRTP(p.audioIn, p.conf.Audio.Type)

var audioWriter msdk.PCM16Writer = p.audioIn
if p.stats != nil {
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
}
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)
p.audioInHandler = audioHandler

mux := rtp.NewMux(nil)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (c *outboundCall) waitClose(ctx context.Context, tid traceid.ID) error {
case <-ticker.C:
c.log.Debugw("sending keep-alive")
c.state.ForceFlush(ctx)
c.stats.Update()
c.printStats()
case <-c.Disconnected():
c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED)
Expand Down Expand Up @@ -284,7 +285,7 @@ func (c *outboundCall) closeWithTimeout() {
}

func (c *outboundCall) printStats() {
c.log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
c.stats.Log(c.log, c.callStart)
}

func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) {
Expand Down
Loading