Skip to content

Commit 2e8040c

Browse files
committed
Allow interruption with Call cancellation, and add some tests
1 parent e2dd21f commit 2e8040c

File tree

12 files changed

+152
-41
lines changed

12 files changed

+152
-41
lines changed

core/src/main/java/dev/failsafe/Call.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,19 @@ public interface Call<R> {
3838
R execute();
3939

4040
/**
41-
* Cancels a synchronous execution by calling the most recent {@link ExecutionContext#onCancel(CheckedRunnable)
42-
* cancelCallback} that was registered during the execution and marking the execution as cancelled. The execution is
43-
* still allowed to complete and return a result. In addition to using a {@link ExecutionContext#onCancel(CheckedRunnable)
44-
* cancelCallback}, executions can cooperate with cancellation by checking {@link ExecutionContext#isCancelled()}.
41+
* Cancels a synchronous execution and calls the most recent {@link ExecutionContext#onCancel(CheckedRunnable)
42+
* cancelCallback} that was registered. The execution is still allowed to complete and return a result. In addition to
43+
* using a {@link ExecutionContext#onCancel(CheckedRunnable) cancelCallback}, executions can cooperate with
44+
* cancellation by checking {@link ExecutionContext#isCancelled()}.
45+
*
46+
* @param mayInterruptIfRunning whether the execution should be interrupted
47+
* @return whether cancellation was successful or not. Returns {@code false} if the execution was already cancelled or
48+
* completed.
49+
*/
50+
boolean cancel(boolean mayInterruptIfRunning);
51+
52+
/**
53+
* Returns whether the call has been cancelled.
4554
*/
46-
void cancel();
55+
boolean isCancelled();
4756
}

core/src/main/java/dev/failsafe/CallImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ public R execute() {
3434
}
3535

3636
@Override
37-
public void cancel() {
38-
execution.cancel();
37+
public boolean cancel(boolean mayInterruptIfRunning) {
38+
boolean result = execution.cancel();
39+
if (mayInterruptIfRunning)
40+
execution.interrupt();
41+
return result;
42+
}
43+
44+
@Override
45+
public boolean isCancelled() {
46+
return execution.isCancelled();
3947
}
4048
}

core/src/main/java/dev/failsafe/ExecutionImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,11 @@ synchronized ExecutionResult<R> postExecute(ExecutionResult<R> result) {
161161
return result;
162162
}
163163

164-
/** Called by users. */
164+
/** Called indorectly by users. */
165165
@Override
166-
public void cancel() {
167-
if (!isCancelled()) {
166+
public boolean cancel() {
167+
boolean cancelled = isCancelled();
168+
if (!cancelled) {
168169
cancelledIndex = Integer.MAX_VALUE;
169170
if (cancelCallback != null) {
170171
try {
@@ -173,6 +174,7 @@ public void cancel() {
173174
}
174175
}
175176
}
177+
return !cancelled && !completed;
176178
}
177179

178180
/** Called by policies. */

core/src/main/java/dev/failsafe/SyncExecutionImpl.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ final class SyncExecutionImpl<R> extends ExecutionImpl<R> implements SyncExecuti
3535

3636
// An optional Failsafe executor
3737
private final FailsafeExecutor<R> executor;
38-
// An optoinal Failsafe call
38+
// An optional Failsafe call
3939
private final CallImpl<R> call;
4040
// The outer-most function that executions begin with
4141
private Function<SyncExecutionInternal<R>, ExecutionResult<R>> outerFn;
42+
// The interruptable execution thread
43+
private final Thread executionThread;
4244
// Whether the execution is currently interruptable
4345
private volatile boolean interruptable;
4446
// Whether the execution has been internally interrupted
@@ -59,6 +61,7 @@ final class SyncExecutionImpl<R> extends ExecutionImpl<R> implements SyncExecuti
5961
executor = null;
6062
call = null;
6163
initial = this;
64+
executionThread = Thread.currentThread();
6265
preExecute();
6366
}
6467

@@ -71,6 +74,7 @@ final class SyncExecutionImpl<R> extends ExecutionImpl<R> implements SyncExecuti
7174
this.executor = executor;
7275
this.call = call;
7376
initial = this;
77+
executionThread = Thread.currentThread();
7478
if (call != null)
7579
call.setExecution(this);
7680

@@ -89,6 +93,7 @@ private SyncExecutionImpl(SyncExecutionImpl<R> execution) {
8993
interruptable = execution.interruptable;
9094
interrupted = execution.interrupted;
9195
initial = execution.initial;
96+
executionThread = execution.executionThread;
9297
if (call != null)
9398
call.setExecution(this);
9499
}
@@ -150,11 +155,6 @@ synchronized ExecutionResult<R> postExecute(ExecutionResult<R> result) {
150155
return result;
151156
}
152157

153-
@Override
154-
public boolean isInterruptable() {
155-
return interruptable;
156-
}
157-
158158
@Override
159159
public boolean isInterrupted() {
160160
return interrupted;
@@ -166,8 +166,14 @@ public void setInterruptable(boolean interruptable) {
166166
}
167167

168168
@Override
169-
public void setInterrupted(boolean interrupted) {
170-
this.interrupted = interrupted;
169+
public void interrupt() {
170+
// Guard against race with the execution completing
171+
synchronized (getInitial()) {
172+
if (interruptable) {
173+
interrupted = true;
174+
executionThread.interrupt();
175+
}
176+
}
171177
}
172178

173179
@Override

core/src/main/java/dev/failsafe/internal/TimeoutExecutor.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public Function<SyncExecutionInternal<R>, ExecutionResult<R>> apply(
6565
// Coordinates a result between the timeout and execution threads
6666
AtomicReference<ExecutionResult<R>> result = new AtomicReference<>();
6767
Future<?> timeoutFuture;
68-
Thread executionThread = Thread.currentThread();
6968

7069
try {
7170
// Schedule timeout check
@@ -76,15 +75,8 @@ public Function<SyncExecutionInternal<R>, ExecutionResult<R>> apply(
7675
// Cancel and interrupt
7776
execution.record(cancelResult);
7877
execution.cancel(this);
79-
if (config.canInterrupt()) {
80-
// Guard against race with the execution completing
81-
synchronized (execution.getInitial()) {
82-
if (execution.isInterruptable()) {
83-
execution.setInterrupted(true);
84-
executionThread.interrupt();
85-
}
86-
}
87-
}
78+
if (config.canInterrupt())
79+
execution.interrupt();
8880
}
8981
return null;
9082
}, config.getTimeout().toNanos(), TimeUnit.NANOSECONDS);

core/src/main/java/dev/failsafe/spi/ExecutionInternal.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ public interface ExecutionInternal<R> extends ExecutionContext<R> {
5353

5454
/**
5555
* Marks the execution as having been cancelled externally, which will cancel pending executions of all policies.
56+
*
57+
* @return whether cancellation was successful or not. Returns {@code false} if the execution was already cancelled or
58+
* completed.
5659
*/
57-
void cancel();
60+
boolean cancel();
5861

5962
/**
6063
* Marks the execution as having been cancelled by the {@code policyExecutor}, which will also cancel pending

core/src/main/java/dev/failsafe/spi/SyncExecutionInternal.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ public interface SyncExecutionInternal<R> extends ExecutionInternal<R>, Executio
3030
*/
3131
SyncExecutionInternal<R> getInitial();
3232

33-
/**
34-
* Returns whether the execution is currently interruptable.
35-
*/
36-
boolean isInterruptable();
37-
3833
/**
3934
* Returns whether the execution is currently interrupted.
4035
*/
@@ -46,9 +41,9 @@ public interface SyncExecutionInternal<R> extends ExecutionInternal<R>, Executio
4641
void setInterruptable(boolean interruptable);
4742

4843
/**
49-
* Sets whether the execution has been internally {@code interrupted}.
44+
* Interrupts the execution.
5045
*/
51-
void setInterrupted(boolean interrupted);
46+
void interrupt();
5247

5348
/**
5449
* Returns a new copy of the SyncExecutionInternal if it is not standalone, else returns {@code this} since standalone
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package dev.failsafe.functional;
2+
3+
import dev.failsafe.*;
4+
import dev.failsafe.function.CheckedConsumer;
5+
import dev.failsafe.function.ContextualRunnable;
6+
import dev.failsafe.testing.Testing;
7+
import net.jodah.concurrentunit.Waiter;
8+
import org.testng.annotations.BeforeMethod;
9+
import org.testng.annotations.Test;
10+
11+
import java.time.Duration;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
14+
import static org.testng.Assert.assertTrue;
15+
16+
@Test
17+
public class CallCancellationTest extends Testing {
18+
Waiter waiter;
19+
20+
@BeforeMethod
21+
void beforeMethod() {
22+
waiter = new Waiter();
23+
}
24+
25+
private void assertCancel(FailsafeExecutor<Void> executor, ContextualRunnable<Void> runnable,
26+
boolean testWithoutInterrupt) throws Throwable {
27+
CheckedConsumer<Boolean> test = interrupt -> {
28+
// Given
29+
Call<Void> call = executor.onComplete(e -> {
30+
waiter.assertNull(e.getResult());
31+
if (!interrupt)
32+
waiter.assertNull(e.getException());
33+
else
34+
waiter.assertTrue(e.getException() instanceof InterruptedException);
35+
waiter.resume();
36+
}).newCall(runnable);
37+
38+
// When
39+
runInThread(() -> {
40+
Testing.sleep(300);
41+
waiter.assertTrue(call.cancel(interrupt));
42+
});
43+
if (!interrupt)
44+
call.execute();
45+
else
46+
assertThrows(call::execute, FailsafeException.class, InterruptedException.class);
47+
48+
waiter.await(1000);
49+
50+
// Then
51+
assertTrue(call.isCancelled());
52+
};
53+
54+
// Test without interrupt
55+
if (testWithoutInterrupt)
56+
test.accept(false);
57+
58+
// Test with interrupt
59+
test.accept(true);
60+
}
61+
62+
public void shouldCancelRetriesWithBlockedExecution() throws Throwable {
63+
assertCancel(Failsafe.with(RetryPolicy.ofDefaults()), ctx -> {
64+
try {
65+
waiter.assertFalse(ctx.isCancelled());
66+
Thread.sleep(1000);
67+
} catch (InterruptedException e) {
68+
waiter.assertTrue(ctx.isCancelled());
69+
throw e;
70+
}
71+
}, true);
72+
}
73+
74+
public void shouldCancelRetriesWithPendingDelay() throws Throwable {
75+
RetryPolicy<Void> retryPolicy = RetryPolicy.<Void>builder().withDelay(Duration.ofMinutes(1)).build();
76+
assertCancel(Failsafe.with(retryPolicy), ctx -> {
77+
throw new IllegalStateException();
78+
}, false);
79+
}
80+
81+
public void shouldPropagateCancelToCallback() throws Throwable {
82+
AtomicBoolean callbackCalled = new AtomicBoolean();
83+
assertCancel(Failsafe.with(RetryPolicy.ofDefaults()), ctx -> {
84+
ctx.onCancel(() -> callbackCalled.set(true));
85+
86+
try {
87+
waiter.assertFalse(ctx.isCancelled());
88+
Thread.sleep(1000);
89+
} catch (InterruptedException e) {
90+
waiter.assertTrue(ctx.isCancelled());
91+
waiter.assertTrue(callbackCalled.get());
92+
throw e;
93+
}
94+
}, true);
95+
}
96+
}
97+

core/src/test/java/dev/failsafe/functional/FailsafeFutureCancellationTest.java renamed to core/src/test/java/dev/failsafe/functional/FutureCancellationTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import dev.failsafe.testing.Testing;
2121
import net.jodah.concurrentunit.Waiter;
2222
import dev.failsafe.event.ExecutionCompletedEvent;
23-
import dev.failsafe.function.ContextualSupplier;
2423
import org.testng.annotations.BeforeMethod;
2524
import org.testng.annotations.Test;
2625

@@ -34,7 +33,7 @@
3433
* Tests behavior when a FailsafeFuture is explicitly cancelled.
3534
*/
3635
@Test
37-
public class FailsafeFutureCancellationTest extends Testing {
36+
public class FutureCancellationTest extends Testing {
3837
Waiter waiter;
3938

4039
@BeforeMethod

core/src/test/java/dev/failsafe/functional/FailsafeFutureCompletionTest.java renamed to core/src/test/java/dev/failsafe/functional/FutureCompletionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Tests behavior when a FailsafeFuture is explicitly completed.
2929
*/
3030
@Test
31-
public class FailsafeFutureCompletionTest extends Testing {
31+
public class FutureCompletionTest extends Testing {
3232
/**
3333
* Asserts that an externally completed FailsafeFuture works as expected.
3434
*/

0 commit comments

Comments
 (0)