2020import java .util .concurrent .CancellationException ;
2121import java .util .concurrent .CompletableFuture ;
2222import java .util .concurrent .Future ;
23+ import java .util .function .BiConsumer ;
2324
2425/**
2526 * A CompletableFuture implementation that propagates cancellations and calls completion handlers.
@@ -34,9 +35,8 @@ public class FailsafeFuture<T> extends CompletableFuture<T> {
3435 private AbstractExecution execution ;
3536
3637 // Mutable state, guarded by "this"
37- private Future <T > policyExecFuture ;
3838 private Future <?> dependentStageFuture ;
39- private Runnable cancelFn ;
39+ private BiConsumer < Boolean , ExecutionResult > cancelFn ;
4040 private List <Future <T >> timeoutFutures ;
4141 private boolean cancelWithInterrupt ;
4242
@@ -72,7 +72,7 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
7272 this .cancelWithInterrupt = mayInterruptIfRunning ;
7373 execution .cancelledIndex = Integer .MAX_VALUE ;
7474 boolean cancelResult = super .cancel (mayInterruptIfRunning );
75- cancelResult = cancelDependencies (mayInterruptIfRunning , cancelResult );
75+ cancelDependencies (mayInterruptIfRunning , null );
7676 ExecutionResult result = ExecutionResult .failure (new CancellationException ());
7777 super .completeExceptionally (result .getFailure ());
7878 executor .handleComplete (result , execution );
@@ -98,46 +98,31 @@ synchronized boolean completeResult(ExecutionResult result) {
9898 return completed ;
9999 }
100100
101- synchronized Future <T > getDependency () {
102- return policyExecFuture ;
103- }
104-
105101 synchronized List <Future <T >> getTimeoutDelegates () {
106102 return timeoutFutures ;
107103 }
108104
109105 /**
110- * Cancels the dependency passing in the {@code interruptDelegate } flag, applies the retry cancel fn, and cancels all
106+ * Cancels the dependency passing in the {@code mayInterrupt } flag, applies the retry cancel fn, and cancels all
111107 * timeout dependencies.
112108 */
113- synchronized boolean cancelDependencies (boolean interruptDelegate , boolean result ) {
114- execution .interrupted = interruptDelegate ;
115- if (policyExecFuture != null )
116- result = policyExecFuture .cancel (interruptDelegate );
109+ synchronized void cancelDependencies (boolean mayInterrupt , ExecutionResult cancelResult ) {
110+ execution .interrupted = mayInterrupt ;
117111 if (dependentStageFuture != null )
118- dependentStageFuture .cancel (interruptDelegate );
119- if (cancelFn != null )
120- cancelFn .run ();
112+ dependentStageFuture .cancel (mayInterrupt );
121113 if (timeoutFutures != null ) {
122114 for (Future <T > timeoutDelegate : timeoutFutures )
123115 timeoutDelegate .cancel (false );
124116 timeoutFutures .clear ();
125117 }
126- return result ;
118+ if (cancelFn != null )
119+ cancelFn .accept (mayInterrupt , cancelResult );
127120 }
128121
129122 void inject (AbstractExecution execution ) {
130123 this .execution = execution ;
131124 }
132125
133- /**
134- * Injects a {@code policyExecFuture} to be cancelled when this future is cancelled.
135- */
136- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
137- synchronized void injectPolicy (Future <?> policyExecFuture ) {
138- this .policyExecFuture = (Future ) policyExecFuture ;
139- }
140-
141126 /**
142127 * Injects a {@code dependentStageFuture} to be cancelled when this future is cancelled.
143128 */
@@ -152,7 +137,7 @@ synchronized void injectStage(Future<?> dependentStageFuture) {
152137 /**
153138 * Injects a {@code cancelFn} to be called when this future is cancelled.
154139 */
155- synchronized void injectCancelFn (Runnable cancelFn ) {
140+ synchronized void injectCancelFn (BiConsumer < Boolean , ExecutionResult > cancelFn ) {
156141 this .cancelFn = cancelFn ;
157142 }
158143
0 commit comments