@@ -212,14 +212,9 @@ void tcp_server::end_serving() {
212212
213213void tcp_server::accept_next_connection (tcp_acceptor_p &acceptor) {
214214 try {
215- // Select the IO context for handling the socket
216- // for `transfer_is_sync`, IO is done in the thread calling `push_sample`,
217- // otherwise in the outlet's IO thread / IO context
218- auto &sock_io_ctx = transfer_is_sync_ ? *sync_transfer_io_ctx_ : *io_;
219-
220- // accept a connection on the session's socket
221- acceptor->async_accept (sock_io_ctx, [shared_this = shared_from_this (), &acceptor](
222- err_t err, tcp_socket sock) {
215+ // accept a new connection
216+ acceptor->async_accept (*io_, [shared_this = shared_from_this (), &acceptor](
217+ err_t err, tcp_socket sock) {
223218 if (err == asio::error::operation_aborted || err == asio::error::shut_down) return ;
224219
225220 // no error: create a new session and start processing
@@ -240,34 +235,36 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
240235// === synchronous transfer
241236
242237void tcp_server::write_all_blocking (std::vector<asio::const_buffer> bufs) {
243- int writes_outstanding = 0 ;
244238 bool any_session_broken = false ;
245239
246240 for (auto &sock : sync_sockets_) {
247241 asio::async_write (*sock, bufs,
248- [this , sock, &writes_outstanding ](
242+ [this , & sock, &any_session_broken ](
249243 const asio::error_code &ec, size_t bytes_transferred) {
250- writes_outstanding--;
251244 switch (ec.value ()) {
252245 case 0 : break ; // success
253246 case asio::error::broken_pipe:
254247 case asio::error::connection_reset:
255248 LOG_F (WARNING, " Broken Pipe / Connection Reset detected. Closing socket." );
256- {
249+ any_session_broken = true ;
250+ asio::post (*sync_transfer_io_ctx_, [sock]() {
257251 asio::error_code close_ec;
258252 sock->close (close_ec);
259- }
253+ });
254+ break ;
255+ case asio::error::operation_aborted:
256+ LOG_F (INFO, " Socket wasn't fast enough" );
260257 break ;
261258 default :
262- LOG_F (WARNING , " Unhandled write_all_blocking error: %s." , ec.message ().c_str ());
259+ LOG_F (ERROR , " Unhandled write_all_blocking error: %s." , ec.message ().c_str ());
263260 }
264261 });
265- writes_outstanding++;
266262 }
267263 try {
268- assert (sync_transfer_io_ctx_);
264+ // prepare the io context for new work
269265 sync_transfer_io_ctx_->restart ();
270- while (writes_outstanding) sync_transfer_io_ctx_->run_one ();
266+ sync_transfer_io_ctx_->run ();
267+
271268 if (any_session_broken) {
272269 // remove sessions whose socket was closed
273270 auto new_end_it = std::remove_if (sync_sockets_.begin (), sync_sockets_.end (),
@@ -582,10 +579,19 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
582579
583580 if (serv->transfer_is_sync_ ) {
584581 LOG_F (INFO, " Using synchronous blocking transfers for new client session." );
585- asio::post (*serv->sync_transfer_io_ctx_ ,
586- [serv, sock_p = std::make_shared<tcp_socket>(std::move (sock_))]() {
587- serv->sync_sockets_ .emplace_back (std::move (sock_p));
588- });
582+ auto &sock_io_ctx = *serv->sync_transfer_io_ctx_ ;
583+
584+ // move the socket into the sync_transfer_io_ctx by releasing it from this
585+ // io ctx and re-creating it with sync_transfer_io_ctx.
586+ // See https://stackoverflow.com/q/52671836/73299
587+ // Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
588+ auto protocol = sock_.local_endpoint ().protocol ();
589+ auto new_sock = std::make_shared<tcp_socket>(sock_io_ctx, protocol, sock_.release ());
590+
591+ asio::post (sock_io_ctx, [serv, sock_p = std::move (new_sock)]() {
592+ LOG_F (1 , " Moved socket to new io_ctx" );
593+ serv->sync_sockets_ .emplace_back (std::move (sock_p));
594+ });
589595 serv->unregister_inflight_session (this );
590596 return ;
591597 }
0 commit comments