Skip to content

Commit 8fe9126

Browse files
committed
Added support for Executor with getStageAsync
1 parent 9c1e034 commit 8fe9126

File tree

3 files changed

+57
-17
lines changed

3 files changed

+57
-17
lines changed

src/main/java/dev/failsafe/FailsafeExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> run
210210
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
211211
*/
212212
public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extends CompletionStage<T>> supplier) {
213-
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future), false);
213+
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future, executor), false);
214214
}
215215

216216
/**
@@ -219,6 +219,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
219219
* <p>Cancelling the resulting {@link CompletableFuture} will automatically cancels the supplied {@link
220220
* CompletionStage} if it's a {@link Future}.</p>
221221
* <ul>
222+
* <li>If the {@code supplier} returns {@code null}, the execution attempt will record a {@code null} result.</li>
222223
* <li>If the execution fails because a {@link Timeout} is exceeded, the resulting future is completed exceptionally
223224
* with {@link TimeoutExceededException}.</li>
224225
* <li>If the execution fails because a {@link CircuitBreaker} is open, the resulting future is completed
@@ -232,7 +233,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
232233
*/
233234
public <T extends R> CompletableFuture<T> getStageAsync(
234235
ContextualSupplier<T, ? extends CompletionStage<T>> supplier) {
235-
return callAsync(future -> getPromiseOfStage(supplier, future), false);
236+
return callAsync(future -> getPromiseOfStage(supplier, future, executor), false);
236237
}
237238

238239
/**

src/main/java/dev/failsafe/Functions.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,26 +125,33 @@ public synchronized CompletableFuture<ExecutionResult<R>> apply(AsyncExecutionIn
125125
*/
126126
@SuppressWarnings("unchecked")
127127
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseOfStage(
128-
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future) {
128+
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future,
129+
Executor executor) {
129130

130131
Assert.notNull(supplier, "supplier");
131132
return execution -> {
132133
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
133134
try {
134135
execution.preExecute();
135-
CompletionStage<? extends R> stage = supplier.get(execution);
136-
137-
// Propagate outer cancellations to the stage
138-
if (stage instanceof Future)
139-
future.propagateCancellation((Future<R>) stage);
136+
CompletionStage<? extends R> stage = withExecutor(supplier, executor).get(execution);
140137

141-
stage.whenComplete((result, failure) -> {
142-
if (failure instanceof CompletionException)
143-
failure = failure.getCause();
144-
ExecutionResult<R> r = failure == null ? ExecutionResult.success(result) : ExecutionResult.failure(failure);
138+
if (stage == null) {
139+
ExecutionResult<R> r = ExecutionResult.success(null);
145140
execution.record(r);
146141
promise.complete(r);
147-
});
142+
} else {
143+
// Propagate outer cancellations to the stage
144+
if (stage instanceof Future)
145+
future.propagateCancellation((Future<R>) stage);
146+
147+
stage.whenComplete((result, failure) -> {
148+
if (failure instanceof CompletionException)
149+
failure = failure.getCause();
150+
ExecutionResult<R> r = failure == null ? ExecutionResult.success(result) : ExecutionResult.failure(failure);
151+
execution.record(r);
152+
promise.complete(r);
153+
});
154+
}
148155
} catch (Throwable t) {
149156
ExecutionResult<R> result = ExecutionResult.failure(t);
150157
execution.record(result);

src/test/java/dev/failsafe/issues/Issue311Test.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,57 @@
22

33
import dev.failsafe.Failsafe;
44
import dev.failsafe.RetryPolicy;
5+
import net.jodah.concurrentunit.Waiter;
56
import org.testng.annotations.Test;
67

8+
import java.util.concurrent.CompletableFuture;
79
import java.util.concurrent.Executor;
810
import java.util.concurrent.Executors;
911
import java.util.concurrent.atomic.AtomicInteger;
1012

11-
import static org.testng.Assert.assertEquals;
13+
import static org.testng.Assert.*;
1214

15+
@Test
1316
public class Issue311Test {
14-
@Test
15-
void failsafeFail() throws Throwable {
17+
public void failsafeFail() throws Throwable {
1618
AtomicInteger counter = new AtomicInteger(0);
1719
Executor executor = Executors.newSingleThreadExecutor();
1820
Failsafe.with(RetryPolicy.builder().handle(RuntimeException.class).withMaxAttempts(2).build())
1921
.with(executor)
2022
.runAsync(() -> {
2123
if (counter.incrementAndGet() == 1)
2224
throw new RuntimeException();
23-
}).get();
25+
})
26+
.get();
2427
assertEquals(counter.get(), 2);
2528
}
29+
30+
public void testNullCompletionStage() throws Throwable {
31+
assertNull(Failsafe.none().getStageAsync(() -> {
32+
return null;
33+
}).get());
34+
}
35+
36+
public void testRunAsyncWithThreadLocalInExecutor() throws Throwable {
37+
ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
38+
Executor executor = runnable -> {
39+
threadLocal.set(true);
40+
runnable.run();
41+
};
42+
Failsafe.none().with(executor).runAsync(() -> {
43+
assertTrue(threadLocal.get());
44+
}).get();
45+
}
46+
47+
public void testGetStageAsyncWithThreadLocalInExecutor() throws Throwable {
48+
ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
49+
Executor executor = runnable -> {
50+
threadLocal.set(true);
51+
runnable.run();
52+
};
53+
assertNull(Failsafe.none().with(executor).getStageAsync(() -> {
54+
assertTrue(threadLocal.get());
55+
return CompletableFuture.completedFuture("ignored");
56+
}).get());
57+
}
2658
}

0 commit comments

Comments
 (0)