Skip to content

Commit dba0513

Browse files
committed
Add outlet speed test program
1 parent a99646e commit dba0513

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed

CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ option(LSL_UNITTESTS "Build LSL library unit tests" OFF)
2828
option(LSL_BUNDLED_BOOST "Use the bundled Boost by default" ON)
2929
option(LSL_BUNDLED_PUGIXML "Use the bundled pugixml by default" ON)
3030
option(LSL_SLIMARCHIVE "Use experimental but smaller serialization code" OFF)
31+
option(LSL_TOOLS "Build some experimental tools for in-depth tests" OFF)
3132

3233
mark_as_advanced(LSL_SLIMARCHIVE LSL_FORCE_FANCY_LIBNAME)
3334

@@ -339,6 +340,13 @@ add_executable(lslver testing/lslver.c)
339340
target_link_libraries(lslver PRIVATE lsl)
340341
installLSLApp(lslver)
341342

343+
if(LSL_TOOLS)
344+
add_executable(blackhole testing/blackhole.cpp)
345+
target_link_libraries(blackhole PRIVATE Threads::Threads)
346+
target_include_directories(blackhole PRIVATE "thirdparty/asio/")
347+
installLSLApp(blackhole)
348+
endif()
349+
342350
set(LSL_INSTALL_ROOT ${CMAKE_CURRENT_BINARY_DIR})
343351
if(LSL_UNITTESTS)
344352
add_subdirectory(testing)

testing/blackhole.cpp

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#include <asio/io_context.hpp>
2+
#include <asio/ip/address.hpp>
3+
#include <asio/ip/tcp.hpp>
4+
#include <asio/read.hpp>
5+
#include <asio/signal_set.hpp>
6+
#include <asio/steady_timer.hpp>
7+
#include <asio/write.hpp>
8+
#include <chrono>
9+
#include <cstdint>
10+
#include <iostream>
11+
#include <list>
12+
#include <signal.h>
13+
#include <string>
14+
15+
// LSL outlet stress tester. Run with `./blackhole --help` for more information.
16+
17+
using namespace std::chrono_literals;
18+
using Endpoint = asio::ip::tcp::endpoint;
19+
20+
using socket_t = asio::basic_stream_socket<asio::ip::tcp, asio::io_context::executor_type>;
21+
using Clock = std::chrono::high_resolution_clock;
22+
23+
static char buf[1024 * 1024];
24+
const char handshake[] = "LSL:streamfeed/110 \nMax-buffer-length: 360\r\n\r\n";
25+
26+
class inlet_socket {
27+
private:
28+
socket_t sock;
29+
30+
public:
31+
std::size_t bytes_read{0};
32+
33+
inlet_socket(asio::io_context &ctx, Endpoint endpoint) : sock(ctx) {
34+
sock.async_connect(endpoint, [this](const asio::error_code &ec) {
35+
switch (ec.value()) {
36+
case 0:
37+
asio::write(sock, asio::const_buffer(handshake, sizeof(handshake) - 1));
38+
schedule_receive(0);
39+
break;
40+
41+
case asio::error::interrupted:
42+
std::cout << "Connect was interrupted…" << std::endl;
43+
break;
44+
case asio::error::connection_refused:
45+
default: throw std::runtime_error("Connection refused");
46+
}
47+
});
48+
}
49+
void schedule_receive(std::size_t read) {
50+
bytes_read += read;
51+
sock.async_read_some(
52+
asio::buffer(buf, sizeof(buf)), [this](const asio::error_code &ec, std::size_t read) {
53+
if (!ec) this->schedule_receive(read);
54+
});
55+
}
56+
};
57+
58+
class BlackHoleContainer {
59+
asio::io_context &ctx;
60+
asio::signal_set signals;
61+
Clock::time_point last_print;
62+
std::list<inlet_socket> sockets;
63+
Endpoint endpoint;
64+
65+
public:
66+
BlackHoleContainer(asio::io_context &ctx, Endpoint endpoint)
67+
: ctx(ctx), signals(ctx), last_print(Clock::now()), endpoint(endpoint) {
68+
for (int signal : {SIGUSR1, SIGUSR2, SIGTERM, SIGINT, SIGCONT}) signals.add(signal);
69+
70+
add_socket();
71+
handle_signal(0);
72+
}
73+
void add_socket() { sockets.emplace_back(ctx, endpoint); }
74+
void handle_signal(int signal) {
75+
auto t = std::chrono::high_resolution_clock::now();
76+
if (signal) {
77+
auto time_passed =
78+
std::chrono::duration_cast<std::chrono::milliseconds>(t - last_print);
79+
std::cout.precision(1);
80+
std::cout.setf(std::ios::fixed, std::ios::floatfield);
81+
std::cout << time_passed.count() << ' ';
82+
double total_bandwidth = 0;
83+
for (auto &sock : sockets) {
84+
double inlet_bandwidth = (sock.bytes_read / 1.024 / 1024 / time_passed.count());
85+
total_bandwidth += inlet_bandwidth;
86+
std::cout << inlet_bandwidth << ' ';
87+
sock.bytes_read = 0;
88+
}
89+
std::cout << total_bandwidth << std::endl;
90+
if (signal == SIGUSR2) add_socket();
91+
if (signal == SIGTERM || signal == SIGINT) exit(0);
92+
}
93+
94+
last_print = t;
95+
signals.async_wait(
96+
[this](const asio::error_code &ec, int signal) { this->handle_signal(signal); });
97+
}
98+
};
99+
100+
/// Parse an address specification like "10.0.0.1:16572", "::1:16574"
101+
/// or (for link-local addresses with scope IDs) "
102+
/// or just a port (16573) into an endpoint
103+
Endpoint parse_addr(std::string addr,
104+
asio::ip::address default_address = asio::ip::address_v4::loopback(),
105+
uint16_t default_port = 16572) {
106+
if (!addr.empty()) {
107+
auto pos = addr.find_last_of(':');
108+
if (pos == std::string::npos)
109+
default_port = std::stoi(addr);
110+
else {
111+
asio::error_code ec;
112+
auto addr_part = addr.substr(0, pos);
113+
default_address = asio::ip::make_address(addr_part, ec);
114+
if (ec) throw std::invalid_argument("Invalid IP address " + addr_part);
115+
default_port = std::stoi(addr.substr(pos + 1));
116+
}
117+
}
118+
return {default_address, default_port};
119+
}
120+
121+
int main(int argc, char **argv) {
122+
if (argc > 1 && (argv[1] == std::string("-h") || argv[1] == std::string("--help"))) {
123+
std::cout
124+
<< "LSL outlet stress tester\n"
125+
<< "Usage: " << argv[0] << " [address=16572] [inlet_count=1]\n\n"
126+
<< "address can be either a port or an address and port (127.0.0.1:16572)\n"
127+
<< "Link-local addresses need a scope id, e.g. fe80::264b:feff:fe2d:f4bc%3:16573\n\n"
128+
<< "While running, several signals are handled:"
129+
<< "SIGCONT (Ctrl+Q)\n"
130+
<< "\tSIGUSR1\t\tPrint current stats\n"
131+
<< "\tSIGUSR2\t\tAdd another inlet and print current stats\n"
132+
<< "\tSIGINT (Ctrl+C)\tPrint stats & quit\n";
133+
}
134+
Endpoint endpoint = parse_addr(argc > 1 ? argv[1] : "");
135+
const int startnum = argc > 2 ? std::stoi(argv[2]) : 1;
136+
137+
std::cout << "Connecting to " << endpoint << std::endl;
138+
139+
asio::io_context ctx;
140+
std::list<inlet_socket> sockets;
141+
142+
BlackHoleContainer cnt(ctx, endpoint);
143+
144+
for (int i = 1; i < startnum; ++i) cnt.add_socket();
145+
146+
ctx.run();
147+
}

0 commit comments

Comments
 (0)