44#include < asio/ip/tcp.hpp>
55#include < asio/ip/udp.hpp>
66#include < asio/ip/v6_only.hpp>
7+ #include < asio/read.hpp>
8+ #include < asio/use_future.hpp>
9+ #include < asio/write.hpp>
710#include < catch2/catch.hpp>
811#include < chrono>
912#include < condition_variable>
1518// clazy:excludeall=non-pod-global-static
1619
1720using namespace asio ;
21+ using namespace std ::chrono_literals;
1822using err_t = const asio::error_code &;
1923
2024static uint16_t port = 28812 ;
@@ -25,6 +29,20 @@ static std::mutex output_mutex;
2529
2630asio::const_buffer hellobuf () { return asio::const_buffer (hello, sizeof (hello)); }
2731
32+
33+ // / launches a task and waits for the underlying thread to have started
34+ template <typename Fun> std::future<void > launch_task (Fun &&fun) {
35+ std::promise<void > started;
36+ auto started_fut = started.get_future ();
37+ std::future<void > done_fut =
38+ std::async (std::launch::async, [&started, fn = std::forward<Fun>(fun)]() {
39+ started.set_value ();
40+ fn ();
41+ });
42+ started_fut.wait ();
43+ return done_fut;
44+ }
45+
2846#define MINFO (str ) \
2947 { \
3048 std::unique_lock<std::mutex> out_lock (output_mutex); \
@@ -134,6 +152,45 @@ TEST_CASE("cancel streambuf reads", "[streambuf][network][!mayfail]") {
134152 sb_read);
135153}
136154
155+ TEST_CASE (" streambuf split reads" , " [streambuf][network]" ) {
156+ asio::io_context io_ctx;
157+ lsl::cancellable_streambuf sb_read;
158+ ip::tcp::endpoint ep (ip::address_v4::loopback (), port++);
159+ ip::tcp::acceptor remote (io_ctx, ep, true );
160+ remote.listen (1 );
161+ REQUIRE (sb_read.connect (ep) != nullptr );
162+ ip::tcp::socket sock (remote.accept ());
163+ REQUIRE (sock.send (asio::buffer (hello, 3 )) == 3 );
164+
165+ REQUIRE (sb_read.sbumpc () == hello[0 ]);
166+ auto done = launch_task ([&]() {
167+ char buf[sizeof (hello)] = {0 };
168+ auto bytes_read = sb_read.sgetn (buf, sizeof (hello) - 2 );
169+ REQUIRE (bytes_read != std::streambuf::traits_type::eof ());
170+ CHECK (bytes_read == sizeof (hello) - 2 );
171+ REQUIRE (std::string (buf) == hellostr.substr (1 ));
172+ });
173+ sock.send (asio::buffer (hello + 3 , 8 ));
174+ done.wait ();
175+
176+ std::vector<char > in_ (65536 * 16 ), out_ (65536 * 16 );
177+ for (std::size_t i = 0 ; i < out_.size (); ++i) out_[i] = (i >> 8 ^ i) % 127 ;
178+
179+ done = launch_task ([&sb_read, &in_](){
180+ auto *dataptr = in_.data (), *endptr = dataptr + in_.size ();
181+ while (dataptr != endptr) {
182+ std::streamsize bytes_read =
183+ sb_read.sgetn (dataptr, std::min<std::streamsize>(endptr - dataptr, 54 ));
184+ if (bytes_read == std::streambuf::traits_type::eof ()) break ;
185+ dataptr += bytes_read;
186+ }
187+ });
188+ for (const char *outptr = out_.data (), *endptr = outptr + out_.size (); outptr != endptr; outptr+=64 )
189+ sock.send (asio::buffer (outptr, 64 ));
190+ done.wait ();
191+ REQUIRE (std::equal (in_.begin (), in_.end (), out_.begin ()));
192+ }
193+
137194TEST_CASE (" receive v4 packets on v6 socket" , " [ipv6][network]" ) {
138195 const uint16_t test_port = port++;
139196 asio::io_context io_ctx;
@@ -238,3 +295,82 @@ TEST_CASE("bindzero", "[network][basic]") {
238295 sock.bind (asio::ip::udp::endpoint (asio::ip::address_v4::any (), 0 ));
239296 REQUIRE (sock.local_endpoint ().port () != 0 );
240297}
298+
299+ #ifdef CATCH_CONFIG_ENABLE_BENCHMARKING
300+
301+ TEST_CASE (" streambuf throughput" , " [streambuf][network]" ) {
302+ asio::io_context io_ctx;
303+ asio::executor_work_guard<asio::io_context::executor_type> work (io_ctx.get_executor ());
304+ auto background_io = launch_task ([&]() { io_ctx.run (); });
305+
306+ lsl::cancellable_streambuf sb_bench;
307+ ip::tcp::endpoint ep (ip::address_v4::loopback (), port++);
308+ ip::tcp::acceptor remote (io_ctx, ep, true );
309+ remote.listen ();
310+ ip::tcp::socket sock (io_ctx);
311+
312+ auto accept_fut = remote.async_accept (sock, asio::use_future);
313+ REQUIRE (sb_bench.connect (ep) != nullptr );
314+ REQUIRE (accept_fut.wait_for (2s) == std::future_status::ready);
315+
316+ char buf_small[16 ] = " !Hello World!" , buf_medium[256 ]{' \xab ' }, buf_large[4096 ]{' \xab ' };
317+ asio::mutable_buffer bufs[] = {
318+ asio::buffer (buf_small), asio::buffer (buf_medium), asio::buffer (buf_large)};
319+
320+ std::vector<char > dummy_buffer;
321+
322+ for (const auto &buf : bufs) {
323+ for (std::size_t chunksize : {1U , 16U , 256U }) {
324+ BENCHMARK_ADVANCED (" Send;nchunk=" + std::to_string (chunksize) +
325+ " ;buf=" + std::to_string (buf.size ()) +
326+ " ;n=" + std::to_string (chunksize * buf.size ()))
327+ (Catch::Benchmark::Chronometer meter) {
328+
329+ const auto total_bytes = buf.size () * chunksize * meter.runs ();
330+ if (dummy_buffer.size () < total_bytes) dummy_buffer.resize (total_bytes);
331+ auto fut = asio::async_read (
332+ sock, asio::buffer (dummy_buffer.data (), total_bytes), asio::use_future);
333+
334+ asio::steady_timer t (io_ctx, 5s);
335+ t.async_wait ([&](err_t ec) { REQUIRE (ec == asio::error::operation_aborted); });
336+ meter.measure ([&]() {
337+ for (auto chunk = 0U ; chunk < chunksize; ++chunk) {
338+ auto res = sb_bench.sputn (reinterpret_cast <char *>(buf.data ()), buf.size ());
339+ REQUIRE (res != std::streambuf::traits_type::eof ());
340+ }
341+ sb_bench.pubsync ();
342+ });
343+ // Wait for the read operations to finish
344+ fut.wait ();
345+ t.cancel ();
346+ };
347+ }
348+ }
349+ for (const auto &buf : bufs) {
350+ for (int chunksize : {1 , 16 , 256 }) {
351+ BENCHMARK_ADVANCED (" Recv;nchunk=" + std::to_string (chunksize) +
352+ " ;buf=" + std::to_string (buf.size ()) +
353+ " ;n=" + std::to_string (chunksize * buf.size ()))
354+ (Catch::Benchmark::Chronometer meter) {
355+ const auto total_bytes = buf.size () * chunksize * meter.runs ();
356+
357+ if (dummy_buffer.size () < total_bytes) dummy_buffer.resize (total_bytes);
358+ asio::async_write (sock, asio::buffer (dummy_buffer.data (), total_bytes),
359+ [](err_t err, std::size_t /* unused */ ) { REQUIRE (!err); });
360+ std::this_thread::sleep_for (10ms);
361+ asio::steady_timer t (io_ctx, 5s);
362+ t.async_wait ([&](err_t ec) { REQUIRE (ec == asio::error::operation_aborted); });
363+ meter.measure ([&]() {
364+ for (int chunk = 0 ; chunk < chunksize; ++chunk) {
365+ auto res = sb_bench.sgetn (reinterpret_cast <char *>(buf.data ()), buf.size ());
366+ REQUIRE (res != std::streambuf::traits_type::eof ());
367+ }
368+ });
369+ t.cancel ();
370+ };
371+ }
372+ }
373+ asio::post (io_ctx, [&]() { io_ctx.stop (); });
374+ background_io.wait ();
375+ }
376+ #endif
0 commit comments