Skip to content

Commit cd77d41

Browse files
committed
Use one IPv4+IPv6 io_context/thread each for outlet data+service
1 parent fe72a27 commit cd77d41

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

src/stream_outlet_impl.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ stream_outlet_impl::stream_outlet_impl(
2020
1000
2121
: api_config::get_instance()->outlet_buffer_reserve_samples()))),
2222
chunk_size_(chunk_size), info_(std::make_shared<stream_info_impl>(info)),
23-
send_buffer_(std::make_shared<send_buffer>(max_capacity)) {
23+
send_buffer_(std::make_shared<send_buffer>(max_capacity)),
24+
io_ctx_data_(std::make_shared<asio::io_context>(1)),
25+
io_ctx_service_(std::make_shared<asio::io_context>(1)) {
2426
ensure_lsl_initialized();
2527
const api_config *cfg = api_config::get_instance();
2628

@@ -47,7 +49,7 @@ stream_outlet_impl::stream_outlet_impl(
4749

4850
// and start the IO threads to handle them
4951
const std::string name{"IO_" + this->info().name().substr(0, 11)};
50-
for (const auto &io : ios_)
52+
for (const auto &io : {io_ctx_data_, io_ctx_service_})
5153
io_threads_.emplace_back(std::make_shared<std::thread>([io, name]() {
5254
loguru::set_thread_name(name.c_str());
5355
while (true) {
@@ -69,20 +71,18 @@ void stream_outlet_impl::instantiate_stack(tcp tcp_protocol, udp udp_protocol) {
6971
uint16_t multicast_port = cfg->multicast_port();
7072
LOG_F(2, "%s: Trying to listen at address '%s'", info().name().c_str(), listen_address.c_str());
7173
// create TCP data server
72-
ios_.push_back(std::make_shared<asio::io_context>());
7374
tcp_servers_.push_back(std::make_shared<tcp_server>(
74-
info_, ios_.back(), send_buffer_, sample_factory_, tcp_protocol, chunk_size_));
75+
info_, io_ctx_data_, send_buffer_, sample_factory_, tcp_protocol, chunk_size_));
7576
// create UDP time server
76-
ios_.push_back(std::make_shared<asio::io_context>());
77-
udp_servers_.push_back(std::make_shared<udp_server>(info_, *ios_.back(), udp_protocol));
77+
udp_servers_.push_back(std::make_shared<udp_server>(info_, *io_ctx_service_, udp_protocol));
7878
// create UDP multicast responders
7979
for (const auto &mcastaddr : cfg->multicast_addresses()) {
8080
try {
8181
// use only addresses for the protocol that we're supposed to use here
8282
auto address = asio::ip::make_address(mcastaddr);
8383
if (udp_protocol == udp::v4() ? address.is_v4() : address.is_v6())
8484
responders_.push_back(std::make_shared<udp_server>(
85-
info_, *ios_.back(), mcastaddr, multicast_port, multicast_ttl, listen_address));
85+
info_, *io_ctx_service_, mcastaddr, multicast_port, multicast_ttl, listen_address));
8686
} catch (std::exception &e) {
8787
LOG_F(WARNING, "Couldn't create multicast responder for %s (%s)", mcastaddr.c_str(),
8888
e.what());
@@ -105,18 +105,20 @@ stream_outlet_impl::~stream_outlet_impl() {
105105
// 4. waiting a bit and
106106
// 5. detaching thread, i.e. letting it hang and continue tearing down
107107
// the outlet
108-
for (auto &ios : ios_) asio::post(*ios, [ios]() { ios->stop(); });
108+
asio::post(*io_ctx_data_, [io = io_ctx_data_]() { io->stop(); });
109+
asio::post(*io_ctx_service_, [io = io_ctx_service_]() { io->stop(); });
109110
const char *name = this->info().name().c_str();
110111
for (int try_nr = 0; try_nr <= 100; ++try_nr) {
111112
switch (try_nr) {
112113
case 0: DLOG_F(INFO, "Trying to join IO threads for %s", name); break;
113114
case 20: LOG_F(INFO, "Waiting for %s's IO threads to end", name); break;
114115
case 80:
115116
LOG_F(WARNING, "Stopping io_contexts for %s", name);
117+
io_ctx_data_->stop();
118+
io_ctx_service_->stop();
116119
for (std::size_t k = 0; k < io_threads_.size(); k++) {
117120
if (!io_threads_[k]->joinable()) {
118-
LOG_F(ERROR, "Tearing down stream_outlet of %s's thread #%lu", name, k);
119-
ios_[k]->stop();
121+
LOG_F(ERROR, "%s's io thread #%lu still running", name, k);
120122
}
121123
}
122124
break;

src/stream_outlet_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,8 @@ class stream_outlet_impl {
317317
stream_info_impl_p info_;
318318
/// the single-producer, multiple-receiver send buffer
319319
send_buffer_p send_buffer_;
320-
/// the IO service objects (two per stack: one for UDP and one for TCP)
321-
std::vector<io_context_p> ios_;
320+
/// the IO service objects
321+
io_context_p io_ctx_data_, io_ctx_service_;
322322

323323
/// the threaded TCP data server(s); two if using both IP stacks
324324
std::vector<tcp_server_p> tcp_servers_;

0 commit comments

Comments
 (0)