Skip to content

Commit 2e10f3f

Browse files
Add SemaphoreBackPressureHandler as an optional fallback (#1524)
1 parent 8672e55 commit 2e10f3f

File tree

6 files changed

+771
-1
lines changed

6 files changed

+771
-1
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/backpressure/BackPressureHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ public interface BackPressureHandler {
5757
*/
5858
void release(int amount, ReleaseReason reason);
5959

60+
/**
61+
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
62+
* processing was successful or not.
63+
* @param amount the amount of permits to release.
64+
*
65+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
66+
* Implement {@link #release(int, ReleaseReason)} instead.
67+
*/
68+
@Deprecated
69+
default void release(int amount) {
70+
}
71+
6072
/**
6173
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
6274
* thus no activity is left in the {@link io.awspring.cloud.sqs.listener.source.MessageSource}.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/backpressure/BackPressureHandlerFactories.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,17 @@ public static BackPressureHandlerFactory throughputBackPressureHandler() {
184184
public static BackPressureHandlerFactory fullBatchBackPressureHandler() {
185185
return options -> FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
186186
}
187+
188+
/**
189+
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}.
190+
*
191+
* @return the created SemaphoreBackPressureHandler.
192+
*/
193+
@Deprecated
194+
public static BackPressureHandlerFactory semaphoreBackPressureHandler() {
195+
return options -> SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
196+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
197+
.throughputConfiguration(options.getBackPressureMode()).build();
198+
}
199+
187200
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/backpressure/BatchAwareBackPressureHandler.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,37 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
3131
*/
3232
int requestBatch() throws InterruptedException;
3333

34+
/**
35+
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
36+
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
37+
* can be specified.
38+
*
39+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
40+
* Implement {@link BackPressureHandler#release(int, ReleaseReason)} instead.
41+
*/
42+
@Deprecated
43+
default void releaseBatch() {
44+
}
45+
46+
@Override
47+
default void release(int amount, ReleaseReason reason) {
48+
if (amount == getBatchSize() && reason == ReleaseReason.NONE_FETCHED) {
49+
releaseBatch();
50+
}
51+
else {
52+
release(amount);
53+
}
54+
}
55+
56+
/**
57+
* Return the configured batch size for this handler.
58+
* @return the batch size.
59+
*
60+
* @deprecated This method is deprecated and will not be used by the Spring Cloud AWS SQS listener anymore.
61+
*/
62+
@Deprecated
63+
default int getBatchSize() {
64+
return 0;
65+
}
66+
3467
}
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener.backpressure;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import io.awspring.cloud.sqs.listener.BackPressureMode;
25+
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
32+
*
33+
* @author Tomaz Fernandes
34+
* @since 3.0
35+
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
36+
*/
37+
@Deprecated
38+
public class SemaphoreBackPressureHandler
39+
implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
40+
41+
private static final Logger logger = LoggerFactory.getLogger(SemaphoreBackPressureHandler.class);
42+
43+
private final Semaphore semaphore;
44+
45+
private final int batchSize;
46+
47+
private final int totalPermits;
48+
49+
private final Duration acquireTimeout;
50+
51+
private final BackPressureMode backPressureConfiguration;
52+
53+
private volatile CurrentThroughputMode currentThroughputMode;
54+
55+
private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
56+
57+
private String id;
58+
59+
private SemaphoreBackPressureHandler(Builder builder) {
60+
this.batchSize = builder.batchSize;
61+
this.totalPermits = builder.totalPermits;
62+
this.acquireTimeout = builder.acquireTimeout;
63+
this.backPressureConfiguration = builder.backPressureMode;
64+
this.semaphore = new Semaphore(totalPermits);
65+
this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration)
66+
? CurrentThroughputMode.HIGH
67+
: CurrentThroughputMode.LOW;
68+
logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits",
69+
backPressureConfiguration, totalPermits);
70+
}
71+
72+
public static Builder builder() {
73+
return new Builder();
74+
}
75+
76+
@Override
77+
public void setId(String id) {
78+
this.id = id;
79+
}
80+
81+
@Override
82+
public String getId() {
83+
return this.id;
84+
}
85+
86+
@Override
87+
public int request(int amount) throws InterruptedException {
88+
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
89+
}
90+
91+
// @formatter:off
92+
@Override
93+
public int requestBatch() throws InterruptedException {
94+
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
95+
? requestInLowThroughputMode()
96+
: requestInHighThroughputMode();
97+
}
98+
99+
private int requestInHighThroughputMode() throws InterruptedException {
100+
return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
101+
? this.batchSize
102+
: tryAcquirePartial();
103+
}
104+
// @formatter:on
105+
106+
private int tryAcquirePartial() throws InterruptedException {
107+
int availablePermits = this.semaphore.availablePermits();
108+
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
109+
return 0;
110+
}
111+
int permitsToRequest = Math.min(availablePermits, this.batchSize);
112+
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
113+
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
114+
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
115+
boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow);
116+
return hasAcquiredPartial ? permitsToRequest : 0;
117+
}
118+
119+
private int requestInLowThroughputMode() throws InterruptedException {
120+
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
121+
// so no actual concurrency
122+
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
123+
this.semaphore.availablePermits());
124+
boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW);
125+
if (hasAcquired) {
126+
logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
127+
// We've acquired all permits - there's no other process currently processing messages
128+
if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
129+
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
130+
this.semaphore.availablePermits());
131+
}
132+
return this.batchSize;
133+
}
134+
else {
135+
return 0;
136+
}
137+
}
138+
139+
private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException {
140+
logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode);
141+
boolean hasAcquired = this.semaphore.tryAcquire(amount, this.acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
142+
if (hasAcquired) {
143+
logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id,
144+
currentThroughputModeNow, this.semaphore.availablePermits());
145+
}
146+
else {
147+
logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}", amount,
148+
this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow,
149+
this.semaphore.availablePermits());
150+
}
151+
return hasAcquired;
152+
}
153+
154+
@Override
155+
public void releaseBatch() {
156+
maybeSwitchToLowThroughputMode();
157+
int permitsToRelease = getPermitsToRelease(this.batchSize);
158+
this.semaphore.release(permitsToRelease);
159+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
160+
this.semaphore.availablePermits());
161+
}
162+
163+
@Override
164+
public int getBatchSize() {
165+
return this.batchSize;
166+
}
167+
168+
private void maybeSwitchToLowThroughputMode() {
169+
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
170+
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
171+
logger.debug("Entire batch of permits released for {}, setting TM LOW. Permits left: {}", this.id,
172+
this.semaphore.availablePermits());
173+
this.currentThroughputMode = CurrentThroughputMode.LOW;
174+
}
175+
}
176+
177+
@Override
178+
public void release(int amount) {
179+
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
180+
this.semaphore.availablePermits());
181+
maybeSwitchToHighThroughputMode(amount);
182+
int permitsToRelease = getPermitsToRelease(amount);
183+
this.semaphore.release(permitsToRelease);
184+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
185+
this.semaphore.availablePermits());
186+
}
187+
188+
private int getPermitsToRelease(int amount) {
189+
return this.hasAcquiredFullPermits.compareAndSet(true, false)
190+
// The first process that gets here should release all permits except for inflight messages
191+
// We can have only one batch of messages at this point since we have all permits
192+
? this.totalPermits - (this.batchSize - amount)
193+
: amount;
194+
}
195+
196+
private void maybeSwitchToHighThroughputMode(int amount) {
197+
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
198+
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
199+
this.semaphore.availablePermits());
200+
this.currentThroughputMode = CurrentThroughputMode.HIGH;
201+
}
202+
}
203+
204+
@Override
205+
public boolean drain(Duration timeout) {
206+
logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(),
207+
this.totalPermits - this.semaphore.availablePermits(), this.id);
208+
try {
209+
return this.semaphore.tryAcquire(this.totalPermits, (int) timeout.getSeconds(), TimeUnit.SECONDS);
210+
}
211+
catch (InterruptedException e) {
212+
Thread.currentThread().interrupt();
213+
throw new IllegalStateException("Interrupted while waiting to acquire permits", e);
214+
}
215+
}
216+
217+
private enum CurrentThroughputMode {
218+
219+
HIGH,
220+
221+
LOW;
222+
223+
}
224+
225+
public static class Builder {
226+
227+
private int batchSize;
228+
229+
private int totalPermits;
230+
231+
private Duration acquireTimeout;
232+
233+
private BackPressureMode backPressureMode;
234+
235+
public Builder batchSize(int batchSize) {
236+
this.batchSize = batchSize;
237+
return this;
238+
}
239+
240+
public Builder totalPermits(int totalPermits) {
241+
this.totalPermits = totalPermits;
242+
return this;
243+
}
244+
245+
public Builder acquireTimeout(Duration acquireTimeout) {
246+
this.acquireTimeout = acquireTimeout;
247+
return this;
248+
}
249+
250+
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
251+
this.backPressureMode = backPressureConfiguration;
252+
return this;
253+
}
254+
255+
public SemaphoreBackPressureHandler build() {
256+
Assert.noNullElements(
257+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
258+
"Missing configuration");
259+
return new SemaphoreBackPressureHandler(this);
260+
}
261+
262+
}
263+
264+
}

0 commit comments

Comments
 (0)