Skip to content

Commit 08926bc

Browse files
authored
Better logs for media and trace tags for calls. (#507)
1 parent 2bb5e36 commit 08926bc

File tree

8 files changed

+119
-56
lines changed

8 files changed

+119
-56
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1414
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b
1515
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
16-
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48
16+
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e
1717
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4
1818
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251016050252-63f7c8381817
1919
github.com/livekit/sipgo v0.13.2-0.20250601220430-a77cc3f220fb

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b h1:dPf9i0JPyf0Sg
129129
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
130130
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
131131
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
132-
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48 h1:CuXGJx4Yuo9cIjiwO454u43sml9KV06ruzJxjVLPSog=
133-
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48/go.mod h1:TpqU2qCI1ES4Lk7PAWSgYO4RaexfVXb54ZO2hXv0Bmc=
132+
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e h1:NcT7Fb1G4aV14y3X+w89QoK9Jl8pq9byrA12f/kq5Uo=
133+
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e/go.mod h1:TpqU2qCI1ES4Lk7PAWSgYO4RaexfVXb54ZO2hXv0Bmc=
134134
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4 h1:YA5HMfNW9IPPdTSkwsyCtK9ZcNW8QpPNlkuD7UUToZE=
135135
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
136136
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251016050252-63f7c8381817 h1:y5YYe37lcGMOZMw4rvreljGjLj60eDSce/N6Asp7pw8=

pkg/sip/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/livekit/protocol/logger"
3030
"github.com/livekit/protocol/rpc"
3131
"github.com/livekit/protocol/tracer"
32+
"github.com/livekit/protocol/utils/traceid"
3233
"github.com/livekit/psrpc"
3334
"github.com/livekit/sipgo"
3435
"github.com/livekit/sipgo/sip"
@@ -165,8 +166,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
165166
if err != nil {
166167
return nil, err
167168
}
169+
tid := traceid.FromGUID(req.SipCallId)
168170
log = log.WithValues(
169171
"callID", req.SipCallId,
172+
"traceID", tid.String(),
170173
"room", req.RoomName,
171174
"participant", req.ParticipantIdentity,
172175
"participantName", req.ParticipantName,
@@ -224,7 +227,7 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
224227
displayName: req.DisplayName,
225228
}
226229
log.Infow("Creating SIP participant")
227-
call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
230+
call, err := c.newCall(ctx, tid, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
228231
if err != nil {
229232
return nil, err
230233
}

pkg/sip/inbound.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,21 @@ import (
2828
"sync/atomic"
2929
"time"
3030

31-
msdk "github.com/livekit/media-sdk"
32-
"github.com/livekit/protocol/rpc"
33-
3431
"github.com/frostbyte73/core"
3532
"github.com/icholy/digest"
3633
"github.com/pkg/errors"
3734

35+
msdk "github.com/livekit/media-sdk"
3836
"github.com/livekit/media-sdk/dtmf"
3937
"github.com/livekit/media-sdk/rtp"
4038
"github.com/livekit/media-sdk/sdp"
4139
"github.com/livekit/media-sdk/tones"
4240
"github.com/livekit/protocol/livekit"
4341
"github.com/livekit/protocol/logger"
42+
"github.com/livekit/protocol/rpc"
4443
lksip "github.com/livekit/protocol/sip"
4544
"github.com/livekit/protocol/tracer"
45+
"github.com/livekit/protocol/utils/traceid"
4646
"github.com/livekit/psrpc"
4747
lksdk "github.com/livekit/server-sdk-go/v2"
4848
"github.com/livekit/sipgo/sip"
@@ -150,7 +150,7 @@ func (s *Server) getInvite(sipCallID string) *inProgressInvite {
150150
return is
151151
}
152152

153-
func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
153+
func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
154154
log = log.WithValues(
155155
"username", username,
156156
"passwordHash", hashPassword(password),
@@ -306,10 +306,12 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
306306
return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP"))
307307
}
308308
callID := lksip.NewCallID()
309+
tid := traceid.FromGUID(callID)
309310
tr := callTransportFromReq(req)
310311
legTr := legTransportFromReq(req)
311312
log := s.log.WithValues(
312313
"callID", callID,
314+
"traceID", tid.String(),
313315
"fromIP", src.Addr(),
314316
"toIP", req.Destination(),
315317
"transport", tr,
@@ -446,7 +448,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
446448
cc.Processing()
447449
}
448450
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
449-
if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password) {
451+
if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) {
450452
cmon.InviteErrorShort("unauthorized")
451453
// handleInviteAuth will generate the SIP Response as needed
452454
return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided")
@@ -457,9 +459,9 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
457459
// ok
458460
}
459461

460-
call = s.newInboundCall(log, cmon, cc, callInfo, state, nil)
462+
call = s.newInboundCall(tid, log, cmon, cc, callInfo, state, start, nil)
461463
call.joinDur = joinDur
462-
return call.handleInvite(call.ctx, req, r.TrunkID, s.conf)
464+
return call.handleInvite(call.ctx, tid, req, r.TrunkID, s.conf)
463465
}
464466

465467
func (s *Server) onOptions(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
@@ -576,10 +578,12 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
576578

577579
type inboundCall struct {
578580
s *Server
581+
tid traceid.ID
579582
log logger.Logger
580583
cc *sipInbound
581584
mon *stats.CallMonitor
582585
state *CallState
586+
callStart time.Time
583587
extraAttrs map[string]string
584588
attrsToHdr map[string]string
585589
ctx context.Context
@@ -600,18 +604,22 @@ type inboundCall struct {
600604
}
601605

602606
func (s *Server) newInboundCall(
607+
tid traceid.ID,
603608
log logger.Logger,
604609
mon *stats.CallMonitor,
605610
cc *sipInbound,
606611
call *rpc.SIPCall,
607612
state *CallState,
613+
callStart time.Time,
608614
extra map[string]string,
609615
) *inboundCall {
610616
// Map known headers immediately on join. The rest of the mapping will be available later.
611617
extra = HeadersToAttrs(extra, nil, 0, cc, nil)
612618
c := &inboundCall{
613619
s: s,
614620
log: log,
621+
tid: tid,
622+
callStart: callStart,
615623
mon: mon,
616624
cc: cc,
617625
call: call,
@@ -633,7 +641,7 @@ func (s *Server) newInboundCall(
633641
return c
634642
}
635643

636-
func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkID string, conf *config.Config) error {
644+
func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip.Request, trunkID string, conf *config.Config) error {
637645
c.mon.InviteAccept()
638646
c.mon.CallStart()
639647
defer c.mon.CallEnd()
@@ -697,7 +705,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
697705
}
698706

699707
runMedia := func(enc livekit.SIPMediaEncryption) ([]byte, error) {
700-
answerData, err := c.runMediaConn(req.Body(), enc, conf, disp.EnabledFeatures)
708+
answerData, err := c.runMediaConn(tid, req.Body(), enc, conf, disp.EnabledFeatures)
701709
if err != nil {
702710
isError := true
703711
status, reason := callDropped, "media-failed"
@@ -874,7 +882,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
874882
}
875883
}
876884

877-
func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
885+
func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
878886
c.mon.SDPSize(len(offerData), true)
879887
c.log.Debugw("SDP offer", "sdp", string(offerData))
880888
e, err := sdpEncryption(enc)
@@ -883,7 +891,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
883891
return nil, err
884892
}
885893

886-
mp, err := NewMediaPort(c.log, c.mon, &MediaOptions{
894+
mp, err := NewMediaPort(tid, c.log, c.mon, &MediaOptions{
887895
IP: c.s.sconf.MediaIP,
888896
Ports: conf.RTPPort,
889897
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
@@ -1057,7 +1065,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
10571065
}
10581066

10591067
func (c *inboundCall) printStats(log logger.Logger) {
1060-
log.Infow("call statistics", "stats", c.stats.Load())
1068+
log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
10611069
}
10621070

10631071
// close should only be called from handleInvite.
@@ -1095,11 +1103,13 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
10951103

10961104
// Call the handler asynchronously to avoid blocking
10971105
if c.s.handler != nil {
1098-
go c.s.handler.OnSessionEnd(context.Background(), &CallIdentifier{
1099-
ProjectID: c.projectID,
1100-
CallID: c.call.LkCallId,
1101-
SipCallID: c.call.SipCallId,
1102-
}, c.state.callInfo, reason)
1106+
go func(tid traceid.ID) {
1107+
c.s.handler.OnSessionEnd(context.Background(), &CallIdentifier{
1108+
ProjectID: c.projectID,
1109+
CallID: c.call.LkCallId,
1110+
SipCallID: c.call.SipCallId,
1111+
}, c.state.callInfo, reason)
1112+
}(c.tid)
11031113
}
11041114

11051115
c.cancel()

pkg/sip/media_port.go

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/livekit/media-sdk/srtp"
3636
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
3737
"github.com/livekit/protocol/logger"
38+
"github.com/livekit/protocol/utils/traceid"
3839

3940
"github.com/livekit/sip/pkg/stats"
4041
)
@@ -132,11 +133,11 @@ type MediaOptions struct {
132133
EnableJitterBuffer bool
133134
}
134135

135-
func NewMediaPort(log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
136-
return NewMediaPortWith(log, mon, nil, opts, sampleRate)
136+
func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
137+
return NewMediaPortWith(tid, log, mon, nil, opts, sampleRate)
137138
}
138139

139-
func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
140+
func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, conn UDPConn, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
140141
if opts == nil {
141142
opts = &MediaOptions{}
142143
}
@@ -158,6 +159,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o
158159
}
159160
mediaTimeout := make(chan struct{})
160161
p := &MediaPort{
162+
tid: tid,
161163
log: log,
162164
opts: opts,
163165
mon: mon,
@@ -172,7 +174,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o
172174
}
173175
p.timeoutInitial.Store(&opts.MediaTimeoutInitial)
174176
p.timeoutGeneral.Store(&opts.MediaTimeout)
175-
go p.timeoutLoop(func() {
177+
go p.timeoutLoop(tid, func() {
176178
close(mediaTimeout)
177179
})
178180
p.log.Debugw("listening for media on UDP", "port", p.Port())
@@ -181,6 +183,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o
181183

182184
// MediaPort combines all functionality related to sending and accepting SIP media.
183185
type MediaPort struct {
186+
tid traceid.ID
184187
log logger.Logger
185188
opts *MediaOptions
186189
mon *stats.CallMonitor
@@ -221,6 +224,7 @@ func (p *MediaPort) EnableOut() {
221224
}
222225

223226
func (p *MediaPort) disableTimeout() {
227+
p.log.Infow("media timeout disabled")
224228
p.timeoutStart.Store(nil)
225229
}
226230

@@ -256,14 +260,16 @@ func (p *MediaPort) SetTimeout(initial, general time.Duration) {
256260
p.enableTimeout(initial, general)
257261
}
258262

259-
func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
263+
func (p *MediaPort) timeoutLoop(tid traceid.ID, timeoutCallback func()) {
264+
defer p.log.Infow("media timeout loop stopped")
260265
ticker := time.NewTicker(p.opts.MediaTimeout)
261266
defer ticker.Stop()
262267

263268
var (
264269
lastPackets uint64
265270
startPackets uint64
266271
lastTime time.Time
272+
lastLog = time.Now()
267273
)
268274
for {
269275
select {
@@ -273,16 +279,45 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
273279
ticker.Reset(tick)
274280
startPackets = p.packetCount.Load()
275281
lastTime = time.Now()
282+
lastLog = lastTime
276283
p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick)
277284
case <-ticker.C:
285+
log := p.log
278286
curPackets := p.packetCount.Load()
287+
startPtr := p.timeoutStart.Load()
288+
var startTime time.Time
289+
if startPtr != nil {
290+
startTime = *startPtr
291+
}
292+
verbose := false
293+
if now := time.Now(); now.Sub(lastLog) > time.Hour {
294+
verbose = true
295+
lastLog = now
296+
log = log.WithValues(
297+
"startPackets", startPackets,
298+
"packets", curPackets,
299+
"lastPackets", lastPackets,
300+
"sinceLast", time.Since(lastTime),
301+
"sinceStart", time.Since(startTime),
302+
)
303+
if curPackets == startPackets {
304+
log.Warnw("media timout is idle for a long time", nil)
305+
} else {
306+
log.Infow("media timeout stats")
307+
}
308+
}
279309
if curPackets != lastPackets {
280310
lastPackets = curPackets
281311
lastTime = time.Now()
312+
if verbose {
313+
log.Infow("got a new packet")
314+
}
282315
continue // wait for the next tick
283316
}
284-
startPtr := p.timeoutStart.Load()
285317
if startPtr == nil {
318+
if verbose {
319+
log.Infow("timeout is disabled")
320+
}
286321
continue // timeout disabled
287322
}
288323
isInitial := lastPackets == startPackets
@@ -310,6 +345,9 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
310345

311346
// Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet.
312347
if since+timeout/10 < timeout {
348+
if verbose {
349+
log.Infow("too early to trigger", "since", since, "timeout", timeout)
350+
}
313351
continue
314352
}
315353
p.log.Infow("triggering media timeout",
@@ -456,14 +494,14 @@ func (p *MediaPort) SetConfig(c *MediaConf) error {
456494
p.conf = c
457495
p.sess = sess
458496

459-
if err = p.setupOutput(); err != nil {
497+
if err = p.setupOutput(p.tid); err != nil {
460498
return err
461499
}
462500
p.setupInput()
463501
return nil
464502
}
465503

466-
func (p *MediaPort) rtpLoop(sess rtp.Session) {
504+
func (p *MediaPort) rtpLoop(tid traceid.ID, sess rtp.Session) {
467505
// Need a loop to process all incoming packets.
468506
for {
469507
r, ssrc, err := sess.AcceptStream()
@@ -477,11 +515,11 @@ func (p *MediaPort) rtpLoop(sess rtp.Session) {
477515
p.mediaReceived.Break()
478516
log := p.log.WithValues("ssrc", ssrc)
479517
log.Infow("accepting RTP stream")
480-
go p.rtpReadLoop(log, r)
518+
go p.rtpReadLoop(tid, log, r)
481519
}
482520
}
483521

484-
func (p *MediaPort) rtpReadLoop(log logger.Logger, r rtp.ReadStream) {
522+
func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStream) {
485523
const maxErrors = 50 // 1 sec, given 20 ms frames
486524
buf := make([]byte, rtp.MTUSize+1)
487525
overflow := false
@@ -546,11 +584,11 @@ func (p *MediaPort) rtpReadLoop(log logger.Logger, r rtp.ReadStream) {
546584
}
547585

548586
// Must be called holding the lock
549-
func (p *MediaPort) setupOutput() error {
587+
func (p *MediaPort) setupOutput(tid traceid.ID) error {
550588
if p.closed.IsBroken() {
551589
return errors.New("media is already closed")
552590
}
553-
go p.rtpLoop(p.sess)
591+
go p.rtpLoop(tid, p.sess)
554592
w, err := p.sess.OpenWriteStream()
555593
if err != nil {
556594
return err

0 commit comments

Comments
 (0)