@@ -144,11 +144,31 @@ class client_session : public std::enable_shared_from_this<client_session> {
144144 std::condition_variable completion_cond_;
145145};
146146
147+ class sync_transfer_handler {
148+ bool transfer_is_sync_;
149+ // sockets that should receive data in sync mode
150+ std::vector<tcp_socket_p> sync_sockets_;
151+ // io context for sync mode, app is responsible for running it
152+ asio::io_context io_ctx_;
153+ public:
154+ sync_transfer_handler (): io_ctx_(1 ) {
155+
156+ }
157+
158+ // / schedules a native socket handle to be added the next time a push operation is done
159+ void add_socket (const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) {
160+ asio::post (io_ctx_, [=](){
161+ sync_sockets_.push_back (std::make_unique<tcp_socket>(io_ctx_, protocol, handle));
162+ });
163+ }
164+ void write_all_blocking (const std::vector<asio::const_buffer> &bufs);
165+ };
166+
147167tcp_server::tcp_server (stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
148168 factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
149169 : chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
150- factory_ (std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) {
151- if (transfer_is_sync_) sync_transfer_io_ctx_ = std::make_unique<asio::io_context>( 1 );
170+ factory_ (std::move(factory)), send_buffer_(std::move(sendbuf)) {
171+ if (do_sync) sync_handler = std::make_unique<sync_transfer_handler>( );
152172 // assign connection-dependent fields
153173 info_->session_id (api_config::get_instance ()->session_id ());
154174 info_->reset_uid ();
@@ -181,6 +201,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s
181201 throw std::runtime_error (" Failed to instantiate socket acceptors for the TCP server" );
182202}
183203
204+ tcp_server::~tcp_server () noexcept
205+ {
206+ // defined here so the compiler can generate the destructor for the sync_handler
207+ }
208+
184209
185210// === externally issued asynchronous commands ===
186211
@@ -234,7 +259,12 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
234259
235260// === synchronous transfer
236261
237- void tcp_server::write_all_blocking (std::vector<asio::const_buffer> bufs) {
262+ void tcp_server::write_all_blocking (const std::vector<asio::const_buffer> &bufs)
263+ {
264+ sync_handler->write_all_blocking (bufs);
265+ }
266+
267+ void sync_transfer_handler::write_all_blocking (const std::vector<asio::const_buffer> &bufs) {
238268 bool any_session_broken = false ;
239269
240270 for (auto &sock : sync_sockets_) {
@@ -247,7 +277,7 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
247277 case asio::error::connection_reset:
248278 LOG_F (WARNING, " Broken Pipe / Connection Reset detected. Closing socket." );
249279 any_session_broken = true ;
250- asio::post (*sync_transfer_io_ctx_ , [sock]() {
280+ asio::post (io_ctx_ , [sock]() {
251281 asio::error_code close_ec;
252282 sock->close (close_ec);
253283 });
@@ -262,8 +292,8 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
262292 }
263293 try {
264294 // prepare the io context for new work
265- sync_transfer_io_ctx_-> restart ();
266- sync_transfer_io_ctx_-> run ();
295+ io_ctx_. restart ();
296+ io_ctx_. run ();
267297
268298 if (any_session_broken) {
269299 // remove sessions whose socket was closed
@@ -577,21 +607,14 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
577607 // convenient for unit tests
578608 if (max_buffered_ <= 0 ) return ;
579609
580- if (serv->transfer_is_sync_ ) {
610+ if (serv->sync_handler ) {
581611 LOG_F (INFO, " Using synchronous blocking transfers for new client session." );
582- auto &sock_io_ctx = *serv->sync_transfer_io_ctx_ ;
583-
612+ auto protocol = sock_.local_endpoint ().protocol ();
584613 // move the socket into the sync_transfer_io_ctx by releasing it from this
585614 // io ctx and re-creating it with sync_transfer_io_ctx.
586615 // See https://stackoverflow.com/q/52671836/73299
587616 // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
588- auto protocol = sock_.local_endpoint ().protocol ();
589- auto new_sock = std::make_shared<tcp_socket>(sock_io_ctx, protocol, sock_.release ());
590-
591- asio::post (sock_io_ctx, [serv, sock_p = std::move (new_sock)]() {
592- LOG_F (1 , " Moved socket to new io_ctx" );
593- serv->sync_sockets_ .emplace_back (std::move (sock_p));
594- });
617+ serv->sync_handler ->add_socket (sock_.release (), protocol);
595618 serv->unregister_inflight_session (this );
596619 return ;
597620 }
0 commit comments