diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index abaf8ffa..e243ddee 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -30,6 +30,7 @@ import ( "github.com/frostbyte73/core" "github.com/icholy/digest" + psdp "github.com/pion/sdp/v3" "github.com/pkg/errors" msdk "github.com/livekit/media-sdk" @@ -1003,6 +1004,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit if err != nil { return nil, err } + c.cc.nextSDPVersion = answer.SDP.Origin.SessionVersion + 1 c.mon.SDPSize(len(answerData), false) c.log().Debugw("SDP answer", "sdp", string(answerData)) @@ -1421,6 +1423,48 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade } +func (c *inboundCall) holdCall(ctx context.Context) error { + c.log().Infow("holding inbound call") + + // Disable media timeout during hold to prevent call termination + if c.media != nil { + c.media.EnableTimeout(false) + c.log().Infow("media timeout disabled for hold") + } + + err := c.cc.holdCall(ctx) + if err != nil { + c.log().Infow("inbound call failed to hold", "error", err) + // Re-enable timeout if hold failed + if c.media != nil { + c.media.EnableTimeout(true) + } + return err + } + + c.log().Infow("inbound call held") + return nil +} + +func (c *inboundCall) unholdCall(ctx context.Context) error { + c.log().Infow("unholding inbound call") + + err := c.cc.unholdCall(ctx) + if err != nil { + c.log().Infow("inbound call failed to unhold", "error", err) + return err + } + + // Re-enable media timeout after unhold + if c.media != nil { + c.media.EnableTimeout(true) + c.log().Infow("media timeout re-enabled after unhold") + } + + c.log().Infow("inbound call unheld") + return nil +} + func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { c := &sipInbound{ log: log, @@ -1467,6 +1511,7 @@ type sipInbound struct { referDone chan error mu sync.RWMutex + sdpMu sync.RWMutex lastSDP []byte inviteOk *sip.Response nextRequestCSeq uint32 @@ -1474,6 +1519,7 @@ type sipInbound struct { ringing chan struct{} acked core.Fuse setHeaders setHeadersFunc + nextSDPVersion uint64 } func (c *sipInbound) ValidateInvite() error { @@ -1952,3 +1998,136 @@ func (c *sipInbound) CloseWithStatus(code sip.StatusCode, status string) { c.drop() } } + +func (c *sipInbound) setSDPMediaDirection(sdpData []byte, direction string) ([]byte, error) { + if len(sdpData) == 0 { + return sdpData, nil + } + + // Parse SDP using the base Parse function (works for both offers and answers) + desc, err := sdp.Parse(sdpData) + if err != nil { + return nil, fmt.Errorf("failed to parse SDP: %w", err) + } + + // Modify direction attributes in each media description + for _, mediaDesc := range desc.SDP.MediaDescriptions { + if mediaDesc == nil { + continue + } + + // Find and remove existing direction attributes + newAttributes := slices.DeleteFunc(mediaDesc.Attributes, func(attr psdp.Attribute) bool { + switch attr.Key { + case "sendrecv", "sendonly", "recvonly", "inactive": + return true + default: + return false + } + }) + + // Add the new direction attribute + newAttributes = append(newAttributes, psdp.Attribute{ + Key: direction, + Value: "", + }) + + mediaDesc.Attributes = newAttributes + } + + // Set session version to current value plus current unix timestamp + desc.SDP.Origin.SessionVersion = c.nextSDPVersion + c.nextSDPVersion += 1 + + // Marshal back to bytes + modifiedSDP, err := desc.SDP.Marshal() + if err != nil { + return nil, fmt.Errorf("failed to marshal modified SDP: %w", err) + } + + return modifiedSDP, nil +} + +func (c *sipInbound) createMediaUpdateRequest(direction string) (*sip.Request, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.sdpMu.Lock() + defer c.sdpMu.Unlock() + + if c.invite == nil || c.inviteOk == nil { + return nil, psrpc.NewErrorf(psrpc.FailedPrecondition, "can't update media direction for non established call") + } + + // Create INVITE with SDP modified for media update + req := sip.NewRequest(sip.INVITE, c.invite.Recipient) + c.setCSeq(req) + + // Copy headers from original INVITE + req.AppendHeader(c.invite.From()) + req.AppendHeader(c.invite.To()) + req.AppendHeader(c.invite.CallID()) + req.AppendHeader(c.contact) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + // Modify SDP to set direction + sdpOffer := c.inviteOk.Body() + if len(sdpOffer) > 0 { + modifiedSDP, err := c.setSDPMediaDirection(sdpOffer, direction) + if err != nil { + return nil, err + } + req.SetBody(modifiedSDP) + } + + c.swapSrcDst(req) + return req, nil +} + +func (c *sipInbound) sendMediaUpdateRequest(ctx context.Context, direction string) error { + req, err := c.createMediaUpdateRequest(direction) + if err != nil { + return err + } + + // Send the INVITE request + tx, err := c.Transaction(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return &livekit.SIPStatus{ + Code: livekit.SIPStatusCode(resp.StatusCode), + Status: resp.Reason, + } + } + + // Send ACK for the unhold INVITE + ack := sip.NewAckRequest(req, resp, nil) + if err := c.WriteRequest(ack); err != nil { + return err + } + + return nil +} + +func (c *sipInbound) holdCall(ctx context.Context) error { + if err := c.sendMediaUpdateRequest(ctx, "sendonly"); err != nil { + return err + } + return nil +} + +func (c *sipInbound) unholdCall(ctx context.Context) error { + if err := c.sendMediaUpdateRequest(ctx, "sendrecv"); err != nil { + return err + } + return nil +}