3030import java .util .Iterator ;
3131import java .util .concurrent .ConcurrentLinkedDeque ;
3232import java .util .concurrent .TimeUnit ;
33- import java .util .concurrent .atomic .AtomicInteger ;
3433import java .util .concurrent .locks .Condition ;
3534import java .util .concurrent .locks .ReentrantLock ;
36- import java .util .concurrent .locks .ReentrantReadWriteLock ;
3735import java .util .function .Consumer ;
3836import java .util .function .Supplier ;
3937
4038import static com .mongodb .assertions .Assertions .assertNotNull ;
4139import static com .mongodb .assertions .Assertions .assertTrue ;
4240import static com .mongodb .assertions .Assertions .notNull ;
4341import static com .mongodb .internal .Locks .lockInterruptibly ;
44- import static com .mongodb .internal .Locks .lockInterruptiblyUnfair ;
45- import static com .mongodb .internal .Locks .withUnfairLock ;
42+ import static com .mongodb .internal .Locks .withLock ;
4643import static com .mongodb .internal .VisibleForTesting .AccessModifier .PRIVATE ;
4744import static com .mongodb .internal .thread .InterruptionUtil .interruptAndCreateMongoInterruptedException ;
4845
@@ -171,9 +168,9 @@ public T get(final long timeout, final TimeUnit timeUnit) {
171168 * and returns {@code null} instead of throwing {@link MongoTimeoutException}.
172169 */
173170 @ Nullable
174- T getImmediateUnfair () {
171+ T getImmediate () {
175172 T element = null ;
176- if (stateAndPermits .acquirePermitImmediateUnfair ()) {
173+ if (stateAndPermits .acquirePermitImmediate ()) {
177174 element = available .pollLast ();
178175 if (element == null ) {
179176 stateAndPermits .releasePermit ();
@@ -321,57 +318,26 @@ private static final class StateAndPermits {
321318 private volatile boolean closed ;
322319 private final int maxPermits ;
323320 private volatile int permits ;
324- /** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
325- * {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
326- * to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
327- * the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
328- * but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
329- * latencies with unfair locking. The fair approach is in accordance with the
330- * <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
331- * connection pool specification</a>.
332- * <p>
333- * When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
334- * and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
335- * threads spend less time in the queue. This results in having smaller high percentiles
336- * of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
337- * to further reduce the high percentiles by acquiring the lock fairly.</p>
338- * <p>
339- * While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
340- * comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
341- * <p>
342- * {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
343- * to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
344- * This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
345- * But it appears to be a good enough compromise, that results in having enough throughput when there are enough
346- * available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
347- * <p>
348- * It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
349- * but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
350- * (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
351- * fair locking is used. That is, this approach does not result in the behavior we want.</p>
352- */
353- private final AtomicInteger waitersEstimate ;
354321 @ Nullable
355322 private Supplier <MongoException > causeSupplier ;
356323
357324 StateAndPermits (final int maxPermits , final Supplier <MongoServerUnavailableException > poolClosedExceptionSupplier ) {
358325 this .poolClosedExceptionSupplier = poolClosedExceptionSupplier ;
359- lock = new ReentrantLock (true );
326+ lock = new ReentrantLock ();
360327 permitAvailableOrClosedOrPausedCondition = lock .newCondition ();
361328 paused = false ;
362329 closed = false ;
363330 this .maxPermits = maxPermits ;
364331 permits = maxPermits ;
365- waitersEstimate = new AtomicInteger ();
366332 causeSupplier = null ;
367333 }
368334
369335 int permits () {
370336 return permits ;
371337 }
372338
373- boolean acquirePermitImmediateUnfair () {
374- return withUnfairLock (lock , () -> {
339+ boolean acquirePermitImmediate () {
340+ return withLock (lock , () -> {
375341 throwIfClosedOrPaused ();
376342 if (permits > 0 ) {
377343 //noinspection NonAtomicOperationOnVolatileField
@@ -391,17 +357,12 @@ boolean acquirePermitImmediateUnfair() {
391357 */
392358 boolean acquirePermit (final long timeout , final TimeUnit unit ) throws MongoInterruptedException {
393359 long remainingNanos = unit .toNanos (timeout );
394- if (waitersEstimate .get () == 0 ) {
395- lockInterruptiblyUnfair (lock );
396- } else {
397- lockInterruptibly (lock );
398- }
360+ lockInterruptibly (lock );
399361 try {
400362 while (permits == 0
401363 // the absence of short-circuiting is of importance
402364 & !throwIfClosedOrPaused ()) {
403365 try {
404- waitersEstimate .incrementAndGet ();
405366 if (timeout < 0 || remainingNanos == Long .MAX_VALUE ) {
406367 permitAvailableOrClosedOrPausedCondition .await ();
407368 } else if (remainingNanos >= 0 ) {
@@ -411,8 +372,6 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
411372 }
412373 } catch (InterruptedException e ) {
413374 throw interruptAndCreateMongoInterruptedException (null , e );
414- } finally {
415- waitersEstimate .decrementAndGet ();
416375 }
417376 }
418377 assertTrue (permits > 0 );
@@ -425,7 +384,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
425384 }
426385
427386 void releasePermit () {
428- withUnfairLock (lock , () -> {
387+ withLock (lock , () -> {
429388 assertTrue (permits < maxPermits );
430389 //noinspection NonAtomicOperationOnVolatileField
431390 permits ++;
@@ -434,7 +393,7 @@ void releasePermit() {
434393 }
435394
436395 void pause (final Supplier <MongoException > causeSupplier ) {
437- withUnfairLock (lock , () -> {
396+ withLock (lock , () -> {
438397 if (!paused ) {
439398 this .paused = true ;
440399 permitAvailableOrClosedOrPausedCondition .signalAll ();
@@ -445,7 +404,7 @@ void pause(final Supplier<MongoException> causeSupplier) {
445404
446405 void ready () {
447406 if (paused ) {
448- withUnfairLock (lock , () -> {
407+ withLock (lock , () -> {
449408 this .paused = false ;
450409 this .causeSupplier = null ;
451410 });
@@ -457,7 +416,7 @@ void ready() {
457416 */
458417 boolean close () {
459418 if (!closed ) {
460- return withUnfairLock (lock , () -> {
419+ return withLock (lock , () -> {
461420 if (!closed ) {
462421 closed = true ;
463422 permitAvailableOrClosedOrPausedCondition .signalAll ();
0 commit comments