@@ -526,114 +526,6 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails
526526
527527 assertThatThrownBy (downloadDirectory .completionFuture ()::join ).hasCause (exception );
528528 }
529- @ Test
530- void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations () throws Exception {
531- // Create many S3 objects to test concurrency
532- int totalFiles = 50 ;
533- String [] keys = new String [totalFiles ];
534- for (int i = 0 ; i < totalFiles ; i ++) {
535- keys [i ] = "file" + i + ".txt" ;
536- }
537- stubSuccessfulListObjects (listObjectsHelper , keys );
538-
539- // Configuration with expected concurrency limit
540- int configuredMaxConcurrency = 5 ;
541-
542- // Track concurrent operations
543- AtomicInteger currentlyActive = new AtomicInteger (0 );
544- AtomicInteger peakConcurrency = new AtomicInteger (0 );
545-
546- // Use a Phaser to coordinate phases
547- Phaser phaser = new Phaser (1 ); // Start with test thread
548-
549- // Mock the single download function to track concurrent operations
550- when (singleDownloadFunction .apply (any ())).thenAnswer (invocation -> {
551- phaser .register (); // Each operation registers
552-
553- CompletableFuture <CompletedFileDownload > future = new CompletableFuture <>();
554-
555- CompletableFuture .runAsync (() -> {
556- try {
557- // Track entry
558- int current = currentlyActive .incrementAndGet ();
559- peakConcurrency .updateAndGet (max -> Math .max (max , current ));
560-
561- // Wait at barrier - this forces all operations to be active simultaneously
562- phaser .arriveAndAwaitAdvance ();
563-
564- // Complete
565- future .complete (CompletedFileDownload .builder ()
566- .response (GetObjectResponse .builder ().eTag ("test" ).build ())
567- .build ());
568-
569- } catch (Exception e ) {
570- future .completeExceptionally (e );
571- } finally {
572- currentlyActive .decrementAndGet ();
573- phaser .arriveAndDeregister ();
574- }
575- });
576-
577- return new DefaultFileDownload (future ,
578- new DefaultTransferProgress (DefaultTransferProgressSnapshot .builder ()
579- .transferredBytes (0L )
580- .build ()),
581- () -> DownloadFileRequest .builder ()
582- .getObjectRequest (GetObjectRequest .builder ().build ())
583- .destination (Paths .get ("." ))
584- .build (),
585- null );
586- });
587-
588- // Configure with our expected limit
589- // To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
590- // configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
591- TransferManagerConfiguration customConfig = TransferManagerConfiguration .builder ()
592- .transferDirectoryMaxConcurrency (configuredMaxConcurrency )
593- .build ();
594-
595- DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper (
596- customConfig , listObjectsHelper , singleDownloadFunction );
597-
598- // Start download asynchronously
599- CompletableFuture <Void > downloadTask = CompletableFuture .runAsync (() -> {
600- DirectoryDownload download = customHelper .downloadDirectory (
601- DownloadDirectoryRequest .builder ()
602- .destination (directory )
603- .bucket ("bucket" )
604- .build ()
605- );
606- download .completionFuture ().join ();
607- });
608-
609- // Wait for operations to register (but not complete the phase)
610- // We wait for more than the configured limit to ensure we'd catch violations
611- Duration maxWait = Duration .ofSeconds (5 );
612- long deadline = System .nanoTime () + maxWait .toNanos ();
613- int current = phaser .getRegisteredParties () - 1 ; // Subtract 1 for main thread
614- while (current < configuredMaxConcurrency ) {
615- if (System .nanoTime () >= deadline ) {
616- throw new AssertionError (
617- "Timed out waiting for registrations: current=" + current +
618- ", configuredMaxConcurrency=" + configuredMaxConcurrency );
619- }
620- LockSupport .parkNanos (10_000_000L ); // ~10 ms
621- current = phaser .getRegisteredParties () - 1 ;
622- }
623-
624- // Check peak BEFORE releasing the phase
625- int observedPeak = peakConcurrency .get ();
626- assertThat (observedPeak )
627- .as ("Implementation allowed %d concurrent operations but was configured for %d" ,
628- observedPeak , configuredMaxConcurrency )
629- .isEqualTo (configuredMaxConcurrency );
630-
631- // Release the phase to let operations complete
632- phaser .arriveAndDeregister ();
633-
634- // Complete the test
635- downloadTask .get (2 , TimeUnit .SECONDS );
636- }
637529
638530 private static DefaultFileDownload completedDownload () {
639531 return new DefaultFileDownload (CompletableFuture .completedFuture (CompletedFileDownload .builder ()
0 commit comments