-
Notifications
You must be signed in to change notification settings - Fork 117
Reuse the same ID for both auth-less and auth-ful INVITEs #488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2fe16cd
c0df8e4
7f24c54
edae62c
20e5b67
e80dab7
301744f
3bcbc8c
c47e899
73a4029
9118c31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ import ( | |
| "github.com/livekit/protocol/rpc" | ||
| lksip "github.com/livekit/protocol/sip" | ||
| "github.com/livekit/protocol/tracer" | ||
| "github.com/livekit/protocol/utils" | ||
| "github.com/livekit/protocol/utils/traceid" | ||
| "github.com/livekit/psrpc" | ||
| lksdk "github.com/livekit/server-sdk-go/v2" | ||
|
|
@@ -64,6 +65,8 @@ const ( | |
| inviteOKRetryAttempts = 5 | ||
| inviteOKRetryAttemptsNoACK = 2 | ||
| inviteOkAckLateTimeout = inviteOkRetryIntervalMax | ||
|
|
||
| inviteCredentialValidity = 60 * time.Minute // Allow reuse of credentials for 1h | ||
| ) | ||
|
|
||
| var errNoACK = errors.New("no ACK received for 200 OK") | ||
|
|
@@ -134,23 +137,50 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo { | |
| return c | ||
| } | ||
|
|
||
| func (s *Server) getInvite(sipCallID string) *inProgressInvite { | ||
| s.imu.Lock() | ||
| defer s.imu.Unlock() | ||
| for i := range s.inProgressInvites { | ||
| if s.inProgressInvites[i].sipCallID == sipCallID { | ||
| return s.inProgressInvites[i] | ||
| func (s *Server) cleanupInvites() { | ||
| ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-s.closing.Watch(): | ||
| return | ||
| case <-ticker.C: | ||
| s.imu.Lock() | ||
| for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); { | ||
| key := it.Item().Value | ||
| delete(s.inProgressInvites, key) | ||
| } | ||
| s.imu.Unlock() | ||
| } | ||
| } | ||
| if len(s.inProgressInvites) >= digestLimit { | ||
| s.inProgressInvites = s.inProgressInvites[1:] | ||
alexlivekit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite { | ||
| key := dialogKey{ | ||
| sipCallID: sipCallID, | ||
| toTag: toTag, | ||
| fromTag: fromTag, | ||
| } | ||
|
|
||
| s.imu.RLock() | ||
| is, exists := s.inProgressInvites[key] | ||
| s.imu.RUnlock() | ||
| if !exists { | ||
| s.imu.Lock() | ||
| is, exists = s.inProgressInvites[key] | ||
| if !exists { | ||
| is = &inProgressInvite{sipCallID: sipCallID, timeoutLink: utils.TimeoutQueueItem[dialogKey]{Value: key}} | ||
| s.inProgressInvites[key] = is | ||
| } | ||
| s.imu.Unlock() | ||
| } | ||
| is := &inProgressInvite{sipCallID: sipCallID} | ||
| s.inProgressInvites = append(s.inProgressInvites, is) | ||
|
|
||
| // Always reset the timeout link, whether just created or not | ||
| s.inviteTimeoutQueue.Reset(&is.timeoutLink) | ||
| return is | ||
| } | ||
|
|
||
| func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { | ||
| func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string, inviteState *inProgressInvite) (ok bool) { | ||
| log = log.WithValues( | ||
| "username", username, | ||
| "passwordHash", hashPassword(password), | ||
|
|
@@ -171,14 +201,6 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re | |
| _ = tx.Respond(sip.NewResponseFromRequest(req, 100, "Processing", nil)) | ||
| } | ||
|
|
||
| // Extract SIP Call ID for tracking in-progress invites | ||
| sipCallID := "" | ||
| if h := req.CallID(); h != nil { | ||
| sipCallID = h.Value() | ||
| } | ||
| inviteState := s.getInvite(sipCallID) | ||
| log = log.WithValues("inviteStateSipCallID", sipCallID) | ||
alexlivekit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| h := req.GetHeader("Proxy-Authorization") | ||
| if h == nil { | ||
| inviteState.challenge = digest.Challenge{ | ||
|
|
@@ -230,7 +252,6 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re | |
| // Check if we have a valid challenge state | ||
| if inviteState.challenge.Realm == "" { | ||
| log.Warnw("No challenge state found for authentication attempt", errors.New("missing challenge state"), | ||
| "sipCallID", sipCallID, | ||
alexlivekit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "expectedRealm", UserAgent, | ||
| ) | ||
| _ = tx.Respond(sip.NewResponseFromRequest(req, 401, "Bad credentials", nil)) | ||
|
|
@@ -305,20 +326,18 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE | |
| s.log.Errorw("cannot parse source IP", err, "fromIP", src) | ||
| return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP")) | ||
| } | ||
| callID := lksip.NewCallID() | ||
| tid := traceid.FromGUID(callID) | ||
| sipCallID := legCallIDFromReq(req) | ||
| tr := callTransportFromReq(req) | ||
| legTr := legTransportFromReq(req) | ||
| log := s.log.WithValues( | ||
| "callID", callID, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The one log line that will not have |
||
| "traceID", tid.String(), | ||
| "sipCallID", sipCallID, | ||
| "fromIP", src.Addr(), | ||
| "toIP", req.Destination(), | ||
| "transport", tr, | ||
| ) | ||
|
|
||
| var call *inboundCall | ||
| cc := s.newInbound(log, LocalTag(callID), s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { | ||
| cc := s.newInbound(log, s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string { | ||
| c := call | ||
| if c == nil || len(c.attrsToHdr) == 0 { | ||
| return headers | ||
|
|
@@ -331,25 +350,53 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE | |
| }) | ||
| log = LoggerWithParams(log, cc) | ||
| log = LoggerWithHeaders(log, cc) | ||
| cc.log = log | ||
| log.Infow("processing invite") | ||
alexlivekit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if err := cc.ValidateInvite(); err != nil { | ||
| log.Errorw("invalid invite", err) | ||
| if s.conf.HideInboundPort { | ||
| cc.Drop() | ||
| } else { | ||
| cc.RespondAndDrop(sip.StatusBadRequest, "Bad request") | ||
| } | ||
| return psrpc.NewError(psrpc.InvalidArgument, errors.Wrap(err, "invite validation failed")) | ||
| } | ||
|
|
||
| // Establish ID | ||
| fromTag, _ := req.From().Params.Get("tag") // always exists, via ValidateInvite() check | ||
| toParams := req.To().Params // To() always exists, via ValidateInvite() check | ||
| if toParams == nil { | ||
| toParams = sip.NewParams() | ||
| req.To().Params = toParams | ||
| } | ||
| toTag, ok := toParams.Get("tag") | ||
| if !ok { | ||
| // No to-tag on the invite means we need to generate one per RFC 3261 section 12. | ||
| // Generate a new to-tag early, to make sure both INVITES have the same ID. | ||
| toTag = utils.NewGuid("") | ||
| toParams.Add("tag", toTag) | ||
| } | ||
| inviteProgress := s.getInvite(sipCallID, toTag, fromTag) | ||
| callID := inviteProgress.lkCallID | ||
alexlivekit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if callID == "" { | ||
| callID = lksip.NewCallID() | ||
| inviteProgress.lkCallID = callID | ||
| } | ||
| cc.id = LocalTag(callID) | ||
| tid := traceid.FromGUID(sipCallID) | ||
|
|
||
| log = log.WithValues("callID", callID) | ||
| log = log.WithValues("traceID", tid.String()) | ||
| cc.log = log | ||
| log.Infow("processing invite") | ||
|
|
||
| ctx, span := tracer.Start(ctx, "Server.onInvite") | ||
| defer span.End() | ||
|
|
||
| s.cmu.RLock() | ||
| existing := s.byCallID[cc.SIPCallID()] | ||
| existing := s.byCallID[sipCallID] | ||
| s.cmu.RUnlock() | ||
| if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() { | ||
| log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength()) | ||
| log.Infow("accepting reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength()) | ||
| existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq()) | ||
| cc.AcceptAsKeepAlive(existing.cc.OwnSDP()) | ||
| return nil | ||
|
|
@@ -376,7 +423,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE | |
|
|
||
| callInfo := &rpc.SIPCall{ | ||
| LkCallId: callID, | ||
| SipCallId: cc.SIPCallID(), | ||
| SipCallId: sipCallID, | ||
| SourceIp: src.Addr().String(), | ||
| Address: ToSIPUri("", cc.Address()), | ||
| From: ToSIPUri("", from), | ||
|
|
@@ -447,15 +494,15 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE | |
| // We will send password request anyway, so might as well signal that the progress is made. | ||
| cc.Processing() | ||
| } | ||
| s.getCallInfo(cc.SIPCallID()).countInvite(log, req) | ||
| if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) { | ||
| s.getCallInfo(sipCallID).countInvite(log, req) | ||
| if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password, inviteProgress) { | ||
| cmon.InviteErrorShort("unauthorized") | ||
| // handleInviteAuth will generate the SIP Response as needed | ||
| return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided") | ||
| } | ||
| // ok | ||
| case AuthAccept: | ||
| s.getCallInfo(cc.SIPCallID()).countInvite(log, req) | ||
| s.getCallInfo(sipCallID).countInvite(log, req) | ||
| // ok | ||
| } | ||
|
|
||
|
|
@@ -1366,11 +1413,10 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade | |
|
|
||
| } | ||
|
|
||
| func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { | ||
| func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { | ||
| c := &sipInbound{ | ||
| log: log, | ||
| s: s, | ||
| id: id, | ||
| invite: invite, | ||
| inviteTx: inviteTx, | ||
| legTr: legTransportFromReq(invite), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -843,7 +843,7 @@ authLoop: | |
| if err != nil { | ||
| return nil, fmt.Errorf("invalid challenge %q: %w", challengeStr, err) | ||
| } | ||
| toHeader := resp.To() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we're doing the right validation on the inbound side, the outbound side E2E test caught this error! |
||
| toHeader = resp.To() | ||
| if toHeader == nil { | ||
| return nil, errors.New("no 'To' header on Response") | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is smart, might want to extend this to max_call_duration or something.
Also, keep in mind that this is per Call-ID for now, so new calls would still need re-auth.