@@ -148,10 +148,11 @@ class client_session : public std::enable_shared_from_this<client_session> {
148148};
149149
150150tcp_server::tcp_server (stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
151- factory_p factory, tcp protocol, int chunk_size)
151+ factory_p factory, tcp protocol, int chunk_size, bool do_async )
152152 : chunk_size_(chunk_size), shutdown_(false ), info_(std::move(info)), io_(std::move(io)),
153153 factory_ (std::move(factory)), send_buffer_(std::move(sendbuf)),
154- acceptor_(std::make_shared<tcp::acceptor>(*io_)) {
154+ acceptor_(std::make_shared<tcp::acceptor>(*io_)),
155+ transfer_is_async_(do_async) {
155156 // open the server connection
156157 acceptor_->open (protocol);
157158
@@ -222,34 +223,69 @@ void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsessio
222223 accept_next_connection ();
223224}
224225
226+ // === synchronous transfer
227+
228+ void tcp_server::write_all_blocking (std::vector<lslboost::asio::const_buffer> buffs) {
229+ std::lock_guard<std::recursive_mutex> lock (inflight_mut_);
230+ std::size_t bytes_sent;
231+ lslboost::system::error_code ec;
232+ for (const auto &x : inflight_ready_) {
233+ if (x.second && x.first ->is_open ()) {
234+ bytes_sent = x.first ->send (buffs, 0 , ec);
235+ if (ec) {
236+ switch (ec.value ()) {
237+ case lslboost::system::errc::broken_pipe:
238+ case lslboost::system::errc::connection_reset:
239+ LOG_F (WARNING, " Broken Pipe / Connection Reset detected. Closing socket." );
240+ inflight_ready_[x.first ] = false ;
241+ post (*io_, [x]() {
242+ close_inflight_socket (x);
243+ });
244+ // We leave it up to the client_session destructor to remove the socket.
245+ break ;
246+ default :
247+ LOG_F (WARNING, " Unhandled write_all_blocking error: %s." , ec.message ().c_str ());
248+ }
249+ }
250+ }
251+ }
252+ }
253+
225254// === graceful cancellation of in-flight sockets ===
226255
227256void tcp_server::register_inflight_socket (const tcp_socket_p &sock) {
228257 std::lock_guard<std::recursive_mutex> lock (inflight_mut_);
229- inflight_ .insert (sock);
258+ inflight_ready_ .insert ({ sock, false } );
230259}
231260
232261void tcp_server::unregister_inflight_socket (const tcp_socket_p &sock) {
233262 std::lock_guard<std::recursive_mutex> lock (inflight_mut_);
234- inflight_.erase (sock);
263+ inflight_ready_[sock] = false ;
264+ inflight_ready_.erase (sock);
265+ }
266+
267+ void tcp_server::close_inflight_socket (std::pair<tcp_socket_p, bool > x) {
268+ try {
269+ if (x.first ->is_open ()) {
270+ try {
271+ // (in some cases shutdown may fail)
272+ x.first ->shutdown (x.first ->shutdown_both );
273+ } catch (...) {}
274+ x.first ->close ();
275+ }
276+ } catch (std::exception &e) {
277+ LOG_F (WARNING, " Error during shutdown_and_close: %s" , e.what ());
278+ }
235279}
236280
237281void tcp_server::close_inflight_sockets () {
238282 std::lock_guard<std::recursive_mutex> lock (inflight_mut_);
239- for (const auto &sock : inflight_)
240- post (*io_, [sock]() {
241- try {
242- if (sock->is_open ()) {
243- try {
244- // (in some cases shutdown may fail)
245- sock->shutdown (sock->shutdown_both );
246- } catch (...) {}
247- sock->close ();
248- }
249- } catch (std::exception &e) {
250- LOG_F (WARNING, " Error during shutdown_and_close: %s" , e.what ());
251- }
283+ for (const auto &x : inflight_ready_) {
284+ inflight_ready_[x.first ] = false ;
285+ post (*io_, [x]() {
286+ close_inflight_socket (x);
252287 });
288+ }
253289}
254290
255291
@@ -509,8 +545,17 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
509545 feedbuf_.consume (n);
510546 // register outstanding work at the server (will be unregistered at session destruction)
511547 work_ = std::make_shared<work_p::element_type>(serv_->io_ ->get_executor ());
512- // spawn a sample transfer thread
548+ serv_-> inflight_ready_ [sock_] = true ;
513549 std::thread (&client_session::transfer_samples_thread, this , shared_from_this ()).detach ();
550+
551+ /*
552+ if (serv_->transfer_is_async_)
553+ // spawn a sample transfer thread
554+ std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
555+ else
556+ LOG_F(WARNING, "Using synchronous-only transfer for new client session!");
557+ */
558+
514559 } catch (std::exception &e) {
515560 LOG_F (WARNING, " Unexpected error while handling the feedheader send outcome: %s" , e.what ());
516561 }
0 commit comments