Skip to content

Commit 94d85aa

Browse files
committed
Add support for permit reservations to rate limiter
1 parent 68e6c1f commit 94d85aa

File tree

6 files changed

+124
-20
lines changed

6 files changed

+124
-20
lines changed

src/main/java/dev/failsafe/RateLimiter.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
* <ul>
4343
* <li>{@link #tryAcquirePermit()}</li>
4444
* <li>{@link #tryAcquirePermits(int)}</li>
45+
* <li>{@link #reservePermit()}</li>
46+
* <li>{@link #reservePermits(int)}</li>
47+
* <li>{@link #tryReservePermit(Duration)}</li>
48+
* <li>{@link #tryReservePermits(int, Duration)}</li>
4549
* </ul>
4650
* </p>
4751
* <p>
@@ -50,6 +54,10 @@
5054
* permits cannot be acquired, and the {@code tryAcquire} methods return a boolean.
5155
* </p>
5256
* <p>
57+
* The {@code reserve} methods attempt to reserve permits and return an expected wait time before the permit can be
58+
* used. This helps integrate with scenarios where you need to wait externally
59+
* </p>
60+
* <p>
5361
* This class is threadsafe.
5462
* </p>
5563
*
@@ -132,6 +140,7 @@ static <R> RateLimiterBuilder<R> builder(RateLimiterConfig<R> config) {
132140
*
133141
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
134142
* @see #tryAcquirePermit()
143+
* @see #reservePermit()
135144
*/
136145
default void acquirePermit() throws InterruptedException {
137146
acquirePermits(1);
@@ -144,6 +153,7 @@ default void acquirePermit() throws InterruptedException {
144153
* @throws IllegalArgumentException if {@code permits} is < 1
145154
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
146155
* @see #tryAcquirePermits(int)
156+
* @see #reservePermits(int)
147157
*/
148158
void acquirePermits(int permits) throws InterruptedException;
149159

@@ -196,10 +206,68 @@ default boolean isBursty() {
196206
return getConfig().getPeriod() != null;
197207
}
198208

209+
/**
210+
* Reserves a permit to perform an execution against the rate limiter, and returns the time that the caller is
211+
* expected to wait before acting on the permit. Returns {@code 0} if the permit is immediately available and no
212+
* waiting is needed.
213+
*
214+
* @see #acquirePermit()
215+
* @see #tryAcquirePermit()
216+
*/
217+
default Duration reservePermit() {
218+
return reservePermits(1);
219+
}
220+
221+
/**
222+
* Reserves the {@code permits} to perform executions against the rate limiter, and returns the time that the caller
223+
* is expected to wait before acting on the permits. Returns {@code 0} if the permits are immediately available and no
224+
* waiting is needed.
225+
*
226+
* @throws IllegalArgumentException if {@code permits} is < 1
227+
* @see #acquirePermits(int)
228+
* @see #tryAcquirePermits(int)
229+
*/
230+
Duration reservePermits(int permits);
231+
232+
/**
233+
* Tries to reserve a permit to perform an execution against the rate limiter, and returns the time that the caller is
234+
* expected to wait before acting on the permit, as long as it's less than the {@code maxWaitTime}.
235+
* <ul>
236+
* <li>Returns the expected wait time for the permit if it was successfully reserved.</li>
237+
* <li>Returns {@code 0} if the permit was successfully reserved and no waiting is needed.</li>
238+
* <li>Returns {@code -1 nanoseconds} if the permit was not reserved because the wait time would be greater than the {@code maxWaitTime}.</li>
239+
* </ul>
240+
*
241+
* @throws NullPointerException if {@code maxWaitTime} is null
242+
* @see #acquirePermit(Duration)
243+
* @see #tryAcquirePermit(Duration)
244+
*/
245+
default Duration tryReservePermit(Duration maxWaitTime) {
246+
return tryReservePermits(1, maxWaitTime);
247+
}
248+
249+
/**
250+
* Tries to reserve the {@code permits} to perform executions against the rate limiter, and returns the time that the
251+
* caller is expected to wait before acting on the permits, as long as it's less than the {@code maxWaitTime}.
252+
* <ul>
253+
* <li>Returns the expected wait time for the permits if they were successfully reserved.</li>
254+
* <li>Returns {@code 0} if the permits were successfully reserved and no waiting is needed.</li>
255+
* <li>Returns {@code -1 nanoseconds} if the permits were not reserved because the wait time would be greater than the {@code maxWaitTime}.</li>
256+
* </ul>
257+
*
258+
* @throws IllegalArgumentException if {@code permits} is < 1
259+
* @throws NullPointerException if {@code maxWaitTime} is null
260+
* @see #acquirePermit(Duration)
261+
* @see #tryAcquirePermit(Duration)
262+
*/
263+
Duration tryReservePermits(int permits, Duration maxWaitTime);
264+
199265
/**
200266
* Tries to acquire a permit to perform an execution against the rate limiter, returning immediately without waiting.
201267
*
202268
* @return whether the requested {@code permits} are successfully acquired or not
269+
* @see #acquirePermit()
270+
* @see #reservePermits(int)
203271
*/
204272
default boolean tryAcquirePermit() {
205273
return tryAcquirePermits(1);
@@ -211,6 +279,7 @@ default boolean tryAcquirePermit() {
211279
*
212280
* @return whether the requested {@code permits} are successfully acquired or not
213281
* @throws IllegalArgumentException if {@code permits} is < 1
282+
* @see #acquirePermits(int)
214283
*/
215284
boolean tryAcquirePermits(int permits);
216285

@@ -221,6 +290,7 @@ default boolean tryAcquirePermit() {
221290
* @return whether a permit is successfully acquired
222291
* @throws NullPointerException if {@code maxWaitTime} is null
223292
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
293+
* @see #acquirePermit(Duration)
224294
*/
225295
default boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException {
226296
return tryAcquirePermits(1, maxWaitTime);
@@ -234,6 +304,7 @@ default boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedExcepti
234304
* @throws IllegalArgumentException if {@code permits} is < 1
235305
* @throws NullPointerException if {@code maxWaitTime} is null
236306
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
307+
* @see #acquirePermits(int, Duration)
237308
*/
238309
boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws InterruptedException;
239310
}

src/main/java/dev/failsafe/RateLimiterBuilder.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import dev.failsafe.internal.util.Assert;
2020

2121
import java.time.Duration;
22-
import java.time.temporal.ChronoUnit;
2322

2423
/**
2524
* Builds {@link RateLimiter} instances.
@@ -57,6 +56,10 @@ public RateLimiter<R> build() {
5756
/**
5857
* Configures the {@code maxWaitTime} to wait for permits to be available. If permits cannot be acquired before the
5958
* {@code maxWaitTime} is exceeded, then the rate limiter will throw {@link RateLimitExceededException}.
59+
* <p>
60+
* This setting only applies when the resulting RateLimiter is used with the {@link Failsafe} class. It does not apply
61+
* when the RateLimiter is used in a standalone way.
62+
* </p>
6063
*
6164
* @throws NullPointerException if {@code maxWaitTime} is null
6265
*/

src/main/java/dev/failsafe/RateLimiterConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public Duration getPeriod() {
8585
/**
8686
* Returns the max time to wait for permits to be available. If permits cannot be acquired before the max wait time is
8787
* exceeded, then the rate limiter will throw {@link RateLimitExceededException}.
88+
* <p>
89+
* This setting only applies when the RateLimiter is used with the {@link Failsafe} class. It does not apply when the
90+
* RateLimiter is used in a standalone way.
91+
* </p>
8892
*
8993
* @see RateLimiterBuilder#withMaxWaitTime(Duration)
9094
*/

src/main/java/dev/failsafe/internal/RateLimiterExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,20 @@ protected ExecutionResult<R> preExecute() {
4848
try {
4949
return rateLimiter.tryAcquirePermit(maxWaitTime) ?
5050
null :
51-
ExecutionResult.failure(new RateLimitExceededException(rateLimiter));
51+
ExecutionResult.exception(new RateLimitExceededException(rateLimiter));
5252
} catch (InterruptedException e) {
5353
// Set interrupt flag
5454
Thread.currentThread().interrupt();
55-
return ExecutionResult.failure(e);
55+
return ExecutionResult.exception(e);
5656
}
5757
}
5858

5959
@Override
6060
protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler scheduler, FailsafeFuture<R> future) {
6161
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
62-
long waitNanos = rateLimiter.acquirePermitWaitNanos(maxWaitTime);
62+
long waitNanos = rateLimiter.reservePermits(1, maxWaitTime);
6363
if (waitNanos == -1)
64-
promise.complete(ExecutionResult.failure(new RateLimitExceededException(rateLimiter)));
64+
promise.complete(ExecutionResult.exception(new RateLimitExceededException(rateLimiter)));
6565
else {
6666
try {
6767
// Wait for the permit

src/main/java/dev/failsafe/internal/RateLimiterImpl.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@
2121
import dev.failsafe.internal.util.Assert;
2222
import dev.failsafe.internal.util.Durations;
2323
import dev.failsafe.spi.PolicyExecutor;
24-
import dev.failsafe.spi.Scheduler;
2524

2625
import java.time.Duration;
27-
import java.util.concurrent.CompletableFuture;
2826
import java.util.concurrent.TimeUnit;
2927

3028
/**
@@ -54,41 +52,45 @@ public RateLimiterConfig<R> getConfig() {
5452

5553
@Override
5654
public void acquirePermits(int permits) throws InterruptedException {
57-
Assert.isTrue(permits > 0, "permits must be > 0");
58-
long waitNanos = stats.acquirePermits(permits, null);
55+
long waitNanos = reservePermits(permits).toNanos();
5956
if (waitNanos > 0)
6057
TimeUnit.NANOSECONDS.sleep(waitNanos);
6158
}
6259

6360
@Override
64-
public boolean tryAcquirePermits(int permits) {
61+
public Duration reservePermits(int permits) {
6562
Assert.isTrue(permits > 0, "permits must be > 0");
66-
long waitNanos = stats.acquirePermits(permits, Duration.ZERO);
67-
return waitNanos == 0;
63+
return Duration.ofNanos(stats.acquirePermits(permits, null));
64+
}
65+
66+
@Override
67+
public boolean tryAcquirePermits(int permits) {
68+
return reservePermits(permits, Duration.ZERO) == 0;
6869
}
6970

7071
@Override
7172
public boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws InterruptedException {
72-
Assert.isTrue(permits > 0, "permits must be > 0");
73-
Assert.notNull(maxWaitTime, "maxWaitTime");
74-
long waitNanos = stats.acquirePermits(permits, Durations.ofSafeNanos(maxWaitTime));
73+
long waitNanos = reservePermits(permits, maxWaitTime);
7574
if (waitNanos == -1)
7675
return false;
7776
if (waitNanos > 0)
7877
TimeUnit.NANOSECONDS.sleep(waitNanos);
7978
return true;
8079
}
8180

81+
@Override
82+
public Duration tryReservePermits(int permits, Duration maxWaitTime) {
83+
return Duration.ofNanos(reservePermits(permits, maxWaitTime));
84+
}
85+
8286
@Override
8387
public PolicyExecutor<R> toExecutor(int policyIndex) {
8488
return new RateLimiterExecutor<>(this, policyIndex);
8589
}
8690

87-
/**
88-
* Returns the wait nanos for an acquired permit which can be used to externally wait.
89-
*/
90-
long acquirePermitWaitNanos(Duration maxWaitTime) {
91+
long reservePermits(int permits, Duration maxWaitTime) {
92+
Assert.isTrue(permits > 0, "permits must be > 0");
9193
Assert.notNull(maxWaitTime, "maxWaitTime");
92-
return stats.acquirePermits(1, Durations.ofSafeNanos(maxWaitTime));
94+
return stats.acquirePermits(permits, Durations.ofSafeNanos(maxWaitTime));
9395
}
9496
}

src/test/java/dev/failsafe/functional/RateLimiterTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,36 @@
2424
import java.time.Duration;
2525

2626
import static dev.failsafe.internal.InternalTesting.resetLimiter;
27+
import static org.testng.Assert.assertEquals;
28+
import static org.testng.Assert.assertTrue;
2729

2830
/**
2931
* Tests various RateLimiter scenarios.
3032
*/
3133
@Test
3234
public class RateLimiterTest extends Testing {
35+
public void testReservePermit() {
36+
// Given
37+
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(100)).build();
38+
39+
// When / Then
40+
assertEquals(limiter.reservePermit(), Duration.ZERO);
41+
assertTrue(limiter.reservePermit().toMillis() > 0);
42+
assertTrue(limiter.reservePermit().toMillis() > 100);
43+
}
44+
45+
public void testTryReservePermit() {
46+
// Given
47+
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(100)).build();
48+
49+
// When / Then
50+
assertEquals(limiter.tryReservePermit(Duration.ofMillis(1)), Duration.ZERO);
51+
assertEquals(limiter.tryReservePermit(Duration.ofMillis(10)), Duration.ofNanos(-1));
52+
assertTrue(limiter.tryReservePermit(Duration.ofMillis(100)).toMillis() > 0);
53+
assertTrue(limiter.tryReservePermit(Duration.ofMillis(200)).toMillis() > 100);
54+
assertEquals(limiter.tryReservePermit(Duration.ofMillis(100)), Duration.ofNanos(-1));
55+
}
56+
3357
public void testPermitAcquiredAfterWait() {
3458
// Given
3559
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(50))

0 commit comments

Comments
 (0)