Skip to content

Commit f329df3

Browse files
committed
Cancel inner futures returned by getStageAsync
Fixes #266
1 parent 9b70e2b commit f329df3

File tree

6 files changed

+79
-23
lines changed

6 files changed

+79
-23
lines changed

src/main/java/net/jodah/failsafe/AsyncExecution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void executeAsync(boolean asyncExecution) {
182182
if (!asyncExecution)
183183
outerExecutionSupplier.get().whenComplete(this::complete);
184184
else
185-
future.inject(scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS));
185+
future.injectPolicy(scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS));
186186
}
187187

188188
/**

src/main/java/net/jodah/failsafe/FailsafeFuture.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ public class FailsafeFuture<T> extends CompletableFuture<T> {
3434
private AbstractExecution execution;
3535

3636
// Mutable state, guarded by "this"
37-
private Future<T> dependency;
37+
private Future<T> policyExecFuture;
38+
private Future<?> dependentStageFuture;
3839
private Runnable cancelFn;
39-
private List<Future<T>> timeoutDependencies;
40+
private List<Future<T>> timeoutFutures;
4041

4142
FailsafeFuture(FailsafeExecutor<T> executor) {
4243
this.executor = executor;
@@ -96,11 +97,11 @@ synchronized boolean completeResult(ExecutionResult result) {
9697
}
9798

9899
synchronized Future<T> getDependency() {
99-
return dependency;
100+
return policyExecFuture;
100101
}
101102

102103
synchronized List<Future<T>> getTimeoutDelegates() {
103-
return timeoutDependencies;
104+
return timeoutFutures;
104105
}
105106

106107
/**
@@ -109,14 +110,16 @@ synchronized List<Future<T>> getTimeoutDelegates() {
109110
*/
110111
synchronized boolean cancelDependencies(boolean interruptDelegate, boolean result) {
111112
execution.interrupted = interruptDelegate;
112-
if (dependency != null)
113-
result = dependency.cancel(interruptDelegate);
113+
if (policyExecFuture != null)
114+
result = policyExecFuture.cancel(interruptDelegate);
115+
if (dependentStageFuture != null)
116+
dependentStageFuture.cancel(interruptDelegate);
114117
if (cancelFn != null)
115118
cancelFn.run();
116-
if (timeoutDependencies != null) {
117-
for (Future<T> timeoutDelegate : timeoutDependencies)
119+
if (timeoutFutures != null) {
120+
for (Future<T> timeoutDelegate : timeoutFutures)
118121
timeoutDelegate.cancel(false);
119-
timeoutDependencies.clear();
122+
timeoutFutures.clear();
120123
}
121124
return result;
122125
}
@@ -126,11 +129,18 @@ void inject(AbstractExecution execution) {
126129
}
127130

128131
/**
129-
* Injects a {@code dependency} to be cancelled when this future is cancelled.
132+
* Injects a {@code policyExecFuture} to be cancelled when this future is cancelled.
130133
*/
131134
@SuppressWarnings({ "unchecked", "rawtypes" })
132-
synchronized void inject(Future<?> dependency) {
133-
this.dependency = (Future) dependency;
135+
synchronized void injectPolicy(Future<?> policyExecFuture) {
136+
this.policyExecFuture = (Future) policyExecFuture;
137+
}
138+
139+
/**
140+
* Injects a {@code dependentStageFuture} to be cancelled when this future is cancelled.
141+
*/
142+
synchronized void injectStage(Future<?> dependentStageFuture) {
143+
this.dependentStageFuture = dependentStageFuture;
134144
}
135145

136146
/**
@@ -141,12 +151,12 @@ synchronized void injectCancelFn(Runnable cancelFn) {
141151
}
142152

143153
/**
144-
* Injects a {@code timeoutDependency} to be cancelled when this future is cancelled.
154+
* Injects a {@code scheduledTimeoutExec} to be cancelled when this future is cancelled.
145155
*/
146156
@SuppressWarnings({ "unchecked", "rawtypes" })
147-
synchronized void injectTimeout(Future<?> timeoutDependency) {
148-
if (timeoutDependencies == null)
149-
timeoutDependencies = new ArrayList<>(3);
150-
timeoutDependencies.add((Future) timeoutDependency);
157+
synchronized void injectTimeout(Future<?> timeoutFuture) {
158+
if (timeoutFutures == null)
159+
timeoutFutures = new ArrayList<>(3);
160+
timeoutFutures.add((Future) timeoutFuture);
151161
}
152162
}

src/main/java/net/jodah/failsafe/FallbackExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
9696
if (!policy.isAsync())
9797
callable.call();
9898
else
99-
future.inject(scheduler.schedule(callable, result.getWaitNanos(), TimeUnit.NANOSECONDS));
99+
future.injectPolicy(scheduler.schedule(callable, result.getWaitNanos(), TimeUnit.NANOSECONDS));
100100
} catch (Throwable t) {
101101
promise.completeExceptionally(t);
102102
}

src/main/java/net/jodah/failsafe/Functions.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ static Supplier<CompletableFuture<ExecutionResult>> getPromiseAsync(
106106

107107
try {
108108
scheduled.set(true);
109-
future.inject(scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS));
109+
future.injectPolicy(scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS));
110110
} catch (Throwable t) {
111111
promise.completeExceptionally(t);
112112
}
@@ -143,14 +143,18 @@ public synchronized CompletableFuture<ExecutionResult> get() {
143143
* Returns a Supplier that pre-executes the {@code execution}, applies the {@code supplier}, records the result and
144144
* returns a promise containing the result.
145145
*/
146+
@SuppressWarnings("rawtypes")
146147
static <T> Supplier<CompletableFuture<ExecutionResult>> getPromiseOfStage(
147-
ContextualSupplier<? extends CompletionStage<? extends T>> supplier, AbstractExecution execution) {
148+
ContextualSupplier<? extends CompletionStage<? extends T>> supplier, AsyncExecution execution) {
148149
Assert.notNull(supplier, "supplier");
149150
return () -> {
150151
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
151152
try {
152153
execution.preExecute();
153-
supplier.get(execution).whenComplete((result, failure) -> {
154+
CompletionStage<? extends T> stage = supplier.get(execution);
155+
if (stage instanceof Future)
156+
execution.future.injectPolicy((Future) stage);
157+
stage.whenComplete((result, failure) -> {
154158
if (failure instanceof CompletionException)
155159
failure = failure.getCause();
156160
ExecutionResult r = failure == null ? ExecutionResult.success(result) : ExecutionResult.failure(failure);

src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ else if (postResult != null) {
133133
retryScheduledListener.handle(postResult, execution);
134134

135135
previousResult = postResult;
136-
future.inject(scheduler.schedule(this, postResult.getWaitNanos(), TimeUnit.NANOSECONDS));
136+
future.injectPolicy(scheduler.schedule(this, postResult.getWaitNanos(), TimeUnit.NANOSECONDS));
137137
future.injectCancelFn(() -> {
138138
// Ensure that the promise completes if a scheduled retry is cancelled
139139
if (executionCancelled())
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.jodah.failsafe.issues;
2+
3+
import net.jodah.failsafe.Asserts;
4+
import net.jodah.failsafe.Failsafe;
5+
import net.jodah.failsafe.Policy;
6+
import net.jodah.failsafe.RetryPolicy;
7+
import org.testng.annotations.Test;
8+
9+
import java.util.concurrent.CancellationException;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.CompletionStage;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
import static org.testng.Assert.assertEquals;
15+
16+
@Test
17+
public class Issue266Test {
18+
AtomicInteger cancelledCounter = new AtomicInteger();
19+
20+
public void test() {
21+
Policy<String> retryPolicy = new RetryPolicy<>();
22+
CompletableFuture<String> future = Failsafe.with(retryPolicy).getStageAsync(this::computeSomething);
23+
future.whenComplete((r, t) -> {
24+
if (t instanceof CancellationException) {
25+
cancelledCounter.incrementAndGet();
26+
}
27+
});
28+
future.cancel(true);
29+
Asserts.assertThrows(future::get, CancellationException.class);
30+
assertEquals(cancelledCounter.get(), 2);
31+
}
32+
33+
CompletionStage<String> computeSomething() {
34+
CompletableFuture<String> future = new CompletableFuture<>();
35+
future.whenComplete((r, t) -> {
36+
if (t instanceof CancellationException) {
37+
cancelledCounter.incrementAndGet();
38+
}
39+
});
40+
return future;
41+
}
42+
}

0 commit comments

Comments
 (0)