Skip to content

Commit cf97ba5

Browse files
cboulaytstenner
authored andcommitted
Implement synchronous outlet for zero-copy writes.
1 parent 05f7e16 commit cf97ba5

File tree

7 files changed

+138
-15
lines changed

7 files changed

+138
-15
lines changed

examples/ReceiveDataInChunks.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ int main(int argc, char **argv) {
1818
try {
1919

2020
std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
21-
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
21+
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
2222
bool flush = argc > 3;
2323
// resolve the stream of interest & make an inlet
24+
int32_t buf_samples = (int32_t)(max_buflen * 1000);
2425
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
25-
lsl::stream_inlet inlet(inlet_info, max_buffered);
26+
lsl::stream_inlet inlet(inlet_info, max_buflen, transp_bufsize_thousandths);
2627

2728
// Use set_postprocessing to get the timestamps in a common base clock.
2829
// Do not use if this application will record timestamps to disk -- it is better to

examples/SendDataInChunks.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,24 @@ struct fake_device {
9191

9292
int main(int argc, char **argv) {
9393
std::cout << "SendDataInChunks" << std::endl;
94-
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
94+
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered "
95+
"chunk_rate nodata use_sync"
96+
<< std::endl;
9597
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
9698
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;
97-
99+
std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into "
100+
"the buffer."
101+
<< std::endl;
102+
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;
103+
98104
std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
99105
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
100106
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
101107
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
102108
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
109+
bool nodata = argc > 7;
110+
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
111+
103112
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
104113
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
105114

@@ -117,7 +126,9 @@ int main(int argc, char **argv) {
117126
chn.append_child_value("type", type);
118127
}
119128
int32_t buf_samples = max_buffered * samplingrate;
120-
lsl::stream_outlet outlet(info, chunk_samples, buf_samples);
129+
auto flags = static_cast<lsl_transport_options_t>(
130+
(do_sync ? transp_sync_blocking : transp_default) | transp_bufsize_samples);
131+
lsl::stream_outlet outlet(info, chunk_samples, buf_samples, flags);
121132
info = outlet.info(); // Refresh info with whatever the outlet captured.
122133
std::cout << "Stream UID: " << info.uid() << std::endl;
123134

@@ -127,6 +138,7 @@ int main(int argc, char **argv) {
127138
// Prepare buffer to get data from 'device'.
128139
// The buffer should be larger than you think you need. Here we make it 4x as large.
129140
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
141+
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);
130142

131143
std::cout << "Now sending data..." << std::endl;
132144

@@ -140,7 +152,7 @@ int main(int argc, char **argv) {
140152
std::this_thread::sleep_until(next_chunk_time);
141153

142154
// Get data from device
143-
std::size_t returned_samples = my_device.get_data(chunk_buffer);
155+
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);
144156

145157
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
146158
// other push_chunk methods are easier but slightly slower.

include/lsl/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ typedef enum {
161161
/// The supplied max_buf should be scaled by 0.001.
162162
transp_bufsize_thousandths = 2,
163163

164+
/// The outlet will use synchronous (blocking) calls to asio to push data
165+
transp_sync_blocking = 4,
166+
164167
// prevent compilers from assuming an instance fits in a single byte
165168
_lsl_transport_options_maxval = 0x7f000000
166169
} lsl_transport_options_t;

src/stream_outlet_impl.cpp

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu
2222
chunk_size_(info.calc_transport_buf_samples(requested_bufsize, flags)),
2323
info_(std::make_shared<stream_info_impl>(info)),
2424
send_buffer_(std::make_shared<send_buffer>(chunk_size_)),
25-
io_ctx_data_(std::make_shared<asio::io_context>(1)),
25+
do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared<asio::io_context>(1)),
2626
io_ctx_service_(std::make_shared<asio::io_context>(1)) {
27+
28+
if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) {
29+
LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async.");
30+
do_sync_ = false;
31+
}
32+
2733
ensure_lsl_initialized();
2834
const api_config *cfg = api_config::get_instance();
2935

@@ -148,8 +154,22 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
148154
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
149155
sample_p smp(
150156
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
151-
smp->assign_untyped(data);
152-
send_buffer_->push_sample(smp);
157+
if (!do_sync_) {
158+
smp->assign_untyped(data); // Note: Makes a copy!
159+
send_buffer_->push_sample(smp);
160+
} else {
161+
if (timestamp == DEDUCED_TIMESTAMP) {
162+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
163+
} else {
164+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
165+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
166+
}
167+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
168+
if (pushthrough) {
169+
tcp_server_->write_all_blocking(sync_buffs_);
170+
sync_buffs_.clear();
171+
}
172+
}
153173
}
154174

155175
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
@@ -163,8 +183,22 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
163183
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
164184
sample_p smp(
165185
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
166-
smp->assign_typed(data);
167-
send_buffer_->push_sample(smp);
186+
if (!do_sync_) {
187+
smp->assign_typed(data);
188+
send_buffer_->push_sample(smp);
189+
} else {
190+
if (timestamp == DEDUCED_TIMESTAMP) {
191+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
192+
} else {
193+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
194+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
195+
}
196+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
197+
if (pushthrough) {
198+
tcp_server_->write_all_blocking(sync_buffs_);
199+
sync_buffs_.clear();
200+
}
201+
}
168202
}
169203

170204
template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);

src/stream_outlet_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "common.h"
55
#include "forward.h"
66
#include "stream_info_impl.h"
7+
#include <asio/buffer.hpp>
78
#include <cstdint>
89
#include <loguru.hpp>
910
#include <memory>
@@ -319,6 +320,8 @@ class stream_outlet_impl {
319320
stream_info_impl_p info_;
320321
/// the single-producer, multiple-receiver send buffer
321322
send_buffer_p send_buffer_;
323+
/// Flag to indicate that push_* operations should be blocking synchronous. false by default.
324+
bool do_sync_;
322325
/// the IO service objects
323326
io_context_p io_ctx_data_, io_ctx_service_;
324327

@@ -331,6 +334,8 @@ class stream_outlet_impl {
331334
std::vector<udp_server_p> responders_;
332335
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
333336
std::vector<thread_p> io_threads_;
337+
/// buffers used in synchronous call to gather-write data directly to the socket.
338+
std::vector<asio::const_buffer> sync_buffs_;
334339
};
335340

336341
} // namespace lsl

src/tcp_server.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ class client_session : public std::enable_shared_from_this<client_session> {
142142
};
143143

144144
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
145-
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6)
145+
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
146146
: chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
147-
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) {
147+
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) {
148148
// assign connection-dependent fields
149149
info_->session_id(api_config::get_instance()->session_id());
150150
info_->reset_uid();
@@ -231,7 +231,47 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
231231
}
232232

233233

234-
// === graceful cancellation of in-flight sockets ===
234+
// === synchronous transfer
235+
236+
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
237+
int writes_outstanding = 0;
238+
bool any_session_broken = false;
239+
240+
for (auto &sock : sync_sockets_) {
241+
asio::async_write(*sock, bufs,
242+
[this, sock, &writes_outstanding](
243+
const asio::error_code &ec, size_t bytes_transferred) {
244+
writes_outstanding--;
245+
switch (ec.value()) {
246+
case 0: break; // success
247+
case asio::error::broken_pipe:
248+
case asio::error::connection_reset:
249+
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
250+
{
251+
asio::error_code close_ec;
252+
sock->close(close_ec);
253+
}
254+
break;
255+
default:
256+
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
257+
}
258+
});
259+
writes_outstanding++;
260+
}
261+
try {
262+
assert(sync_transfer_io_ctx_);
263+
sync_transfer_io_ctx_->restart();
264+
while (writes_outstanding) sync_transfer_io_ctx_->run_one();
265+
if (any_session_broken) {
266+
// remove sessions whose socket was closed
267+
auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(),
268+
[](const tcp_socket_p &sock) {
269+
return !sock->is_open();
270+
});
271+
sync_sockets_.erase(new_end_it, sync_sockets_.end());
272+
}
273+
} catch (std::exception &e) { LOG_F(ERROR, "Error during write_all_blocking: %s", e.what()); }
274+
}
235275

236276
void tcp_server::register_inflight_session(const std::shared_ptr<client_session> &session) {
237277
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
@@ -539,6 +579,16 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
539579
// convenient for unit tests
540580
if (max_buffered_ <= 0) return;
541581

582+
if (serv->transfer_is_sync_) {
583+
LOG_F(INFO, "Using synchronous blocking transfers for new client session.");
584+
asio::post(*serv->sync_transfer_io_ctx_,
585+
[serv, sock_p = std::make_shared<tcp_socket>(std::move(sock_))]() {
586+
serv->sync_sockets_.emplace_back(std::move(sock_p));
587+
});
588+
serv->unregister_inflight_session(this);
589+
return;
590+
}
591+
542592
// determine transfer parameters
543593
auto queue = serv->send_buffer_->new_consumer(max_buffered_);
544594

src/tcp_server.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
4747
* @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server.
4848
* @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines
4949
* the effective chunking.
50+
* @param do_sync Set true to indicate data transfer should happen synchronously in a blocking
51+
* call. Default false -- asynchronous transfer in a thread (copies data).
5052
*/
5153
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
52-
int chunk_size, bool allow_v4, bool allow_v6);
54+
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);
5355

5456
/**
5557
* Begin serving TCP connections.
@@ -67,6 +69,12 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
6769
*/
6870
void end_serving();
6971

72+
/**
73+
* Write directly to each socket. This should only be used when server initialized with
74+
* do_async = false.
75+
*/
76+
void write_all_blocking(std::vector<asio::const_buffer> buffs);
77+
7078
private:
7179
friend class client_session;
7280

@@ -95,6 +103,16 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
95103
// acceptor socket
96104
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket
97105

106+
107+
// sync mode fields
108+
109+
// Flag to indicate that new client_sessions should use synchronous blocking data transfer.
110+
bool transfer_is_sync_;
111+
// sockets that should receive data in sync mode
112+
std::vector<tcp_socket_p> sync_sockets_;
113+
// io context for sync mode, app is responsible for running it
114+
std::unique_ptr<asio::io_context> sync_transfer_io_ctx_;
115+
98116
// registry of in-flight asessions (for cancellation)
99117
std::map<void *, std::weak_ptr<client_session>> inflight_;
100118
std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access

0 commit comments

Comments
 (0)