@@ -68,6 +68,18 @@ using ipcTestParams =
6868struct umfIpcTest : umf_test::test,
6969 ::testing::WithParamInterface<ipcTestParams> {
7070 umfIpcTest () {}
71+ size_t getOpenedIpcCacheSize () {
72+ const char *max_size_str = getenv (" UMF_MAX_OPENED_IPC_HANDLES" );
73+ if (max_size_str) {
74+ char *endptr;
75+ size_t max_size = strtoul (max_size_str, &endptr, 10 );
76+ EXPECT_EQ (*endptr, ' \0 ' );
77+ if (*endptr == ' \0 ' ) {
78+ return max_size;
79+ }
80+ }
81+ return 0 ;
82+ }
7183 void SetUp () override {
7284 test::SetUp ();
7385 auto [pool_ops, pool_params_create, pool_params_destroy, provider_ops,
@@ -80,6 +92,7 @@ struct umfIpcTest : umf_test::test,
8092 providerParamsCreate = provider_params_create;
8193 providerParamsDestroy = provider_params_destroy;
8294 memAccessor = accessor;
95+ openedIpcCacheSize = getOpenedIpcCacheSize ();
8396 }
8497
8598 void TearDown () override { test::TearDown (); }
@@ -160,6 +173,7 @@ struct umfIpcTest : umf_test::test,
160173 umf_memory_provider_ops_t *providerOps = nullptr ;
161174 pfnProviderParamsCreate providerParamsCreate = nullptr ;
162175 pfnProviderParamsDestroy providerParamsDestroy = nullptr ;
176+ size_t openedIpcCacheSize = 0 ;
163177
164178 void concurrentGetConcurrentPutHandles (bool shuffle) {
165179 std::vector<void *> ptrs;
@@ -264,6 +278,158 @@ struct umfIpcTest : umf_test::test,
264278 pool.reset (nullptr );
265279 EXPECT_EQ (stat.putCount , stat.getCount );
266280 }
281+
282+ void concurrentOpenConcurrentCloseHandles (bool shuffle) {
283+ umf_result_t ret;
284+ std::vector<void *> ptrs;
285+ constexpr size_t ALLOC_SIZE = 100 ;
286+ constexpr size_t NUM_POINTERS = 100 ;
287+ umf::pool_unique_handle_t pool = makePool ();
288+ ASSERT_NE (pool.get (), nullptr );
289+
290+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
291+ void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
292+ EXPECT_NE (ptr, nullptr );
293+ ptrs.push_back (ptr);
294+ }
295+
296+ std::array<umf_ipc_handle_t , NUM_POINTERS> ipcHandles;
297+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
298+ umf_ipc_handle_t ipcHandle;
299+ size_t handleSize;
300+ ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
301+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
302+ ipcHandles[i] = ipcHandle;
303+ }
304+
305+ std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
306+ umf_ipc_handler_handle_t ipcHandler = nullptr ;
307+ ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
308+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
309+ ASSERT_NE (ipcHandler, nullptr );
310+
311+ umf_test::syncthreads_barrier syncthreads (NTHREADS);
312+
313+ auto openHandlesFn = [shuffle, &ipcHandles, &openedIpcHandles,
314+ &syncthreads, ipcHandler](size_t tid) {
315+ // Each thread gets a copy of the pointers to shuffle them
316+ std::array<umf_ipc_handle_t , NUM_POINTERS> localIpcHandles =
317+ ipcHandles;
318+ if (shuffle) {
319+ std::random_device rd;
320+ std::mt19937 g (rd ());
321+ std::shuffle (localIpcHandles.begin (), localIpcHandles.end (), g);
322+ }
323+ syncthreads ();
324+ for (auto ipcHandle : localIpcHandles) {
325+ void *ptr;
326+ umf_result_t ret =
327+ umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
328+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
329+ openedIpcHandles[tid].push_back (ptr);
330+ }
331+ };
332+
333+ umf_test::parallel_exec (NTHREADS, openHandlesFn);
334+
335+ auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
336+ syncthreads ();
337+ for (void *ptr : openedIpcHandles[tid]) {
338+ umf_result_t ret = umfCloseIPCHandle (ptr);
339+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
340+ }
341+ };
342+
343+ umf_test::parallel_exec (NTHREADS, closeHandlesFn);
344+
345+ for (auto ipcHandle : ipcHandles) {
346+ ret = umfPutIPCHandle (ipcHandle);
347+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
348+ }
349+
350+ for (void *ptr : ptrs) {
351+ ret = umfPoolFree (pool.get (), ptr);
352+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
353+ }
354+
355+ pool.reset (nullptr );
356+ EXPECT_EQ (stat.getCount , stat.allocCount );
357+ EXPECT_EQ (stat.putCount , stat.getCount );
358+ EXPECT_EQ (stat.openCount , stat.allocCount );
359+ EXPECT_EQ (stat.openCount , stat.closeCount );
360+ }
361+
362+ void concurrentOpenCloseHandles (bool shuffle) {
363+ umf_result_t ret;
364+ std::vector<void *> ptrs;
365+ constexpr size_t ALLOC_SIZE = 100 ;
366+ constexpr size_t NUM_POINTERS = 100 ;
367+ umf::pool_unique_handle_t pool = makePool ();
368+ ASSERT_NE (pool.get (), nullptr );
369+
370+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
371+ void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
372+ EXPECT_NE (ptr, nullptr );
373+ ptrs.push_back (ptr);
374+ }
375+
376+ std::array<umf_ipc_handle_t , NUM_POINTERS> ipcHandles;
377+ for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
378+ umf_ipc_handle_t ipcHandle;
379+ size_t handleSize;
380+ ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
381+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
382+ ipcHandles[i] = ipcHandle;
383+ }
384+
385+ umf_ipc_handler_handle_t ipcHandler = nullptr ;
386+ ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
387+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
388+ ASSERT_NE (ipcHandler, nullptr );
389+
390+ umf_test::syncthreads_barrier syncthreads (NTHREADS);
391+
392+ auto openCloseHandlesFn = [shuffle, &ipcHandles, &syncthreads,
393+ ipcHandler](size_t ) {
394+ // Each thread gets a copy of the pointers to shuffle them
395+ std::array<umf_ipc_handle_t , NUM_POINTERS> localIpcHandles =
396+ ipcHandles;
397+ if (shuffle) {
398+ std::random_device rd;
399+ std::mt19937 g (rd ());
400+ std::shuffle (localIpcHandles.begin (), localIpcHandles.end (), g);
401+ }
402+ syncthreads ();
403+ for (auto ipcHandle : localIpcHandles) {
404+ void *ptr;
405+ umf_result_t ret =
406+ umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
407+ ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
408+ ret = umfCloseIPCHandle (ptr);
409+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
410+ }
411+ };
412+
413+ umf_test::parallel_exec (NTHREADS, openCloseHandlesFn);
414+
415+ for (auto ipcHandle : ipcHandles) {
416+ ret = umfPutIPCHandle (ipcHandle);
417+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
418+ }
419+
420+ for (void *ptr : ptrs) {
421+ ret = umfPoolFree (pool.get (), ptr);
422+ EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
423+ }
424+
425+ pool.reset (nullptr );
426+ EXPECT_EQ (stat.getCount , stat.allocCount );
427+ EXPECT_EQ (stat.putCount , stat.getCount );
428+ if (openedIpcCacheSize == 0 ) {
429+ EXPECT_EQ (stat.openCount , stat.allocCount );
430+ }
431+ EXPECT_EQ (stat.openCount , stat.closeCount );
432+ }
267433};
268434
269435TEST_P (umfIpcTest, GetIPCHandleSize) {
@@ -529,75 +695,20 @@ TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) {
529695 concurrentGetPutHandles (true );
530696}
531697
532- TEST_P (umfIpcTest, ConcurrentOpenCloseHandles) {
533- umf_result_t ret;
534- std::vector<void *> ptrs;
535- constexpr size_t ALLOC_SIZE = 100 ;
536- constexpr size_t NUM_POINTERS = 100 ;
537- umf::pool_unique_handle_t pool = makePool ();
538- ASSERT_NE (pool.get (), nullptr );
539-
540- for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
541- void *ptr = umfPoolMalloc (pool.get (), ALLOC_SIZE);
542- EXPECT_NE (ptr, nullptr );
543- ptrs.push_back (ptr);
544- }
545-
546- std::array<umf_ipc_handle_t , NUM_POINTERS> ipcHandles;
547- for (size_t i = 0 ; i < NUM_POINTERS; ++i) {
548- umf_ipc_handle_t ipcHandle;
549- size_t handleSize;
550- ret = umfGetIPCHandle (ptrs[i], &ipcHandle, &handleSize);
551- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
552- ipcHandles[i] = ipcHandle;
553- }
554-
555- std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
556- umf_ipc_handler_handle_t ipcHandler = nullptr ;
557- ret = umfPoolGetIPCHandler (pool.get (), &ipcHandler);
558- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
559- ASSERT_NE (ipcHandler, nullptr );
560-
561- umf_test::syncthreads_barrier syncthreads (NTHREADS);
562-
563- auto openHandlesFn = [&ipcHandles, &openedIpcHandles, &syncthreads,
564- ipcHandler](size_t tid) {
565- syncthreads ();
566- for (auto ipcHandle : ipcHandles) {
567- void *ptr;
568- umf_result_t ret = umfOpenIPCHandle (ipcHandler, ipcHandle, &ptr);
569- ASSERT_EQ (ret, UMF_RESULT_SUCCESS);
570- openedIpcHandles[tid].push_back (ptr);
571- }
572- };
573-
574- umf_test::parallel_exec (NTHREADS, openHandlesFn);
575-
576- auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
577- syncthreads ();
578- for (void *ptr : openedIpcHandles[tid]) {
579- umf_result_t ret = umfCloseIPCHandle (ptr);
580- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
581- }
582- };
583-
584- umf_test::parallel_exec (NTHREADS, closeHandlesFn);
698+ TEST_P (umfIpcTest, ConcurrentOpenConcurrentCloseHandles) {
699+ concurrentOpenConcurrentCloseHandles (false );
700+ }
585701
586- for (auto ipcHandle : ipcHandles) {
587- ret = umfPutIPCHandle (ipcHandle);
588- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
589- }
702+ TEST_P (umfIpcTest, ConcurrentOpenConcurrentCloseHandlesShuffled) {
703+ concurrentOpenConcurrentCloseHandles (true );
704+ }
590705
591- for (void *ptr : ptrs) {
592- ret = umfPoolFree (pool.get (), ptr);
593- EXPECT_EQ (ret, UMF_RESULT_SUCCESS);
594- }
706+ TEST_P (umfIpcTest, ConcurrentOpenCloseHandles) {
707+ concurrentOpenCloseHandles (false );
708+ }
595709
596- pool.reset (nullptr );
597- EXPECT_EQ (stat.getCount , stat.allocCount );
598- EXPECT_EQ (stat.putCount , stat.getCount );
599- EXPECT_EQ (stat.openCount , stat.allocCount );
600- EXPECT_EQ (stat.openCount , stat.closeCount );
710+ TEST_P (umfIpcTest, ConcurrentOpenCloseHandlesShuffled) {
711+ concurrentOpenCloseHandles (true );
601712}
602713
603714TEST_P (umfIpcTest, ConcurrentDestroyIpcHandlers) {
0 commit comments