Skip to content

Commit 15d16aa

Browse files
committed
Refactor sync tcp server; add sync_transfer_handler
1 parent 8302e84 commit 15d16aa

File tree

2 files changed

+44
-25
lines changed

2 files changed

+44
-25
lines changed

src/tcp_server.cpp

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,31 @@ class client_session : public std::enable_shared_from_this<client_session> {
143143
std::condition_variable completion_cond_;
144144
};
145145

146+
class sync_transfer_handler {
147+
bool transfer_is_sync_;
148+
// sockets that should receive data in sync mode
149+
std::vector<tcp_socket_p> sync_sockets_;
150+
// io context for sync mode, app is responsible for running it
151+
asio::io_context io_ctx_;
152+
public:
153+
sync_transfer_handler(): io_ctx_(1) {
154+
155+
}
156+
157+
/// schedules a native socket handle to be added the next time a push operation is done
158+
void add_socket(const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) {
159+
asio::post(io_ctx_, [=](){
160+
sync_sockets_.push_back(std::make_unique<tcp_socket>(io_ctx_, protocol, handle));
161+
});
162+
}
163+
void write_all_blocking(const std::vector<asio::const_buffer> &bufs);
164+
};
165+
146166
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
147167
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
148168
: chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
149-
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) {
150-
if (transfer_is_sync_) sync_transfer_io_ctx_ = std::make_unique<asio::io_context>(1);
169+
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) {
170+
if (do_sync) sync_handler = std::make_unique<sync_transfer_handler>();
151171
// assign connection-dependent fields
152172
info_->session_id(api_config::get_instance()->session_id());
153173
info_->reset_uid();
@@ -180,6 +200,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s
180200
throw std::runtime_error("Failed to instantiate socket acceptors for the TCP server");
181201
}
182202

203+
tcp_server::~tcp_server() noexcept
204+
{
205+
// defined here so the compiler can generate the destructor for the sync_handler
206+
}
207+
183208

184209
// === externally issued asynchronous commands ===
185210

@@ -233,7 +258,12 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
233258

234259
// === synchronous transfer
235260

236-
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
261+
void tcp_server::write_all_blocking(const std::vector<asio::const_buffer> &bufs)
262+
{
263+
sync_handler->write_all_blocking(bufs);
264+
}
265+
266+
void sync_transfer_handler::write_all_blocking(const std::vector<asio::const_buffer> &bufs) {
237267
bool any_session_broken = false;
238268

239269
for (auto &sock : sync_sockets_) {
@@ -246,7 +276,7 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
246276
case asio::error::connection_reset:
247277
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
248278
any_session_broken = true;
249-
asio::post(*sync_transfer_io_ctx_, [sock]() {
279+
asio::post(io_ctx_, [sock]() {
250280
asio::error_code close_ec;
251281
sock->close(close_ec);
252282
});
@@ -261,8 +291,8 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
261291
}
262292
try {
263293
// prepare the io context for new work
264-
sync_transfer_io_ctx_->restart();
265-
sync_transfer_io_ctx_->run();
294+
io_ctx_.restart();
295+
io_ctx_.run();
266296

267297
if (any_session_broken) {
268298
// remove sessions whose socket was closed
@@ -581,21 +611,14 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
581611
// convenient for unit tests
582612
if (max_buffered_ <= 0) return;
583613

584-
if (serv->transfer_is_sync_) {
614+
if (serv->sync_handler) {
585615
LOG_F(INFO, "Using synchronous blocking transfers for new client session.");
586-
auto &sock_io_ctx = *serv->sync_transfer_io_ctx_;
587-
616+
auto protocol = sock_.local_endpoint().protocol();
588617
// move the socket into the sync_transfer_io_ctx by releasing it from this
589618
// io ctx and re-creating it with sync_transfer_io_ctx.
590619
// See https://stackoverflow.com/q/52671836/73299
591620
// Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
592-
auto protocol = sock_.local_endpoint().protocol();
593-
auto new_sock = std::make_shared<tcp_socket>(sock_io_ctx, protocol, sock_.release());
594-
595-
asio::post(sock_io_ctx, [serv, sock_p = std::move(new_sock)]() {
596-
LOG_F(1, "Moved socket to new io_ctx");
597-
serv->sync_sockets_.emplace_back(std::move(sock_p));
598-
});
621+
serv->sync_handler->add_socket(sock_.release(), protocol);
599622
serv->unregister_inflight_session(this);
600623
return;
601624
}

src/tcp_server.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
5353
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
5454
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);
5555

56+
~tcp_server() noexcept;
57+
5658
/**
5759
* Begin serving TCP connections.
5860
*
@@ -73,7 +75,7 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
7375
* Write directly to each socket. This should only be used when server initialized with
7476
* do_sync = true.
7577
*/
76-
void write_all_blocking(std::vector<asio::const_buffer> bufs);
78+
void write_all_blocking(const std::vector<asio::const_buffer>& bufs);
7779

7880
private:
7981
friend class client_session;
@@ -104,14 +106,8 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
104106
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket
105107

106108

107-
// sync mode fields
108-
109-
// Flag to indicate that new client_sessions should use synchronous blocking data transfer.
110-
bool transfer_is_sync_;
111-
// sockets that should receive data in sync mode
112-
std::vector<tcp_socket_p> sync_sockets_;
113-
// io context for sync mode, app is responsible for running it
114-
std::unique_ptr<asio::io_context> sync_transfer_io_ctx_;
109+
// optional pointer to a handler class for synchronous transfers
110+
std::unique_ptr<class sync_transfer_handler> sync_handler;
115111

116112
// registry of in-flight asessions (for cancellation)
117113
std::map<void *, std::weak_ptr<client_session>> inflight_;

0 commit comments

Comments
 (0)