Skip to content

Commit e80dab7

Browse files
committed
Adding single goroutine to batch-delete invites at intervals
1 parent 20e5b67 commit e80dab7

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

pkg/sip/inbound.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,24 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo {
137137
return c
138138
}
139139

140+
func (s *Server) cleanupInvites() {
141+
ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes
142+
defer ticker.Stop()
143+
for {
144+
select {
145+
case <-s.closing.Watch():
146+
return
147+
case <-ticker.C:
148+
s.imu.Lock()
149+
for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); {
150+
key := it.Item().Value
151+
delete(s.inProgressInvites, *key)
152+
}
153+
s.imu.Unlock()
154+
}
155+
}
156+
}
157+
140158
func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
141159
key := dialogKey{
142160
sipCallID: sipCallID,
@@ -149,15 +167,13 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
149167
if ok {
150168
return is
151169
}
152-
is = &inProgressInvite{sipCallID: sipCallID}
170+
is = &inProgressInvite{
171+
sipCallID: sipCallID,
172+
expireAt: time.Now().Add(inviteCredentialValidity),
173+
}
153174
s.inProgressInvites[key] = is
175+
s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key})
154176

155-
go func() {
156-
time.Sleep(inviteCredentialValidity)
157-
s.imu.Lock()
158-
defer s.imu.Unlock()
159-
delete(s.inProgressInvites, key)
160-
}()
161177
return is
162178
}
163179

@@ -1403,7 +1419,6 @@ func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request,
14031419
c := &sipInbound{
14041420
log: log,
14051421
s: s,
1406-
id: "unassigned",
14071422
invite: invite,
14081423
inviteTx: inviteTx,
14091424
legTr: legTransportFromReq(invite),

pkg/sip/server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net"
2525
"net/netip"
2626
"sync"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/frostbyte73/core"
@@ -35,6 +36,7 @@ import (
3536
"github.com/livekit/protocol/livekit"
3637
"github.com/livekit/protocol/logger"
3738
"github.com/livekit/protocol/rpc"
39+
"github.com/livekit/protocol/utils"
3840
"github.com/livekit/protocol/utils/traceid"
3941
"github.com/livekit/sipgo"
4042
"github.com/livekit/sipgo/sip"
@@ -142,8 +144,10 @@ type Server struct {
142144
sipListeners []io.Closer
143145
sipUnhandled RequestHandler
144146

145-
imu sync.Mutex
146-
inProgressInvites map[dialogKey]*inProgressInvite
147+
imu sync.Mutex
148+
inProgressInvites map[dialogKey]*inProgressInvite
149+
inviteTimeoutQueue utils.TimeoutQueue[*dialogKey]
150+
isCleanupTaskRunning atomic.Bool
147151

148152
closing core.Fuse
149153
cmu sync.RWMutex
@@ -167,6 +171,7 @@ type inProgressInvite struct {
167171
sipCallID string
168172
challenge digest.Challenge
169173
lkCallID string // SCL_* LiveKit call ID assigned to this dialog
174+
expireAt time.Time
170175
}
171176

172177
type ServerOption func(s *Server)
@@ -337,6 +342,9 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
337342
}
338343
}
339344

345+
// Start the cleanup task
346+
go s.cleanupInvites()
347+
340348
return nil
341349
}
342350

0 commit comments

Comments
 (0)