Skip to content

Commit 665b6e7

Browse files
committed
Add support for async rate limiter waiting
Closes #318
1 parent 054b019 commit 665b6e7

File tree

6 files changed

+158
-54
lines changed

6 files changed

+158
-54
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
### Improvements
44

55
- Issue #309 - Introduced a `Bulkhead` policy.
6+
- Issue #318 - Add non-blocking async waiting for rate limiters.
7+
8+
### SPI Changes
9+
10+
- `PolicyExecutor.preExecuteAsync` was introduced to support async pre-execution. This is backwards compatible with `preExecute`.
611

712
# 3.1.0
813

src/main/java/dev/failsafe/internal/RateLimiterExecutor.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
import dev.failsafe.RateLimitExceededException;
1919
import dev.failsafe.RateLimiter;
2020
import dev.failsafe.spi.ExecutionResult;
21+
import dev.failsafe.spi.FailsafeFuture;
2122
import dev.failsafe.spi.PolicyExecutor;
23+
import dev.failsafe.spi.Scheduler;
2224

2325
import java.time.Duration;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
2429

2530
/**
2631
* A PolicyExecutor that handles failures according to a {@link RateLimiter}.
@@ -50,4 +55,31 @@ protected ExecutionResult<R> preExecute() {
5055
return ExecutionResult.failure(e);
5156
}
5257
}
58+
59+
@Override
60+
protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler scheduler, FailsafeFuture<R> future) {
61+
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
62+
long waitNanos = rateLimiter.acquirePermitWaitNanos(maxWaitTime);
63+
if (waitNanos == -1)
64+
promise.complete(ExecutionResult.failure(new RateLimitExceededException(rateLimiter)));
65+
else {
66+
try {
67+
Future<?> scheduledWait = scheduler.schedule(() -> {
68+
// Signal for execution and post-execution to proceed with a non-result
69+
return promise.complete(ExecutionResult.none());
70+
}, waitNanos, TimeUnit.NANOSECONDS);
71+
72+
// Propagate outer cancellations to the RateLimiter future and its promise
73+
future.setCancelFn(this, (mayInterrupt, cancelResult) -> {
74+
scheduledWait.cancel(mayInterrupt);
75+
promise.complete(cancelResult);
76+
});
77+
} catch (Throwable t) {
78+
// Hard scheduling failure
79+
promise.completeExceptionally(t);
80+
}
81+
}
82+
83+
return promise;
84+
}
5385
}

src/main/java/dev/failsafe/internal/RateLimiterImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import dev.failsafe.internal.util.Assert;
2222
import dev.failsafe.internal.util.Durations;
2323
import dev.failsafe.spi.PolicyExecutor;
24+
import dev.failsafe.spi.Scheduler;
2425

2526
import java.time.Duration;
27+
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.TimeUnit;
2729

2830
/**
@@ -81,4 +83,12 @@ public boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws Inter
8183
public PolicyExecutor<R> toExecutor(int policyIndex) {
8284
return new RateLimiterExecutor<>(this, policyIndex);
8385
}
86+
87+
/**
88+
* Returns the wait nanos for an acquired permit which can be used to externally wait.
89+
*/
90+
long acquirePermitWaitNanos(Duration maxWaitTime) {
91+
Assert.notNull(maxWaitTime, "maxWaitTime");
92+
return stats.acquirePermits(1, Durations.ofSafeNanos(maxWaitTime));
93+
}
8494
}

src/main/java/dev/failsafe/spi/PolicyExecutor.java

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package dev.failsafe.spi;
1717

18-
import dev.failsafe.internal.EventHandler;
1918
import dev.failsafe.ExecutionContext;
2019
import dev.failsafe.Policy;
20+
import dev.failsafe.internal.EventHandler;
2121

2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.function.Function;
@@ -61,6 +61,17 @@ protected ExecutionResult<R> preExecute() {
6161
return null;
6262
}
6363

64+
/**
65+
* Called before an async execution to return an alternative result or failure such as if execution is not allowed or
66+
* needed. Returns {@code null} if pre execution is not performed. If the resulting future is completed with a {@link
67+
* ExecutionResult#isNonResult() non-result}, then execution and post-execution should still be performed. If the
68+
* resulting future is completed with {@code null}, then the execution is assumed to have been cancelled.
69+
*/
70+
protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler scheduler, FailsafeFuture<R> future) {
71+
ExecutionResult<R> result = preExecute();
72+
return result == null ? null : CompletableFuture.completedFuture(result);
73+
}
74+
6475
/**
6576
* Performs an execution by calling pre-execute else calling the supplier and doing a post-execute.
6677
*/
@@ -69,7 +80,7 @@ public Function<SyncExecutionInternal<R>, ExecutionResult<R>> apply(
6980
return execution -> {
7081
ExecutionResult<R> result = preExecute();
7182
if (result != null) {
72-
// Still need to preExecute when returning an alternative result before making it to the terminal Supplier
83+
// Still need to preExecute when short-circuiting an execution with an alternative result
7384
execution.preExecute();
7485
return result;
7586
}
@@ -78,23 +89,6 @@ public Function<SyncExecutionInternal<R>, ExecutionResult<R>> apply(
7889
};
7990
}
8091

81-
/**
82-
* Performs synchronous post-execution handling for a {@code result}.
83-
*/
84-
public ExecutionResult<R> postExecute(ExecutionInternal<R> execution, ExecutionResult<R> result) {
85-
execution.recordAttempt();
86-
if (isFailure(result)) {
87-
result = onFailure(execution, result.withFailure());
88-
handleFailure(result, execution);
89-
} else {
90-
result = result.withSuccess();
91-
onSuccess(result);
92-
handleSuccess(result, execution);
93-
}
94-
95-
return result;
96-
}
97-
9892
/**
9993
* Performs an async execution by calling pre-execute else calling the supplier and doing a post-execute. Implementors
10094
* must handle a null result from a supplier, which indicates that an async execution has occurred, that a result will
@@ -105,21 +99,65 @@ public Function<AsyncExecutionInternal<R>, CompletableFuture<ExecutionResult<R>>
10599
FailsafeFuture<R> future) {
106100

107101
return execution -> {
102+
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
103+
Runnable runnable = () -> innerFn.apply(execution)
104+
.thenCompose(r -> r == null ? ExecutionResult.nullFuture() : postExecuteAsync(execution, r, scheduler, future))
105+
.whenComplete((postResult, postError) -> {
106+
if (postError != null)
107+
promise.completeExceptionally(postError);
108+
else
109+
promise.complete(postResult);
110+
});
111+
108112
if (!execution.isRecorded()) {
109-
ExecutionResult<R> result = preExecute();
110-
if (result != null) {
111-
// Still need to preExecute when returning an alternative result before making it to the terminal Supplier
112-
execution.preExecute();
113-
return CompletableFuture.completedFuture(result);
113+
CompletableFuture<ExecutionResult<R>> preResult = preExecuteAsync(scheduler, future);
114+
if (preResult != null) {
115+
preResult.whenComplete((result, error) -> {
116+
if (error != null)
117+
promise.completeExceptionally(error);
118+
else if (result != null) {
119+
// Check for non-result, which occurs after a rate limiter preExecute delay
120+
if (result.isNonResult()) {
121+
// Execute and post-execute
122+
runnable.run();
123+
} else {
124+
// Still need to preExecute when short-circuiting an execution with an alternative result
125+
execution.preExecute();
126+
promise.complete(result);
127+
}
128+
} else {
129+
// If result is null, the execution is assumed to have been cancelled
130+
promise.complete(null);
131+
}
132+
});
133+
134+
return promise;
114135
}
115136
}
116137

117-
return innerFn.apply(execution).thenCompose(r -> {
118-
return r == null ? ExecutionResult.nullFuture() : postExecuteAsync(execution, r, scheduler, future);
119-
});
138+
// Execute and post-execute
139+
runnable.run();
140+
return promise;
120141
};
121142
}
122143

144+
/**
145+
* Performs synchronous post-execution handling for a {@code result}.
146+
*/
147+
public ExecutionResult<R> postExecute(ExecutionInternal<R> execution, ExecutionResult<R> result) {
148+
execution.recordAttempt();
149+
if (isFailure(result)) {
150+
result = onFailure(execution, result.withFailure());
151+
handleFailure(result, execution);
152+
} else {
153+
result = result.withSuccess();
154+
onSuccess(result);
155+
handleSuccess(result, execution);
156+
}
157+
158+
return result;
159+
}
160+
123161
/**
124162
* Performs potentially asynchronous post-execution handling for a {@code result}.
125163
*/

src/test/java/dev/failsafe/functional/FailsafeFutureCancellationTest.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package dev.failsafe.functional;
1717

1818
import dev.failsafe.*;
19+
import dev.failsafe.function.ContextualRunnable;
1920
import dev.failsafe.testing.Testing;
2021
import net.jodah.concurrentunit.Waiter;
2122
import dev.failsafe.event.ExecutionCompletedEvent;
@@ -41,13 +42,13 @@ void beforeMethod() {
4142
waiter = new Waiter();
4243
}
4344

44-
private <R> void assertCancel(FailsafeExecutor<R> executor, ContextualSupplier<R, R> supplier) throws Throwable {
45+
private void assertCancel(FailsafeExecutor<Void> executor, ContextualRunnable<Void> runnable) throws Throwable {
4546
// Given
46-
CompletableFuture<R> future = executor.onComplete(e -> {
47+
CompletableFuture<Void> future = executor.onComplete(e -> {
4748
waiter.assertNull(e.getResult());
4849
waiter.assertTrue(e.getFailure() instanceof CancellationException);
4950
waiter.resume();
50-
}).getAsync(supplier);
51+
}).runAsync(runnable);
5152

5253
Testing.sleep(300);
5354

@@ -62,23 +63,45 @@ private <R> void assertCancel(FailsafeExecutor<R> executor, ContextualSupplier<R
6263
assertThrows(future::get, CancellationException.class);
6364
}
6465

65-
public void shouldCancelOnGetAsyncWithRetries() throws Throwable {
66-
assertCancel(Failsafe.with(retryAlways), ctx -> {
66+
public void shouldCancelAsyncRetriesWithPendingDelay() throws Throwable {
67+
RetryPolicy<Void> retryPolicy = RetryPolicy.<Void>builder().withDelay(Duration.ofMinutes(1)).build();
68+
assertCancel(Failsafe.with(retryPolicy), ctx -> {
69+
throw new IllegalStateException();
70+
});
71+
}
72+
73+
public void shouldCancelAsyncRetriesWithBlockedExecution() throws Throwable {
74+
assertCancel(Failsafe.with(RetryPolicy.ofDefaults()), ctx -> {
6775
try {
6876
waiter.assertFalse(ctx.isCancelled());
6977
Thread.sleep(1000);
7078
} catch (InterruptedException e) {
7179
waiter.assertTrue(ctx.isCancelled());
7280
throw e;
7381
}
74-
return false;
7582
});
7683
}
7784

78-
public void shouldCancelOnGetAsyncWithTimeout() throws Throwable {
85+
public void shouldCancelAsyncTimeoutWithBlockedExecution() throws Throwable {
7986
assertCancel(Failsafe.with(Timeout.of(Duration.ofMinutes(1))), ctx -> {
80-
Thread.sleep(1000);
81-
return "test";
87+
try {
88+
waiter.assertFalse(ctx.isCancelled());
89+
Thread.sleep(1000);
90+
} catch (InterruptedException e) {
91+
waiter.assertTrue(ctx.isCancelled());
92+
throw e;
93+
}
94+
});
95+
}
96+
97+
public void shouldCancelAsyncRateLimiterWaitingOnPermit() throws Throwable {
98+
RateLimiter<Void> limiter = RateLimiter.<Void>smoothBuilder(1, Duration.ofSeconds(1))
99+
.withMaxWaitTime(Duration.ofMinutes(1))
100+
.build();
101+
limiter.tryAcquirePermit(); // All permits should be used now
102+
103+
assertCancel(Failsafe.with(limiter), ctx -> {
104+
fail("Execution should be cancelled during preExecute");
82105
});
83106
}
84107

src/test/java/dev/failsafe/functional/RateLimiterTest.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package dev.failsafe.functional;
1717

18-
import dev.failsafe.Failsafe;
19-
import dev.failsafe.RateLimitExceededException;
20-
import dev.failsafe.RateLimiter;
21-
import dev.failsafe.RetryPolicy;
18+
import dev.failsafe.*;
2219
import dev.failsafe.testing.Testing;
2320
import org.testng.annotations.Test;
2421

@@ -49,9 +46,7 @@ public void shouldThrowRateLimitExceededExceptionAfterPermitsExceeded() {
4946
*/
5047
public void testMaxWaitTimeExceeded() {
5148
// Given
52-
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(10))
53-
.withMaxWaitTime(Duration.ofMillis(20))
54-
.build();
49+
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(10)).build();
5550

5651
// When / Then
5752
testRunFailure(() -> {
@@ -90,24 +85,25 @@ public void testRejectedWithRetries() {
9085
}
9186

9287
/**
93-
* Asserts that a rate limiter propagates an InterruptedException.
88+
* Asserts that a rate limiter propagates a sync InterruptedException.
9489
*/
9590
public void testAcquirePermitWithInterrupt() {
9691
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofSeconds(1))
9792
.withMaxWaitTime(Duration.ofSeconds(5))
9893
.build();
94+
limiter.tryAcquirePermit();
9995

100-
testRunFailure(() -> {
101-
resetLimiter(limiter);
102-
limiter.tryAcquirePermit();
103-
Thread thread = Thread.currentThread();
104-
runInThread(() -> {
105-
Thread.sleep(100);
106-
thread.interrupt();
107-
});
108-
}, Failsafe.with(limiter), ctx -> {
96+
Thread thread = Thread.currentThread();
97+
runInThread(() -> {
98+
Thread.sleep(100);
99+
thread.interrupt();
100+
});
101+
assertThrows(() -> Failsafe.with(limiter).run(() -> {
109102
System.out.println("Executing");
110103
throw new Exception();
111-
}, InterruptedException.class);
104+
}), FailsafeException.class, InterruptedException.class);
105+
106+
// Reset interrupt flag
107+
Thread.interrupted();
112108
}
113109
}

0 commit comments

Comments
 (0)