Skip to content

Commit e4146af

Browse files
committed
Adding single goroutine to batch-delete invites at intervals
1 parent 7e3a124 commit e4146af

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

pkg/sip/inbound.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,24 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo {
137137
return c
138138
}
139139

140+
func (s *Server) cleanupInvites() {
141+
ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes
142+
defer ticker.Stop()
143+
for {
144+
select {
145+
case <-s.closing.Watch():
146+
return
147+
case <-ticker.C:
148+
s.imu.Lock()
149+
for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); {
150+
key := it.Item().Value
151+
delete(s.inProgressInvites, *key)
152+
}
153+
s.imu.Unlock()
154+
}
155+
}
156+
}
157+
140158
func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
141159
key := dialogKey{
142160
sipCallID: sipCallID,
@@ -149,15 +167,13 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
149167
if ok {
150168
return is
151169
}
152-
is = &inProgressInvite{sipCallID: sipCallID}
170+
is = &inProgressInvite{
171+
sipCallID: sipCallID,
172+
expireAt: time.Now().Add(inviteCredentialValidity),
173+
}
153174
s.inProgressInvites[key] = is
175+
s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key})
154176

155-
go func() {
156-
time.Sleep(inviteCredentialValidity)
157-
s.imu.Lock()
158-
defer s.imu.Unlock()
159-
delete(s.inProgressInvites, key)
160-
}()
161177
return is
162178
}
163179

@@ -1324,7 +1340,6 @@ func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request,
13241340
c := &sipInbound{
13251341
log: log,
13261342
s: s,
1327-
id: "unassigned",
13281343
invite: invite,
13291344
inviteTx: inviteTx,
13301345
legTr: legTransportFromReq(invite),

pkg/sip/server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net"
2525
"net/netip"
2626
"sync"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/frostbyte73/core"
@@ -35,6 +36,7 @@ import (
3536
"github.com/livekit/protocol/livekit"
3637
"github.com/livekit/protocol/logger"
3738
"github.com/livekit/protocol/rpc"
39+
"github.com/livekit/protocol/utils"
3840
"github.com/livekit/sipgo"
3941
"github.com/livekit/sipgo/sip"
4042

@@ -137,8 +139,10 @@ type Server struct {
137139
sipListeners []io.Closer
138140
sipUnhandled RequestHandler
139141

140-
imu sync.Mutex
141-
inProgressInvites map[dialogKey]*inProgressInvite
142+
imu sync.Mutex
143+
inProgressInvites map[dialogKey]*inProgressInvite
144+
inviteTimeoutQueue utils.TimeoutQueue[*dialogKey]
145+
isCleanupTaskRunning atomic.Bool
142146

143147
closing core.Fuse
144148
cmu sync.RWMutex
@@ -161,6 +165,7 @@ type inProgressInvite struct {
161165
sipCallID string
162166
challenge digest.Challenge
163167
lkCallID string // SCL_* LiveKit call ID assigned to this dialog
168+
expireAt time.Time
164169
}
165170

166171
func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Server {
@@ -316,6 +321,9 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
316321
}
317322
}
318323

324+
// Start the cleanup task
325+
go s.cleanupInvites()
326+
319327
return nil
320328
}
321329

0 commit comments

Comments
 (0)