Skip to content

Commit 4594f28

Browse files
committed
Add async waiting support for bulkheads
1 parent 6971872 commit 4594f28

File tree

12 files changed

+402
-212
lines changed

12 files changed

+402
-212
lines changed

src/main/java/dev/failsafe/internal/BulkheadExecutor.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
import dev.failsafe.ExecutionContext;
2121
import dev.failsafe.RateLimitExceededException;
2222
import dev.failsafe.spi.ExecutionResult;
23+
import dev.failsafe.spi.FailsafeFuture;
2324
import dev.failsafe.spi.PolicyExecutor;
25+
import dev.failsafe.spi.Scheduler;
2426

2527
import java.time.Duration;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.Future;
30+
import java.util.concurrent.TimeUnit;
2631

2732
/**
2833
* A PolicyExecutor that handles failures according to a {@link Bulkhead}.
@@ -53,6 +58,39 @@ protected ExecutionResult<R> preExecute() {
5358
}
5459
}
5560

61+
@Override
62+
protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler scheduler, FailsafeFuture<R> future) {
63+
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
64+
CompletableFuture<Void> acquireFuture = bulkhead.acquirePermitAsync();
65+
acquireFuture.whenComplete((result, error) -> {
66+
// Signal for execution to proceed
67+
promise.complete(ExecutionResult.none());
68+
});
69+
70+
if (!promise.isDone()) {
71+
try {
72+
// Schedule bulkhead permit timeout
73+
Future<?> timeoutFuture = scheduler.schedule(() -> {
74+
promise.complete(ExecutionResult.failure(new BulkheadFullException(bulkhead)));
75+
acquireFuture.cancel(true);
76+
return null;
77+
}, maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);
78+
79+
// Propagate outer cancellations to the promise, bulkhead acquire future, and timeout future
80+
future.setCancelFn(this, (mayInterrupt, cancelResult) -> {
81+
promise.complete(cancelResult);
82+
acquireFuture.cancel(mayInterrupt);
83+
timeoutFuture.cancel(mayInterrupt);
84+
});
85+
} catch (Throwable t) {
86+
// Hard scheduling failure
87+
promise.completeExceptionally(t);
88+
}
89+
}
90+
91+
return promise;
92+
}
93+
5694
@Override
5795
public void onSuccess(ExecutionResult<R> result) {
5896
bulkhead.releasePermit();

src/main/java/dev/failsafe/internal/BulkheadImpl.java

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,31 @@
1717

1818
import dev.failsafe.Bulkhead;
1919
import dev.failsafe.BulkheadConfig;
20-
import dev.failsafe.internal.util.Durations;
20+
import dev.failsafe.internal.util.FutureLinkedList;
2121
import dev.failsafe.spi.PolicyExecutor;
2222

2323
import java.time.Duration;
24-
import java.util.concurrent.Semaphore;
25-
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.*;
2625

2726
/**
28-
* A Bulkhead implementation.
27+
* A Bulkhead implementation that supports sync and async waiting.
2928
*
3029
* @param <R> result type
3130
* @author Jonathan Halterman
3231
*/
3332
public class BulkheadImpl<R> implements Bulkhead<R> {
33+
private static final CompletableFuture<Void> NULL_FUTURE = CompletableFuture.completedFuture(null);
3434
private final BulkheadConfig<R> config;
35-
private final Semaphore semaphore;
35+
private final int maxPermits;
36+
37+
// Mutable state
38+
private int permits;
39+
private final FutureLinkedList futures = new FutureLinkedList();
3640

3741
public BulkheadImpl(BulkheadConfig<R> config) {
3842
this.config = config;
39-
semaphore = new Semaphore(config.getMaxConcurrency(), true);
43+
maxPermits = config.getMaxConcurrency();
44+
permits = maxPermits;
4045
}
4146

4247
@Override
@@ -46,22 +51,57 @@ public BulkheadConfig<R> getConfig() {
4651

4752
@Override
4853
public void acquirePermit() throws InterruptedException {
49-
semaphore.acquire();
54+
try {
55+
acquirePermitAsync().get();
56+
} catch (CancellationException | ExecutionException ignore) {
57+
// Not possible since the future will always be completed with null
58+
}
5059
}
5160

5261
@Override
53-
public boolean tryAcquirePermit() {
54-
return semaphore.tryAcquire();
62+
public synchronized boolean tryAcquirePermit() {
63+
if (permits > 0) {
64+
permits -= 1;
65+
return true;
66+
}
67+
return false;
5568
}
5669

5770
@Override
5871
public boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException {
59-
return semaphore.tryAcquire(Durations.ofSafeNanos(maxWaitTime).toNanos(), TimeUnit.NANOSECONDS);
72+
CompletableFuture<Void> future = acquirePermitAsync();
73+
if (future == NULL_FUTURE)
74+
return true;
75+
76+
try {
77+
future.get(maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);
78+
return true;
79+
} catch (CancellationException | ExecutionException | TimeoutException e) {
80+
return false;
81+
}
82+
}
83+
84+
/**
85+
* Returns a CompletableFuture that is completed when a permit is acquired. Externally completing this future will
86+
* remove the waiter from the bulkhead's internal queue.
87+
*/
88+
synchronized CompletableFuture<Void> acquirePermitAsync() {
89+
if (permits > 0) {
90+
permits -= 1;
91+
return NULL_FUTURE;
92+
} else {
93+
return futures.add();
94+
}
6095
}
6196

6297
@Override
63-
public void releasePermit() {
64-
semaphore.release();
98+
public synchronized void releasePermit() {
99+
if (permits < maxPermits) {
100+
permits += 1;
101+
CompletableFuture<Void> future = futures.pollFirst();
102+
if (future != null)
103+
future.complete(null);
104+
}
65105
}
66106

67107
@Override

src/main/java/dev/failsafe/internal/RateLimiterExecutor.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,17 @@ protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler schedu
6464
promise.complete(ExecutionResult.failure(new RateLimitExceededException(rateLimiter)));
6565
else {
6666
try {
67-
Future<?> scheduledWait = scheduler.schedule(() -> {
68-
// Signal for execution and post-execution to proceed with a non-result
69-
return promise.complete(ExecutionResult.none());
67+
// Wait for the permit
68+
Future<?> permitWaitFuture = scheduler.schedule(() -> {
69+
// Signal for execution to proceed
70+
promise.complete(ExecutionResult.none());
71+
return null;
7072
}, waitNanos, TimeUnit.NANOSECONDS);
7173

72-
// Propagate outer cancellations to the RateLimiter future and its promise
74+
// Propagate outer cancellations to the promise and permit wait future
7375
future.setCancelFn(this, (mayInterrupt, cancelResult) -> {
74-
scheduledWait.cancel(mayInterrupt);
7576
promise.complete(cancelResult);
77+
permitWaitFuture.cancel(mayInterrupt);
7678
});
7779
} catch (Throwable t) {
7880
// Hard scheduling failure

src/main/java/dev/failsafe/internal/RateLimiterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
31-
* A RateLimiter implementation.
31+
* A RateLimiter implementation that supports smooth and bursty rate limiting.
3232
*
3333
* @param <R> result type
3434
*/
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe.internal.util;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
20+
/**
21+
* A LinkedList of CompletableFutures that removes a future from the list when it's completed.
22+
* <p>
23+
* This class is threadsafe.
24+
* </p>
25+
*
26+
* @author Jonathan Halterman
27+
*/
28+
public final class FutureLinkedList {
29+
Node head;
30+
Node tail;
31+
32+
static class Node {
33+
Node previous;
34+
Node next;
35+
CompletableFuture<Void> future;
36+
}
37+
38+
/**
39+
* Adds a new CompletableFuture to the list and returns it. The returned future will be removed from the list when
40+
* it's completed.
41+
*/
42+
public synchronized CompletableFuture<Void> add() {
43+
Node node = new Node();
44+
node.future = new CompletableFuture<>();
45+
node.future.whenComplete((result, error) -> remove(node));
46+
47+
if (head == null)
48+
head = tail = node;
49+
else {
50+
tail.next = node;
51+
node.previous = tail;
52+
tail = node;
53+
}
54+
return node.future;
55+
}
56+
57+
/**
58+
* Returns and removes the first future in the list, else returns {@code null} if the list is empty.
59+
*/
60+
public synchronized CompletableFuture<Void> pollFirst() {
61+
Node previousHead = head;
62+
if (head != null) {
63+
head = head.next;
64+
if (head != null)
65+
head.previous = null;
66+
}
67+
return previousHead == null ? null : previousHead.future;
68+
}
69+
70+
private synchronized void remove(Node node) {
71+
if (node.previous != null)
72+
node.previous.next = node.next;
73+
if (node.next != null)
74+
node.next.previous = node.previous;
75+
if (head == node)
76+
head = node.next;
77+
if (tail == node)
78+
tail = node.previous;
79+
}
80+
}

src/test/java/dev/failsafe/functional/BulkheadTest.java

Lines changed: 21 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,37 @@
1515
*/
1616
package dev.failsafe.functional;
1717

18-
import dev.failsafe.*;
18+
import dev.failsafe.Bulkhead;
19+
import dev.failsafe.BulkheadFullException;
20+
import dev.failsafe.Failsafe;
1921
import dev.failsafe.testing.Testing;
2022
import org.testng.annotations.Test;
2123

2224
import java.time.Duration;
2325

24-
import static dev.failsafe.internal.InternalTesting.resetBulkhead;
25-
import static dev.failsafe.internal.InternalTesting.resetLimiter;
26-
import static org.testng.Assert.assertEquals;
27-
2826
/**
2927
* Tests various Bulkhead scenarios.
3028
*/
3129
@Test
3230
public class BulkheadTest extends Testing {
31+
public void testPermitAcquiredAfterWait() {
32+
// Given
33+
Bulkhead<Object> bulkhead = Bulkhead.builder(2).withMaxWaitTime(Duration.ofSeconds(1)).build();
34+
35+
// When / Then
36+
testGetSuccess(() -> {
37+
bulkhead.tryAcquirePermit();
38+
bulkhead.tryAcquirePermit(); // bulkhead should be full
39+
40+
runInThread(() -> {
41+
Thread.sleep(200);
42+
bulkhead.releasePermit(); // bulkhead should not be full
43+
});
44+
}, Failsafe.with(bulkhead), ctx -> {
45+
return "test";
46+
}, "test");
47+
}
48+
3349
public void shouldThrowBulkheadFullExceptionAfterPermitsExceeded() {
3450
// Given
3551
Bulkhead<Object> bulkhead = Bulkhead.of(2);
@@ -54,52 +70,4 @@ public void testMaxWaitTimeExceeded() {
5470
testRunFailure(Failsafe.with(bulkhead), ctx -> {
5571
}, BulkheadFullException.class);
5672
}
57-
58-
/**
59-
* Tests a scenario where Bulkhead rejects some retried executions, which prevents the user's Supplier from being
60-
* called.
61-
*/
62-
public void testRejectedWithRetries() {
63-
Stats rpStats = new Stats();
64-
Stats rlStats = new Stats();
65-
RetryPolicy<Object> rp = withStatsAndLogs(RetryPolicy.builder().withMaxAttempts(7), rpStats).build();
66-
Bulkhead<Object> bh = withStatsAndLogs(Bulkhead.builder(2), rlStats).build();
67-
bh.tryAcquirePermit();
68-
bh.tryAcquirePermit(); // bulkhead should be full
69-
70-
testRunFailure(() -> {
71-
rpStats.reset();
72-
rlStats.reset();
73-
}, Failsafe.with(rp, bh), ctx -> {
74-
System.out.println("Executing");
75-
throw new Exception();
76-
}, (f, e) -> {
77-
assertEquals(e.getAttemptCount(), 7);
78-
assertEquals(e.getExecutionCount(), 0);
79-
assertEquals(rpStats.failedAttemptCount, 7);
80-
assertEquals(rpStats.retryCount, 6);
81-
}, BulkheadFullException.class);
82-
}
83-
84-
/**
85-
* Asserts that a bulkhead propagates an InterruptedException.
86-
*/
87-
public void testAcquirePermitWithInterrupt() {
88-
Bulkhead<Object> bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofSeconds(5)).build();
89-
bulkhead.tryAcquirePermit(); // Bulkhead should be full
90-
91-
testRunFailure(() -> {
92-
Thread thread = Thread.currentThread();
93-
runInThread(() -> {
94-
Thread.sleep(100);
95-
thread.interrupt();
96-
});
97-
}, Failsafe.with(bulkhead), ctx -> {
98-
System.out.println("Executing");
99-
throw new Exception();
100-
}, InterruptedException.class);
101-
102-
// Reset interrupt flag
103-
Thread.interrupted();
104-
}
10573
}

src/test/java/dev/failsafe/functional/FailsafeFutureCancellationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ public void shouldCancelAsyncRateLimiterWaitingOnPermit() throws Throwable {
105105
});
106106
}
107107

108+
public void shouldCancelBulkheadWaitingOnPermit() throws Throwable {
109+
Bulkhead<Void> bulkhead = Bulkhead.<Void>builder(2).withMaxWaitTime(Duration.ofSeconds(1)).build();
110+
bulkhead.tryAcquirePermit();
111+
bulkhead.tryAcquirePermit(); // bulkhead should be full
112+
113+
assertCancel(Failsafe.with(bulkhead), ctx -> {
114+
fail("Execution should be cancelled during preExecute");
115+
});
116+
}
117+
108118
/**
109119
* Asserts that cancelling a FailsafeFuture causes both retry policies to stop.
110120
*/

0 commit comments

Comments
 (0)