Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/frostbyte73/core"
"github.com/icholy/digest"
psdp "github.com/pion/sdp/v3"
"github.com/pkg/errors"

msdk "github.com/livekit/media-sdk"
Expand Down Expand Up @@ -1003,6 +1004,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit
if err != nil {
return nil, err
}
c.cc.nextSDPVersion = answer.SDP.Origin.SessionVersion + 1
c.mon.SDPSize(len(answerData), false)
c.log().Debugw("SDP answer", "sdp", string(answerData))

Expand Down Expand Up @@ -1421,6 +1423,48 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade

}

func (c *inboundCall) holdCall(ctx context.Context) error {
c.log().Infow("holding inbound call")

// Disable media timeout during hold to prevent call termination
if c.media != nil {
c.media.EnableTimeout(false)
c.log().Infow("media timeout disabled for hold")
}

err := c.cc.holdCall(ctx)
if err != nil {
c.log().Infow("inbound call failed to hold", "error", err)
// Re-enable timeout if hold failed
if c.media != nil {
c.media.EnableTimeout(true)
}
return err
}

c.log().Infow("inbound call held")
return nil
}

func (c *inboundCall) unholdCall(ctx context.Context) error {
c.log().Infow("unholding inbound call")

err := c.cc.unholdCall(ctx)
if err != nil {
c.log().Infow("inbound call failed to unhold", "error", err)
return err
}

// Re-enable media timeout after unhold
if c.media != nil {
c.media.EnableTimeout(true)
c.log().Infow("media timeout re-enabled after unhold")
}

c.log().Infow("inbound call unheld")
return nil
}

func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
c := &sipInbound{
log: log,
Expand Down Expand Up @@ -1467,13 +1511,15 @@ type sipInbound struct {
referDone chan error

mu sync.RWMutex
sdpMu sync.RWMutex
lastSDP []byte
inviteOk *sip.Response
nextRequestCSeq uint32
referCseq uint32
ringing chan struct{}
acked core.Fuse
setHeaders setHeadersFunc
nextSDPVersion uint64
}

func (c *sipInbound) ValidateInvite() error {
Expand Down Expand Up @@ -1952,3 +1998,136 @@ func (c *sipInbound) CloseWithStatus(code sip.StatusCode, status string) {
c.drop()
}
}

func (c *sipInbound) setSDPMediaDirection(sdpData []byte, direction string) ([]byte, error) {
if len(sdpData) == 0 {
return sdpData, nil
}

// Parse SDP using the base Parse function (works for both offers and answers)
desc, err := sdp.Parse(sdpData)
if err != nil {
return nil, fmt.Errorf("failed to parse SDP: %w", err)
}

// Modify direction attributes in each media description
for _, mediaDesc := range desc.SDP.MediaDescriptions {
if mediaDesc == nil {
continue
}

// Find and remove existing direction attributes
newAttributes := slices.DeleteFunc(mediaDesc.Attributes, func(attr psdp.Attribute) bool {
switch attr.Key {
case "sendrecv", "sendonly", "recvonly", "inactive":
return true
default:
return false
}
})

// Add the new direction attribute
newAttributes = append(newAttributes, psdp.Attribute{
Key: direction,
Value: "",
})

mediaDesc.Attributes = newAttributes
}

// Set session version to current value plus current unix timestamp
desc.SDP.Origin.SessionVersion = c.nextSDPVersion
c.nextSDPVersion += 1

// Marshal back to bytes
modifiedSDP, err := desc.SDP.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal modified SDP: %w", err)
}

return modifiedSDP, nil
}

func (c *sipInbound) createMediaUpdateRequest(direction string) (*sip.Request, error) {
c.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want a second mutex here, like holdMu or sdpMu that will prevent multiple calls to hold/unhold.

defer c.mu.Unlock()
c.sdpMu.Lock()
defer c.sdpMu.Unlock()

if c.invite == nil || c.inviteOk == nil {
return nil, psrpc.NewErrorf(psrpc.FailedPrecondition, "can't update media direction for non established call")
}

// Create INVITE with SDP modified for media update
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)
c.setCSeq(req)

// Copy headers from original INVITE
req.AppendHeader(c.invite.From())
req.AppendHeader(c.invite.To())
req.AppendHeader(c.invite.CallID())
req.AppendHeader(c.contact)
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))

// Modify SDP to set direction
sdpOffer := c.inviteOk.Body()
if len(sdpOffer) > 0 {
modifiedSDP, err := c.setSDPMediaDirection(sdpOffer, direction)
if err != nil {
return nil, err
}
req.SetBody(modifiedSDP)
}

c.swapSrcDst(req)
return req, nil
}

func (c *sipInbound) sendMediaUpdateRequest(ctx context.Context, direction string) error {
req, err := c.createMediaUpdateRequest(direction)
if err != nil {
return err
}

// Send the INVITE request
tx, err := c.Transaction(req)
if err != nil {
return err
}
defer tx.Terminate()

resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil)
if err != nil {
return err
}

if resp.StatusCode != sip.StatusOK {
return &livekit.SIPStatus{
Code: livekit.SIPStatusCode(resp.StatusCode),
Status: resp.Reason,
}
}

// Send ACK for the unhold INVITE
ack := sip.NewAckRequest(req, resp, nil)
if err := c.WriteRequest(ack); err != nil {
return err
}

return nil
}

func (c *sipInbound) holdCall(ctx context.Context) error {
if err := c.sendMediaUpdateRequest(ctx, "sendonly"); err != nil {
return err
}
return nil
}

func (c *sipInbound) unholdCall(ctx context.Context) error {
if err := c.sendMediaUpdateRequest(ctx, "sendrecv"); err != nil {
return err
}
return nil
}