Skip to content

Commit 66b8ca3

Browse files
authored
Ratelimiter implementation (#313)
Creates a RateLimiter policy with smooth and bursty variants.
1 parent ff160d4 commit 66b8ca3

29 files changed

+1666
-149
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# 3.1
2+
3+
### Improvements
4+
5+
- Issue #308 - Introduce a `RateLimiter` policy.
6+
17
# 3.0.2
28

39
### Bug Fixes
@@ -17,7 +23,6 @@
1723

1824
# 3.0
1925

20-
2126
### API Changes
2227

2328
This release introduces some breaking changes to the API:

src/main/java/dev/failsafe/CircuitBreaker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ enum State {
107107
* Attempts to acquire a permit for the circuit breaker and throws {@link CircuitBreakerOpenException} if a permit
108108
* could not be acquired. Permission will be automatically released when a result or failure is recorded.
109109
*
110-
* @throws CircuitBreakerOpenException Thrown when the circuit breaker is in a half-open state and no permits remain
111-
* according to the configured success or failure thresholding capacity.
110+
* @throws CircuitBreakerOpenException if the circuit breaker is in a half-open state and no permits remain according
111+
* to the configured success or failure thresholding capacity.
112112
* @see #tryAcquirePermit()
113113
* @see #recordResult(Object)
114114
* @see #recordFailure(Throwable)

src/main/java/dev/failsafe/FailsafeException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package dev.failsafe;
1717

1818
/**
19-
* Thrown when a synchronous Failsafe execution fails with an exception. Use {@link Throwable#getCause()} to learn the
20-
* cause of the failure.
21-
*
19+
* Thrown when a synchronous Failsafe execution fails with an {@link Exception}, wrapping the underlying exception. Use
20+
* {@link Throwable#getCause()} to learn the cause of the failure.
21+
*
2222
* @author Jonathan Halterman
2323
*/
2424
public class FailsafeException extends RuntimeException {

src/main/java/dev/failsafe/FailsafeExecutor.java

Lines changed: 87 additions & 82 deletions
Large diffs are not rendered by default.

src/main/java/dev/failsafe/Functions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ static <R> Function<SyncExecutionInternal<R>, ExecutionResult<R>> get(Contextual
5454
synchronized (execution.getInitial()) {
5555
execution.setInterruptable(false);
5656
if (execution.isInterrupted()) {
57-
// Clear interrupt flag if interruption was intended
57+
// Clear interrupt flag if interruption was performed by Failsafe
5858
Thread.interrupted();
5959
return execution.getResult();
6060
} else if (throwable instanceof InterruptedException)
61-
// Set interrupt flag if interrupt occurred but was not intended
61+
// Set interrupt flag if interruption was not performed by Failsafe
6262
Thread.currentThread().interrupt();
6363
}
6464

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
/**
19+
* Thrown when an execution exceeds or would exceed a {@link RateLimiter}.
20+
*
21+
* @author Jonathan Halterman
22+
*/
23+
public class RateLimitExceededException extends FailsafeException {
24+
private static final long serialVersionUID = 1L;
25+
26+
private final RateLimiter<?> rateLimiter;
27+
28+
public RateLimitExceededException(RateLimiter<?> rateLimiter) {
29+
this.rateLimiter = rateLimiter;
30+
}
31+
32+
/** Returns the {@link RateLimiter} that caused the exception. */
33+
public RateLimiter<?> getRateLimiter() {
34+
return rateLimiter;
35+
}
36+
}
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
import java.time.Duration;
19+
20+
/**
21+
* A rate limiter allows you to control the rate of executions as a way of preventing system overload.
22+
* <p>
23+
* There are two types of rate limiting: <i>smooth</i> and <i>bursty</i>. <i>Smooth</i> rate limiting will evenly spread
24+
* out execution requests over-time, effectively smoothing out uneven execution request rates. <i>Bursty</i> rate
25+
* limiting allows potential bursts of executions to occur, up to a configured max per time period.</p>
26+
* <p>Rate limiting is based on permits, which can be requested in order to perform rate limited execution.
27+
* Permits are automatically refreshed over time based on the rate limiter's configuration.</p>
28+
* <p>
29+
* This class provides methods that block while waiting for permits to become available, and also methods that return
30+
* immediately. The blocking methods include:
31+
* <ul>
32+
* <li>{@link #acquirePermit()}</li>
33+
* <li>{@link #acquirePermits(int)}</li>
34+
* <li>{@link #acquirePermit(Duration)}</li>
35+
* <li>{@link #acquirePermits(int, Duration)}</li>
36+
* <li>{@link #tryAcquirePermit(Duration)}</li>
37+
* <li>{@link #tryAcquirePermits(int, Duration)}</li>
38+
* </ul>
39+
* The methods that return immediately include:
40+
* <ul>
41+
* <li>{@link #tryAcquirePermit()}</li>
42+
* <li>{@link #tryAcquirePermits(int)}</li>
43+
* </ul>
44+
* </p>
45+
* <p>
46+
* This class also provides methods that throw {@link RateLimitExceededException} when permits cannot be acquired, and
47+
* also methods that return a boolean. The {@code acquire} methods all throw {@link RateLimitExceededException} when
48+
* permits cannot be acquired, and the {@code tryAcquire} methods return a boolean.
49+
* </p>
50+
* <p>
51+
* This class is threadsafe.
52+
* </p>
53+
*
54+
* @param <R> result type
55+
* @author Jonathan Halterman
56+
* @see RateLimiterConfig
57+
* @see RateLimiterBuilder
58+
* @see RateLimitExceededException
59+
*/
60+
public interface RateLimiter<R> extends Policy<R> {
61+
/**
62+
* Returns a smooth {@link RateLimiterBuilder} for the {@code maxExecutions} and {@code period}, which control how
63+
* frequently an execution is permitted. The individual execution rate is computed as {@code period / maxExecutions}.
64+
* For example, with {@code maxExecutions} of {@code 100} and a {@code period} of {@code 1000 millis}, individual
65+
* executions will be permitted at a max rate of one every 10 millis.
66+
* <p>By default, the returned {@link RateLimiterBuilder} will have a {@link RateLimiterBuilder#withMaxWaitTime max
67+
* wait time} of {@code 0}.
68+
* <p>
69+
* Executions are performed with no delay until they exceed the max rate, after which executions are either rejected
70+
* or will block and wait until the {@link RateLimiterBuilder#withMaxWaitTime(Duration) max wait time} is exceeded.
71+
*
72+
* @param maxExecutions The max number of permitted executions per {@code period}
73+
* @param period The period after which permitted executions are reset to the {@code maxExecutions}
74+
*/
75+
static <R> RateLimiterBuilder<R> smoothBuilder(long maxExecutions, Duration period) {
76+
return new RateLimiterBuilder<>(period.dividedBy(maxExecutions));
77+
}
78+
79+
/**
80+
* Returns a smooth {@link RateLimiterBuilder} for the {@code maxRate}, which controls how frequently an execution is
81+
* permitted. For example, a {@code maxRate} of {@code Duration.ofMillis(10)} would allow up to one execution every 10
82+
* milliseconds.
83+
* <p>By default, the returned {@link RateLimiterBuilder} will have a {@link RateLimiterBuilder#withMaxWaitTime max
84+
* wait time} of {@code 0}.
85+
* <p>
86+
* Executions are performed with no delay until they exceed the {@code maxRate}, after which executions are either
87+
* rejected or will block and wait until the {@link RateLimiterBuilder#withMaxWaitTime(Duration) max wait time} is
88+
* exceeded.
89+
*
90+
* @param maxRate at which individual executions should be permitted
91+
*/
92+
static <R> RateLimiterBuilder<R> smoothBuilder(Duration maxRate) {
93+
return new RateLimiterBuilder<>(maxRate);
94+
}
95+
96+
/**
97+
* Returns a bursty {@link RateLimiterBuilder} for the {@code maxExecutions} per {@code period}. For example, a {@code
98+
* maxExecutions} value of {@code 100} with a {@code period} of {@code Duration.ofSeconds(1)} would allow up to 100
99+
* executions every 1 second.
100+
* <p>By default, the returned {@link RateLimiterBuilder} will have a {@link RateLimiterBuilder#withMaxWaitTime max
101+
* wait time} of {@code 0}.
102+
* <p>
103+
* Executions are performed with no delay up until the {@code maxExecutions} are reached for the current {@code
104+
* period}, after which executions are either rejected or will block and wait until the {@link
105+
* RateLimiterBuilder#withMaxWaitTime(Duration) max wait time} is exceeded.
106+
*
107+
* @param maxExecutions The max number of permitted executions per {@code period}
108+
* @param period The period after which permitted executions are reset to the {@code maxExecutions}
109+
*/
110+
static <R> RateLimiterBuilder<R> burstyBuilder(long maxExecutions, Duration period) {
111+
return new RateLimiterBuilder<>(maxExecutions, period);
112+
}
113+
114+
/**
115+
* Creates a new RateLimiterBuilder that will be based on the {@code config}.
116+
*/
117+
static <R> RateLimiterBuilder<R> builder(RateLimiterConfig<R> config) {
118+
return new RateLimiterBuilder<>(config);
119+
}
120+
121+
/**
122+
* Returns the {@link RateLimiterConfig} that the RateLimiter was built with.
123+
*/
124+
@Override
125+
RateLimiterConfig<R> getConfig();
126+
127+
/**
128+
* Attempts to acquire a permit to perform an execution against the rate limiter, waiting until one is available or
129+
* the thread is interrupted.
130+
*
131+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
132+
* @see #tryAcquirePermit()
133+
*/
134+
default void acquirePermit() throws InterruptedException {
135+
acquirePermits(1);
136+
}
137+
138+
/**
139+
* Attempts to acquire the requested {@code permits} to perform executions against the rate limiter, waiting until
140+
* they are available or the thread is interrupted.
141+
*
142+
* @throws IllegalArgumentException if {@code permits} is < 1
143+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
144+
* @see #tryAcquirePermits(int)
145+
*/
146+
void acquirePermits(int permits) throws InterruptedException;
147+
148+
/**
149+
* Attempts to acquire a permit to perform an execution against the rate limiter, waiting up to the {@code
150+
* maxWaitTime} until one is available, else throwing {@link RateLimitExceededException} if a permit will not be
151+
* available in time.
152+
*
153+
* @throws NullPointerException if {@code maxWaitTime} is null
154+
* @throws RateLimitExceededException if the rate limiter cannot acquire a permit within the {@code maxWaitTime}
155+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
156+
* @see #tryAcquirePermit(Duration)
157+
*/
158+
default void acquirePermit(Duration maxWaitTime) throws InterruptedException {
159+
acquirePermits(1, maxWaitTime);
160+
}
161+
162+
/**
163+
* Attempts to acquire the requested {@code permits} to perform executions against the rate limiter, waiting up to the
164+
* {@code maxWaitTime} until they are available, else throwing {@link RateLimitExceededException} if the permits will
165+
* not be available in time.
166+
*
167+
* @throws IllegalArgumentException if {@code permits} is < 1
168+
* @throws NullPointerException if {@code maxWaitTime} is null
169+
* @throws RateLimitExceededException if the rate limiter cannot acquire a permit within the {@code maxWaitTime}
170+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
171+
* @see #tryAcquirePermits(int, Duration)
172+
*/
173+
default void acquirePermits(int permits, Duration maxWaitTime) throws InterruptedException {
174+
if (!tryAcquirePermits(permits, maxWaitTime))
175+
throw new RateLimitExceededException(this);
176+
}
177+
178+
/**
179+
* Returns whether the rate limiter is smooth.
180+
*
181+
* @see #smoothBuilder(long, Duration)
182+
* @see #smoothBuilder(Duration)
183+
*/
184+
default boolean isSmooth() {
185+
return getConfig().getMaxRate() != null;
186+
}
187+
188+
/**
189+
* Returns whether the rate limiter is bursty.
190+
*
191+
* @see #burstyBuilder(long, Duration)
192+
*/
193+
default boolean isBursty() {
194+
return getConfig().getPeriod() != null;
195+
}
196+
197+
/**
198+
* Tries to acquire a permit to perform an execution against the rate limiter, returning immediately without waiting.
199+
*
200+
* @return whether the requested {@code permits} are successfully acquired or not
201+
*/
202+
default boolean tryAcquirePermit() {
203+
return tryAcquirePermits(1);
204+
}
205+
206+
/**
207+
* Tries to acquire the requested {@code permits} to perform executions against the rate limiter, returning
208+
* immediately without waiting.
209+
*
210+
* @return whether the requested {@code permits} are successfully acquired or not
211+
* @throws IllegalArgumentException if {@code permits} is < 1
212+
*/
213+
boolean tryAcquirePermits(int permits);
214+
215+
/**
216+
* Tries to acquire a permit to perform an execution against the rate limiter, waiting up to the {@code maxWaitTime}
217+
* until they are available.
218+
*
219+
* @return whether a permit is successfully acquired
220+
* @throws NullPointerException if {@code maxWaitTime} is null
221+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
222+
*/
223+
default boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException {
224+
return tryAcquirePermits(1, maxWaitTime);
225+
}
226+
227+
/**
228+
* Tries to acquire the requested {@code permits} to perform executions against the rate limiter, waiting up to the
229+
* {@code maxWaitTime} until they are available.
230+
*
231+
* @return whether the requested {@code permits} are successfully acquired or not
232+
* @throws IllegalArgumentException if {@code permits} is < 1
233+
* @throws NullPointerException if {@code maxWaitTime} is null
234+
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
235+
*/
236+
boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws InterruptedException;
237+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License
15+
*/
16+
package dev.failsafe;
17+
18+
import dev.failsafe.internal.RateLimiterImpl;
19+
import dev.failsafe.internal.util.Assert;
20+
21+
import java.time.Duration;
22+
import java.time.temporal.ChronoUnit;
23+
24+
/**
25+
* Builds {@link RateLimiter} instances.
26+
* <p>
27+
* This class is <i>not</i> threadsafe.
28+
* </p>
29+
*
30+
* @param <R> result type
31+
* @author Jonathan Halterman
32+
* @see RateLimiterConfig
33+
* @see RateLimitExceededException
34+
*/
35+
public class RateLimiterBuilder<R> extends PolicyBuilder<RateLimiterBuilder<R>, RateLimiterConfig<R>, R> {
36+
RateLimiterBuilder(Duration executionRate) {
37+
super(new RateLimiterConfig<>(executionRate));
38+
config.maxWaitTime = Duration.ZERO;
39+
}
40+
41+
RateLimiterBuilder(long maxPermits, Duration period) {
42+
super(new RateLimiterConfig<>(maxPermits, period));
43+
config.maxWaitTime = Duration.ZERO;
44+
}
45+
46+
RateLimiterBuilder(RateLimiterConfig<R> config) {
47+
super(new RateLimiterConfig<>(config));
48+
}
49+
50+
/**
51+
* Builds a new {@link RateLimiter} using the builder's configuration.
52+
*/
53+
public RateLimiter<R> build() {
54+
return new RateLimiterImpl<>(new RateLimiterConfig<>(config));
55+
}
56+
57+
/**
58+
* Configures the {@code maxWaitTime} to wait for permits to be available. If permits cannot be acquired before the
59+
* {@code maxWaitTime} is exceeded, then the rate limiter will throw {@link RateLimitExceededException}.
60+
*
61+
* @throws NullPointerException if {@code maxWaitTime} is null
62+
*/
63+
public RateLimiterBuilder<R> withMaxWaitTime(Duration maxWaitTime) {
64+
config.maxWaitTime = Assert.notNull(maxWaitTime, "maxWaitTime");
65+
return this;
66+
}
67+
}

0 commit comments

Comments
 (0)