Skip to content

Commit c1f514f

Browse files
committed
Use Executor for sync executions
Related #221
1 parent 9b7c82e commit c1f514f

File tree

6 files changed

+212
-94
lines changed

6 files changed

+212
-94
lines changed

src/main/java/net/jodah/failsafe/FailsafeExecutor.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
*/
4444
public class FailsafeExecutor<R> extends PolicyListeners<FailsafeExecutor<R>, R> {
4545
private Scheduler scheduler = Scheduler.DEFAULT;
46+
private Executor executor;
4647
/** Policies sorted outer-most first */
4748
final List<? extends Policy<R>> policies;
4849
private EventHandler<R> completeHandler;
@@ -134,7 +135,7 @@ public <T extends R> T get(ContextualSupplier<T, T> supplier) {
134135
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
135136
*/
136137
public <T extends R> CompletableFuture<T> getAsync(CheckedSupplier<T> supplier) {
137-
return callAsync(future -> getPromise(toCtxSupplier(supplier)), false);
138+
return callAsync(future -> getPromise(toCtxSupplier(supplier), executor), false);
138139
}
139140

140141
/**
@@ -153,7 +154,7 @@ public <T extends R> CompletableFuture<T> getAsync(CheckedSupplier<T> supplier)
153154
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
154155
*/
155156
public <T extends R> CompletableFuture<T> getAsync(ContextualSupplier<T, T> supplier) {
156-
return callAsync(future -> getPromise(supplier), false);
157+
return callAsync(future -> getPromise(supplier, executor), false);
157158
}
158159

159160
/**
@@ -178,7 +179,7 @@ public <T extends R> CompletableFuture<T> getAsync(ContextualSupplier<T, T> supp
178179
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
179180
*/
180181
public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> runnable) {
181-
return callAsync(future -> getPromiseExecution(runnable), true);
182+
return callAsync(future -> getPromiseExecution(runnable, executor), true);
182183
}
183184

184185
/**
@@ -199,7 +200,7 @@ public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> run
199200
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
200201
*/
201202
public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extends CompletionStage<T>> supplier) {
202-
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future), false);
203+
return callAsync(future -> getPromiseOfStage(toCtxSupplier(supplier), future, executor), false);
203204
}
204205

205206
/**
@@ -221,7 +222,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
221222
*/
222223
public <T extends R> CompletableFuture<T> getStageAsync(
223224
ContextualSupplier<T, ? extends CompletionStage<T>> supplier) {
224-
return callAsync(future -> getPromiseOfStage(supplier, future), false);
225+
return callAsync(future -> getPromiseOfStage(supplier, future, executor), false);
225226
}
226227

227228
/**
@@ -249,7 +250,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(
249250
*/
250251
public <T extends R> CompletableFuture<T> getStageAsyncExecution(
251252
AsyncSupplier<T, ? extends CompletionStage<T>> supplier) {
252-
return callAsync(future -> getPromiseOfStageExecution(supplier, future), true);
253+
return callAsync(future -> getPromiseOfStageExecution(supplier, future, executor), true);
253254
}
254255

255256
/**
@@ -293,7 +294,7 @@ public void run(ContextualRunnable<Void> runnable) {
293294
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
294295
*/
295296
public CompletableFuture<Void> runAsync(CheckedRunnable runnable) {
296-
return callAsync(future -> getPromise(toCtxSupplier(runnable)), false);
297+
return callAsync(future -> getPromise(toCtxSupplier(runnable), executor), false);
297298
}
298299

299300
/**
@@ -311,7 +312,7 @@ public CompletableFuture<Void> runAsync(CheckedRunnable runnable) {
311312
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
312313
*/
313314
public CompletableFuture<Void> runAsync(ContextualRunnable<Void> runnable) {
314-
return callAsync(future -> getPromise(toCtxSupplier(runnable)), false);
315+
return callAsync(future -> getPromise(toCtxSupplier(runnable), executor), false);
315316
}
316317

317318
/**
@@ -336,7 +337,7 @@ public CompletableFuture<Void> runAsync(ContextualRunnable<Void> runnable) {
336337
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
337338
*/
338339
public CompletableFuture<Void> runAsyncExecution(AsyncRunnable<Void> runnable) {
339-
return callAsync(future -> getPromiseExecution(runnable), true);
340+
return callAsync(future -> getPromiseExecution(runnable, executor), true);
340341
}
341342

342343
/**
@@ -372,33 +373,46 @@ public FailsafeExecutor<R> onSuccess(CheckedConsumer<ExecutionCompletedEvent<R>>
372373
}
373374

374375
/**
375-
* Configures the {@code executor} to use for performing asynchronous executions and listener callbacks.
376+
* Configures the {@code scheduledExecutorService} to use for performing asynchronous executions and listener
377+
* callbacks.
376378
* <p>
377-
* Note: The {@code executor} should have a core pool size of at least 2 in order for {@link Timeout timeouts} to
378-
* work.
379+
* Note: The {@code scheduledExecutorService} should have a core pool size of at least 2 in order for {@link Timeout
380+
* timeouts} to work.
379381
* </p>
380382
*
381-
* @throws NullPointerException if {@code executor} is null
382-
* @throws IllegalArgumentException if the {@code executor} has a core pool size of less than 2
383+
* @throws NullPointerException if {@code scheduledExecutorService} is null
384+
* @throws IllegalArgumentException if the {@code scheduledExecutorService} has a core pool size of less than 2
383385
*/
384-
public FailsafeExecutor<R> with(ScheduledExecutorService executor) {
385-
this.scheduler = Scheduler.of(executor);
386+
public FailsafeExecutor<R> with(ScheduledExecutorService scheduledExecutorService) {
387+
this.scheduler = Scheduler.of(Assert.notNull(scheduledExecutorService, "scheduledExecutorService"));
386388
return this;
387389
}
388390

389391
/**
390-
* Configures the {@code executor} to use for performing asynchronous executions and listener callbacks. For
391-
* executions that require a delay, an internal ScheduledExecutorService will be used for the delay, then the {@code
392-
* executor} will be used for actual execution.
392+
* Configures the {@code executorService} to use for performing asynchronous executions and listener callbacks. For
393+
* async executions that require a delay, an internal ScheduledExecutorService will be used for the delay, then the
394+
* {@code executorService} will be used for actual execution.
393395
* <p>
394-
* Note: The {@code executor} should have a core pool size or parallelism of at least 2 in order for {@link Timeout
395-
* timeouts} to work.
396+
* Note: The {@code executorService} should have a core pool size or parallelism of at least 2 in order for {@link
397+
* Timeout timeouts} to work.
396398
* </p>
397399
*
400+
* @throws NullPointerException if {@code executorService} is null
401+
*/
402+
public FailsafeExecutor<R> with(ExecutorService executorService) {
403+
this.scheduler = Scheduler.of(Assert.notNull(executorService, "executorService"));
404+
return this;
405+
}
406+
407+
/**
408+
* Configures the {@code executor} to use as a wrapper around executions. The {@code executor} is responsible for
409+
* propagating executions. Executions that normally return a result, such as {@link #get(CheckedSupplier)} will return
410+
* {@code null} since the {@link Executor} interface does not support results.
411+
*
398412
* @throws NullPointerException if {@code executor} is null
399413
*/
400414
public FailsafeExecutor<R> with(Executor executor) {
401-
this.scheduler = Scheduler.of(executor);
415+
this.executor = Assert.notNull(executor, "executor");
402416
return this;
403417
}
404418

@@ -422,7 +436,7 @@ public FailsafeExecutor<R> with(Scheduler scheduler) {
422436
*/
423437
@SuppressWarnings({ "unchecked", "rawtypes" })
424438
private <T> T call(ContextualSupplier<T, T> innerSupplier) {
425-
SyncExecutionImpl<T> execution = new SyncExecutionImpl(this, scheduler, Functions.get(innerSupplier));
439+
SyncExecutionImpl<T> execution = new SyncExecutionImpl(this, scheduler, Functions.get(innerSupplier, executor));
426440
ExecutionResult<T> result = execution.executeSync();
427441
Throwable failure = result.getFailure();
428442
if (failure != null) {

src/main/java/net/jodah/failsafe/Functions.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ final class Functions {
3535
*
3636
* @param <R> result type
3737
*/
38-
static <R> Function<SyncExecutionInternal<R>, ExecutionResult<R>> get(ContextualSupplier<R, R> supplier) {
38+
static <R> Function<SyncExecutionInternal<R>, ExecutionResult<R>> get(ContextualSupplier<R, R> supplier,
39+
Executor executor) {
40+
3941
return execution -> {
4042
ExecutionResult<R> result;
4143
Throwable throwable = null;
4244
try {
4345
execution.preExecute();
44-
result = ExecutionResult.success(supplier.get(execution));
46+
result = ExecutionResult.success(withExecutor(supplier, executor).get(execution));
4547
} catch (Throwable t) {
4648
throwable = t;
4749
result = ExecutionResult.failure(t);
@@ -71,14 +73,14 @@ static <R> Function<SyncExecutionInternal<R>, ExecutionResult<R>> get(Contextual
7173
* @param <R> result type
7274
*/
7375
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromise(
74-
ContextualSupplier<R, R> supplier) {
76+
ContextualSupplier<R, R> supplier, Executor executor) {
7577

7678
Assert.notNull(supplier, "supplier");
7779
return execution -> {
7880
ExecutionResult<R> result;
7981
try {
8082
execution.preExecute();
81-
result = ExecutionResult.success(supplier.get(execution));
83+
result = ExecutionResult.success(withExecutor(supplier, executor).get(execution));
8284
} catch (Throwable t) {
8385
result = ExecutionResult.failure(t);
8486
}
@@ -95,15 +97,15 @@ static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult
9597
* @param <R> result type
9698
*/
9799
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseExecution(
98-
AsyncRunnable<R> runnable) {
100+
AsyncRunnable<R> runnable, Executor executor) {
99101

100102
Assert.notNull(runnable, "runnable");
101103
return new Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>>() {
102104
@Override
103105
public synchronized CompletableFuture<ExecutionResult<R>> apply(AsyncExecutionInternal<R> execution) {
104106
try {
105107
execution.preExecute();
106-
runnable.run(execution);
108+
withExecutor(runnable, executor).run(execution);
107109
} catch (Throwable e) {
108110
execution.record(null, e);
109111
}
@@ -122,14 +124,15 @@ public synchronized CompletableFuture<ExecutionResult<R>> apply(AsyncExecutionIn
122124
*/
123125
@SuppressWarnings("unchecked")
124126
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseOfStage(
125-
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future) {
127+
ContextualSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future,
128+
Executor executor) {
126129

127130
Assert.notNull(supplier, "supplier");
128131
return execution -> {
129132
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
130133
try {
131134
execution.preExecute();
132-
CompletionStage<? extends R> stage = supplier.get(execution);
135+
CompletionStage<? extends R> stage = withExecutor(supplier, executor).get(execution);
133136

134137
// Propagate outer cancellations to the stage
135138
if (stage instanceof Future)
@@ -160,15 +163,15 @@ static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult
160163
*/
161164
@SuppressWarnings("unchecked")
162165
static <R> Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>> getPromiseOfStageExecution(
163-
AsyncSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future) {
166+
AsyncSupplier<R, ? extends CompletionStage<? extends R>> supplier, FailsafeFuture<R> future, Executor executor) {
164167

165168
Assert.notNull(supplier, "supplier");
166169
Semaphore asyncFutureLock = new Semaphore(1);
167170
return execution -> {
168171
try {
169172
execution.preExecute();
170173
asyncFutureLock.acquire();
171-
CompletionStage<? extends R> stage = supplier.get(execution);
174+
CompletionStage<? extends R> stage = withExecutor(supplier, executor).get(execution);
172175

173176
// Propagate outer cancellations to the stage
174177
if (stage instanceof Future)
@@ -277,6 +280,52 @@ static <R, T> ContextualSupplier<R, T> toCtxSupplier(CheckedSupplier<T> supplier
277280
return ctx -> supplier.get();
278281
}
279282

283+
static <R, T> ContextualSupplier<R, T> withExecutor(ContextualSupplier<R, T> supplier, Executor executor) {
284+
return executor == null ? supplier : ctx -> {
285+
executor.execute(() -> {
286+
try {
287+
supplier.get(ctx);
288+
} catch (Throwable e) {
289+
handleExecutorThrowablen(e);
290+
}
291+
});
292+
return null;
293+
};
294+
}
295+
296+
static <R> AsyncRunnable<R> withExecutor(AsyncRunnable<R> runnable, Executor executor) {
297+
return executor == null ? runnable : exec -> {
298+
executor.execute(() -> {
299+
try {
300+
runnable.run(exec);
301+
} catch (Throwable e) {
302+
handleExecutorThrowablen(e);
303+
}
304+
});
305+
};
306+
}
307+
308+
static <R, T> AsyncSupplier<R, T> withExecutor(AsyncSupplier<R, T> supplier, Executor executor) {
309+
return executor == null ? supplier : exec -> {
310+
executor.execute(() -> {
311+
try {
312+
supplier.get(exec);
313+
} catch (Throwable e) {
314+
handleExecutorThrowablen(e);
315+
}
316+
});
317+
return null;
318+
};
319+
}
320+
321+
private static void handleExecutorThrowablen(Throwable e) {
322+
if (e instanceof RuntimeException)
323+
throw (RuntimeException) e;
324+
if (e instanceof Error)
325+
throw (Error) e;
326+
throw new FailsafeException(e);
327+
}
328+
280329
static <T, R> CheckedFunction<T, R> toFn(CheckedConsumer<T> consumer) {
281330
return t -> {
282331
consumer.accept(t);

src/main/java/net/jodah/failsafe/internal/util/DelegatingExecutorService.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

src/main/java/net/jodah/failsafe/spi/Scheduler.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package net.jodah.failsafe.spi;
1717

18-
import net.jodah.failsafe.internal.util.Assert;
19-
import net.jodah.failsafe.internal.util.DelegatingExecutorService;
2018
import net.jodah.failsafe.internal.util.DelegatingScheduler;
2119

2220
import java.util.concurrent.*;
@@ -40,25 +38,16 @@ public interface Scheduler {
4038
ScheduledFuture<?> schedule(Callable<?> callable, long delay, TimeUnit unit);
4139

4240
/**
43-
* Returns a Scheduler adapted from the {@code executor}.
44-
*
45-
* @throws NullPointerException if {@code executor} is null
41+
* Returns a Scheduler adapted from the {@code scheduledExecutorService}.
4642
*/
47-
static Scheduler of(final ScheduledExecutorService executor) {
48-
Assert.notNull(executor, "executor");
49-
return executor::schedule;
43+
static Scheduler of(ScheduledExecutorService scheduledExecutorService) {
44+
return scheduledExecutorService::schedule;
5045
}
5146

5247
/**
53-
* Returns a Scheduler adapted from the {@code executor}.
54-
*
55-
* @throws NullPointerException if {@code executor} is null
48+
* Returns a Scheduler adapted from the {@code executorService}.
5649
*/
57-
static Scheduler of(final Executor executor) {
58-
Assert.notNull(executor, "executor");
59-
ExecutorService executorService = executor instanceof ExecutorService ?
60-
(ExecutorService) executor :
61-
new DelegatingExecutorService(executor);
50+
static Scheduler of(ExecutorService executorService) {
6251
return new DelegatingScheduler(executorService);
6352
}
6453
}

src/test/java/net/jodah/failsafe/AsyncExecutionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
@Test
3232
public class AsyncExecutionTest extends Testing {
3333
Function<AsyncExecutionInternal<Object>, CompletableFuture<ExecutionResult<Object>>> innerFn = Functions.getPromise(
34-
ctx -> null);
34+
ctx -> null, null);
3535
ConnectException e = new ConnectException();
3636
AsyncExecutionInternal<Object> exec;
3737
FailsafeFuture<Object> future;

0 commit comments

Comments
 (0)