From 2fe16cdfde99025815a1b0a518cfea176eefd9a5 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 20 Oct 2025 14:19:54 -0700 Subject: [PATCH 01/11] inProgressInvite lifecycle, stable ID generation, minor log field reshuffling --- pkg/sip/inbound.go | 62 +++++++++++++++++++++++++++++++-------------- pkg/sip/protocol.go | 7 +++++ pkg/sip/server.go | 25 +++++++++--------- 3 files changed, 63 insertions(+), 31 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index a7fe57b7..aa3ed0f4 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + uuid "github.com/satori/go.uuid" + "github.com/frostbyte73/core" "github.com/icholy/digest" "github.com/pkg/errors" @@ -64,6 +66,8 @@ const ( inviteOKRetryAttempts = 5 inviteOKRetryAttemptsNoACK = 2 inviteOkAckLateTimeout = inviteOkRetryIntervalMax + + inviteCredentialValidity = 60 * time.Minute // Allow reuse of credentials for 1h ) var errNoACK = errors.New("no ACK received for 200 OK") @@ -137,20 +141,23 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo { func (s *Server) getInvite(sipCallID string) *inProgressInvite { s.imu.Lock() defer s.imu.Unlock() - for i := range s.inProgressInvites { - if s.inProgressInvites[i].sipCallID == sipCallID { - return s.inProgressInvites[i] - } + is, ok := s.inProgressInvites[sipCallID] + if ok { + return is } - if len(s.inProgressInvites) >= digestLimit { - s.inProgressInvites = s.inProgressInvites[1:] - } - is := &inProgressInvite{sipCallID: sipCallID} - s.inProgressInvites = append(s.inProgressInvites, is) + is = &inProgressInvite{sipCallID: sipCallID} + s.inProgressInvites[sipCallID] = is + + go func() { + time.Sleep(inviteCredentialValidity) + s.imu.Lock() + defer s.imu.Unlock() + delete(s.inProgressInvites, sipCallID) + }() return is } -func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { +func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string, inviteState *inProgressInvite) (ok bool) { log = log.WithValues( "username", username, "passwordHash", hashPassword(password), @@ -176,8 +183,6 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re if h := req.CallID(); h != nil { sipCallID = h.Value() } - inviteState := s.getInvite(sipCallID) - log = log.WithValues("inviteStateSipCallID", sipCallID) h := req.GetHeader("Proxy-Authorization") if h == nil { @@ -230,7 +235,6 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re // Check if we have a valid challenge state if inviteState.challenge.Realm == "" { log.Warnw("No challenge state found for authentication attempt", errors.New("missing challenge state"), - "sipCallID", sipCallID, "expectedRealm", UserAgent, ) _ = tx.Respond(sip.NewResponseFromRequest(req, 401, "Bad credentials", nil)) @@ -305,12 +309,12 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE s.log.Errorw("cannot parse source IP", err, "fromIP", src) return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP")) } - callID := lksip.NewCallID() + sipCallID := legCallIDFromReq(req) tid := traceid.FromGUID(callID) tr := callTransportFromReq(req) legTr := legTransportFromReq(req) log := s.log.WithValues( - "callID", callID, + "sipCallID", sipCallID, "traceID", tid.String(), "fromIP", src.Addr(), "toIP", req.Destination(), @@ -318,7 +322,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE ) var call *inboundCall - cc := s.newInbound(log, LocalTag(callID), s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { + cc := s.newInbound(log, "unassigned", s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { c := call if c == nil || len(c.attrsToHdr) == 0 { return headers @@ -331,8 +335,6 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE }) log = LoggerWithParams(log, cc) log = LoggerWithHeaders(log, cc) - cc.log = log - log.Infow("processing invite") if err := cc.ValidateInvite(); err != nil { if s.conf.HideInboundPort { @@ -342,6 +344,28 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE } return psrpc.NewError(psrpc.InvalidArgument, errors.Wrap(err, "invite validation failed")) } + + // Establish ID + if _, ok := req.To().Params.Get("tag"); !ok { + // No to-tag on the invite means we need to generate one per RFC 3261 section 12. + if !inviteHasAuth(req) { + // No auth = a 407 response and another INVITE+auth. + // Generate a new to-tag early, to make sure both INVITES have the same ID. + uuid, _ := uuid.NewV4() // Same as NewResponseFromRequest in sipgo + req.To().Params.Add("tag", uuid.String()) + } + } + inviteProgress := s.getInvite(req.CallID().Value()) + callID := inviteProgress.lkCallID + if callID == "" { + callID = lksip.NewCallID() + inviteProgress.lkCallID = callID + } + + log = log.WithValues("callID", callID) + cc.log = log + log.Infow("processing invite") + ctx, span := tracer.Start(ctx, "Server.onInvite") defer span.End() @@ -448,7 +472,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE cc.Processing() } s.getCallInfo(cc.SIPCallID()).countInvite(log, req) - if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) { + if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password, inviteProgress) { cmon.InviteErrorShort("unauthorized") // handleInviteAuth will generate the SIP Response as needed return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided") diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index 3a233fa2..ae5ee5ed 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -173,6 +173,13 @@ func legTransportFromReq(req *sip.Request) Transport { return "" } +func legCallIDFromReq(req *sip.Request) string { + if callID := req.CallID(); callID != nil { + return callID.Value() + } + return "" +} + func transportPort(c *config.Config, t Transport) int { if t == TransportTLS { if tc := c.TLS; tc != nil { diff --git a/pkg/sip/server.go b/pkg/sip/server.go index bedfcd9e..fa68921b 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -44,8 +44,7 @@ import ( ) const ( - UserAgent = "LiveKit" - digestLimit = 500 + UserAgent = "LiveKit" ) const ( @@ -138,7 +137,7 @@ type Server struct { sipUnhandled RequestHandler imu sync.Mutex - inProgressInvites []*inProgressInvite + inProgressInvites map[string]*inProgressInvite closing core.Fuse cmu sync.RWMutex @@ -161,6 +160,7 @@ type Server struct { type inProgressInvite struct { sipCallID string challenge digest.Challenge + lkCallID string // SCL_* LiveKit call ID assigned to this dialog } type ServerOption func(s *Server) @@ -178,15 +178,16 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats log = logger.GetLogger() } s := &Server{ - log: log, - conf: conf, - region: region, - mon: mon, - getIOClient: getIOClient, - getRoom: DefaultGetRoomFunc, - byRemoteTag: make(map[RemoteTag]*inboundCall), - byLocalTag: make(map[LocalTag]*inboundCall), - byCallID: make(map[string]*inboundCall), + log: log, + conf: conf, + region: region, + mon: mon, + getIOClient: getIOClient, + getRoom: DefaultGetRoomFunc, + inProgressInvites: make(map[string]*inProgressInvite), + byRemoteTag: make(map[RemoteTag]*inboundCall), + byLocalTag: make(map[LocalTag]*inboundCall), + byCallID: make(map[string]*inboundCall), } for _, option := range options { option(s) From c0df8e47a2df3b5f099d6cfc7ad365ae2ee77de7 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 20 Oct 2025 14:28:35 -0700 Subject: [PATCH 02/11] adding test --- pkg/sip/service_test.go | 144 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index 5b21fb7c..fb76d05a 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -782,3 +782,147 @@ func TestCANCELSendsBothResponses(t *testing.T) { // Verify we received the critical 487 response require.True(t, invite487Received, "Should have received 487 Request Terminated response to INVITE when CANCEL is sent") } + +// TestSameCallIDForAuthFlow verifies that the same LiveKit call ID is assigned to both +// the initial INVITE (without auth) and the subsequent INVITE (with auth) +func TestSameCallIDForAuthFlow(t *testing.T) { + const ( + fromUser = "test@example.com" + toUser = "agent@example.com" + username = "testuser" + password = "testpass" + callID = "same-call-id@test.com" + ) + + var capturedCallIDs []string + var mu sync.Mutex + + h := &TestHandler{ + GetAuthCredentialsFunc: func(ctx context.Context, call *rpc.SIPCall) (AuthInfo, error) { + // Capture the LiveKit call ID from the first request + mu.Lock() + capturedCallIDs = append(capturedCallIDs, call.LkCallId) + mu.Unlock() + + return AuthInfo{ + Result: AuthPassword, + Username: username, + Password: password, + }, nil + }, + DispatchCallFunc: func(ctx context.Context, info *CallInfo) CallDispatch { + return CallDispatch{ + Result: DispatchNoRuleReject, + // No room config needed for reject + } + }, + OnSessionEndFunc: func(ctx context.Context, callIdentifier *CallIdentifier, callInfo *livekit.SIPCallInfo, reason string) { + // No-op for tests to avoid async logging issues + }, + } + + // Create service with authentication enabled + sipPort := rand.Intn(testPortSIPMax-testPortSIPMin) + testPortSIPMin + localIP, err := config.GetLocalIP() + require.NoError(t, err) + + sipServerAddress := fmt.Sprintf("%s:%d", localIP, sipPort) + + mon, err := stats.NewMonitor(&config.Config{MaxCpuUtilization: 0.9}) + require.NoError(t, err) + + // Use a no-op logger to avoid panics from async logging after test completion + log := logger.LogRLogger(logr.Discard()) + s, err := NewService("", &config.Config{ + HideInboundPort: false, // Enable authentication + SIPPort: sipPort, + SIPPortListen: sipPort, + RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax}, + }, mon, log, func(projectID string) rpc.IOInfoClient { return nil }) + require.NoError(t, err) + require.NotNil(t, s) + t.Cleanup(s.Stop) + + s.SetHandler(h) + require.NoError(t, s.Start()) + + sipUserAgent, err := sipgo.NewUA( + sipgo.WithUserAgent(fromUser), + sipgo.WithUserAgentLogger(slog.New(logger.ToSlogHandler(s.log))), + ) + require.NoError(t, err) + + sipClient, err := sipgo.NewClient(sipUserAgent) + require.NoError(t, err) + + offer, err := sdp.NewOffer(localIP, 0xB0B, sdp.EncryptionNone) + require.NoError(t, err) + offerData, err := offer.SDP.Marshal() + require.NoError(t, err) + + // Create first INVITE request (without auth) + inviteRecipient := sip.Uri{User: toUser, Host: sipServerAddress} + inviteRequest1 := sip.NewRequest(sip.INVITE, inviteRecipient) + inviteRequest1.SetDestination(sipServerAddress) + inviteRequest1.SetBody(offerData) + inviteRequest1.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + inviteRequest1.AppendHeader(sip.NewHeader("Call-ID", callID)) + + tx1, err := sipClient.TransactionRequest(inviteRequest1) + require.NoError(t, err) + t.Cleanup(tx1.Terminate) + + // Should receive 100 Trying first, then 407 Unauthorized + res1 := getResponseOrFail(t, tx1) + require.Equal(t, sip.StatusCode(100), res1.StatusCode, "First request should receive 100 Trying") + res1 = getResponseOrFail(t, tx1) + require.Equal(t, sip.StatusCode(407), res1.StatusCode, "First request should receive 407 Unauthorized") + + // Get the challenge from first response + authHeader1 := res1.GetHeader("Proxy-Authenticate") + require.NotNil(t, authHeader1, "First response should have Proxy-Authenticate header") + challenge1 := authHeader1.Value() + + // Parse the challenge to extract nonce and realm + challenge, err := digest.ParseChallenge(challenge1) + require.NoError(t, err, "Should be able to parse challenge") + + // Compute the digest response using the challenge and credentials + cred, err := digest.Digest(challenge, digest.Options{ + Method: "INVITE", + URI: inviteRecipient.String(), + Username: username, + Password: password, + }) + require.NoError(t, err, "Should be able to compute digest response") + + // Create second INVITE request (with auth) using the SAME Call-ID + inviteRequest2 := sip.NewRequest(sip.INVITE, inviteRecipient) + inviteRequest2.SetDestination(sipServerAddress) + inviteRequest2.SetBody(offerData) + inviteRequest2.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + inviteRequest2.AppendHeader(sip.NewHeader("Call-ID", callID)) + inviteRequest2.AppendHeader(sip.NewHeader("Proxy-Authorization", cred.String())) + + tx2, err := sipClient.TransactionRequest(inviteRequest2) + require.NoError(t, err) + t.Cleanup(tx2.Terminate) + + // Should receive 100 Trying first, then proceed with authentication + res2 := getResponseOrFail(t, tx2) + require.Equal(t, sip.StatusCode(100), res2.StatusCode, "Second request should receive 100 Trying") + + // Wait a bit for the handler to be called + time.Sleep(100 * time.Millisecond) + + // Verify we captured exactly 2 call IDs + mu.Lock() + require.Len(t, capturedCallIDs, 2, "Should have captured 2 call IDs") + require.Equal(t, capturedCallIDs[0], capturedCallIDs[1], "Both requests should have the same LiveKit call ID") + require.NotEmpty(t, capturedCallIDs[0], "Call ID should not be empty") + require.Contains(t, capturedCallIDs[0], "SCL_", "Call ID should have SCL_ prefix") + mu.Unlock() + + t.Logf("First call ID: %s", capturedCallIDs[0]) + t.Logf("Second call ID: %s", capturedCallIDs[1]) +} From 7f24c54bcf2e539d86d330849b6bc1876babc54e Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 20 Oct 2025 14:58:02 -0700 Subject: [PATCH 03/11] self-review --- pkg/sip/inbound.go | 32 +++++++++++++++++--------------- pkg/sip/service_test.go | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index aa3ed0f4..684d843d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -138,21 +138,22 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo { return c } -func (s *Server) getInvite(sipCallID string) *inProgressInvite { +func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { + key := fmt.Sprintf("%s:%s:%s", sipCallID, toTag, fromTag) s.imu.Lock() defer s.imu.Unlock() - is, ok := s.inProgressInvites[sipCallID] + is, ok := s.inProgressInvites[key] if ok { return is } is = &inProgressInvite{sipCallID: sipCallID} - s.inProgressInvites[sipCallID] = is + s.inProgressInvites[key] = is go func() { time.Sleep(inviteCredentialValidity) s.imu.Lock() defer s.imu.Unlock() - delete(s.inProgressInvites, sipCallID) + delete(s.inProgressInvites, key) }() return is } @@ -322,7 +323,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE ) var call *inboundCall - cc := s.newInbound(log, "unassigned", s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { + cc := s.newInbound(log, s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { c := call if c == nil || len(c.attrsToHdr) == 0 { return headers @@ -346,21 +347,22 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE } // Establish ID - if _, ok := req.To().Params.Get("tag"); !ok { + fromTag, _ := req.From().Params.Get("tag") // always exists, via ValidateInvite() check + toTag, ok := req.To().Params.Get("tag") // To() always exists, via ValidateInvite() check + if !ok { // No to-tag on the invite means we need to generate one per RFC 3261 section 12. - if !inviteHasAuth(req) { - // No auth = a 407 response and another INVITE+auth. - // Generate a new to-tag early, to make sure both INVITES have the same ID. - uuid, _ := uuid.NewV4() // Same as NewResponseFromRequest in sipgo - req.To().Params.Add("tag", uuid.String()) - } + // Generate a new to-tag early, to make sure both INVITES have the same ID. + uuid, _ := uuid.NewV4() // Same as NewResponseFromRequest in sipgo + toTag = uuid.String() + req.To().Params.Add("tag", toTag) } - inviteProgress := s.getInvite(req.CallID().Value()) + inviteProgress := s.getInvite(sipCallID, toTag, fromTag) callID := inviteProgress.lkCallID if callID == "" { callID = lksip.NewCallID() inviteProgress.lkCallID = callID } + cc.id = LocalTag(callID) log = log.WithValues("callID", callID) cc.log = log @@ -1390,11 +1392,11 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade } -func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { +func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { c := &sipInbound{ log: log, s: s, - id: id, + id: "unassigned", invite: invite, inviteTx: inviteTx, legTr: legTransportFromReq(invite), diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index fb76d05a..8cf137e8 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -792,6 +792,7 @@ func TestSameCallIDForAuthFlow(t *testing.T) { username = "testuser" password = "testpass" callID = "same-call-id@test.com" + fromTag = "fixed-from-tag-12345" ) var capturedCallIDs []string @@ -860,6 +861,12 @@ func TestSameCallIDForAuthFlow(t *testing.T) { offerData, err := offer.SDP.Marshal() require.NoError(t, err) + inviteFromHeader := sip.FromHeader{ + DisplayName: fromUser, + Address: sip.Uri{User: fromUser, Host: sipServerAddress}, + Params: sip.NewParams().Add("tag", fromTag), // Key bit here + } + // Create first INVITE request (without auth) inviteRecipient := sip.Uri{User: toUser, Host: sipServerAddress} inviteRequest1 := sip.NewRequest(sip.INVITE, inviteRecipient) @@ -867,6 +874,7 @@ func TestSameCallIDForAuthFlow(t *testing.T) { inviteRequest1.SetBody(offerData) inviteRequest1.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) inviteRequest1.AppendHeader(sip.NewHeader("Call-ID", callID)) + inviteRequest1.AppendHeader(&inviteFromHeader) tx1, err := sipClient.TransactionRequest(inviteRequest1) require.NoError(t, err) @@ -878,6 +886,12 @@ func TestSameCallIDForAuthFlow(t *testing.T) { res1 = getResponseOrFail(t, tx1) require.Equal(t, sip.StatusCode(407), res1.StatusCode, "First request should receive 407 Unauthorized") + // Get the To tag from the 407 response + toHeader := res1.To() + require.NotNil(t, toHeader, "407 response should have To header") + _, ok := toHeader.Params.Get("tag") + require.True(t, ok, "407 response To header should have tag parameter") + // Get the challenge from first response authHeader1 := res1.GetHeader("Proxy-Authenticate") require.NotNil(t, authHeader1, "First response should have Proxy-Authenticate header") @@ -896,13 +910,15 @@ func TestSameCallIDForAuthFlow(t *testing.T) { }) require.NoError(t, err, "Should be able to compute digest response") - // Create second INVITE request (with auth) using the SAME Call-ID + // Create second INVITE request (with auth) using the SAME Call-ID, From tag, and To tag inviteRequest2 := sip.NewRequest(sip.INVITE, inviteRecipient) inviteRequest2.SetDestination(sipServerAddress) inviteRequest2.SetBody(offerData) inviteRequest2.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) inviteRequest2.AppendHeader(sip.NewHeader("Call-ID", callID)) inviteRequest2.AppendHeader(sip.NewHeader("Proxy-Authorization", cred.String())) + inviteRequest2.AppendHeader(&inviteFromHeader) + inviteRequest2.AppendHeader(toHeader) tx2, err := sipClient.TransactionRequest(inviteRequest2) require.NoError(t, err) From edae62c9c1aea68428402499a2a97968cda405c4 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 20 Oct 2025 15:27:15 -0700 Subject: [PATCH 04/11] bugfix! --- pkg/sip/outbound.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 27b2cd6a..62107a3b 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -843,7 +843,7 @@ authLoop: if err != nil { return nil, fmt.Errorf("invalid challenge %q: %w", challengeStr, err) } - toHeader := resp.To() + toHeader = resp.To() if toHeader == nil { return nil, errors.New("no 'To' header on Response") } From 20e5b67f3238ffe406b348c975ed424defbbd677 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Oct 2025 20:36:21 -0700 Subject: [PATCH 05/11] PR comments, except one --- go.mod | 2 ++ pkg/sip/inbound.go | 21 ++++++++++++++------- pkg/sip/server.go | 10 ++++++++-- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index c301cce9..387c6d6c 100644 --- a/go.mod +++ b/go.mod @@ -142,3 +142,5 @@ require ( google.golang.org/grpc v1.76.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) + +replace github.com/livekit/psrpc => ../psrpc diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 684d843d..2434217d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -28,8 +28,6 @@ import ( "sync/atomic" "time" - uuid "github.com/satori/go.uuid" - "github.com/frostbyte73/core" "github.com/icholy/digest" "github.com/pkg/errors" @@ -42,6 +40,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" lksip "github.com/livekit/protocol/sip" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils/traceid" @@ -139,7 +138,11 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo { } func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { - key := fmt.Sprintf("%s:%s:%s", sipCallID, toTag, fromTag) + key := dialogKey{ + sipCallID: sipCallID, + toTag: toTag, + fromTag: fromTag, + } s.imu.Lock() defer s.imu.Unlock() is, ok := s.inProgressInvites[key] @@ -338,6 +341,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE log = LoggerWithHeaders(log, cc) if err := cc.ValidateInvite(); err != nil { + log.Errorw("invalid invite", err) if s.conf.HideInboundPort { cc.Drop() } else { @@ -348,13 +352,16 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE // Establish ID fromTag, _ := req.From().Params.Get("tag") // always exists, via ValidateInvite() check - toTag, ok := req.To().Params.Get("tag") // To() always exists, via ValidateInvite() check + toParams := req.To().Params // To() always exists, via ValidateInvite() check + if toParams == nil { + toParams = sip.NewParams() + req.To().Params = toParams + } + toTag, ok := toParams.Get("tag") if !ok { // No to-tag on the invite means we need to generate one per RFC 3261 section 12. // Generate a new to-tag early, to make sure both INVITES have the same ID. - uuid, _ := uuid.NewV4() // Same as NewResponseFromRequest in sipgo - toTag = uuid.String() - req.To().Params.Add("tag", toTag) + toParams.Add("tag", utils.NewGuid("")) } inviteProgress := s.getInvite(sipCallID, toTag, fromTag) callID := inviteProgress.lkCallID diff --git a/pkg/sip/server.go b/pkg/sip/server.go index fa68921b..46433845 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -126,6 +126,12 @@ type Handler interface { OnSessionEnd(ctx context.Context, callIdentifier *CallIdentifier, callInfo *livekit.SIPCallInfo, reason string) } +type dialogKey struct { + sipCallID string + toTag string + fromTag string +} + type Server struct { log logger.Logger mon *stats.Monitor @@ -137,7 +143,7 @@ type Server struct { sipUnhandled RequestHandler imu sync.Mutex - inProgressInvites map[string]*inProgressInvite + inProgressInvites map[dialogKey]*inProgressInvite closing core.Fuse cmu sync.RWMutex @@ -184,7 +190,7 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats mon: mon, getIOClient: getIOClient, getRoom: DefaultGetRoomFunc, - inProgressInvites: make(map[string]*inProgressInvite), + inProgressInvites: make(map[dialogKey]*inProgressInvite), byRemoteTag: make(map[RemoteTag]*inboundCall), byLocalTag: make(map[LocalTag]*inboundCall), byCallID: make(map[string]*inboundCall), From e80dab7bfc0df0991109e4039e2aee6f158ec75c Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 22 Oct 2025 16:20:59 -0700 Subject: [PATCH 06/11] Adding single goroutine to batch-delete invites at intervals --- pkg/sip/inbound.go | 31 +++++++++++++++++++++++-------- pkg/sip/server.go | 12 ++++++++++-- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 2434217d..b6316342 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -137,6 +137,24 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo { return c } +func (s *Server) cleanupInvites() { + ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes + defer ticker.Stop() + for { + select { + case <-s.closing.Watch(): + return + case <-ticker.C: + s.imu.Lock() + for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); { + key := it.Item().Value + delete(s.inProgressInvites, *key) + } + s.imu.Unlock() + } + } +} + func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { key := dialogKey{ sipCallID: sipCallID, @@ -149,15 +167,13 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { if ok { return is } - is = &inProgressInvite{sipCallID: sipCallID} + is = &inProgressInvite{ + sipCallID: sipCallID, + expireAt: time.Now().Add(inviteCredentialValidity), + } s.inProgressInvites[key] = is + s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key}) - go func() { - time.Sleep(inviteCredentialValidity) - s.imu.Lock() - defer s.imu.Unlock() - delete(s.inProgressInvites, key) - }() return is } @@ -1403,7 +1419,6 @@ func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, c := &sipInbound{ log: log, s: s, - id: "unassigned", invite: invite, inviteTx: inviteTx, legTr: legTransportFromReq(invite), diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 46433845..75e8387b 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -24,6 +24,7 @@ import ( "net" "net/netip" "sync" + "sync/atomic" "time" "github.com/frostbyte73/core" @@ -35,6 +36,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/traceid" "github.com/livekit/sipgo" "github.com/livekit/sipgo/sip" @@ -142,8 +144,10 @@ type Server struct { sipListeners []io.Closer sipUnhandled RequestHandler - imu sync.Mutex - inProgressInvites map[dialogKey]*inProgressInvite + imu sync.Mutex + inProgressInvites map[dialogKey]*inProgressInvite + inviteTimeoutQueue utils.TimeoutQueue[*dialogKey] + isCleanupTaskRunning atomic.Bool closing core.Fuse cmu sync.RWMutex @@ -167,6 +171,7 @@ type inProgressInvite struct { sipCallID string challenge digest.Challenge lkCallID string // SCL_* LiveKit call ID assigned to this dialog + expireAt time.Time } type ServerOption func(s *Server) @@ -337,6 +342,9 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C } } + // Start the cleanup task + go s.cleanupInvites() + return nil } From 301744f98c49819b9afe3f587a00d21bd46a2ca9 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 22 Oct 2025 16:25:42 -0700 Subject: [PATCH 07/11] self-review --- go.mod | 2 -- pkg/sip/inbound.go | 5 +---- pkg/sip/server.go | 1 - 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 387c6d6c..c301cce9 100644 --- a/go.mod +++ b/go.mod @@ -142,5 +142,3 @@ require ( google.golang.org/grpc v1.76.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) - -replace github.com/livekit/psrpc => ../psrpc diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index b6316342..b60a515f 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -167,10 +167,7 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { if ok { return is } - is = &inProgressInvite{ - sipCallID: sipCallID, - expireAt: time.Now().Add(inviteCredentialValidity), - } + is = &inProgressInvite{sipCallID: sipCallID} s.inProgressInvites[key] = is s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key}) diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 75e8387b..94affec2 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -171,7 +171,6 @@ type inProgressInvite struct { sipCallID string challenge digest.Challenge lkCallID string // SCL_* LiveKit call ID assigned to this dialog - expireAt time.Time } type ServerOption func(s *Server) From 3bcbc8c23a043d0247f9979f5efdb636c428aeac Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 22 Oct 2025 16:31:28 -0700 Subject: [PATCH 08/11] fix test --- pkg/sip/inbound.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index b60a515f..0f404daf 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -374,7 +374,8 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE if !ok { // No to-tag on the invite means we need to generate one per RFC 3261 section 12. // Generate a new to-tag early, to make sure both INVITES have the same ID. - toParams.Add("tag", utils.NewGuid("")) + toTag = utils.NewGuid("") + toParams.Add("tag", toTag) } inviteProgress := s.getInvite(sipCallID, toTag, fromTag) callID := inviteProgress.lkCallID From c47e8996ce924f41b9474c332e5a09b29690283c Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 4 Nov 2025 17:05:10 -0800 Subject: [PATCH 09/11] Finally got around to adressing review comments --- pkg/sip/inbound.go | 22 ++++++++++++++-------- pkg/sip/server.go | 33 ++++++++++++++++----------------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 0f404daf..5b907d41 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -148,7 +148,7 @@ func (s *Server) cleanupInvites() { s.imu.Lock() for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); { key := it.Item().Value - delete(s.inProgressInvites, *key) + delete(s.inProgressInvites, key) } s.imu.Unlock() } @@ -161,16 +161,22 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { toTag: toTag, fromTag: fromTag, } + s.imu.Lock() - defer s.imu.Unlock() - is, ok := s.inProgressInvites[key] - if ok { - return is + is, exists := s.inProgressInvites[key] + s.imu.Unlock() + if !exists { + s.imu.Lock() + is, exists = s.inProgressInvites[key] + if !exists { + is = &inProgressInvite{sipCallID: sipCallID, timeoutLink: utils.TimeoutQueueItem[dialogKey]{Value: key}} + s.inProgressInvites[key] = is + } + s.imu.Unlock() } - is = &inProgressInvite{sipCallID: sipCallID} - s.inProgressInvites[key] = is - s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key}) + // Always reset the timeout link, whether just created or not + s.inviteTimeoutQueue.Reset(&is.timeoutLink) return is } diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 94affec2..3edc4d1b 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -24,7 +24,6 @@ import ( "net" "net/netip" "sync" - "sync/atomic" "time" "github.com/frostbyte73/core" @@ -135,19 +134,18 @@ type dialogKey struct { } type Server struct { - log logger.Logger - mon *stats.Monitor - region string - sipSrv *sipgo.Server - getIOClient GetIOInfoClient - getRoom GetRoomFunc - sipListeners []io.Closer - sipUnhandled RequestHandler - - imu sync.Mutex - inProgressInvites map[dialogKey]*inProgressInvite - inviteTimeoutQueue utils.TimeoutQueue[*dialogKey] - isCleanupTaskRunning atomic.Bool + log logger.Logger + mon *stats.Monitor + region string + sipSrv *sipgo.Server + getIOClient GetIOInfoClient + getRoom GetRoomFunc + sipListeners []io.Closer + sipUnhandled RequestHandler + inviteTimeoutQueue utils.TimeoutQueue[dialogKey] + + imu sync.Mutex + inProgressInvites map[dialogKey]*inProgressInvite closing core.Fuse cmu sync.RWMutex @@ -168,9 +166,10 @@ type Server struct { } type inProgressInvite struct { - sipCallID string - challenge digest.Challenge - lkCallID string // SCL_* LiveKit call ID assigned to this dialog + sipCallID string + challenge digest.Challenge + lkCallID string // SCL_* LiveKit call ID assigned to this dialog + timeoutLink utils.TimeoutQueueItem[dialogKey] } type ServerOption func(s *Server) From 73a402943721fd07431c1e3b94935282f579511e Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 14 Nov 2025 11:03:16 -0800 Subject: [PATCH 10/11] PR comment - switched to RWmutex --- pkg/sip/inbound.go | 4 ++-- pkg/sip/server.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 5b907d41..393eeace 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -162,9 +162,9 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { fromTag: fromTag, } - s.imu.Lock() + s.imu.RLock() is, exists := s.inProgressInvites[key] - s.imu.Unlock() + s.imu.RUnlock() if !exists { s.imu.Lock() is, exists = s.inProgressInvites[key] diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 3edc4d1b..5212f1d6 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -144,7 +144,7 @@ type Server struct { sipUnhandled RequestHandler inviteTimeoutQueue utils.TimeoutQueue[dialogKey] - imu sync.Mutex + imu sync.RWMutex inProgressInvites map[dialogKey]*inProgressInvite closing core.Fuse From 9118c313bd1ac5d7923cd82d424c972a600ad4c2 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 14 Nov 2025 14:22:09 -0800 Subject: [PATCH 11/11] Updating changes the followed this PR --- pkg/sip/inbound.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 393eeace..35061803 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -40,9 +40,9 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" lksip "github.com/livekit/protocol/sip" "github.com/livekit/protocol/tracer" + "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/traceid" "github.com/livekit/psrpc" lksdk "github.com/livekit/server-sdk-go/v2" @@ -201,12 +201,6 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re _ = tx.Respond(sip.NewResponseFromRequest(req, 100, "Processing", nil)) } - // Extract SIP Call ID for tracking in-progress invites - sipCallID := "" - if h := req.CallID(); h != nil { - sipCallID = h.Value() - } - h := req.GetHeader("Proxy-Authorization") if h == nil { inviteState.challenge = digest.Challenge{ @@ -333,12 +327,10 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP")) } sipCallID := legCallIDFromReq(req) - tid := traceid.FromGUID(callID) tr := callTransportFromReq(req) legTr := legTransportFromReq(req) log := s.log.WithValues( "sipCallID", sipCallID, - "traceID", tid.String(), "fromIP", src.Addr(), "toIP", req.Destination(), "transport", tr, @@ -390,8 +382,10 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE inviteProgress.lkCallID = callID } cc.id = LocalTag(callID) + tid := traceid.FromGUID(sipCallID) log = log.WithValues("callID", callID) + log = log.WithValues("traceID", tid.String()) cc.log = log log.Infow("processing invite") @@ -399,10 +393,10 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE defer span.End() s.cmu.RLock() - existing := s.byCallID[cc.SIPCallID()] + existing := s.byCallID[sipCallID] s.cmu.RUnlock() if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() { - log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength()) + log.Infow("accepting reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength()) existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq()) cc.AcceptAsKeepAlive(existing.cc.OwnSDP()) return nil @@ -429,7 +423,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE callInfo := &rpc.SIPCall{ LkCallId: callID, - SipCallId: cc.SIPCallID(), + SipCallId: sipCallID, SourceIp: src.Addr().String(), Address: ToSIPUri("", cc.Address()), From: ToSIPUri("", from), @@ -500,7 +494,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE // We will send password request anyway, so might as well signal that the progress is made. cc.Processing() } - s.getCallInfo(cc.SIPCallID()).countInvite(log, req) + s.getCallInfo(sipCallID).countInvite(log, req) if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password, inviteProgress) { cmon.InviteErrorShort("unauthorized") // handleInviteAuth will generate the SIP Response as needed @@ -508,7 +502,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE } // ok case AuthAccept: - s.getCallInfo(cc.SIPCallID()).countInvite(log, req) + s.getCallInfo(sipCallID).countInvite(log, req) // ok }