Skip to content

Commit 6dc6d67

Browse files
committed
Implement synchronous outlet for zero-copy writes.
1 parent cca1428 commit 6dc6d67

File tree

7 files changed

+138
-15
lines changed

7 files changed

+138
-15
lines changed

examples/ReceiveDataInChunks.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ int main(int argc, char **argv) {
1818
try {
1919

2020
std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
21-
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
21+
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
2222
bool flush = argc > 3;
2323
// resolve the stream of interest & make an inlet
24+
int32_t buf_samples = (int32_t)(max_buflen * 1000);
2425
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
25-
lsl::stream_inlet inlet(inlet_info, (int32_t)max_buffered);
26+
lsl::stream_inlet inlet(inlet_info, max_buflen, transp_bufsize_thousandths);
2627

2728
// Use set_postprocessing to get the timestamps in a common base clock.
2829
// Do not use if this application will record timestamps to disk -- it is better to

examples/SendDataInChunks.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,24 @@ struct fake_device {
9292

9393
int main(int argc, char **argv) {
9494
std::cout << "SendDataInChunks" << std::endl;
95-
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
95+
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered "
96+
"chunk_rate nodata use_sync"
97+
<< std::endl;
9698
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
9799
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;
98-
100+
std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into "
101+
"the buffer."
102+
<< std::endl;
103+
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;
104+
99105
std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
100106
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
101107
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
102108
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
103109
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
110+
bool nodata = argc > 7;
111+
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
112+
104113
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
105114
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
106115

@@ -118,7 +127,9 @@ int main(int argc, char **argv) {
118127
chn.append_child_value("type", type);
119128
}
120129
int32_t buf_samples = (int32_t)(max_buffered * samplingrate);
121-
lsl::stream_outlet outlet(info, chunk_samples, buf_samples);
130+
auto flags = static_cast<lsl_transport_options_t>(
131+
(do_sync ? transp_sync_blocking : transp_default) | transp_bufsize_samples);
132+
lsl::stream_outlet outlet(info, chunk_samples, buf_samples, flags);
122133
info = outlet.info(); // Refresh info with whatever the outlet captured.
123134
std::cout << "Stream UID: " << info.uid() << std::endl;
124135

@@ -128,6 +139,7 @@ int main(int argc, char **argv) {
128139
// Prepare buffer to get data from 'device'.
129140
// The buffer should be larger than you think you need. Here we make it 4x as large.
130141
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
142+
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);
131143

132144
std::cout << "Now sending data..." << std::endl;
133145

@@ -141,7 +153,7 @@ int main(int argc, char **argv) {
141153
std::this_thread::sleep_until(next_chunk_time);
142154

143155
// Get data from device
144-
std::size_t returned_samples = my_device.get_data(chunk_buffer);
156+
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);
145157

146158
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
147159
// other push_chunk methods are easier but slightly slower.

include/lsl/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ typedef enum {
161161
/// The supplied max_buf should be scaled by 0.001.
162162
transp_bufsize_thousandths = 2,
163163

164+
/// The outlet will use synchronous (blocking) calls to asio to push data
165+
transp_sync_blocking = 4,
166+
164167
// prevent compilers from assuming an instance fits in a single byte
165168
_lsl_transport_options_maxval = 0x7f000000
166169
} lsl_transport_options_t;

src/stream_outlet_impl.cpp

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu
2222
chunk_size_(info.calc_transport_buf_samples(requested_bufsize, flags)),
2323
info_(std::make_shared<stream_info_impl>(info)),
2424
send_buffer_(std::make_shared<send_buffer>(chunk_size_)),
25-
io_ctx_data_(std::make_shared<asio::io_context>(1)),
25+
do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared<asio::io_context>(1)),
2626
io_ctx_service_(std::make_shared<asio::io_context>(1)) {
27+
28+
if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) {
29+
LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async.");
30+
do_sync_ = false;
31+
}
32+
2733
ensure_lsl_initialized();
2834
const api_config *cfg = api_config::get_instance();
2935

@@ -147,8 +153,22 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
147153
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
148154
sample_p smp(
149155
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
150-
smp->assign_untyped(data);
151-
send_buffer_->push_sample(smp);
156+
if (!do_sync_) {
157+
smp->assign_untyped(data); // Note: Makes a copy!
158+
send_buffer_->push_sample(smp);
159+
} else {
160+
if (timestamp == DEDUCED_TIMESTAMP) {
161+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
162+
} else {
163+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
164+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
165+
}
166+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
167+
if (pushthrough) {
168+
tcp_server_->write_all_blocking(sync_buffs_);
169+
sync_buffs_.clear();
170+
}
171+
}
152172
}
153173

154174
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
@@ -162,8 +182,22 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
162182
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
163183
sample_p smp(
164184
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
165-
smp->assign_typed(data);
166-
send_buffer_->push_sample(smp);
185+
if (!do_sync_) {
186+
smp->assign_typed(data);
187+
send_buffer_->push_sample(smp);
188+
} else {
189+
if (timestamp == DEDUCED_TIMESTAMP) {
190+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
191+
} else {
192+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
193+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
194+
}
195+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
196+
if (pushthrough) {
197+
tcp_server_->write_all_blocking(sync_buffs_);
198+
sync_buffs_.clear();
199+
}
200+
}
167201
}
168202

169203
template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);

src/stream_outlet_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "common.h"
55
#include "forward.h"
66
#include "stream_info_impl.h"
7+
#include <asio/buffer.hpp>
78
#include <cstdint>
89
#include <loguru.hpp>
910
#include <memory>
@@ -319,6 +320,8 @@ class stream_outlet_impl {
319320
stream_info_impl_p info_;
320321
/// the single-producer, multiple-receiver send buffer
321322
send_buffer_p send_buffer_;
323+
/// Flag to indicate that push_* operations should be blocking synchronous. false by default.
324+
bool do_sync_;
322325
/// the IO service objects
323326
io_context_p io_ctx_data_, io_ctx_service_;
324327

@@ -331,6 +334,8 @@ class stream_outlet_impl {
331334
std::vector<udp_server_p> responders_;
332335
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
333336
std::vector<thread_p> io_threads_;
337+
/// buffers used in synchronous call to gather-write data directly to the socket.
338+
std::vector<asio::const_buffer> sync_buffs_;
334339
};
335340

336341
} // namespace lsl

src/tcp_server.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ class client_session : public std::enable_shared_from_this<client_session> {
143143
};
144144

145145
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
146-
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6)
146+
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
147147
: chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
148-
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) {
148+
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) {
149149
// assign connection-dependent fields
150150
info_->session_id(api_config::get_instance()->session_id());
151151
info_->reset_uid();
@@ -232,7 +232,47 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
232232
}
233233

234234

235-
// === graceful cancellation of in-flight sockets ===
235+
// === synchronous transfer
236+
237+
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
238+
int writes_outstanding = 0;
239+
bool any_session_broken = false;
240+
241+
for (auto &sock : sync_sockets_) {
242+
asio::async_write(*sock, bufs,
243+
[this, sock, &writes_outstanding](
244+
const asio::error_code &ec, size_t bytes_transferred) {
245+
writes_outstanding--;
246+
switch (ec.value()) {
247+
case 0: break; // success
248+
case asio::error::broken_pipe:
249+
case asio::error::connection_reset:
250+
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
251+
{
252+
asio::error_code close_ec;
253+
sock->close(close_ec);
254+
}
255+
break;
256+
default:
257+
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
258+
}
259+
});
260+
writes_outstanding++;
261+
}
262+
try {
263+
assert(sync_transfer_io_ctx_);
264+
sync_transfer_io_ctx_->restart();
265+
while (writes_outstanding) sync_transfer_io_ctx_->run_one();
266+
if (any_session_broken) {
267+
// remove sessions whose socket was closed
268+
auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(),
269+
[](const tcp_socket_p &sock) {
270+
return !sock->is_open();
271+
});
272+
sync_sockets_.erase(new_end_it, sync_sockets_.end());
273+
}
274+
} catch (std::exception &e) { LOG_F(ERROR, "Error during write_all_blocking: %s", e.what()); }
275+
}
236276

237277
void tcp_server::register_inflight_session(const std::shared_ptr<client_session> &session) {
238278
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
@@ -535,6 +575,16 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
535575
// convenient for unit tests
536576
if (max_buffered_ <= 0) return;
537577

578+
if (serv->transfer_is_sync_) {
579+
LOG_F(INFO, "Using synchronous blocking transfers for new client session.");
580+
asio::post(*serv->sync_transfer_io_ctx_,
581+
[serv, sock_p = std::make_shared<tcp_socket>(std::move(sock_))]() {
582+
serv->sync_sockets_.emplace_back(std::move(sock_p));
583+
});
584+
serv->unregister_inflight_session(this);
585+
return;
586+
}
587+
538588
// determine transfer parameters
539589
auto queue = serv->send_buffer_->new_consumer(max_buffered_);
540590

src/tcp_server.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
4747
* @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server.
4848
* @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines
4949
* the effective chunking.
50+
* @param do_sync Set true to indicate data transfer should happen synchronously in a blocking
51+
* call. Default false -- asynchronous transfer in a thread (copies data).
5052
*/
5153
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
52-
int chunk_size, bool allow_v4, bool allow_v6);
54+
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);
5355

5456
/**
5557
* Begin serving TCP connections.
@@ -67,6 +69,12 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
6769
*/
6870
void end_serving();
6971

72+
/**
73+
* Write directly to each socket. This should only be used when server initialized with
74+
* do_async = false.
75+
*/
76+
void write_all_blocking(std::vector<asio::const_buffer> buffs);
77+
7078
private:
7179
friend class client_session;
7280

@@ -95,6 +103,16 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
95103
// acceptor socket
96104
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket
97105

106+
107+
// sync mode fields
108+
109+
// Flag to indicate that new client_sessions should use synchronous blocking data transfer.
110+
bool transfer_is_sync_;
111+
// sockets that should receive data in sync mode
112+
std::vector<tcp_socket_p> sync_sockets_;
113+
// io context for sync mode, app is responsible for running it
114+
std::unique_ptr<asio::io_context> sync_transfer_io_ctx_;
115+
98116
// registry of in-flight asessions (for cancellation)
99117
std::map<void *, std::weak_ptr<client_session>> inflight_;
100118
std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access

0 commit comments

Comments
 (0)