Skip to content

Commit 664042c

Browse files
committed
Added improvements and tests around alternative execution results
1 parent 61a8ac5 commit 664042c

File tree

7 files changed

+232
-17
lines changed

7 files changed

+232
-17
lines changed

src/main/java/net/jodah/failsafe/ExecutionResult.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package net.jodah.failsafe;
1717

1818
import java.util.Objects;
19+
import java.util.concurrent.CompletableFuture;
1920

2021
/**
2122
* The result of an execution. Immutable.
@@ -25,6 +26,8 @@
2526
* @author Jonathan Halterman
2627
*/
2728
public class ExecutionResult {
29+
static final CompletableFuture<ExecutionResult> NULL_FUTURE = CompletableFuture.completedFuture(null);
30+
2831
/** An execution that was completed with a non-result */
2932
static final ExecutionResult NONE = new ExecutionResult(null, null, true, 0, true, true, true);
3033

src/main/java/net/jodah/failsafe/FallbackExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, S
6666
protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
6767
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<Object> future) {
6868
return () -> supplier.get().thenCompose(result -> {
69+
if (result == null)
70+
return ExecutionResult.NULL_FUTURE;
71+
6972
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
7073
if (executionCancelled()) {
7174
promise.complete(result);

src/main/java/net/jodah/failsafe/Functions.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
* @author Jonathan Halterman
3030
*/
3131
final class Functions {
32-
private static final CompletableFuture<ExecutionResult> NULL_FUTURE = CompletableFuture.completedFuture(null);
33-
3432
interface SettableSupplier<T> extends Supplier<T> {
3533
void set(T value);
3634
}
@@ -136,7 +134,7 @@ public synchronized CompletableFuture<ExecutionResult> get() {
136134
}
137135

138136
// Result will be provided later via AsyncExecution.complete
139-
return NULL_FUTURE;
137+
return ExecutionResult.NULL_FUTURE;
140138
}
141139
};
142140
}
@@ -199,7 +197,7 @@ static <T> Supplier<CompletableFuture<ExecutionResult>> getPromiseOfStageExecuti
199197
}
200198

201199
// Result will be provided later via AsyncExecution.complete
202-
return NULL_FUTURE;
200+
return ExecutionResult.NULL_FUTURE;
203201
};
204202
}
205203

src/main/java/net/jodah/failsafe/PolicyExecutor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
public abstract class PolicyExecutor<P extends Policy> {
3232
protected final P policy;
3333
protected final AbstractExecution execution;
34-
/* Index of the policy relative to other policies in a composition, inner-most first */
35-
int policyIndex;
34+
/* Index of the policy relative to other policies in a composition, inner-most first */ int policyIndex;
3635

3736
protected PolicyExecutor(P policy, AbstractExecution execution) {
3837
this.policy = policy;
@@ -53,8 +52,11 @@ protected ExecutionResult preExecute() {
5352
protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, Scheduler scheduler) {
5453
return () -> {
5554
ExecutionResult result = preExecute();
56-
if (result != null)
55+
if (result != null) {
56+
// Still need to preExecute when returning an alternative result before making it to the terminal Supplier
57+
execution.preExecute();
5758
return result;
59+
}
5860

5961
return postExecute(supplier.get());
6062
};
@@ -64,6 +66,7 @@ protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, S
6466
* Performs synchronous post-execution handling for a {@code result}.
6567
*/
6668
protected ExecutionResult postExecute(ExecutionResult result) {
69+
execution.recordAttempt();
6770
if (isFailure(result)) {
6871
result = onFailure(result.withFailure());
6972
callFailureListener(result);
@@ -77,16 +80,23 @@ protected ExecutionResult postExecute(ExecutionResult result) {
7780
}
7881

7982
/**
80-
* Performs an async execution by calling pre-execute else calling the supplier and doing a post-execute.
83+
* Performs an async execution by calling pre-execute else calling the supplier and doing a post-execute. Implementors
84+
* must handle a null result from a supplier, which indicates that an async execution has occurred, a result will come
85+
* later, and postExecute handling should not be performed.
8186
*/
8287
protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
8388
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<Object> future) {
8489
return () -> {
8590
ExecutionResult result = preExecute();
86-
if (result != null)
91+
if (result != null) {
92+
// Still need to preExecute when returning an alternative result before making it to the terminal Supplier
93+
execution.preExecute();
8794
return CompletableFuture.completedFuture(result);
95+
}
8896

89-
return supplier.get().thenCompose(s -> postExecuteAsync(s, scheduler, future));
97+
return supplier.get().thenCompose(r -> {
98+
return r == null ? ExecutionResult.NULL_FUTURE : postExecuteAsync(r, scheduler, future);
99+
});
90100
};
91101
}
92102

src/test/java/net/jodah/failsafe/Testing.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
package net.jodah.failsafe;
1717

1818
import net.jodah.failsafe.event.ExecutionCompletedEvent;
19-
import net.jodah.failsafe.function.AsyncRunnable;
20-
import net.jodah.failsafe.function.CheckedConsumer;
21-
import net.jodah.failsafe.function.CheckedRunnable;
22-
import net.jodah.failsafe.function.CheckedSupplier;
19+
import net.jodah.failsafe.function.*;
2320
import net.jodah.failsafe.internal.CircuitBreakerInternals;
2421
import net.jodah.failsafe.internal.CircuitState;
2522

@@ -35,6 +32,8 @@
3532
import java.util.concurrent.atomic.AtomicReference;
3633
import java.util.function.Consumer;
3734

35+
import static org.testng.Assert.assertEquals;
36+
3837
/**
3938
* Utilities to to assist with testing.
4039
*/
@@ -185,6 +184,7 @@ public static <T> T unwrapExceptions(CheckedSupplier<T> supplier) {
185184
sneakyThrow(e.getCause() == null ? e : e.getCause());
186185
return null;
187186
} catch (RuntimeException | Error e) {
187+
e.printStackTrace();
188188
throw e;
189189
} catch (Throwable t) {
190190
throw new RuntimeException(t);
@@ -329,23 +329,56 @@ public static void testSyncAndAsync(FailsafeExecutor<?> failsafe, Runnable given
329329
@SafeVarargs
330330
public static void testAsyncExecution(FailsafeExecutor<?> failsafe, AsyncRunnable when,
331331
Consumer<ExecutionCompletedEvent<?>> then, Class<? extends Throwable>... expectedExceptions) {
332+
AsyncSupplier supplier = ex -> {
333+
when.run(ex);
334+
return null;
335+
};
336+
testAsyncExecution(failsafe, supplier, then, null, expectedExceptions);
337+
}
338+
339+
@SafeVarargs
340+
public static void testAsyncExecution(FailsafeExecutor<?> failsafe, AsyncRunnable when,
341+
Consumer<ExecutionCompletedEvent<?>> then, Object expectedResult,
342+
Class<? extends Throwable>... expectedExceptions) {
343+
AsyncSupplier supplier = ex -> {
344+
when.run(ex);
345+
return null;
346+
};
347+
testAsyncExecution(failsafe, supplier, then, expectedResult, expectedExceptions);
348+
}
349+
350+
@SafeVarargs
351+
public static <T> void testAsyncExecution(FailsafeExecutor<T> failsafe, AsyncSupplier<T> when,
352+
Consumer<ExecutionCompletedEvent<?>> then, Class<? extends Throwable>... expectedExceptions) {
353+
testAsyncExecution(failsafe, when, then, null, expectedExceptions);
354+
}
355+
356+
@SafeVarargs
357+
public static <T> void testAsyncExecution(FailsafeExecutor<T> failsafe, AsyncSupplier<T> when,
358+
Consumer<ExecutionCompletedEvent<?>> then, Object expectedResult,
359+
Class<? extends Throwable>... expectedExceptions) {
332360
AtomicReference<ExecutionCompletedEvent<?>> completedEventRef = new AtomicReference<>();
333361
CheckedConsumer<ExecutionCompletedEvent<?>> setCompletedEventFn = completedEventRef::set;
334362
Runnable postTestFn = () -> {
363+
ExecutionCompletedEvent<?> completedEvent = completedEventRef.get();
335364
if (expectedExceptions.length > 0)
336-
Asserts.assertMatches(completedEventRef.get().getFailure(), Arrays.asList(expectedExceptions));
365+
Asserts.assertMatches(completedEvent.getFailure(), Arrays.asList(expectedExceptions));
366+
else
367+
assertEquals(completedEvent.getResult(), expectedResult);
337368
then.accept(completedEventRef.get());
338369
};
339370

340371
// Async test
341372
System.out.println("\nRunning async execution test");
342373
if (expectedExceptions.length == 0) {
343-
Testing.unwrapExceptions(() -> failsafe.onComplete(setCompletedEventFn::accept).runAsyncExecution(when).get());
374+
Object result = Testing.unwrapExceptions(
375+
() -> failsafe.onComplete(setCompletedEventFn::accept).getAsyncExecution(when).get());
376+
assertEquals(result, expectedResult);
344377
} else {
345378
List<Class<? extends Throwable>> expected = new LinkedList<>();
346379
Collections.addAll(expected, expectedExceptions);
347380
expected.add(0, ExecutionException.class);
348-
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn::accept).runAsyncExecution(when).get(),
381+
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn::accept).getAsyncExecution(when).get(),
349382
expected);
350383
}
351384
postTestFn.run();
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package net.jodah.failsafe.functional;
17+
18+
import net.jodah.failsafe.*;
19+
import net.jodah.failsafe.Testing.Stats;
20+
import org.testng.annotations.Test;
21+
22+
import static net.jodah.failsafe.Testing.*;
23+
import static org.testng.Assert.assertEquals;
24+
25+
/**
26+
* Tests scenarios where a PolicyExecutor's preExecute provides an alternative result, which prevents the user's
27+
* Supplier from being called. This occurs when a CircuitBreaker is open.
28+
*/
29+
@Test
30+
public class AlternativeResultTest {
31+
public void testRejectedSyncAndAsync() {
32+
Stats rpStats = new Stats();
33+
Stats cbStats = new Stats();
34+
RetryPolicy<Object> rp = withStats(new RetryPolicy<>().withMaxAttempts(7), rpStats, true);
35+
CircuitBreaker<Object> cb = withStats(new CircuitBreaker<>().withFailureThreshold(3), cbStats, true);
36+
37+
testSyncAndAsync(Failsafe.with(rp, cb), () -> {
38+
rpStats.reset();
39+
cbStats.reset();
40+
cb.close();
41+
}, () -> {
42+
System.out.println("Executing");
43+
throw new Exception();
44+
}, e -> {
45+
assertEquals(e.getAttemptCount(), 7);
46+
assertEquals(e.getExecutionCount(), 3);
47+
assertEquals(rpStats.failedAttemptCount, 7);
48+
assertEquals(rpStats.retryCount, 6);
49+
assertEquals(cb.getExecutionCount(), 3);
50+
assertEquals(cb.getFailureCount(), 3);
51+
}, CircuitBreakerOpenException.class);
52+
}
53+
54+
public void testRejectedAsyncExecutionWithRetry() {
55+
Stats rpStats = new Stats();
56+
Stats cbStats = new Stats();
57+
RetryPolicy<Object> rp = withStats(new RetryPolicy<>().withMaxAttempts(7), rpStats, true);
58+
CircuitBreaker<Object> cb = withStats(new CircuitBreaker<>().withFailureThreshold(3), cbStats, true);
59+
60+
// Test with retryOn()
61+
testAsyncExecution(Failsafe.with(rp, cb), ex -> {
62+
runAsync(() -> {
63+
System.out.println("Executing");
64+
ex.retryOn(new IllegalStateException());
65+
});
66+
}, e -> {
67+
assertEquals(e.getAttemptCount(), 7);
68+
assertEquals(e.getExecutionCount(), 3);
69+
assertEquals(rpStats.failedAttemptCount, 7);
70+
assertEquals(rpStats.retryCount, 6);
71+
assertEquals(cb.getExecutionCount(), 3);
72+
assertEquals(cb.getFailureCount(), 3);
73+
}, CircuitBreakerOpenException.class);
74+
}
75+
76+
@Test(enabled = false)
77+
public void testRejectedAsyncExecutionWithCompleteAndRetry() {
78+
Stats rpStats = new Stats();
79+
Stats cbStats = new Stats();
80+
RetryPolicy<Object> rp = withStats(new RetryPolicy<>().withMaxAttempts(7), rpStats, true);
81+
CircuitBreaker<Object> cb = withStats(new CircuitBreaker<>().withFailureThreshold(3), cbStats, true);
82+
83+
// Test with complete() and retry()
84+
rpStats.reset();
85+
cbStats.reset();
86+
cb.close();
87+
testAsyncExecution(Failsafe.with(rp, cb), ex -> {
88+
runAsync(() -> {
89+
System.out.println("Executing");
90+
if (!ex.complete(null, new IllegalStateException())) {
91+
ex.retry();
92+
}
93+
});
94+
}, e -> {
95+
assertEquals(e.getAttemptCount(), 7);
96+
assertEquals(e.getExecutionCount(), 3);
97+
assertEquals(rpStats.failedAttemptCount, 7);
98+
assertEquals(rpStats.retryCount, 6);
99+
assertEquals(cb.getExecutionCount(), 3);
100+
assertEquals(cb.getFailureCount(), 3);
101+
}, CircuitBreakerOpenException.class);
102+
}
103+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package net.jodah.failsafe.functional;
17+
18+
import net.jodah.failsafe.*;
19+
import net.jodah.failsafe.Testing.Stats;
20+
import org.testng.annotations.Test;
21+
22+
import java.time.Duration;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.Consumer;
25+
26+
import static net.jodah.failsafe.Testing.*;
27+
import static org.testng.Assert.assertEquals;
28+
29+
/**
30+
* Tests AsyncExecution scenarios.
31+
*/
32+
@Test
33+
public class AsyncExecutionTest {
34+
public void testAsyncExecWithPassthroughPolicies() {
35+
Stats rpStats = new Stats();
36+
RetryPolicy<Object> rp = withStats(new RetryPolicy<>().withMaxRetries(3), rpStats, true);
37+
// Passthrough policy that should allow async execution results through
38+
Fallback<Object> fb = Fallback.<Object>of("test").handleIf((r, f) -> false);
39+
Timeout<Object> timeout = Timeout.of(Duration.ofMinutes(1));
40+
AtomicInteger counter = new AtomicInteger();
41+
42+
Consumer<FailsafeExecutor<Object>> test = failsafe -> testAsyncExecution(failsafe, ex -> {
43+
runAsync(() -> {
44+
System.out.println("Executing");
45+
if (counter.getAndIncrement() < 3)
46+
ex.retryOn(new IllegalStateException());
47+
else
48+
ex.complete("done");
49+
});
50+
}, e -> {
51+
assertEquals(e.getAttemptCount(), 4);
52+
assertEquals(e.getExecutionCount(), 4);
53+
assertEquals(rpStats.failedAttemptCount, 3);
54+
assertEquals(rpStats.retryCount, 3);
55+
}, "done");
56+
57+
// Test RetryPolicy, Fallback
58+
test.accept(Failsafe.with(rp, fb));
59+
60+
// Test RetryPolicy, Timeout
61+
rpStats.reset();
62+
counter.set(0);
63+
test.accept(Failsafe.with(rp, timeout));
64+
}
65+
}

0 commit comments

Comments
 (0)