Skip to content

Commit 39d6dd7

Browse files
authored
Basic support for reinvites. (#480)
* Basic support for reinvites. * Fix refer tests (again).
1 parent 986b981 commit 39d6dd7

File tree

6 files changed

+66
-48
lines changed

6 files changed

+66
-48
lines changed

pkg/sip/inbound.go

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,6 @@ func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.Se
176176
if h := req.CallID(); h != nil {
177177
sipCallID = h.Value()
178178
}
179-
ci := s.getCallInfo(sipCallID)
180-
ci.countInvite(log, req)
181179
inviteState := s.getInvite(sipCallID)
182180
log = log.WithValues("inviteStateSipCallID", sipCallID)
183181

@@ -333,6 +331,16 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
333331
ctx, span := tracer.Start(ctx, "Server.onInvite")
334332
defer span.End()
335333

334+
s.cmu.RLock()
335+
existing := s.byCallID[cc.SIPCallID()]
336+
s.cmu.RUnlock()
337+
if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() {
338+
log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength())
339+
existing.log.Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
340+
cc.AcceptAsKeepAlive()
341+
return nil
342+
}
343+
336344
from, to := cc.From(), cc.To()
337345

338346
cmon := s.mon.NewCall(stats.Inbound, from.Host, to.Host)
@@ -352,15 +360,9 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
352360
cc.Processing()
353361
}
354362

355-
// Extract SIP Call ID directly from the request
356-
sipCallID := ""
357-
if h := req.CallID(); h != nil {
358-
sipCallID = h.Value()
359-
}
360-
361363
callInfo := &rpc.SIPCall{
362364
LkCallId: callID,
363-
SipCallId: sipCallID,
365+
SipCallId: cc.SIPCallID(),
364366
SourceIp: src.Addr().String(),
365367
Address: ToSIPUri("", cc.Address()),
366368
From: ToSIPUri("", from),
@@ -421,13 +423,15 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
421423
// We will send password request anyway, so might as well signal that the progress is made.
422424
cc.Processing()
423425
}
426+
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
424427
if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password) {
425428
cmon.InviteErrorShort("unauthorized")
426429
// handleInviteAuth will generate the SIP Response as needed
427430
return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided")
428431
}
429-
fallthrough
432+
// ok
430433
case AuthAccept:
434+
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
431435
// ok
432436
}
433437

@@ -446,7 +450,7 @@ func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
446450
return
447451
}
448452
s.cmu.RLock()
449-
c := s.activeCalls[tag]
453+
c := s.byRemoteTag[tag]
450454
s.cmu.RUnlock()
451455
if c == nil {
452456
return
@@ -463,7 +467,7 @@ func (s *Server) onBye(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
463467
}
464468

465469
s.cmu.RLock()
466-
c := s.activeCalls[tag]
470+
c := s.byRemoteTag[tag]
467471
s.cmu.RUnlock()
468472
if c != nil {
469473
c.cc.AcceptBye(req, tx)
@@ -512,7 +516,7 @@ func (s *Server) OnNoRoute(log *slog.Logger, req *sip.Request, tx sip.ServerTran
512516
}
513517
s.log.Infow("Inbound SIP request not handled",
514518
"method", req.Method.String(),
515-
"callID", callID,
519+
"sipCallID", callID,
516520
"from", from,
517521
"to", to)
518522
tx.Respond(sip.NewResponseFromRequest(req, 405, "Method Not Allowed", nil))
@@ -526,7 +530,7 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
526530
}
527531

528532
s.cmu.RLock()
529-
c := s.activeCalls[tag]
533+
c := s.byRemoteTag[tag]
530534
s.cmu.RUnlock()
531535
if c != nil {
532536
c.log.Infow("NOTIFY")
@@ -600,8 +604,9 @@ func (s *Server) newInboundCall(
600604
c.log = c.log.WithValues("jitterBuf", c.jitterBuf)
601605
c.ctx, c.cancel = context.WithCancel(context.Background())
602606
s.cmu.Lock()
603-
s.activeCalls[cc.Tag()] = c
604-
s.byLocal[cc.ID()] = c
607+
s.byRemoteTag[cc.Tag()] = c
608+
s.byLocalTag[cc.ID()] = c
609+
s.byCallID[cc.SIPCallID()] = c
605610
s.cmu.Unlock()
606611
return c
607612
}
@@ -1059,8 +1064,9 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
10591064
c.callDur()
10601065
}
10611066
c.s.cmu.Lock()
1062-
delete(c.s.activeCalls, c.cc.Tag())
1063-
delete(c.s.byLocal, c.cc.ID())
1067+
delete(c.s.byRemoteTag, c.cc.Tag())
1068+
delete(c.s.byLocalTag, c.cc.ID())
1069+
delete(c.s.byCallID, c.cc.SIPCallID())
10641070
c.s.cmu.Unlock()
10651071

10661072
c.s.DeregisterTransferSIPParticipant(c.cc.ID())
@@ -1314,28 +1320,30 @@ func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite
13141320
}
13151321
c.to = invite.To()
13161322
if h := invite.CSeq(); h != nil {
1323+
c.inviteCSeq = h.SeqNo
13171324
c.nextRequestCSeq = h.SeqNo + 1
13181325
}
13191326
if callID := invite.CallID(); callID != nil {
1320-
c.callID = callID.Value()
1327+
c.sipCallID = callID.Value()
13211328
}
13221329
return c
13231330
}
13241331

13251332
type sipInbound struct {
1326-
log logger.Logger
1327-
s *Server
1328-
id LocalTag
1329-
tag RemoteTag
1330-
callID string
1331-
invite *sip.Request
1332-
inviteTx sip.ServerTransaction
1333-
contact *sip.ContactHeader
1334-
cancelled chan struct{}
1335-
from *sip.FromHeader
1336-
to *sip.ToHeader
1337-
legTr Transport
1338-
referDone chan error
1333+
log logger.Logger
1334+
s *Server
1335+
id LocalTag // SCL
1336+
tag RemoteTag
1337+
sipCallID string
1338+
invite *sip.Request
1339+
inviteCSeq uint32
1340+
inviteTx sip.ServerTransaction
1341+
contact *sip.ContactHeader
1342+
cancelled chan struct{}
1343+
from *sip.FromHeader
1344+
to *sip.ToHeader
1345+
legTr Transport
1346+
referDone chan error
13391347

13401348
mu sync.RWMutex
13411349
inviteOk *sip.Response
@@ -1347,7 +1355,7 @@ type sipInbound struct {
13471355
}
13481356

13491357
func (c *sipInbound) ValidateInvite() error {
1350-
if c.callID == "" {
1358+
if c.sipCallID == "" {
13511359
return errors.New("no Call-ID header in INVITE")
13521360
}
13531361
if c.from == nil {
@@ -1427,8 +1435,12 @@ func (c *sipInbound) Tag() RemoteTag {
14271435
return c.tag
14281436
}
14291437

1430-
func (c *sipInbound) CallID() string {
1431-
return c.callID
1438+
func (c *sipInbound) SIPCallID() string {
1439+
return c.sipCallID
1440+
}
1441+
1442+
func (c *sipInbound) InviteCSeq() uint32 {
1443+
return c.inviteCSeq
14321444
}
14331445

14341446
func (c *sipInbound) RemoteHeaders() Headers {
@@ -1519,6 +1531,10 @@ func (c *sipInbound) accepted(inviteOK *sip.Response) {
15191531
c.inviteTx = nil
15201532
}
15211533

1534+
func (c *sipInbound) AcceptAsKeepAlive() {
1535+
c.RespondAndDrop(sip.StatusOK, "OK")
1536+
}
1537+
15221538
func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[string]string) error {
15231539
ctx, span := tracer.Start(ctx, "sipInbound.Accept")
15241540
defer span.End()

pkg/sip/outbound.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func (c *outboundCall) close(err error, status CallStatus, description string, r
314314
go c.c.handler.OnSessionEnd(context.Background(), &CallIdentifier{
315315
ProjectID: c.projectID,
316316
CallID: c.state.callInfo.CallId,
317-
SipCallID: c.cc.CallID(),
317+
SipCallID: c.cc.SIPCallID(),
318318
}, c.state.callInfo, description)
319319
}
320320
})
@@ -734,7 +734,7 @@ func (c *sipOutbound) Tag() RemoteTag {
734734
return c.tag
735735
}
736736

737-
func (c *sipOutbound) CallID() string {
737+
func (c *sipOutbound) SIPCallID() string {
738738
c.mu.RLock()
739739
defer c.mu.RUnlock()
740740
return c.callID

pkg/sip/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ type Signaling interface {
120120
To() sip.Uri
121121
ID() LocalTag
122122
Tag() RemoteTag
123-
CallID() string
123+
SIPCallID() string
124124
RemoteHeaders() Headers
125125

126126
WriteRequest(req *sip.Request) error

pkg/sip/server.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ type Server struct {
137137

138138
closing core.Fuse
139139
cmu sync.RWMutex
140-
activeCalls map[RemoteTag]*inboundCall
141-
byLocal map[LocalTag]*inboundCall
140+
byRemoteTag map[RemoteTag]*inboundCall
141+
byLocalTag map[LocalTag]*inboundCall
142+
byCallID map[string]*inboundCall
142143

143144
infos struct {
144145
sync.Mutex
@@ -167,8 +168,9 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats
167168
region: region,
168169
mon: mon,
169170
getIOClient: getIOClient,
170-
activeCalls: make(map[RemoteTag]*inboundCall),
171-
byLocal: make(map[LocalTag]*inboundCall),
171+
byRemoteTag: make(map[RemoteTag]*inboundCall),
172+
byLocalTag: make(map[LocalTag]*inboundCall),
173+
byCallID: make(map[string]*inboundCall),
172174
}
173175
s.infos.byCallID = expirable.NewLRU[string, *inboundCallInfo](maxCallCache, nil, callCacheTTL)
174176
s.initMediaRes()
@@ -315,8 +317,8 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
315317
func (s *Server) Stop() {
316318
s.closing.Break()
317319
s.cmu.Lock()
318-
calls := maps.Values(s.activeCalls)
319-
s.activeCalls = make(map[RemoteTag]*inboundCall)
320+
calls := maps.Values(s.byRemoteTag)
321+
s.byRemoteTag = make(map[RemoteTag]*inboundCall)
320322
s.cmu.Unlock()
321323
for _, c := range calls {
322324
_ = c.Close()

pkg/sip/service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *Service) ActiveCalls() ActiveCalls {
157157
s.cli.cmu.Unlock()
158158

159159
s.srv.cmu.Lock()
160-
samples, total = sampleMap(5, s.srv.activeCalls, func(v *inboundCall) string {
160+
samples, total = sampleMap(5, s.srv.byRemoteTag, func(v *inboundCall) string {
161161
if v == nil || v.cc == nil {
162162
return "<nil>"
163163
}
@@ -340,7 +340,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string,
340340
}
341341

342342
s.srv.cmu.Lock()
343-
in := s.srv.byLocal[LocalTag(callID)]
343+
in := s.srv.byLocalTag[LocalTag(callID)]
344344
s.srv.cmu.Unlock()
345345

346346
if in != nil {

pkg/sip/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func LoggerWithParams(log logger.Logger, c Signaling) logger.Logger {
245245
if tag := c.Tag(); tag != "" {
246246
log = log.WithValues("sipTag", tag)
247247
}
248-
if cid := c.CallID(); cid != "" {
248+
if cid := c.SIPCallID(); cid != "" {
249249
log = log.WithValues("sipCallID", cid)
250250
}
251251
return log
@@ -304,7 +304,7 @@ func HeadersToAttrs(attrs, hdrToAttr map[string]string, opts livekit.SIPHeaderOp
304304
if tag := c.Tag(); tag != "" {
305305
attrs[AttrSIPCallTag] = string(tag)
306306
}
307-
if cid := c.CallID(); cid != "" {
307+
if cid := c.SIPCallID(); cid != "" {
308308
attrs[AttrSIPCallIDFull] = cid
309309
}
310310
}

0 commit comments

Comments
 (0)