Skip to content

Commit e8ac452

Browse files
committed
Propagate Future cancellations to getStageAsyncExecution supplied stages
Fixes #290
1 parent 5d00d46 commit e8ac452

File tree

4 files changed

+80
-28
lines changed

4 files changed

+80
-28
lines changed

src/main/java/net/jodah/failsafe/FailsafeExecutor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> run
146146
/**
147147
* Executes the {@code supplier} asynchronously until the resulting future is successfully completed or the configured
148148
* policies are exceeded.
149+
* <p>Cancelling the resulting {@link CompletableFuture} will automatically cancels the supplied {@link
150+
* CompletionStage} if it's a {@link Future}.</p>
149151
* <p>
150152
* If the execution fails because a {@link Timeout} is exceeded, the resulting future is completed exceptionally with
151153
* {@link TimeoutExceededException}.
@@ -165,6 +167,8 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
165167
/**
166168
* Executes the {@code supplier} asynchronously until the resulting future is successfully completed or the configured
167169
* policies are exceeded.
170+
* <p>Cancelling the resulting {@link CompletableFuture} will automatically cancels the supplied {@link
171+
* CompletionStage} if it's a {@link Future}.</p>
168172
* <p>
169173
* If the execution fails because a {@link Timeout} is exceeded, the resulting future is completed exceptionally with
170174
* {@link TimeoutExceededException}.
@@ -191,6 +195,8 @@ public <T extends R> CompletableFuture<T> getStageAsync(
191195
* completed. Any exception that is thrown from the {@code supplier} will automatically be recorded via {@code
192196
* AsyncExecution.recordFailure}.
193197
* </p>
198+
* <p>Cancelling the resulting {@link CompletableFuture} will automatically cancels the supplied {@link
199+
* CompletionStage} if it's a {@link Future}.</p>
194200
* <p>
195201
* If the execution fails because a {@link Timeout} is exceeded, the resulting future is completed exceptionally with
196202
* {@link TimeoutExceededException}.

src/main/java/net/jodah/failsafe/FailsafeFuture.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map.Entry;
2222
import java.util.concurrent.CancellationException;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Future;
2425
import java.util.function.BiConsumer;
2526

2627
/**
@@ -39,7 +40,7 @@ public class FailsafeFuture<R> extends CompletableFuture<R> {
3940
private AbstractExecution<R> newestExecution;
4041
Map<Integer, BiConsumer<Boolean, ExecutionResult>> cancelFunctions;
4142
// Whether a cancel with interrupt has already occurred
42-
boolean cancelledWithInterrupt;
43+
private boolean cancelledWithInterrupt;
4344

4445
FailsafeFuture(FailsafeExecutor<R> executor) {
4546
this.executor = executor;
@@ -135,4 +136,15 @@ synchronized void injectCancelFn(int policyIndex, BiConsumer<Boolean, ExecutionR
135136
cancelFunctions = new HashMap<>();
136137
cancelFunctions.put(policyIndex, cancelFn);
137138
}
139+
140+
/**
141+
* Propogates any previous cancellation to the {@code future}, either by cancelling it immediately or storing a cancel
142+
* function for later.
143+
*/
144+
synchronized void propagateCancellation(Future<R> future, int policyIndex) {
145+
if (isCancelled())
146+
future.cancel(cancelledWithInterrupt);
147+
else
148+
injectCancelFn(policyIndex, (mayInterrupt, cancelResult) -> future.cancel(mayInterrupt));
149+
}
138150
}

src/main/java/net/jodah/failsafe/Functions.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,8 @@ static <R> Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> getPr
132132
CompletionStage<? extends R> stage = supplier.get(execution);
133133

134134
// Propagate outer cancellations to the stage
135-
if (stage instanceof Future) {
136-
Future<R> future = (Future<R>) stage;
137-
synchronized (execution.future) {
138-
if (execution.future.isCancelled())
139-
future.cancel(execution.future.cancelledWithInterrupt);
140-
else
141-
execution.future.injectCancelFn(-1, (mayInterrupt, cancelResult) -> future.cancel(mayInterrupt));
142-
}
143-
}
135+
if (stage instanceof Future)
136+
execution.future.propagateCancellation((Future<R>) stage, -1);
144137

145138
stage.whenComplete((result, failure) -> {
146139
if (failure instanceof CompletionException)
@@ -165,6 +158,7 @@ static <R> Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> getPr
165158
*
166159
* @param <R> result type
167160
*/
161+
@SuppressWarnings("unchecked")
168162
static <R> Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> getPromiseOfStageExecution(
169163
AsyncSupplier<R, ? extends CompletionStage<? extends R>> supplier) {
170164

@@ -175,6 +169,11 @@ static <R> Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> getPr
175169
execution.preExecute();
176170
asyncFutureLock.acquire();
177171
CompletionStage<? extends R> stage = supplier.get(execution);
172+
173+
// Propagate outer cancellations to the stage
174+
if (stage instanceof Future)
175+
execution.future.propagateCancellation((Future<R>) stage, -1);
176+
178177
stage.whenComplete((innerResult, failure) -> {
179178
try {
180179
if (failure != null)

src/test/java/net/jodah/failsafe/functional/FailsafeFutureCancellationTest.java

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@
33
import net.jodah.concurrentunit.Waiter;
44
import net.jodah.failsafe.*;
55
import net.jodah.failsafe.event.ExecutionCompletedEvent;
6+
import net.jodah.failsafe.function.AsyncSupplier;
67
import net.jodah.failsafe.function.CheckedSupplier;
78
import org.testng.annotations.Test;
89

910
import java.time.Duration;
1011
import java.util.concurrent.*;
1112
import java.util.concurrent.atomic.AtomicReference;
13+
import java.util.function.BiConsumer;
1214

1315
import static org.testng.Assert.*;
1416

1517
/**
16-
* Tests behavior when a Failsafe Future is manually cancelled.
18+
* Tests behavior when a FailsafeFuture is manually cancelled.
1719
*/
1820
@Test
19-
public class ManualCancellationTest extends Testing {
21+
public class FailsafeFutureCancellationTest extends Testing {
2022
/**
2123
* Asserts that cancelling a FailsafeFuture causes both retry policies to stop.
2224
*/
@@ -55,29 +57,62 @@ public void testCancelWithNestedRetries() throws Throwable {
5557
/**
5658
* Asserts that FailsafeFuture cancellations are propagated to a CompletionStage.
5759
*/
58-
public void shouldPropagateCancellationToCompletionStage() throws Throwable {
60+
public void shouldPropagateCancellationToStage() throws Throwable {
5961
// Given
6062
Policy<String> retryPolicy = new RetryPolicy<>();
61-
Waiter cancelledWaiter = new Waiter();
62-
CheckedSupplier<CompletionStage<String>> computeSomething = () -> {
63-
CompletableFuture<String> future = new CompletableFuture<>();
64-
future.whenComplete((r, t) -> {
65-
if (t instanceof CancellationException)
66-
cancelledWaiter.resume();
67-
});
68-
return future;
63+
Waiter cancellationWaiter = new Waiter();
64+
BiConsumer<String, Throwable> waiterResumer = (r, t) -> {
65+
if (t instanceof CancellationException)
66+
cancellationWaiter.resume();
67+
};
68+
CheckedSupplier<CompletionStage<String>> doWork = () -> {
69+
CompletableFuture<String> promise = new CompletableFuture<>();
70+
promise.whenComplete(waiterResumer);
71+
72+
// Simulate asynchronous work
73+
runInThread(() -> Thread.sleep(1000));
74+
return promise;
6975
};
7076

7177
// When
72-
CompletableFuture<String> future = Failsafe.with(retryPolicy).getStageAsync(computeSomething);
73-
future.whenComplete((r, t) -> {
78+
CompletableFuture<String> failsafeFuture = Failsafe.with(retryPolicy).getStageAsync(doWork);
79+
failsafeFuture.whenComplete(waiterResumer);
80+
failsafeFuture.cancel(true);
81+
82+
// Then
83+
Asserts.assertThrows(failsafeFuture::get, CancellationException.class);
84+
// Wait for the promise and failsafeFuture to complete with cancellation
85+
cancellationWaiter.await(1000, 2);
86+
}
87+
88+
/**
89+
* Asserts that FailsafeFuture cancellations are propagated to a CompletionStage in an async integration execution.
90+
*/
91+
public void shouldPropagateCancellationToStageAsyncExecution() throws Throwable {
92+
// Given
93+
Policy<String> retryPolicy = new RetryPolicy<>();
94+
Waiter cancellationWaiter = new Waiter();
95+
BiConsumer<String, Throwable> waiterResumer = (r, t) -> {
7496
if (t instanceof CancellationException)
75-
cancelledWaiter.resume();
76-
});
77-
future.cancel(true);
97+
cancellationWaiter.resume();
98+
};
99+
AsyncSupplier<String, CompletionStage<String>> doWork = exec -> {
100+
CompletableFuture<String> promise = new CompletableFuture<>();
101+
promise.whenComplete(waiterResumer);
102+
103+
// Simulate asynchronous work
104+
runInThread(() -> Thread.sleep(1000));
105+
return promise;
106+
};
107+
108+
// When
109+
CompletableFuture<String> failsafeFuture = Failsafe.with(retryPolicy).getStageAsyncExecution(doWork);
110+
failsafeFuture.whenComplete(waiterResumer);
111+
failsafeFuture.cancel(true);
78112

79113
// Then
80-
Asserts.assertThrows(future::get, CancellationException.class);
81-
cancelledWaiter.await(1000, 2);
114+
Asserts.assertThrows(failsafeFuture::get, CancellationException.class);
115+
// Wait for the promise and failsafeFuture to complete with cancellation
116+
cancellationWaiter.await(1000, 2);
82117
}
83118
}

0 commit comments

Comments
 (0)