Skip to content

Commit 44e2418

Browse files
milos-lkdennwc
andauthored
Logging rx and tx sample/frame rate (#498)
* Logging rx and tx sample/frame rate * passing logger correclty * Integrate media rx/tx stats into existing call statistics. --------- Co-authored-by: Denys Smirnov <dennwc@pm.me>
1 parent ca13542 commit 44e2418

File tree

5 files changed

+204
-59
lines changed

5 files changed

+204
-59
lines changed

pkg/sip/inbound.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
945945
case <-ticker.C:
946946
c.log().Debugw("sending keep-alive")
947947
c.state.ForceFlush(ctx)
948+
c.stats.Update()
948949
c.printStats(c.log())
949950
case <-ctx.Done():
950951
c.closeWithHangup()
@@ -1150,7 +1151,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
11501151
}
11511152

11521153
func (c *inboundCall) printStats(log logger.Logger) {
1153-
log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
1154+
c.stats.Log(log, c.callStart)
11541155
}
11551156

11561157
// close should only be called from handleInvite.

pkg/sip/media.go

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"strconv"
2121
"sync/atomic"
22+
"time"
2223

2324
"github.com/pion/interceptor"
2425
prtp "github.com/pion/rtp"
@@ -27,6 +28,7 @@ import (
2728

2829
"github.com/livekit/media-sdk/rtp"
2930

31+
"github.com/livekit/protocol/logger"
3032
"github.com/livekit/sip/pkg/stats"
3133
)
3234

@@ -45,38 +47,6 @@ type StatsSnapshot struct {
4547
Closed bool `json:"closed"`
4648
}
4749

48-
type PortStatsSnapshot struct {
49-
Streams uint64 `json:"streams"`
50-
Packets uint64 `json:"packets"`
51-
IgnoredPackets uint64 `json:"packets_ignored"`
52-
InputPackets uint64 `json:"packets_input"`
53-
54-
MuxPackets uint64 `json:"mux_packets"`
55-
MuxBytes uint64 `json:"mux_bytes"`
56-
57-
AudioPackets uint64 `json:"audio_packets"`
58-
AudioBytes uint64 `json:"audio_bytes"`
59-
60-
DTMFPackets uint64 `json:"dtmf_packets"`
61-
DTMFBytes uint64 `json:"dtmf_bytes"`
62-
63-
Closed bool `json:"closed"`
64-
}
65-
66-
type RoomStatsSnapshot struct {
67-
InputPackets uint64 `json:"input_packets"`
68-
InputBytes uint64 `json:"input_bytes"`
69-
DTMFPackets uint64 `json:"dtmf_packets"`
70-
71-
MixerSamples uint64 `json:"mixer_samples"`
72-
MixerFrames uint64 `json:"mixer_frames"`
73-
74-
OutputSamples uint64 `json:"output_samples"`
75-
OutputFrames uint64 `json:"output_frames"`
76-
77-
Closed bool `json:"closed"`
78-
}
79-
8050
type MixerStatsSnapshot struct {
8151
Tracks int64 `json:"tracks"`
8252
TracksTotal uint64 `json:"tracks_total"`
@@ -97,34 +67,21 @@ type MixerStatsSnapshot struct {
9767
OutputFrames uint64 `json:"output_frames"`
9868
}
9969

70+
func (s *Stats) Update() {
71+
if s == nil {
72+
return
73+
}
74+
s.Port.Update()
75+
s.Room.Update()
76+
}
77+
10078
func (s *Stats) Load() StatsSnapshot {
10179
p := &s.Port
10280
r := &s.Room
10381
m := &r.Mixer
10482
return StatsSnapshot{
105-
Port: PortStatsSnapshot{
106-
Streams: p.Streams.Load(),
107-
Packets: p.Packets.Load(),
108-
IgnoredPackets: p.IgnoredPackets.Load(),
109-
InputPackets: p.InputPackets.Load(),
110-
MuxPackets: p.MuxPackets.Load(),
111-
MuxBytes: p.MuxBytes.Load(),
112-
AudioPackets: p.AudioPackets.Load(),
113-
AudioBytes: p.AudioBytes.Load(),
114-
DTMFPackets: p.DTMFPackets.Load(),
115-
DTMFBytes: p.DTMFBytes.Load(),
116-
Closed: p.Closed.Load(),
117-
},
118-
Room: RoomStatsSnapshot{
119-
InputPackets: r.InputPackets.Load(),
120-
InputBytes: r.InputBytes.Load(),
121-
DTMFPackets: r.DTMFPackets.Load(),
122-
MixerSamples: r.MixerSamples.Load(),
123-
MixerFrames: r.MixerFrames.Load(),
124-
OutputSamples: r.OutputSamples.Load(),
125-
OutputFrames: r.OutputFrames.Load(),
126-
Closed: r.Closed.Load(),
127-
},
83+
Port: p.Load(),
84+
Room: r.Load(),
12885
Mixer: MixerStatsSnapshot{
12986
Tracks: m.Tracks.Load(),
13087
TracksTotal: m.TracksTotal.Load(),
@@ -144,10 +101,30 @@ func (s *Stats) Load() StatsSnapshot {
144101
}
145102
}
146103

104+
func (s *Stats) Log(log logger.Logger, callStart time.Time) {
105+
const expectedSampleRate = RoomSampleRate
106+
st := s.Load()
107+
log.Infow("call statistics",
108+
"stats", st,
109+
"durMin", int(time.Since(callStart).Minutes()),
110+
"sip_rx_ppm", ratePPM(st.Port.AudioRX, expectedSampleRate),
111+
"sip_tx_ppm", ratePPM(st.Port.AudioTX, expectedSampleRate),
112+
"lk_publish_ppm", ratePPM(st.Room.PublishTX, expectedSampleRate),
113+
"expected_pcm_hz", expectedSampleRate,
114+
)
115+
}
116+
147117
func (s *Stats) MarshalJSON() ([]byte, error) {
148118
return json.Marshal(s.Load())
149119
}
150120

121+
func ratePPM(rate float64, expected int) float64 {
122+
if expected <= 0 {
123+
return 0
124+
}
125+
return (rate - float64(expected)) / float64(expected) * 1_000_000
126+
}
127+
151128
const (
152129
channels = 1
153130
RoomSampleRate = 48000

pkg/sip/media_port.go

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"io"
21+
"math"
2122
"net"
2223
"net/netip"
2324
"strings"
@@ -45,6 +46,32 @@ const (
4546
defaultMediaTimeoutInitial = 30 * time.Second
4647
)
4748

49+
type PortStatsSnapshot struct {
50+
Streams uint64 `json:"streams"`
51+
Packets uint64 `json:"packets"`
52+
IgnoredPackets uint64 `json:"packets_ignored"`
53+
InputPackets uint64 `json:"packets_input"`
54+
55+
MuxPackets uint64 `json:"mux_packets"`
56+
MuxBytes uint64 `json:"mux_bytes"`
57+
58+
AudioPackets uint64 `json:"audio_packets"`
59+
AudioBytes uint64 `json:"audio_bytes"`
60+
61+
AudioInFrames uint64 `json:"audio_in_frames"`
62+
AudioInSamples uint64 `json:"audio_in_samples"`
63+
AudioOutFrames uint64 `json:"audio_out_frames"`
64+
AudioOutSamples uint64 `json:"audio_out_samples"`
65+
66+
AudioRX float64 `json:"audio_rx"`
67+
AudioTX float64 `json:"audio_tx"`
68+
69+
DTMFPackets uint64 `json:"dtmf_packets"`
70+
DTMFBytes uint64 `json:"dtmf_bytes"`
71+
72+
Closed bool `json:"closed"`
73+
}
74+
4875
type PortStats struct {
4976
Streams atomic.Uint64
5077
Packets atomic.Uint64
@@ -57,10 +84,72 @@ type PortStats struct {
5784
AudioPackets atomic.Uint64
5885
AudioBytes atomic.Uint64
5986

87+
AudioInFrames atomic.Uint64
88+
AudioInSamples atomic.Uint64
89+
AudioOutFrames atomic.Uint64
90+
AudioOutSamples atomic.Uint64
91+
92+
AudioRX atomic.Uint64 // based on AudioInSamples
93+
AudioTX atomic.Uint64 // based on AudioOutSamples
94+
6095
DTMFPackets atomic.Uint64
6196
DTMFBytes atomic.Uint64
6297

6398
Closed atomic.Bool
99+
100+
mu sync.Mutex
101+
last struct {
102+
Time time.Time
103+
AudioInSamples uint64
104+
AudioOutSamples uint64
105+
}
106+
}
107+
108+
func (s *PortStats) Load() PortStatsSnapshot {
109+
return PortStatsSnapshot{
110+
Streams: s.Streams.Load(),
111+
Packets: s.Packets.Load(),
112+
IgnoredPackets: s.IgnoredPackets.Load(),
113+
InputPackets: s.InputPackets.Load(),
114+
MuxPackets: s.MuxPackets.Load(),
115+
MuxBytes: s.MuxBytes.Load(),
116+
AudioPackets: s.AudioPackets.Load(),
117+
AudioBytes: s.AudioBytes.Load(),
118+
AudioInFrames: s.AudioInFrames.Load(),
119+
AudioInSamples: s.AudioInSamples.Load(),
120+
AudioOutFrames: s.AudioOutFrames.Load(),
121+
AudioOutSamples: s.AudioOutSamples.Load(),
122+
AudioRX: math.Float64frombits(s.AudioRX.Load()),
123+
AudioTX: math.Float64frombits(s.AudioTX.Load()),
124+
DTMFPackets: s.DTMFPackets.Load(),
125+
DTMFBytes: s.DTMFBytes.Load(),
126+
Closed: s.Closed.Load(),
127+
}
128+
}
129+
130+
func (s *PortStats) Update() {
131+
s.mu.Lock()
132+
defer s.mu.Unlock()
133+
t := time.Now()
134+
dt := t.Sub(s.last.Time).Seconds()
135+
136+
curAudioInSamples := s.AudioInSamples.Load()
137+
curAudioOutSamples := s.AudioOutSamples.Load()
138+
139+
if dt > 0 {
140+
rxSamples := curAudioInSamples - s.last.AudioInSamples
141+
txSamples := curAudioOutSamples - s.last.AudioOutSamples
142+
143+
rxRate := float64(rxSamples) / dt
144+
txRate := float64(txSamples) / dt
145+
146+
s.AudioRX.Store(math.Float64bits(rxRate))
147+
s.AudioTX.Store(math.Float64bits(txRate))
148+
}
149+
150+
s.last.Time = t
151+
s.last.AudioInSamples = curAudioInSamples
152+
s.last.AudioOutSamples = curAudioOutSamples
64153
}
65154

66155
type UDPConn interface {
@@ -610,6 +699,9 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {
610699

611700
// Encoding pipeline (LK PCM -> SIP RTP)
612701
audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP)
702+
if p.stats != nil {
703+
audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples)
704+
}
613705

614706
if p.conf.Audio.DTMFType != 0 {
615707
p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate)
@@ -638,7 +730,12 @@ func (p *MediaPort) setupInput() {
638730
if p.opts.NoInputResample {
639731
p.audioIn.SetSampleRate(codecInfo.SampleRate)
640732
}
641-
audioHandler := codec.DecodeRTP(p.audioIn, p.conf.Audio.Type)
733+
734+
var audioWriter msdk.PCM16Writer = p.audioIn
735+
if p.stats != nil {
736+
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
737+
}
738+
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)
642739
p.audioInHandler = audioHandler
643740

644741
mux := rtp.NewMux(nil)

pkg/sip/outbound.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ func (c *outboundCall) waitClose(ctx context.Context, tid traceid.ID) error {
230230
case <-ticker.C:
231231
c.log.Debugw("sending keep-alive")
232232
c.state.ForceFlush(ctx)
233+
c.stats.Update()
233234
c.printStats()
234235
case <-c.Disconnected():
235236
c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED)
@@ -284,7 +285,7 @@ func (c *outboundCall) closeWithTimeout() {
284285
}
285286

286287
func (c *outboundCall) printStats() {
287-
c.log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
288+
c.stats.Log(c.log, c.callStart)
288289
}
289290

290291
func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) {

0 commit comments

Comments
 (0)