Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit b0623ac

Browse files
authored
Merge pull request #988 from starwarfan/internal-rework
Refactor internal transport for reducing listening ports
2 parents b506ad4 + 2d6071e commit b0623ac

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+4054
-1747
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
Internal Transport
2+
3+
# 1 Overview
4+
5+
Internal transport uses client-server model.
6+
7+
An internal server for each process and one server can host multiple streams.
8+
9+
An internal client for each remote stream, client connects server to receive stream data.
10+
11+
The internal connection can be TCP/TLS/QUIC.
12+
13+
![plot](./pics/internal_structure.png)
14+
15+
# 2 Workflow
16+
17+
![plot](./pics/internal_connect.png)
18+
19+
(Certificate to verify client)
20+
21+
Drop data policy:
22+
23+
1. Drop non-key frame data for video.
24+
25+
2. Drop low level data for audio.
26+
27+
3. Drop oldest data.
28+
29+
4. Other customized policy based on media information.
30+
31+
# 3 Data format
32+
33+
The transport message over internal connection is in following format:
34+
35+
| 4 bytes (payload length K) | K bytes (payload data) |
36+
37+
Payload data = | 1 byte (message type) | (k – 1 ) bytes format data |
38+
39+
The “message type” can be: (protobuf)
40+
41+
1. Feedback (client to server)
42+
43+
2. MediaFrame (server to client)
44+
45+
3. MetaData (server to client)
46+
47+
The payload format of each type can be referred from MediaFramePipeline.cpp.
48+
49+
Feedback – class FeedbackMsg
50+
51+
MediaFrame – class Frame
52+
53+
MetaData – class MetaData
54+
55+
# 4 Control layer
56+
57+
## 4.1 Spread call
58+
59+
Every stream has an internal address of server (ip, port).
60+
61+
For each agent process, there’re local and remote streams like client
62+
SDK.
63+
64+
`Local stream`: stream which is created on local process.
65+
66+
`Remote stream`: stream which is spread from other nodes through
67+
internal connection.
68+
69+
If we want to spread a stream to local, only internal address (ip, port) of that stream is needed.
70+
71+
RPC calls for spreading:
72+
73+
Conference node –(linkup with stream address)–> Target node (do spread and linkup)
74+
75+
Linkup call:
76+
RPC linkup(
77+
targetId, /* linkup destination, like webrtc subscription Id */
78+
from = { /* linkup source */
79+
type /* audio, video... */: {id, ip, port} /* remote stream description */
80+
},
81+
)
82+
83+
84+
## 4.2 Lifetime
85+
86+
A `TransportServer` lives during the `workingNode` process.
87+
88+
A `TransportClient` is created for a `Remote stream`, is destroyed
89+
by disconnect event or close call.
12 KB
Loading
6.31 KB
Loading
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright (C) <2021> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef BUILDING_NODE_EXTENSION
6+
#define BUILDING_NODE_EXTENSION
7+
#endif
8+
9+
#include "InternalClientWrapper.h"
10+
11+
using namespace v8;
12+
13+
DEFINE_LOGGER(InternalClient, "InternalClientWrapper");
14+
15+
Nan::Persistent<Function> InternalClient::constructor;
16+
17+
InternalClient::InternalClient() {
18+
async_stats_ = new uv_async_t;
19+
stats_callback_ = nullptr;
20+
uv_async_init(uv_default_loop(), async_stats_, &InternalClient::statsCallback);
21+
}
22+
23+
static void destroyAsyncHandle(uv_handle_t *handle) {
24+
delete handle;
25+
handle = nullptr;
26+
}
27+
28+
InternalClient::~InternalClient() {
29+
boost::mutex::scoped_lock lock(stats_lock);
30+
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(async_stats_))) {
31+
ELOG_DEBUG("Closing Stats handle");
32+
uv_close(reinterpret_cast<uv_handle_t*>(async_stats_), destroyAsyncHandle);
33+
}
34+
async_stats_ = nullptr;
35+
}
36+
37+
NAN_MODULE_INIT(InternalClient::Init) {
38+
// Constructor template
39+
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
40+
tpl->SetClassName(Nan::New("InternalClient").ToLocalChecked());
41+
tpl->InstanceTemplate()->SetInternalFieldCount(1);
42+
43+
// Prototype
44+
Nan::SetPrototypeMethod(tpl, "close", close);
45+
Nan::SetPrototypeMethod(tpl, "addDestination", addDestination);
46+
Nan::SetPrototypeMethod(tpl, "removeDestination", removeDestination);
47+
48+
constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked());
49+
Nan::Set(target, Nan::New("InternalClient").ToLocalChecked(),
50+
Nan::GetFunction(tpl).ToLocalChecked());
51+
}
52+
53+
NAN_METHOD(InternalClient::New) {
54+
if (info.Length() < 4) {
55+
Nan::ThrowError("Wrong number of arguments");
56+
}
57+
58+
if (info.IsConstructCall()) {
59+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
60+
std::string streamId = std::string(*param0);
61+
62+
Nan::Utf8String param1(Nan::To<v8::String>(info[1]).ToLocalChecked());
63+
std::string protocol = std::string(*param1);
64+
65+
Nan::Utf8String param2(Nan::To<v8::String>(info[2]).ToLocalChecked());
66+
std::string ip = std::string(*param2);
67+
68+
unsigned int port = Nan::To<unsigned int>(info[3]).FromJust();
69+
70+
InternalClient* obj = new InternalClient();
71+
obj->me = new owt_base::InternalClient(
72+
streamId, protocol, ip, port, obj);
73+
if (info.Length() > 4 && info[4]->IsFunction()) {
74+
obj->stats_callback_ = new Nan::Callback(info[4].As<Function>());
75+
}
76+
obj->src = obj->me;
77+
obj->Wrap(info.This());
78+
info.GetReturnValue().Set(info.This());
79+
} else {
80+
ELOG_WARN("Not construct call");
81+
}
82+
}
83+
84+
NAN_METHOD(InternalClient::close) {
85+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
86+
obj->src = nullptr;
87+
delete obj->me;
88+
obj->me = nullptr;
89+
}
90+
91+
NAN_METHOD(InternalClient::addDestination) {
92+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
93+
owt_base::InternalClient* me = obj->me;
94+
95+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
96+
std::string track = std::string(*param0);
97+
98+
FrameDestination* param =
99+
ObjectWrap::Unwrap<FrameDestination>(
100+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
101+
owt_base::FrameDestination* dest = param->dest;
102+
103+
if (track == "audio") {
104+
me->addAudioDestination(dest);
105+
} else if (track == "video") {
106+
me->addVideoDestination(dest);
107+
}
108+
}
109+
110+
NAN_METHOD(InternalClient::removeDestination) {
111+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
112+
owt_base::InternalClient* me = obj->me;
113+
114+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
115+
std::string track = std::string(*param0);
116+
117+
FrameDestination* param =
118+
ObjectWrap::Unwrap<FrameDestination>(
119+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
120+
owt_base::FrameDestination* dest = param->dest;
121+
122+
if (track == "audio") {
123+
me->removeAudioDestination(dest);
124+
} else if (track == "video") {
125+
me->removeVideoDestination(dest);
126+
}
127+
}
128+
129+
NAUV_WORK_CB(InternalClient::statsCallback) {
130+
Nan::HandleScope scope;
131+
InternalClient* obj = reinterpret_cast<InternalClient*>(async->data);
132+
if (!obj || !obj->me || !obj->stats_callback_) {
133+
return;
134+
}
135+
136+
boost::mutex::scoped_lock lock(obj->stats_lock);
137+
while (!obj->stats_messages.empty()) {
138+
Local<Value> args[] = {
139+
Nan::New(obj->stats_messages.front().c_str()).ToLocalChecked()
140+
};
141+
Nan::AsyncResource resource("InternalServer.statsCallback");
142+
resource.runInAsyncScope(Nan::GetCurrentContext()->Global(),
143+
obj->stats_callback_->GetFunction(),
144+
1, args);
145+
obj->stats_messages.pop();
146+
}
147+
}
148+
149+
void InternalClient::onConnected() {
150+
boost::mutex::scoped_lock lock(stats_lock);
151+
if (!async_stats_ || !stats_callback_) {
152+
return;
153+
}
154+
stats_messages.push("connected");
155+
async_stats_->data = this;
156+
uv_async_send(async_stats_);
157+
}
158+
159+
void InternalClient::onDisconnected() {
160+
boost::mutex::scoped_lock lock(stats_lock);
161+
if (!async_stats_ || !stats_callback_) {
162+
return;
163+
}
164+
stats_messages.push("disconnected");
165+
async_stats_->data = this;
166+
uv_async_send(async_stats_);
167+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (C) <2021> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef INTERNALCLIENTWRAPPER_H
6+
#define INTERNALCLIENTWRAPPER_H
7+
8+
#include "../../addons/common/MediaFramePipelineWrapper.h"
9+
#include <InternalClient.h>
10+
#include <logger.h>
11+
#include <nan.h>
12+
13+
/*
14+
* Wrapper class of owt_base::InternalClient
15+
*/
16+
class InternalClient : public FrameSource,
17+
public owt_base::InternalClient::Listener {
18+
public:
19+
DECLARE_LOGGER();
20+
static NAN_MODULE_INIT(Init);
21+
22+
owt_base::InternalClient* me;
23+
boost::mutex stats_lock;
24+
std::queue<std::string> stats_messages;
25+
26+
// Implements owt_base::InternalClient::Listener
27+
void onConnected() override;
28+
void onDisconnected() override;
29+
30+
private:
31+
InternalClient();
32+
~InternalClient();
33+
34+
Nan::Callback *stats_callback_;
35+
uv_async_t *async_stats_;
36+
37+
static Nan::Persistent<v8::Function> constructor;
38+
39+
static NAN_METHOD(New);
40+
41+
static NAN_METHOD(close);
42+
43+
static NAN_METHOD(addDestination);
44+
45+
static NAN_METHOD(removeDestination);
46+
47+
static NAUV_WORK_CB(statsCallback);
48+
};
49+
50+
#endif

source/agent/addons/internalIO/InternalConfig.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
#endif
88

99
#include "InternalConfig.h"
10-
#include <RawTransport.h>
10+
#include <TransportBase.h>
1111

1212
using namespace v8;
1313

1414
void setPassphrase(const FunctionCallbackInfo<Value>& args) {
15-
String::Utf8Value param0(args[0]->ToString());
15+
Isolate* isolate = Isolate::GetCurrent();
16+
String::Utf8Value param0(isolate, args[0]->ToString());
1617
std::string p = std::string(*param0);
17-
owt_base::RawTransport<owt_base::Protocol::TCP>::setPassphrase(p);
18+
owt_base::TransportSecret::setPassphrase(p);
1819
}
1920

2021
void InitInternalConfig(v8::Local<v8::Object> exports) {

0 commit comments

Comments
 (0)