2929import com .apple .foundationdb .record .logging .KeyValueLogMessage ;
3030import com .apple .foundationdb .record .logging .LogMessageKeys ;
3131import com .apple .foundationdb .record .provider .foundationdb .runners .ExponentialDelay ;
32+ import com .apple .foundationdb .record .provider .foundationdb .runners .FutureAutoClose ;
3233import com .apple .foundationdb .record .provider .foundationdb .runners .TransactionalRunner ;
3334import com .apple .foundationdb .record .provider .foundationdb .synchronizedsession .SynchronizedSessionRunner ;
3435import com .apple .foundationdb .record .util .Result ;
3839
3940import javax .annotation .Nonnull ;
4041import javax .annotation .Nullable ;
41- import java .util .ArrayList ;
4242import java .util .List ;
4343import java .util .Map ;
4444import java .util .UUID ;
@@ -57,6 +57,7 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
5757 @ Nonnull
5858 private final FDBDatabase database ;
5959 private final TransactionalRunner transactionalRunner ;
60+ private final FutureAutoClose futureManager ;
6061 @ Nonnull
6162 private FDBRecordContextConfig .Builder contextConfigBuilder ;
6263 @ Nonnull
@@ -67,22 +68,19 @@ public class FDBDatabaseRunnerImpl implements FDBDatabaseRunner {
6768 private long initialDelayMillis ;
6869
6970 private boolean closed ;
70- @ Nonnull
71- private final List <CompletableFuture <?>> futuresToCompleteExceptionally ;
7271
7372 @ API (API .Status .INTERNAL )
7473 FDBDatabaseRunnerImpl (@ Nonnull FDBDatabase database , FDBRecordContextConfig .Builder contextConfigBuilder ) {
7574 this .database = database ;
7675 this .contextConfigBuilder = contextConfigBuilder ;
7776 this .executor = database .newContextExecutor (contextConfigBuilder .getMdcContext ());
7877 this .transactionalRunner = new TransactionalRunner (database , contextConfigBuilder );
78+ this .futureManager = new FutureAutoClose ();
7979
8080 final FDBDatabaseFactory factory = database .getFactory ();
8181 this .maxAttempts = factory .getMaxAttempts ();
8282 this .maxDelayMillis = factory .getMaxDelayMillis ();
8383 this .initialDelayMillis = factory .getInitialDelayMillis ();
84-
85- futuresToCompleteExceptionally = new ArrayList <>();
8684 }
8785
8886 @ Override
@@ -225,7 +223,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
225223 if (getTimer () != null ) {
226224 future = getTimer ().instrument (FDBStoreTimer .Events .RETRY_DELAY , future , executor );
227225 }
228- addFutureToCompleteExceptionally (future );
226+ futureManager . registerFuture (future );
229227 return future .thenApply (vignore -> {
230228 currAttempt ++;
231229 return true ;
@@ -240,8 +238,7 @@ LogMessageKeys.MAX_ATTEMPTS, getMaxAttempts(),
240238 @ SuppressWarnings ("squid:S1181" )
241239 public CompletableFuture <T > runAsync (@ Nonnull final Function <? super FDBRecordContext , CompletableFuture <? extends T >> retriable ,
242240 @ Nonnull final BiFunction <? super T , Throwable , Result <? extends T , ? extends Throwable >> handlePostTransaction ) {
243- CompletableFuture <T > future = new CompletableFuture <>();
244- addFutureToCompleteExceptionally (future );
241+ CompletableFuture <T > future = futureManager .newFuture ();
245242 AsyncUtil .whileTrue (() -> {
246243 try {
247244 return transactionalRunner .runAsync (currAttempt != 0 , retriable )
@@ -314,13 +311,25 @@ public synchronized void close() {
314311 return ;
315312 }
316313 closed = true ;
317- if (!futuresToCompleteExceptionally .stream ().allMatch (CompletableFuture ::isDone )) {
318- final Exception exception = new RunnerClosed ();
319- for (CompletableFuture <?> future : futuresToCompleteExceptionally ) {
320- future .completeExceptionally (exception );
314+ // Ensure we call both close() methods, capturing all exceptions
315+ RuntimeException caught = null ;
316+ try {
317+ futureManager .close ();
318+ } catch (RuntimeException e ) {
319+ caught = e ;
320+ }
321+ try {
322+ transactionalRunner .close ();
323+ } catch (RuntimeException e ) {
324+ if (caught != null ) {
325+ caught .addSuppressed (e );
326+ } else {
327+ caught = e ;
321328 }
322329 }
323- transactionalRunner .close ();
330+ if (caught != null ) {
331+ throw caught ;
332+ }
324333 }
325334
326335 @ Override
@@ -337,15 +346,4 @@ public SynchronizedSessionRunner startSynchronizedSession(@Nonnull Subspace lock
337346 public SynchronizedSessionRunner joinSynchronizedSession (@ Nonnull Subspace lockSubspace , @ Nonnull UUID sessionId , long leaseLengthMillis ) {
338347 return SynchronizedSessionRunner .joinSession (lockSubspace , sessionId , leaseLengthMillis , this );
339348 }
340-
341- private synchronized void addFutureToCompleteExceptionally (@ Nonnull CompletableFuture <?> future ) {
342- if (closed ) {
343- final RunnerClosed exception = new RunnerClosed ();
344- future .completeExceptionally (exception );
345- throw exception ;
346- }
347- futuresToCompleteExceptionally .removeIf (CompletableFuture ::isDone );
348- futuresToCompleteExceptionally .add (future );
349- }
350-
351349}
0 commit comments