@@ -211,14 +211,9 @@ void tcp_server::end_serving() {
211211
212212void tcp_server::accept_next_connection (tcp_acceptor_p &acceptor) {
213213 try {
214- // Select the IO context for handling the socket
215- // for `transfer_is_sync`, IO is done in the thread calling `push_sample`,
216- // otherwise in the outlet's IO thread / IO context
217- auto &sock_io_ctx = transfer_is_sync_ ? *sync_transfer_io_ctx_ : *io_;
218-
219- // accept a connection on the session's socket
220- acceptor->async_accept (sock_io_ctx, [shared_this = shared_from_this (), &acceptor](
221- err_t err, tcp_socket sock) {
214+ // accept a new connection
215+ acceptor->async_accept (*io_, [shared_this = shared_from_this (), &acceptor](
216+ err_t err, tcp_socket sock) {
222217 if (err == asio::error::operation_aborted || err == asio::error::shut_down) return ;
223218
224219 // no error: create a new session and start processing
@@ -239,34 +234,36 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
239234// === synchronous transfer
240235
241236void tcp_server::write_all_blocking (std::vector<asio::const_buffer> bufs) {
242- int writes_outstanding = 0 ;
243237 bool any_session_broken = false ;
244238
245239 for (auto &sock : sync_sockets_) {
246240 asio::async_write (*sock, bufs,
247- [this , sock, &writes_outstanding ](
241+ [this , & sock, &any_session_broken ](
248242 const asio::error_code &ec, size_t bytes_transferred) {
249- writes_outstanding--;
250243 switch (ec.value ()) {
251244 case 0 : break ; // success
252245 case asio::error::broken_pipe:
253246 case asio::error::connection_reset:
254247 LOG_F (WARNING, " Broken Pipe / Connection Reset detected. Closing socket." );
255- {
248+ any_session_broken = true ;
249+ asio::post (*sync_transfer_io_ctx_, [sock]() {
256250 asio::error_code close_ec;
257251 sock->close (close_ec);
258- }
252+ });
253+ break ;
254+ case asio::error::operation_aborted:
255+ LOG_F (INFO, " Socket wasn't fast enough" );
259256 break ;
260257 default :
261- LOG_F (WARNING , " Unhandled write_all_blocking error: %s." , ec.message ().c_str ());
258+ LOG_F (ERROR , " Unhandled write_all_blocking error: %s." , ec.message ().c_str ());
262259 }
263260 });
264- writes_outstanding++;
265261 }
266262 try {
267- assert (sync_transfer_io_ctx_);
263+ // prepare the io context for new work
268264 sync_transfer_io_ctx_->restart ();
269- while (writes_outstanding) sync_transfer_io_ctx_->run_one ();
265+ sync_transfer_io_ctx_->run ();
266+
270267 if (any_session_broken) {
271268 // remove sessions whose socket was closed
272269 auto new_end_it = std::remove_if (sync_sockets_.begin (), sync_sockets_.end (),
@@ -586,10 +583,19 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
586583
587584 if (serv->transfer_is_sync_ ) {
588585 LOG_F (INFO, " Using synchronous blocking transfers for new client session." );
589- asio::post (*serv->sync_transfer_io_ctx_ ,
590- [serv, sock_p = std::make_shared<tcp_socket>(std::move (sock_))]() {
591- serv->sync_sockets_ .emplace_back (std::move (sock_p));
592- });
586+ auto &sock_io_ctx = *serv->sync_transfer_io_ctx_ ;
587+
588+ // move the socket into the sync_transfer_io_ctx by releasing it from this
589+ // io ctx and re-creating it with sync_transfer_io_ctx.
590+ // See https://stackoverflow.com/q/52671836/73299
591+ // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
592+ auto protocol = sock_.local_endpoint ().protocol ();
593+ auto new_sock = std::make_shared<tcp_socket>(sock_io_ctx, protocol, sock_.release ());
594+
595+ asio::post (sock_io_ctx, [serv, sock_p = std::move (new_sock)]() {
596+ LOG_F (1 , " Moved socket to new io_ctx" );
597+ serv->sync_sockets_ .emplace_back (std::move (sock_p));
598+ });
593599 serv->unregister_inflight_session (this );
594600 return ;
595601 }
0 commit comments