2626import com .datastax .oss .driver .shaded .guava .common .annotations .VisibleForTesting ;
2727import edu .umd .cs .findbugs .annotations .NonNull ;
2828import edu .umd .cs .findbugs .annotations .Nullable ;
29- import java .util .ArrayDeque ;
3029import java .util .Deque ;
31- import java .util .concurrent .locks . ReentrantLock ;
32- import net . jcip . annotations . GuardedBy ;
30+ import java .util .concurrent .ConcurrentLinkedDeque ;
31+ import java . util . concurrent . atomic . AtomicInteger ;
3332import net .jcip .annotations .ThreadSafe ;
3433import org .slf4j .Logger ;
3534import org .slf4j .LoggerFactory ;
@@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler {
6160 private final String logPrefix ;
6261 private final int maxConcurrentRequests ;
6362 private final int maxQueueSize ;
64-
65- private final ReentrantLock lock = new ReentrantLock ();
66-
67- @ GuardedBy ("lock" )
68- private int concurrentRequests ;
69-
70- @ GuardedBy ("lock" )
71- private final Deque <Throttled > queue = new ArrayDeque <>();
72-
73- @ GuardedBy ("lock" )
74- private boolean closed ;
63+ private final AtomicInteger concurrentRequests = new AtomicInteger (0 );
64+ // CLQ is not O(1) for size(), as it forces a full iteration of the queue. So, we track
65+ // the size of the queue explicitly.
66+ private final Deque <Throttled > queue = new ConcurrentLinkedDeque <>();
67+ private final AtomicInteger queueSize = new AtomicInteger (0 );
68+ private volatile boolean closed = false ;
7569
7670 public ConcurrencyLimitingRequestThrottler (DriverContext context ) {
7771 this .logPrefix = context .getSessionName ();
@@ -88,50 +82,62 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {
8882
8983 @ Override
9084 public void register (@ NonNull Throttled request ) {
91- boolean notifyReadyRequired = false ;
85+ if (closed ) {
86+ LOG .trace ("[{}] Rejecting request after shutdown" , logPrefix );
87+ fail (request , "The session is shutting down" );
88+ return ;
89+ }
9290
93- lock .lock ();
94- try {
95- if (closed ) {
96- LOG .trace ("[{}] Rejecting request after shutdown" , logPrefix );
97- fail (request , "The session is shutting down" );
98- } else if (queue .isEmpty () && concurrentRequests < maxConcurrentRequests ) {
99- // We have capacity for one more concurrent request
91+ // Implementation note: Technically the "concurrent requests" or "queue size"
92+ // could read transiently over the limit, but the queue itself will never grow
93+ // beyond the limit since we always check for that condition and revert if
94+ // over-limit. We do this instead of a CAS-loop to avoid the potential loop.
95+
96+ // If no backlog exists AND we get capacity, we can execute immediately
97+ if (queueSize .get () == 0 ) {
98+ // Take a claim first, and then check if we are OK to proceed
99+ int newConcurrent = concurrentRequests .incrementAndGet ();
100+ if (newConcurrent <= maxConcurrentRequests ) {
100101 LOG .trace ("[{}] Starting newly registered request" , logPrefix );
101- concurrentRequests += 1 ;
102- notifyReadyRequired = true ;
103- } else if (queue .size () < maxQueueSize ) {
104- LOG .trace ("[{}] Enqueuing request" , logPrefix );
105- queue .add (request );
102+ request .onThrottleReady (false );
103+ return ;
106104 } else {
107- LOG .trace ("[{}] Rejecting request because of full queue" , logPrefix );
108- fail (
109- request ,
110- String .format (
111- "The session has reached its maximum capacity "
112- + "(concurrent requests: %d, queue size: %d)" ,
113- maxConcurrentRequests , maxQueueSize ));
105+ // We exceeded the limit, decrement the count and fall through to the queuing logic
106+ concurrentRequests .decrementAndGet ();
114107 }
115- } finally {
116- lock .unlock ();
117108 }
118109
119- // no need to hold the lock while allowing the task to progress
120- if (notifyReadyRequired ) {
121- request .onThrottleReady (false );
110+ // If we have a backlog, or we failed to claim capacity, try to enqueue
111+ int newQueueSize = queueSize .incrementAndGet ();
112+ if (newQueueSize <= maxQueueSize ) {
113+ LOG .trace ("[{}] Enqueuing request" , logPrefix );
114+ queue .offer (request );
115+
116+ // Double-check that we were still supposed to be enqueued; it is possible
117+ // that the session was closed while we were enqueuing, it's also possible
118+ // that it is right now removing the request, so we need to check both
119+ if (closed ) {
120+ if (queue .remove (request )) {
121+ queueSize .decrementAndGet ();
122+ LOG .trace ("[{}] Rejecting late request after shutdown" , logPrefix );
123+ fail (request , "The session is shutting down" );
124+ }
125+ }
126+ } else {
127+ LOG .trace ("[{}] Rejecting request because of full queue" , logPrefix );
128+ queueSize .decrementAndGet ();
129+ fail (
130+ request ,
131+ String .format (
132+ "The session has reached its maximum capacity "
133+ + "(concurrent requests: %d, queue size: %d)" ,
134+ maxConcurrentRequests , maxQueueSize ));
122135 }
123136 }
124137
125138 @ Override
126139 public void signalSuccess (@ NonNull Throttled request ) {
127- Throttled nextRequest = null ;
128- lock .lock ();
129- try {
130- nextRequest = onRequestDoneAndDequeNext ();
131- } finally {
132- lock .unlock ();
133- }
134-
140+ Throttled nextRequest = onRequestDoneAndDequeNext ();
135141 if (nextRequest != null ) {
136142 nextRequest .onThrottleReady (true );
137143 }
@@ -145,17 +151,13 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
145151 @ Override
146152 public void signalTimeout (@ NonNull Throttled request ) {
147153 Throttled nextRequest = null ;
148- lock .lock ();
149- try {
150- if (!closed ) {
151- if (queue .remove (request )) { // The request timed out before it was active
152- LOG .trace ("[{}] Removing timed out request from the queue" , logPrefix );
153- } else {
154- nextRequest = onRequestDoneAndDequeNext ();
155- }
154+ if (!closed ) {
155+ if (queue .remove (request )) { // The request timed out before it was active
156+ queueSize .decrementAndGet ();
157+ LOG .trace ("[{}] Removing timed out request from the queue" , logPrefix );
158+ } else {
159+ nextRequest = onRequestDoneAndDequeNext ();
156160 }
157- } finally {
158- lock .unlock ();
159161 }
160162
161163 if (nextRequest != null ) {
@@ -166,35 +168,30 @@ public void signalTimeout(@NonNull Throttled request) {
166168 @ Override
167169 public void signalCancel (@ NonNull Throttled request ) {
168170 Throttled nextRequest = null ;
169- lock .lock ();
170- try {
171- if (!closed ) {
172- if (queue .remove (request )) { // The request has been cancelled before it was active
173- LOG .trace ("[{}] Removing cancelled request from the queue" , logPrefix );
174- } else {
175- nextRequest = onRequestDoneAndDequeNext ();
176- }
171+ if (!closed ) {
172+ if (queue .remove (request )) { // The request has been cancelled before it was active
173+ queueSize .decrementAndGet ();
174+ LOG .trace ("[{}] Removing cancelled request from the queue" , logPrefix );
175+ } else {
176+ nextRequest = onRequestDoneAndDequeNext ();
177177 }
178- } finally {
179- lock .unlock ();
180178 }
181179
182180 if (nextRequest != null ) {
183181 nextRequest .onThrottleReady (true );
184182 }
185183 }
186184
187- @ SuppressWarnings ("GuardedBy" ) // this method is only called with the lock held
188185 @ Nullable
189186 private Throttled onRequestDoneAndDequeNext () {
190- assert lock .isHeldByCurrentThread ();
191187 if (!closed ) {
192- if (queue .isEmpty ()) {
193- concurrentRequests -= 1 ;
188+ Throttled nextRequest = queue .poll ();
189+ if (nextRequest == null ) {
190+ concurrentRequests .decrementAndGet ();
194191 } else {
192+ queueSize .decrementAndGet ();
195193 LOG .trace ("[{}] Starting dequeued request" , logPrefix );
196- // don't touch concurrentRequests since we finished one but started another
197- return queue .poll ();
194+ return nextRequest ;
198195 }
199196 }
200197
@@ -204,45 +201,28 @@ private Throttled onRequestDoneAndDequeNext() {
204201
205202 @ Override
206203 public void close () {
207- lock .lock ();
208- try {
209- closed = true ;
210- LOG .debug ("[{}] Rejecting {} queued requests after shutdown" , logPrefix , queue .size ());
211- for (Throttled request : queue ) {
212- fail (request , "The session is shutting down" );
213- }
214- } finally {
215- lock .unlock ();
204+ closed = true ;
205+
206+ LOG .debug ("[{}] Rejecting {} queued requests after shutdown" , logPrefix , queueSize .get ());
207+ Throttled request ;
208+ while ((request = queue .poll ()) != null ) {
209+ queueSize .decrementAndGet ();
210+ fail (request , "The session is shutting down" );
216211 }
217212 }
218213
219214 public int getQueueSize () {
220- lock .lock ();
221- try {
222- return queue .size ();
223- } finally {
224- lock .unlock ();
225- }
215+ return queueSize .get ();
226216 }
227217
228218 @ VisibleForTesting
229219 int getConcurrentRequests () {
230- lock .lock ();
231- try {
232- return concurrentRequests ;
233- } finally {
234- lock .unlock ();
235- }
220+ return concurrentRequests .get ();
236221 }
237222
238223 @ VisibleForTesting
239224 Deque <Throttled > getQueue () {
240- lock .lock ();
241- try {
242- return queue ;
243- } finally {
244- lock .unlock ();
245- }
225+ return queue ;
246226 }
247227
248228 private static void fail (Throttled request , String message ) {
0 commit comments