Skip to content

Commit 82aad04

Browse files
committed
Executors should not be used for getStage calls
1 parent ef0019e commit 82aad04

File tree

2 files changed

+11
-21
lines changed

2 files changed

+11
-21
lines changed

src/main/java/net/jodah/failsafe/FailsafeExecutor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> run
200200
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
201201
*/
202202
public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extends CompletionStage<T>> supplier) {
203-
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future, executor), false);
203+
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future), false);
204204
}
205205

206206
/**
@@ -222,7 +222,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
222222
*/
223223
public <T extends R> CompletableFuture<T> getStageAsync(
224224
ContextualSupplier<T, ? extends CompletionStage<T>> supplier) {
225-
return callAsync(future -> getPromiseOfStage(supplier, future, executor), false);
225+
return callAsync(future -> getPromiseOfStage(supplier, future), false);
226226
}
227227

228228
/**
@@ -250,7 +250,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(
250250
*/
251251
public <T extends R> CompletableFuture<T> getStageAsyncExecution(
252252
AsyncSupplier<T, ? extends CompletionStage<T>> supplier) {
253-
return callAsync(future -> getPromiseOfStageExecution(supplier, future, executor), true);
253+
return callAsync(future -> getPromiseOfStageExecution(supplier, future), true);
254254
}
255255

256256
/**
@@ -408,6 +408,9 @@ public FailsafeExecutor<R> with(ExecutorService executorService) {
408408
* Configures the {@code executor} to use as a wrapper around executions. The {@code executor} is responsible for
409409
* propagating executions. Executions that normally return a result, such as {@link #get(CheckedSupplier)} will return
410410
* {@code null} since the {@link Executor} interface does not support results.
411+
* <p>The {@code executor} will not be used for {@link #getStageAsync(CheckedSupplier) getStageAsync} or {@link
412+
* #getStageAsyncExecution(AsyncSupplier) getStageAsyncExecution} calls since those require a returned result.
413+
* </p>
411414
*
412415
* @throws NullPointerException if {@code executor} is null
413416
*/

src/main/java/net/jodah/failsafe/Functions.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,18 @@ public synchronized CompletableFuture<ExecutionResult<R>> apply(AsyncExecutionIn
121121
* supplier}, records the result and returns a promise containing the result.
122122
*
123123
* @param <R> result type
124+
* @throws UnsupportedOperationException when using
124125
*/
125126
@SuppressWarnings("unchecked")
126127
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseOfStage(
127-
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future,
128-
Executor executor) {
128+
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future) {
129129

130130
Assert.notNull(supplier, "supplier");
131131
return execution -> {
132132
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
133133
try {
134134
execution.preExecute();
135-
CompletionStage<? extends R> stage = withExecutor(supplier, executor).get(execution);
135+
CompletionStage<? extends R> stage = supplier.get(execution);
136136

137137
// Propagate outer cancellations to the stage
138138
if (stage instanceof Future)
@@ -163,15 +163,15 @@ static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult
163163
*/
164164
@SuppressWarnings("unchecked")
165165
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseOfStageExecution(
166-
AsyncSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future, Executor executor) {
166+
AsyncSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future) {
167167

168168
Assert.notNull(supplier, "supplier");
169169
Semaphore asyncFutureLock = new Semaphore(1);
170170
return execution -> {
171171
try {
172172
execution.preExecute();
173173
asyncFutureLock.acquire();
174-
CompletionStage<? extends R> stage = withExecutor(supplier, executor).get(execution);
174+
CompletionStage<? extends R> stage = supplier.get(execution);
175175

176176
// Propagate outer cancellations to the stage
177177
if (stage instanceof Future)
@@ -305,19 +305,6 @@ static <R> AsyncRunnable<R> withExecutor(AsyncRunnable<R> runnable, Executor exe
305305
};
306306
}
307307

308-
static <R, T> AsyncSupplier<R, T> withExecutor(AsyncSupplier<R, T> supplier, Executor executor) {
309-
return executor == null ? supplier : exec -> {
310-
executor.execute(() -> {
311-
try {
312-
supplier.get(exec);
313-
} catch (Throwable e) {
314-
handleExecutorThrowablen(e);
315-
}
316-
});
317-
return null;
318-
};
319-
}
320-
321308
private static void handleExecutorThrowablen(Throwable e) {
322309
if (e instanceof RuntimeException)
323310
throw (RuntimeException) e;

0 commit comments

Comments
 (0)