2828
2929import java .util .Iterator ;
3030import java .util .concurrent .TimeUnit ;
31+ import java .util .concurrent .atomic .AtomicInteger ;
3132import java .util .concurrent .locks .Condition ;
32- import java .util .concurrent .locks .ReadWriteLock ;
33+ import java .util .concurrent .locks .Lock ;
34+ import java .util .concurrent .locks .ReentrantLock ;
3335import java .util .concurrent .locks .ReentrantReadWriteLock ;
3436import java .util .function .Consumer ;
3537import java .util .function .Supplier ;
@@ -162,7 +164,7 @@ public T get() {
162164 public T get (final long timeout , final TimeUnit timeUnit ) {
163165 stateAndPermits .throwIfClosedOrPaused ();
164166
165- if (!stateAndPermits .acquirePermitFair (timeout , timeUnit )) {
167+ if (!stateAndPermits .acquirePermit (timeout , timeUnit )) {
166168 throw new MongoTimeoutException (String .format ("Timeout waiting for a pooled item after %d %s" , timeout , timeUnit ));
167169 }
168170
@@ -221,7 +223,7 @@ public void prune() {
221223 public void ensureMinSize (final int minSize , final Consumer <T > initAndRelease ) {
222224 stateAndPermits .throwIfClosedOrPaused ();
223225 while (getCount () < minSize ) {
224- if (!stateAndPermits .acquirePermitFair (0 , TimeUnit .MILLISECONDS )) {
226+ if (!stateAndPermits .acquirePermit (0 , TimeUnit .MILLISECONDS )) {
225227 break ;
226228 }
227229 initAndRelease .accept (createNewAndReleasePermitIfFailure ());
@@ -246,7 +248,7 @@ private T createNewAndReleasePermitIfFailure() {
246248 */
247249 @ VisibleForTesting (otherwise = PRIVATE )
248250 boolean acquirePermit (final long timeout , final TimeUnit timeUnit ) {
249- return stateAndPermits .acquirePermitFair (timeout , timeUnit );
251+ return stateAndPermits .acquirePermit (timeout , timeUnit );
250252 }
251253
252254 /**
@@ -330,12 +332,42 @@ static boolean isPoolClosedException(final Throwable e) {
330332 @ ThreadSafe
331333 private static final class StateAndPermits {
332334 private final Supplier <MongoServerUnavailableException > poolClosedExceptionSupplier ;
333- private final ReadWriteLock lock ;
335+ private final ReentrantReadWriteLock lock ;
334336 private final Condition permitAvailableOrClosedOrPausedCondition ;
335337 private volatile boolean paused ;
336338 private volatile boolean closed ;
337339 private final int maxPermits ;
338340 private volatile int permits ;
341+ /** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
342+ * {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
343+ * to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
344+ * the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
345+ * but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
346+ * latencies with unfair locking. The fair approach is in accordance with the
347+ * <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
348+ * connection pool specification</a>.
349+ * <p>
350+ * When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
351+ * and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
352+ * threads spend less time in the queue. This results in having smaller high percentiles
353+ * of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
354+ * to further reduce the high percentiles by acquiring the lock fairly.</p>
355+ * <p>
356+ * While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
357+ * comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
358+ * <p>
359+ * {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
360+ * to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
361+ * This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
362+ * But it appears to be a good enough compromise, that results in having enough throughput when there are enough
363+ * available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
364+ * <p>
365+ * It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
366+ * but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
367+ * (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
368+ * fair locking is used. That is, this approach does not result in the behavior we want.</p>
369+ */
370+ private final AtomicInteger waitersEstimate ;
339371 @ Nullable
340372 private Supplier <MongoException > causeSupplier ;
341373
@@ -347,6 +379,7 @@ private static final class StateAndPermits {
347379 closed = false ;
348380 this .maxPermits = maxPermits ;
349381 permits = maxPermits ;
382+ waitersEstimate = new AtomicInteger ();
350383 causeSupplier = null ;
351384 }
352385
@@ -355,9 +388,7 @@ int permits() {
355388 }
356389
357390 boolean acquirePermitImmediateUnfair () {
358- if (!lock .writeLock ().tryLock ()) { // unfair
359- lock .writeLock ().lock ();
360- }
391+ lockUnfair (lock .writeLock ());
361392 try {
362393 throwIfClosedOrPaused ();
363394 if (permits > 0 ) {
@@ -373,21 +404,24 @@ boolean acquirePermitImmediateUnfair() {
373404 }
374405
375406 /**
407+ * This method also emulates the eager {@link InterruptedException} behavior of
408+ * {@link java.util.concurrent.Semaphore#tryAcquire(long, TimeUnit)}.
409+ *
376410 * @param timeout See {@link com.mongodb.internal.Timeout#startNow(long, TimeUnit)}.
377411 */
378- boolean acquirePermitFair (final long timeout , final TimeUnit unit ) throws MongoInterruptedException {
412+ boolean acquirePermit (final long timeout , final TimeUnit unit ) throws MongoInterruptedException {
379413 long remainingNanos = unit .toNanos (timeout );
380- try {
381- // preserve the eager InterruptedException behavior of `Semaphore.tryAcquire(long, TimeUnit)`
382- lock .writeLock ().lockInterruptibly ();
383- } catch (InterruptedException e ) {
384- throw new MongoInterruptedException (null , e );
414+ if (waitersEstimate .get () == 0 ) {
415+ lockInterruptiblyUnfair (lock .writeLock ());
416+ } else {
417+ lockInterruptibly (lock .writeLock ());
385418 }
386419 try {
387420 while (permits == 0
388421 // the absence of short-circuiting is of importance
389422 & !throwIfClosedOrPaused ()) {
390423 try {
424+ waitersEstimate .incrementAndGet ();
391425 if (timeout < 0 || remainingNanos == Long .MAX_VALUE ) {
392426 permitAvailableOrClosedOrPausedCondition .await ();
393427 } else if (remainingNanos >= 0 ) {
@@ -397,6 +431,8 @@ boolean acquirePermitFair(final long timeout, final TimeUnit unit) throws MongoI
397431 }
398432 } catch (InterruptedException e ) {
399433 throw new MongoInterruptedException (null , e );
434+ } finally {
435+ waitersEstimate .decrementAndGet ();
400436 }
401437 }
402438 assertTrue (permits > 0 );
@@ -409,7 +445,7 @@ boolean acquirePermitFair(final long timeout, final TimeUnit unit) throws MongoI
409445 }
410446
411447 void releasePermit () {
412- lock .writeLock (). lock ( );
448+ lockUnfair ( lock .writeLock ());
413449 try {
414450 assertTrue (permits < maxPermits );
415451 //noinspection NonAtomicOperationOnVolatileField
@@ -421,7 +457,7 @@ void releasePermit() {
421457 }
422458
423459 void pause (final Supplier <MongoException > causeSupplier ) {
424- lock .writeLock (). lock ( );
460+ lockUnfair ( lock .writeLock ());
425461 try {
426462 if (!paused ) {
427463 this .paused = true ;
@@ -435,7 +471,7 @@ void pause(final Supplier<MongoException> causeSupplier) {
435471
436472 void ready () {
437473 if (paused ) {
438- lock .writeLock (). lock ( );
474+ lockUnfair ( lock .writeLock ());
439475 try {
440476 this .paused = false ;
441477 this .causeSupplier = null ;
@@ -450,7 +486,7 @@ void ready() {
450486 */
451487 boolean close () {
452488 if (!closed ) {
453- lock .writeLock (). lock ( );
489+ lockUnfair ( lock .writeLock ());
454490 try {
455491 if (!closed ) {
456492 closed = true ;
@@ -501,4 +537,45 @@ boolean closed() {
501537 static String sizeToString (final int size ) {
502538 return size == INFINITE_SIZE ? "infinite" : Integer .toString (size );
503539 }
540+
541+ static void lockInterruptibly (final Lock lock ) throws MongoInterruptedException {
542+ try {
543+ lock .lockInterruptibly ();
544+ } catch (InterruptedException e ) {
545+ throw new MongoInterruptedException (null , e );
546+ }
547+ }
548+
549+ private static void lockInterruptiblyUnfair (final ReentrantReadWriteLock .WriteLock lock ) throws MongoInterruptedException {
550+ throwIfInterrupted ();
551+ // `WriteLock.tryLock` is unfair
552+ if (!lock .tryLock ()) {
553+ try {
554+ lock .lockInterruptibly ();
555+ } catch (InterruptedException e ) {
556+ Thread .currentThread ().interrupt ();
557+ throw new MongoInterruptedException (null , new InterruptedException ());
558+ }
559+ }
560+ }
561+
562+ static void lockUnfair (final ReentrantLock lock ) {
563+ // `ReentrantLock.tryLock` is unfair
564+ if (!lock .tryLock ()) {
565+ lock .lock ();
566+ }
567+ }
568+
569+ private static void lockUnfair (final ReentrantReadWriteLock .WriteLock lock ) {
570+ // `WriteLock.tryLock` is unfair
571+ if (!lock .tryLock ()) {
572+ lock .lock ();
573+ }
574+ }
575+
576+ private static void throwIfInterrupted () throws MongoInterruptedException {
577+ if (Thread .currentThread ().isInterrupted ()) {
578+ throw new MongoInterruptedException (null , new InterruptedException ());
579+ }
580+ }
504581}
0 commit comments