Skip to content

Commit 6e1d609

Browse files
committed
run ring buffer stress on multiple cores. (#4348)
1 parent 226646f commit 6e1d609

File tree

1 file changed

+117
-128
lines changed

1 file changed

+117
-128
lines changed

tests/api_test/api_test.cpp

Lines changed: 117 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,14 +1156,63 @@ typedef struct _ring_buffer_test_context
11561156
std::promise<void> promise;
11571157
} ring_buffer_test_context_t;
11581158

1159+
void
1160+
trigger_ring_buffer_events(fd_t program_fd, uint32_t expected_event_count, _Inout_ void* data, uint32_t data_size)
1161+
{
1162+
// Get cpu count
1163+
SYSTEM_INFO sysinfo;
1164+
GetSystemInfo(&sysinfo);
1165+
1166+
// Create 2 threads per CPU that invoke the program to trigger ring buffer events.
1167+
const uint32_t thread_count = 2;
1168+
uint32_t total_threads = thread_count * sysinfo.dwNumberOfProcessors;
1169+
// Round up the iterations per thread to ensure at least expected_event_count events are raised.
1170+
uint32_t iterations_per_thread = (expected_event_count + total_threads + 1) / total_threads;
1171+
1172+
std::vector<std::jthread> threads;
1173+
std::atomic<size_t> failure_count = 0;
1174+
for (DWORD i = 0; i < sysinfo.dwNumberOfProcessors; i++) {
1175+
for (uint32_t j = 0; j < thread_count; j++) {
1176+
threads.emplace_back([&, i]() {
1177+
bind_md_t ctx = {};
1178+
bpf_test_run_opts opts = {};
1179+
opts.ctx_in = &ctx;
1180+
opts.ctx_size_in = sizeof(ctx);
1181+
opts.ctx_out = &ctx;
1182+
opts.ctx_size_out = sizeof(ctx);
1183+
opts.cpu = static_cast<uint32_t>(i);
1184+
opts.data_in = data;
1185+
opts.data_size_in = data_size;
1186+
opts.data_out = data;
1187+
opts.data_size_out = data_size;
1188+
1189+
for (uint32_t k = 0; k < iterations_per_thread; k++) {
1190+
int result = bpf_prog_test_run_opts(program_fd, &opts);
1191+
if (result != 0) {
1192+
std::cout << "bpf_prog_test_run_opts failed with " << result << std::endl;
1193+
failure_count++;
1194+
break;
1195+
}
1196+
}
1197+
});
1198+
}
1199+
}
1200+
1201+
// Wait for threads to complete.
1202+
for (auto& t : threads) {
1203+
t.join();
1204+
}
1205+
1206+
REQUIRE(failure_count == 0);
1207+
}
1208+
11591209
TEST_CASE("test_ringbuffer_wraparound", "[stress][ring_buffer]")
11601210
{
11611211
// Load bindmonitor_ringbuf.sys.
11621212
struct bpf_object* object = nullptr;
1163-
fd_t program_fd;
1213+
fd_t program_fd = ebpf_fd_invalid;
11641214
ring_buffer_test_context_t context;
11651215
std::string app_id = "api_test.exe";
1166-
uint32_t thread_count = 2;
11671216
native_module_helper_t native_helper;
11681217
native_helper.initialize("bindmonitor_ringbuf", EBPF_EXECUTION_NATIVE);
11691218

@@ -1177,9 +1226,6 @@ TEST_CASE("test_ringbuffer_wraparound", "[stress][ring_buffer]")
11771226

11781227
uint32_t max_entries = bpf_map__max_entries(bpf_object__find_map_by_name(object, "process_map"));
11791228
uint32_t max_iterations = static_cast<uint32_t>(10 * (max_entries / app_id.size()));
1180-
printf("max_iterations: %d\n", max_iterations);
1181-
REQUIRE(max_iterations % thread_count == 0);
1182-
uint32_t iterations_per_thread = max_iterations / thread_count;
11831229

11841230
// Initialize context.
11851231
context.event_count = 0;
@@ -1198,39 +1244,9 @@ TEST_CASE("test_ringbuffer_wraparound", "[stress][ring_buffer]")
11981244
&context,
11991245
nullptr);
12001246

1201-
// Create 2 threads that invoke the program to trigger ring buffer events.
1202-
std::vector<std::jthread> threads;
1203-
std::atomic<size_t> failure_count = 0;
1204-
for (uint32_t i = 0; i < thread_count; i++) {
1205-
threads.emplace_back([&]() {
1206-
bind_md_t ctx = {};
1207-
bpf_test_run_opts opts = {};
1208-
opts.ctx_in = &ctx;
1209-
opts.ctx_size_in = sizeof(ctx);
1210-
opts.ctx_out = &ctx;
1211-
opts.ctx_size_out = sizeof(ctx);
1212-
opts.data_in = app_id.data();
1213-
opts.data_size_in = static_cast<uint32_t>(app_id.size());
1214-
opts.data_out = app_id.data();
1215-
opts.data_size_out = static_cast<uint32_t>(app_id.size());
1216-
1217-
for (uint32_t i = 0; i < iterations_per_thread; i++) {
1218-
int result = bpf_prog_test_run_opts(program_fd, &opts);
1219-
if (result != 0) {
1220-
std::cout << "bpf_prog_test_run_opts failed with " << result << std::endl;
1221-
failure_count++;
1222-
break;
1223-
}
1224-
}
1225-
});
1226-
}
1227-
1228-
// Wait for threads to complete.
1229-
for (auto& t : threads) {
1230-
t.join();
1231-
}
1232-
1233-
REQUIRE(failure_count == 0);
1247+
// trigger ring buffer events from multiple threads across all CPUs.
1248+
trigger_ring_buffer_events(
1249+
program_fd, context.expected_event_count, app_id.data(), static_cast<uint32_t>(app_id.size()));
12341250

12351251
// Wait for 1 second for the ring buffer to receive all events.
12361252
REQUIRE(ring_buffer_event_callback.wait_for(1s) == std::future_status::ready);
@@ -1242,57 +1258,11 @@ TEST_CASE("test_ringbuffer_wraparound", "[stress][ring_buffer]")
12421258
bpf_object__close(object);
12431259
}
12441260

1245-
TEST_CASE("Test program order", "[native_tests]")
1246-
{
1247-
struct bpf_object* object = nullptr;
1248-
fd_t program_fd;
1249-
uint32_t program_count = 4;
1250-
int result;
1251-
1252-
REQUIRE(
1253-
_program_load_helper(
1254-
"multiple_programs.sys", BPF_PROG_TYPE_SAMPLE, EBPF_EXECUTION_NATIVE, &object, &program_fd) == 0);
1255-
1256-
// Get all 4 programs in the native object, and invoke them using bpf_prog_test_run.
1257-
//
1258-
// If there is a mismatch in the sorting order of bpf2c and ebpfapi, the 4 eBPF programs
1259-
// in this object file will be initialized with wrong handles. That will cause wrong programs
1260-
// to be invoked when bpf_prog_test_run is called. Since each program returns a different value,
1261-
// we can validate that the correct / expected program was invoked by checking the return value.
1262-
for (uint32_t i = 0; i < program_count; i++) {
1263-
bpf_test_run_opts opts = {};
1264-
bind_md_t ctx = {};
1265-
std::string program_name = "program" + std::to_string(i + 1);
1266-
struct bpf_program* program = bpf_object__find_program_by_name(object, program_name.c_str());
1267-
REQUIRE(program != nullptr);
1268-
program_fd = bpf_program__fd(program);
1269-
REQUIRE(program_fd > 0);
1270-
1271-
std::string app_id = "api_test.exe";
1272-
1273-
opts.ctx_in = &ctx;
1274-
opts.ctx_size_in = sizeof(ctx);
1275-
opts.ctx_out = &ctx;
1276-
opts.ctx_size_out = sizeof(ctx);
1277-
opts.data_in = app_id.data();
1278-
opts.data_size_in = static_cast<uint32_t>(app_id.size());
1279-
opts.data_out = app_id.data();
1280-
opts.data_size_out = static_cast<uint32_t>(app_id.size());
1281-
1282-
result = bpf_prog_test_run_opts(program_fd, &opts);
1283-
REQUIRE(result == 0);
1284-
REQUIRE(opts.retval == (i + 1));
1285-
}
1286-
1287-
// Clean up.
1288-
bpf_object__close(object);
1289-
}
1290-
12911261
typedef struct _perf_event_array_test_context
12921262
{
12931263
std::atomic<size_t> event_count = 0;
1294-
size_t expected_event_count = 0;
1295-
size_t cpu_count = 0;
1264+
uint32_t expected_event_count = 0;
1265+
uint32_t cpu_count = 0;
12961266
uint64_t event_length = 0;
12971267
std::atomic<size_t> lost_count = 0;
12981268
std::promise<void> promise;
@@ -1302,13 +1272,12 @@ TEST_CASE("test_perfbuffer", "[stress][perf_buffer]")
13021272
{
13031273
// Load bindmonitor_perf_event_array.sys.
13041274
struct bpf_object* object = nullptr;
1305-
fd_t program_fd;
1275+
fd_t program_fd = ebpf_fd_invalid;
13061276
perf_event_array_test_context_t context;
13071277
std::string app_id = "api_test.exe";
1308-
size_t cpu_count = libbpf_num_possible_cpus();
1278+
uint32_t cpu_count = libbpf_num_possible_cpus();
13091279
CAPTURE(cpu_count);
13101280
context.cpu_count = cpu_count;
1311-
uint32_t thread_count = 2;
13121281
native_module_helper_t native_helper;
13131282
native_helper.initialize("bindmonitor_perf_event_array", EBPF_EXECUTION_NATIVE);
13141283

@@ -1322,11 +1291,7 @@ TEST_CASE("test_perfbuffer", "[stress][perf_buffer]")
13221291

13231292
uint32_t max_entries = bpf_map__max_entries(bpf_object__find_map_by_name(object, "process_map"));
13241293
uint32_t max_iterations = static_cast<uint32_t>(10 * (max_entries / app_id.size()));
1325-
printf("max_iterations: %d\n", max_iterations);
1326-
REQUIRE(max_iterations % thread_count == 0);
1327-
uint32_t iterations_per_thread = max_iterations / thread_count;
1328-
1329-
CAPTURE(max_entries, max_iterations, iterations_per_thread);
1294+
CAPTURE(max_entries, max_iterations);
13301295

13311296
// Initialize context.
13321297
context.event_count = 0;
@@ -1343,7 +1308,14 @@ TEST_CASE("test_perfbuffer", "[stress][perf_buffer]")
13431308
return;
13441309
}
13451310
if (((++context->event_count) + (context->lost_count)) >= context->expected_event_count) {
1346-
context->promise.set_value();
1311+
try {
1312+
context->promise.set_value();
1313+
} catch (const std::future_error& e) {
1314+
// Ignore the exception if the promise was already set.
1315+
if (e.code() != std::future_errc::promise_already_satisfied) {
1316+
throw; // Rethrow if it's a different error.
1317+
}
1318+
}
13471319
}
13481320
return;
13491321
},
@@ -1354,50 +1326,67 @@ TEST_CASE("test_perfbuffer", "[stress][perf_buffer]")
13541326
},
13551327
&context,
13561328
nullptr);
1357-
// Create 2 threads that invoke the program to trigger perf buffer events.
1358-
std::vector<std::jthread> threads;
1359-
std::atomic<size_t> failure_count = 0;
1360-
for (uint32_t i = 0; i < thread_count; i++) {
1361-
threads.emplace_back([&]() {
1362-
bind_md_t ctx = {};
1363-
bpf_test_run_opts opts = {};
1364-
opts.ctx_in = &ctx;
1365-
opts.ctx_size_in = sizeof(ctx);
1366-
opts.ctx_out = &ctx;
1367-
opts.ctx_size_out = sizeof(ctx);
1368-
opts.data_in = app_id.data();
1369-
opts.data_size_in = static_cast<uint32_t>(app_id.size());
1370-
opts.data_out = app_id.data();
1371-
opts.data_size_out = static_cast<uint32_t>(app_id.size());
1372-
1373-
for (uint32_t i = 0; i < iterations_per_thread; i++) {
1374-
int result = bpf_prog_test_run_opts(program_fd, &opts);
1375-
if (result != 0) {
1376-
std::cout << "bpf_prog_test_run_opts failed with " << result << std::endl;
1377-
failure_count++;
1378-
break;
1379-
}
1380-
}
1381-
});
1382-
}
13831329

1384-
// Wait for threads to complete.
1385-
for (auto& t : threads) {
1386-
t.join();
1387-
}
1388-
1389-
REQUIRE(failure_count == 0);
1330+
// Trigger perf buffer events from multiple threads across all CPUs.
1331+
trigger_ring_buffer_events(
1332+
program_fd, context.expected_event_count, app_id.data(), static_cast<uint32_t>(app_id.size()));
13901333

13911334
// Wait for 1 second for the ring buffer to receive all events.
13921335
bool test_complete = perf_buffer_event_callback.wait_for(1s) == std::future_status::ready;
13931336

1394-
CAPTURE(context.event_length, context.event_count, context.expected_event_count, context.lost_count, failure_count);
1337+
CAPTURE(context.event_length, context.event_count, context.expected_event_count, context.lost_count);
13951338

13961339
REQUIRE(test_complete == true);
13971340

13981341
// Unsubscribe from the ring buffer.
13991342
perf_buffer__free(perfbuffer);
14001343

1344+
// Clean up.
1345+
bpf_object__close(object);
1346+
}
1347+
1348+
TEST_CASE("Test program order", "[native_tests]")
1349+
{
1350+
struct bpf_object* object = nullptr;
1351+
fd_t program_fd;
1352+
uint32_t program_count = 4;
1353+
int result;
1354+
1355+
REQUIRE(
1356+
_program_load_helper(
1357+
"multiple_programs.sys", BPF_PROG_TYPE_SAMPLE, EBPF_EXECUTION_NATIVE, &object, &program_fd) == 0);
1358+
1359+
// Get all 4 programs in the native object, and invoke them using bpf_prog_test_run.
1360+
//
1361+
// If there is a mismatch in the sorting order of bpf2c and ebpfapi, the 4 eBPF programs
1362+
// in this object file will be initialized with wrong handles. That will cause wrong programs
1363+
// to be invoked when bpf_prog_test_run is called. Since each program returns a different value,
1364+
// we can validate that the correct / expected program was invoked by checking the return value.
1365+
for (uint32_t i = 0; i < program_count; i++) {
1366+
bpf_test_run_opts opts = {};
1367+
bind_md_t ctx = {};
1368+
std::string program_name = "program" + std::to_string(i + 1);
1369+
struct bpf_program* program = bpf_object__find_program_by_name(object, program_name.c_str());
1370+
REQUIRE(program != nullptr);
1371+
program_fd = bpf_program__fd(program);
1372+
REQUIRE(program_fd > 0);
1373+
1374+
std::string app_id = "api_test.exe";
1375+
1376+
opts.ctx_in = &ctx;
1377+
opts.ctx_size_in = sizeof(ctx);
1378+
opts.ctx_out = &ctx;
1379+
opts.ctx_size_out = sizeof(ctx);
1380+
opts.data_in = app_id.data();
1381+
opts.data_size_in = static_cast<uint32_t>(app_id.size());
1382+
opts.data_out = app_id.data();
1383+
opts.data_size_out = static_cast<uint32_t>(app_id.size());
1384+
1385+
result = bpf_prog_test_run_opts(program_fd, &opts);
1386+
REQUIRE(result == 0);
1387+
REQUIRE(opts.retval == (i + 1));
1388+
}
1389+
14011390
// Clean up.
14021391
bpf_object__close(object);
14031392
}

0 commit comments

Comments
 (0)