Skip to content

Commit 4f29f99

Browse files
authored
Get to the point of establishing publishing peer connection and opening signalling data channel. (#706)
* WIP * queue up message sends * WIP * handle more messages * clean up * message queue * signalling version in state * clean up
1 parent 82191cf commit 4f29f99

12 files changed

+428
-59
lines changed

engine.go

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ var (
113113
// -------------------------------------------
114114

115115
const (
116-
reliableDataChannelName = "_reliable"
117-
lossyDataChannelName = "_lossy"
116+
signallingDataChannelName = "_signalling"
117+
reliableDataChannelName = "_reliable"
118+
lossyDataChannelName = "_lossy"
118119

119120
maxReconnectCount = 10
120121
initialReconnectInterval = 300 * time.Millisecond
@@ -124,6 +125,8 @@ const (
124125
type RTCEngine struct {
125126
log protoLogger.Logger
126127

128+
signallingVersion signalling.SignallingVersion
129+
127130
engineHandler engineHandler
128131
cbGetLocalParticipantSID func() string
129132

@@ -136,6 +139,7 @@ type RTCEngine struct {
136139
signalTransport signalling.SignalTransport
137140

138141
dclock sync.RWMutex
142+
signallingDC *webrtc.DataChannel
139143
reliableDC *webrtc.DataChannel
140144
lossyDC *webrtc.DataChannel
141145
reliableDCSub *webrtc.DataChannel
@@ -163,32 +167,39 @@ type RTCEngine struct {
163167
onCloseLock sync.Mutex
164168
}
165169

166-
func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() string) *RTCEngine {
170+
func NewRTCEngine(
171+
signallingVersion signalling.SignallingVersion,
172+
engineHandler engineHandler,
173+
getLocalParticipantSID func() string,
174+
) *RTCEngine {
167175
e := &RTCEngine{
168176
log: logger,
177+
signallingVersion: signallingVersion,
169178
engineHandler: engineHandler,
170179
cbGetLocalParticipantSID: getLocalParticipantSID,
171180
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
172181
joinTimeout: 15 * time.Second,
173182
reliableMsgSeq: 1,
174183
}
175-
// SIGNALLING-V2-TODO: have to instantiate objects based on signal version & transport
176-
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
177-
Logger: e.log,
178-
})
179-
e.signalHandler = signalling.NewSignalHandler(signalling.SignalHandlerParams{
180-
Logger: e.log,
181-
Processor: e,
182-
})
183-
e.signalTransport = signalling.NewSignalTransportWebSocket(signalling.SignalTransportWebSocketParams{
184-
Logger: e.log,
185-
Version: Version,
186-
Protocol: PROTOCOL,
187-
Signalling: e.signalling,
188-
SignalTransportHandler: e,
189-
SignalHandler: e.signalHandler,
190-
})
191-
/*
184+
switch signallingVersion {
185+
case signalling.SignallingVersionV1:
186+
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
187+
Logger: e.log,
188+
})
189+
e.signalHandler = signalling.NewSignalHandler(signalling.SignalHandlerParams{
190+
Logger: e.log,
191+
Processor: e,
192+
})
193+
e.signalTransport = signalling.NewSignalTransportWebSocket(signalling.SignalTransportWebSocketParams{
194+
Logger: e.log,
195+
Version: Version,
196+
Protocol: PROTOCOL,
197+
Signalling: e.signalling,
198+
SignalTransportHandler: e,
199+
SignalHandler: e.signalHandler,
200+
})
201+
202+
case signalling.SignallingVersionV2:
192203
e.signalling = signalling.NewSignallingv2(signalling.Signallingv2Params{
193204
Logger: e.log,
194205
})
@@ -204,7 +215,10 @@ func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() str
204215
Signalling: e.signalling,
205216
SignalHandler: e.signalHandler,
206217
})
207-
*/
218+
219+
default:
220+
return nil
221+
}
208222

209223
e.onClose = []func(){}
210224
return e
@@ -319,8 +333,8 @@ func (e *RTCEngine) setRTT(rtt uint32) {
319333
func (e *RTCEngine) configure(
320334
iceServers []*livekit.ICEServer,
321335
clientConfig *livekit.ClientConfiguration,
322-
subscriberPrimary *bool) error {
323-
336+
subscriberPrimary *bool,
337+
) error {
324338
configuration := e.makeRTCConfiguration(iceServers, clientConfig)
325339

326340
// reset reliable message sequence
@@ -468,6 +482,7 @@ func (e *RTCEngine) configure(
468482
return err
469483
}
470484
e.lossyDC.OnMessage(e.handleDataPacket)
485+
471486
e.reliableDC, err = e.publisher.PeerConnection().CreateDataChannel(reliableDataChannelName, &webrtc.DataChannelInit{
472487
Ordered: &trueVal,
473488
})
@@ -476,6 +491,30 @@ func (e *RTCEngine) configure(
476491
return err
477492
}
478493
e.reliableDC.OnMessage(e.handleDataPacket)
494+
495+
// SIGNALLING-V2-TODO: instantiating this rely on signal transport strategy rather than signalling version
496+
// SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
497+
if e.signallingVersion == signalling.SignallingVersionV2 {
498+
e.signallingDC, err = e.publisher.PeerConnection().CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
499+
Ordered: &trueVal,
500+
})
501+
if err != nil {
502+
e.dclock.Unlock()
503+
return err
504+
}
505+
e.signallingDC.OnOpen(func() {
506+
signallingTransportDataChannel := signalling.NewSignalTransportDataChannel(signalling.SignalTransportDataChannelParams{
507+
Logger: e.log,
508+
DataChannel: e.signallingDC,
509+
SignalHandler: e.signalHandler,
510+
})
511+
e.signalTransport.SetAsyncTransport(signallingTransportDataChannel)
512+
})
513+
e.signallingDC.OnClose(func() {
514+
// SIGNALLING-V2-TODO: should call SignalTransportHandler.OnClose
515+
})
516+
e.signallingDC.OnMessage(e.handleSignalling)
517+
}
479518
e.dclock.Unlock()
480519

481520
return nil
@@ -640,6 +679,10 @@ func (e *RTCEngine) readDataPacket(msg webrtc.DataChannelMessage) (*livekit.Data
640679
return dataPacket, err
641680
}
642681

682+
func (e *RTCEngine) handleSignalling(msg webrtc.DataChannelMessage) {
683+
e.signalHandler.HandleEncodedMessage(msg.Data)
684+
}
685+
643686
func (e *RTCEngine) handleDisconnect(fullReconnect bool) {
644687
// do not retry until fully connected
645688
if e.closed.Load() || !e.hasConnected.Load() {

room.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ import (
3838
"github.com/livekit/protocol/livekit"
3939
)
4040

41+
var (
42+
signallingVersion signalling.SignallingVersion = signalling.SignallingVersionV1
43+
)
44+
45+
func WithSignallingVersion(v signalling.SignallingVersion) {
46+
signallingVersion = v
47+
}
48+
4149
var (
4250
_ engineHandler = (*Room)(nil)
4351
)
@@ -204,7 +212,7 @@ func NewRoom(callback *RoomCallback) *Room {
204212
}
205213
r.callback.Merge(callback)
206214

207-
r.engine = NewRTCEngine(r, r.getLocalParticipantSID)
215+
r.engine = NewRTCEngine(signallingVersion, r, r.getLocalParticipantSID)
208216
r.LocalParticipant = newLocalParticipant(r.engine, r.callback, r.serverInfo)
209217
return r
210218
}

signalling/errors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ var (
2424
ErrInvalidMessageType = errors.New("invalid message type")
2525
ErrInvalidParameter = errors.New("invalid parameter")
2626
ErrCannotDialSignal = errors.New("could not dial signal connection")
27-
ErrEmptyResponse = errors.New("empty response")
27+
ErrMessageQueueNotStarted = errors.New("message queue not started")
28+
ErrMessageQueueFull = errors.New("message queue is full")
2829
)

signalling/interfaces.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,32 @@ import (
2727
"google.golang.org/protobuf/proto"
2828
)
2929

30+
type SignallingVersion int
31+
32+
const (
33+
SignallingVersionUnused SignallingVersion = iota
34+
SignallingVersionV1 // v1
35+
SignallingVersionV2 // v2
36+
)
37+
38+
func (s SignallingVersion) String() string {
39+
switch s {
40+
case SignallingVersionUnused:
41+
return "UNUSED"
42+
43+
case SignallingVersionV1:
44+
return "V1"
45+
46+
case SignallingVersionV2:
47+
return "V2"
48+
49+
default:
50+
return fmt.Sprintf("UNKNOWN: %d", s)
51+
}
52+
}
53+
54+
// ---------------------------
55+
3056
type joinMethod int
3157

3258
const (
@@ -124,6 +150,7 @@ type ConnectParams struct {
124150

125151
type SignalTransport interface {
126152
SetLogger(l protoLogger.Logger)
153+
SetAsyncTransport(asyncTransport SignalTransport)
127154

128155
Start()
129156
IsStarted() bool
@@ -153,6 +180,9 @@ type SignalHandler interface {
153180
SetLogger(l protoLogger.Logger)
154181

155182
HandleMessage(msg proto.Message) error
183+
HandleEncodedMessage(data []byte) error
184+
185+
PruneStaleReassemblies()
156186
}
157187

158188
type SignalProcessor interface {

signalling/messagequeue.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2023 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalling
16+
17+
import (
18+
"sync"
19+
20+
"github.com/livekit/protocol/logger"
21+
"google.golang.org/protobuf/proto"
22+
)
23+
24+
type messageQueueParams struct {
25+
Logger logger.Logger
26+
HandleMessage func(msg proto.Message)
27+
}
28+
29+
type messageQueue struct {
30+
params messageQueueParams
31+
32+
lock sync.RWMutex
33+
isStarted bool
34+
msgChan chan proto.Message
35+
}
36+
37+
func newMessageQueue(params messageQueueParams) *messageQueue {
38+
return &messageQueue{
39+
params: params,
40+
}
41+
}
42+
43+
func (m *messageQueue) SetLogger(l logger.Logger) {
44+
m.params.Logger = l
45+
}
46+
47+
func (m *messageQueue) Start() {
48+
m.lock.Lock()
49+
defer m.lock.Unlock()
50+
51+
if m.isStarted {
52+
return
53+
}
54+
m.isStarted = true
55+
56+
m.msgChan = make(chan proto.Message, 100)
57+
go m.worker(m.msgChan)
58+
}
59+
60+
func (m *messageQueue) IsStarted() bool {
61+
m.lock.RLock()
62+
defer m.lock.RUnlock()
63+
64+
return m.isStarted
65+
}
66+
67+
func (m *messageQueue) Close() {
68+
m.lock.Lock()
69+
defer m.lock.Unlock()
70+
71+
if !m.isStarted {
72+
return
73+
}
74+
m.isStarted = false
75+
76+
close(m.msgChan)
77+
}
78+
79+
func (m *messageQueue) Enqueue(msg proto.Message) error {
80+
if msg == nil {
81+
return nil
82+
}
83+
84+
m.lock.RLock()
85+
defer m.lock.RUnlock()
86+
if !m.isStarted {
87+
return ErrMessageQueueNotStarted
88+
}
89+
90+
select {
91+
case m.msgChan <- msg:
92+
return nil
93+
default:
94+
return ErrMessageQueueFull
95+
}
96+
}
97+
98+
func (m *messageQueue) worker(msgChan chan proto.Message) {
99+
for {
100+
msg, more := <-msgChan
101+
if !more {
102+
return
103+
}
104+
105+
m.params.HandleMessage(msg)
106+
}
107+
}

signalling/signalhandler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,12 @@ func (s *signalhandler) HandleMessage(msg proto.Message) error {
119119

120120
return nil
121121
}
122+
123+
func (s *signalhandler) HandleEncodedMessage(data []byte) error {
124+
signalResponse := &livekit.SignalResponse{}
125+
if err := proto.Unmarshal(data, signalResponse); err != nil {
126+
return err
127+
}
128+
129+
return s.HandleMessage(signalResponse)
130+
}

signalling/signalhandlerunimplemented.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,9 @@ func (s *signalhandlerUnimplemented) SetLogger(l logger.Logger) {}
2929
func (s *signalhandlerUnimplemented) HandleMessage(msg proto.Message) error {
3030
return nil
3131
}
32+
33+
func (s *signalhandlerUnimplemented) HandleEncodedMessage(data []byte) error {
34+
return nil
35+
}
36+
37+
func (s *signalhandlerUnimplemented) PruneStaleReassemblies() {}

0 commit comments

Comments
 (0)