Skip to content

Commit 9b7c82e

Browse files
committed
Add support for explicit policy composition.
Fixes #254
1 parent 1142f95 commit 9b7c82e

File tree

8 files changed

+112
-60
lines changed

8 files changed

+112
-60
lines changed

src/main/java/net/jodah/failsafe/Failsafe.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ public class Failsafe {
3838
* Failsafe.with(fallback, retryPolicy, circuitBreaker).get(supplier);
3939
* </pre>
4040
* </p>
41-
* This results in the following internal composition when executing a {@code runnable} or {@code supplier} and
41+
* <p>
42+
* This is equivalent to composition using the the {@link FailsafeExecutor#compose(Policy) compose} method:
43+
* <pre>
44+
* Failsafe.with(fallback).compose(retryPolicy).compose(circuitBreaker).get(supplier);
45+
* </pre>
46+
* </p>
47+
* These result in the following internal composition when executing a {@code runnable} or {@code supplier} and
4248
* handling its result:
4349
* <p>
4450
* <pre>

src/main/java/net/jodah/failsafe/FailsafeExecutor.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import net.jodah.failsafe.internal.util.Assert;
2121
import net.jodah.failsafe.spi.*;
2222

23+
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.concurrent.*;
2526
import java.util.function.Function;
@@ -53,6 +54,44 @@ public class FailsafeExecutor<R> extends PolicyListeners<FailsafeExecutor<R>, R>
5354
this.policies = policies;
5455
}
5556

57+
/**
58+
* Returns the currently configured policies.
59+
*
60+
* @see #compose(Policy)
61+
*/
62+
public List<? extends Policy<R>> getPolicies() {
63+
return policies;
64+
}
65+
66+
/**
67+
* Returns a new {@code FailsafeExecutor} that composes the currently configured policies around the given {@code
68+
* innerPolicy}. For example, consider:
69+
* <p>
70+
* <pre>
71+
* Failsafe.with(fallback).compose(retryPolicy).compose(circuitBreaker);
72+
* </pre>
73+
* </p>
74+
* This results in the following internal composition when executing a {@code runnable} or {@code supplier} and
75+
* handling its result:
76+
* <p>
77+
* <pre>
78+
* Fallback(RetryPolicy(CircuitBreaker(Supplier)))
79+
* </pre>
80+
* </p>
81+
* This means the {@code CircuitBreaker} is first to evaluate the {@code Supplier}'s result, then the {@code
82+
* RetryPolicy}, then the {@code Fallback}. Each policy makes its own determination as to whether the result
83+
* represents a failure. This allows different policies to be used for handling different types of failures.
84+
*
85+
* @throws NullPointerException if {@code innerPolicy} is null
86+
* @see #getPolicies()
87+
*/
88+
public <P extends Policy<R>> FailsafeExecutor<R> compose(P innerPolicy) {
89+
Assert.notNull(innerPolicy, "innerPolicy");
90+
List<Policy<R>> composed = new ArrayList<>(policies);
91+
composed.add(innerPolicy);
92+
return new FailsafeExecutor<>(composed);
93+
}
94+
5695
/**
5796
* Executes the {@code supplier} until a successful result is returned or the configured policies are exceeded.
5897
*

src/main/java/net/jodah/failsafe/Timeout.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,9 @@ public static <R> Timeout<R> of(Duration timeout) {
117117
public PolicyExecutor<R, ? extends Policy<R>> toExecutor(int policyIndex) {
118118
return new TimeoutExecutor<>(this, policyIndex, policyHandlers);
119119
}
120+
121+
@Override
122+
public String toString() {
123+
return "Timeout[timeout=" + timeout + ", interruptable=" + interruptable + ']';
124+
}
120125
}

src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.*;
3030
import java.util.concurrent.atomic.AtomicBoolean;
3131
import java.util.concurrent.atomic.AtomicInteger;
32-
import java.util.concurrent.atomic.AtomicReference;
3332
import java.util.function.Function;
3433

3534
import static net.jodah.failsafe.Asserts.assertThrows;
@@ -293,51 +292,6 @@ public void shouldCompleteFutureExternally() throws Throwable {
293292
waiter.await(1000);
294293
}
295294

296-
/**
297-
* Tests a scenario where three timeouts should cause all delegates to be cancelled with interrupts.
298-
*/
299-
public void shouldCancelNestedTimeoutsWithInterrupt() throws Throwable {
300-
// Given
301-
RetryPolicy<Boolean> rp = new RetryPolicy<Boolean>().onRetry(e -> System.out.println("Retrying"));
302-
Timeout<Boolean> timeout1 = Timeout.of(Duration.ofMillis(1000));
303-
Timeout<Boolean> timeout2 = Timeout.<Boolean>of(Duration.ofMillis(200)).withInterrupt(true);
304-
AtomicReference<FailsafeFuture<Boolean>> futureRef = new AtomicReference<>();
305-
CountDownLatch futureLatch = new CountDownLatch(1);
306-
307-
// When
308-
FailsafeFuture<Boolean> future = (FailsafeFuture<Boolean>) Failsafe.with(rp, timeout2, timeout1).onComplete(e -> {
309-
waiter.assertNull(e.getResult());
310-
waiter.assertTrue(e.getFailure() instanceof TimeoutExceededException);
311-
waiter.resume();
312-
}).getAsync(ctx -> {
313-
// Wait for futureRef to be set
314-
futureLatch.await();
315-
waiter.assertTrue(ctx.getLastFailure() == null || ctx.getLastFailure() instanceof TimeoutExceededException);
316-
317-
try {
318-
// Assert not cancelled
319-
waiter.assertFalse(ctx.isCancelled());
320-
// waiter.assertFalse(futureRef.get().cancelFunctions.isEmpty());
321-
Thread.sleep(1000);
322-
} catch (InterruptedException e) {
323-
// Assert cancelled
324-
waiter.assertTrue(ctx.isCancelled());
325-
waiter.resume();
326-
throw e;
327-
}
328-
waiter.fail("Expected interruption");
329-
return false;
330-
});
331-
futureRef.set(future);
332-
futureLatch.countDown();
333-
334-
// Then
335-
waiter.await(1000, 4);
336-
assertFalse(future.isCancelled());
337-
assertTrue(future.isDone());
338-
assertThrows(future::get, ExecutionException.class, TimeoutExceededException.class);
339-
}
340-
341295
private void assertCancel(Function<FailsafeExecutor<?>, Future<?>> executorCallable, Policy<?> policy)
342296
throws Throwable {
343297
// Given

src/test/java/net/jodah/failsafe/functional/BlockedExecutionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void shouldCancelScheduledRetryOnTimeout() {
4343
Timeout<Boolean> timeout = Timeout.of(Duration.ofMillis(100));
4444
RetryPolicy<Boolean> rp = new RetryPolicy<Boolean>().withDelay(Duration.ofMillis(1000)).handleResult(false);
4545

46-
Future<Boolean> future = Failsafe.with(timeout, rp).with(executor).getAsync(() -> {
46+
Future<Boolean> future = Failsafe.with(timeout).compose(rp).with(executor).getAsync(() -> {
4747
// Tie up single thread immediately after execution, before the retry is scheduled
4848
executor.submit(Testing.uncheck(() -> Thread.sleep(1000)));
4949
return false;
@@ -65,7 +65,7 @@ public void shouldCancelScheduledFallbackOnTimeout() {
6565
return true;
6666
}).handleResult(false);
6767

68-
Future<Boolean> future = Failsafe.with(timeout, fallback).with(executor).getAsync(() -> {
68+
Future<Boolean> future = Failsafe.with(timeout).compose(fallback).with(executor).getAsync(() -> {
6969
// Tie up single thread immediately after execution, before the fallback is scheduled
7070
executor.submit(Testing.uncheck(() -> Thread.sleep(1000)));
7171
return false;

src/test/java/net/jodah/failsafe/functional/NestedTimeoutTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package net.jodah.failsafe.functional;
22

3+
import net.jodah.concurrentunit.Waiter;
34
import net.jodah.failsafe.*;
45
import org.testng.annotations.Test;
56

67
import java.time.Duration;
8+
import java.util.concurrent.CountDownLatch;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.Future;
711

8-
import static org.testng.Assert.assertEquals;
9-
import static org.testng.Assert.assertTrue;
12+
import static org.testng.Assert.*;
1013

1114
/**
1215
* Tests nested timeout scenarios.
@@ -84,4 +87,49 @@ public void testFallbackRetryPolicyTimeoutTimeout() {
8487
innerTimeout.withInterrupt(true);
8588
test.run();
8689
}
90+
91+
/**
92+
* Tests a scenario where three timeouts should cause all delegates to be cancelled with interrupts.
93+
*/
94+
// TODO consider removing this test in favor of the ones above
95+
public void shouldCancelNestedTimeoutsWithInterrupt() throws Throwable {
96+
// Given
97+
RetryPolicy<Boolean> rp = new RetryPolicy<Boolean>().onRetry(e -> System.out.println("Retrying"));
98+
Timeout<Boolean> timeout1 = Timeout.of(Duration.ofMillis(1000));
99+
Timeout<Boolean> timeout2 = Timeout.<Boolean>of(Duration.ofMillis(200)).withInterrupt(true);
100+
CountDownLatch futureLatch = new CountDownLatch(1);
101+
Waiter waiter = new Waiter();
102+
103+
// When
104+
Future<Boolean> future = Failsafe.with(rp).compose(timeout2).compose(timeout1).onComplete(e -> {
105+
waiter.assertNull(e.getResult());
106+
waiter.assertTrue(e.getFailure() instanceof TimeoutExceededException);
107+
waiter.resume();
108+
}).getAsync(ctx -> {
109+
// Wait for futureRef to be set
110+
futureLatch.await();
111+
waiter.assertTrue(ctx.getLastFailure() == null || ctx.getLastFailure() instanceof TimeoutExceededException);
112+
113+
try {
114+
// Assert not cancelled
115+
waiter.assertFalse(ctx.isCancelled());
116+
// waiter.assertFalse(futureRef.get().cancelFunctions.isEmpty());
117+
Thread.sleep(1000);
118+
} catch (InterruptedException e) {
119+
// Assert cancelled
120+
waiter.assertTrue(ctx.isCancelled());
121+
waiter.resume();
122+
throw e;
123+
}
124+
waiter.fail("Expected interruption");
125+
return false;
126+
});
127+
futureLatch.countDown();
128+
129+
// Then
130+
waiter.await(1000, 4);
131+
assertFalse(future.isCancelled());
132+
assertTrue(future.isDone());
133+
assertThrows(future::get, ExecutionException.class, TimeoutExceededException.class);
134+
}
87135
}

src/test/java/net/jodah/failsafe/functional/PolicyCompositionTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void testFallbackRetryPolicyCircuitBreaker() {
3636

3737
testRunSuccess(() -> {
3838
resetBreaker(cb);
39-
}, Failsafe.with(fb, rp, cb), ctx -> {
39+
}, Failsafe.with(fb).compose(rp).compose(cb), ctx -> {
4040
throw new IllegalStateException();
4141
}, e -> {
4242
assertEquals(cb.getFailureCount(), 3);
@@ -54,7 +54,7 @@ public void testCircuitBreakerRetryPolicy() {
5454

5555
testRunFailure(() -> {
5656
resetBreaker(cb);
57-
}, Failsafe.with(cb, rp), ctx -> {
57+
}, Failsafe.with(cb).compose(rp), ctx -> {
5858
throw new IllegalStateException();
5959
}, e -> {
6060
assertEquals(e.getAttemptCount(), 3);
@@ -71,7 +71,7 @@ public void testFallbackRetryPolicy() {
7171
RetryPolicy<Object> rp = new RetryPolicy<>().withMaxRetries(2);
7272
Fallback<Object> fb = Fallback.of("test");
7373

74-
testRunSuccess(Failsafe.with(fb, rp), ctx -> {
74+
testRunSuccess(Failsafe.with(fb).compose(rp), ctx -> {
7575
throw new IllegalStateException();
7676
}, e -> {
7777
assertEquals(e.getAttemptCount(), 3);
@@ -85,7 +85,7 @@ public void testRetryPolicyFallback() {
8585
RetryPolicy<Object> rp = new RetryPolicy<>().withMaxRetries(2);
8686
Fallback<Object> fb = Fallback.of("test");
8787

88-
testRunSuccess(Failsafe.with(rp, fb), ctx -> {
88+
testRunSuccess(Failsafe.with(rp).compose(fb), ctx -> {
8989
throw new IllegalStateException();
9090
}, e -> {
9191
assertEquals(e.getAttemptCount(), 1);

src/test/java/net/jodah/failsafe/functional/TimeoutTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void testRetryThenTimeoutWithPendingRetry() {
163163
executionCounter.set(0);
164164
timeoutStats.reset();
165165
rpStats.reset();
166-
}, Failsafe.with(timeout, retryPolicy), ctx -> {
166+
}, Failsafe.with(timeout).compose(retryPolicy), ctx -> {
167167
System.out.println("Executing");
168168
executionCounter.incrementAndGet();
169169
throw new Exception();
@@ -198,7 +198,7 @@ public void testTimeoutThenFallbackWithBlockedSupplier() {
198198
Runnable test = () -> testRunFailure(() -> {
199199
timeoutStats.reset();
200200
fbStats.reset();
201-
}, Failsafe.with(fallback, timeout), ctx -> {
201+
}, Failsafe.with(fallback).compose(timeout), ctx -> {
202202
System.out.println("Executing");
203203
Thread.sleep(100);
204204
throw new Exception();
@@ -233,7 +233,7 @@ public void testTimeoutThenFallback() {
233233
Runnable test = () -> testRunFailure(() -> {
234234
timeoutStats.reset();
235235
fbStats.reset();
236-
}, Failsafe.with(fallback, timeout), ctx -> {
236+
}, Failsafe.with(fallback).compose(timeout), ctx -> {
237237
System.out.println("Executing");
238238
throw new Exception();
239239
}, e -> {
@@ -266,7 +266,7 @@ public void testFallbackThenTimeoutWithBlockedSupplier() {
266266
Runnable test = () -> testRunFailure(() -> {
267267
timeoutStats.reset();
268268
fbStats.reset();
269-
}, Failsafe.with(timeout, fallback), ctx -> {
269+
}, Failsafe.with(timeout).compose(fallback), ctx -> {
270270
System.out.println("Executing");
271271
Thread.sleep(100);
272272
throw new Exception();
@@ -301,7 +301,7 @@ public void testFallbackThenTimeoutWithBlockedFallback() {
301301
Runnable test = () -> testRunFailure(() -> {
302302
timeoutStats.reset();
303303
fbStats.reset();
304-
}, Failsafe.with(timeout, fallback), ctx -> {
304+
}, Failsafe.with(timeout).compose(fallback), ctx -> {
305305
System.out.println("Executing");
306306
throw new Exception();
307307
}, e -> {

0 commit comments

Comments
 (0)