Skip to content

Commit 4f6e2e2

Browse files
vbabaninstIncMale
andauthored
Ignore maxWaitTime when CSOT is enabled. (#1744)
JAVA-5409 --------- Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
1 parent 66c7e57 commit 4f6e2e2

File tree

9 files changed

+217
-47
lines changed

9 files changed

+217
-47
lines changed

driver-core/src/main/com/mongodb/MongoOperationTimeoutException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.mongodb.annotations.Alpha;
1919
import com.mongodb.annotations.Reason;
20+
import com.mongodb.lang.Nullable;
2021

2122
import java.util.concurrent.TimeUnit;
2223

@@ -56,7 +57,7 @@ public MongoOperationTimeoutException(final String message) {
5657
* @param message the message
5758
* @param cause the cause
5859
*/
59-
public MongoOperationTimeoutException(final String message, final Throwable cause) {
60+
public MongoOperationTimeoutException(final String message, @Nullable final Throwable cause) {
6061
super(message, cause);
6162
}
6263
}

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static MongoOperationTimeoutException createMongoTimeoutException(final T
7878
return createMongoTimeoutException("Operation exceeded the timeout limit: " + cause.getMessage(), cause);
7979
}
8080

81-
public static MongoOperationTimeoutException createMongoTimeoutException(final String message, final Throwable cause) {
81+
public static MongoOperationTimeoutException createMongoTimeoutException(final String message, @Nullable final Throwable cause) {
8282
if (cause instanceof MongoOperationTimeoutException) {
8383
return (MongoOperationTimeoutException) cause;
8484
}
@@ -449,7 +449,10 @@ public TimeoutContext withComputedServerSelectionTimeoutContext() {
449449
return this;
450450
}
451451

452-
public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
452+
public Timeout startMaxWaitTimeout(final StartTime checkoutStart) {
453+
if (hasTimeoutMS()) {
454+
return assertNotNull(timeout);
455+
}
453456
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
454457
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
455458
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.mongodb.event.ConnectionPoolListener;
4343
import com.mongodb.event.ConnectionPoolReadyEvent;
4444
import com.mongodb.event.ConnectionReadyEvent;
45+
import com.mongodb.internal.TimeoutContext;
4546
import com.mongodb.internal.VisibleForTesting;
4647
import com.mongodb.internal.async.SingleResultCallback;
4748
import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue;
@@ -98,6 +99,7 @@
9899
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
99100
import static com.mongodb.internal.Locks.lockInterruptibly;
100101
import static com.mongodb.internal.Locks.withLock;
102+
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
101103
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
102104
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
103105
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
@@ -110,12 +112,12 @@
110112
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_CONNECTING;
111113
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_IDLE_TIME_MS;
112114
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_POOL_SIZE;
115+
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_WAIT_TIMEOUT_MS;
113116
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MIN_POOL_SIZE;
114117
import static com.mongodb.internal.logging.LogMessage.Entry.Name.REASON_DESCRIPTION;
115118
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
116119
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
117120
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVICE_ID;
118-
import static com.mongodb.internal.logging.LogMessage.Entry.Name.WAIT_QUEUE_TIMEOUT_MS;
119121
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
120122
import static java.lang.String.format;
121123
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -190,12 +192,12 @@ public int getGeneration(@NonNull final ObjectId serviceId) {
190192
@Override
191193
public InternalConnection get(final OperationContext operationContext) {
192194
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
193-
Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
195+
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startMaxWaitTimeout(checkoutStart);
194196
try {
195197
stateAndGeneration.throwIfClosedOrPaused();
196-
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart);
198+
PooledConnection connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext());
197199
if (!connection.opened()) {
198-
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart);
200+
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, maxWaitTimeout, checkoutStart);
199201
}
200202
connection.checkedOutForOperation(operationContext);
201203
connectionCheckedOut(operationContext, connection, checkoutStart);
@@ -208,7 +210,7 @@ public InternalConnection get(final OperationContext operationContext) {
208210
@Override
209211
public void getAsync(final OperationContext operationContext, final SingleResultCallback<InternalConnection> callback) {
210212
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
211-
Timeout maxWaitTimeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS);
213+
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startMaxWaitTimeout(checkoutStart);
212214
SingleResultCallback<PooledConnection> eventSendingCallback = (connection, failure) -> {
213215
SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
214216
if (failure == null) {
@@ -225,13 +227,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult
225227
eventSendingCallback.onResult(null, e);
226228
return;
227229
}
228-
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, t -> {
230+
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext(), t -> {
229231
if (t != null) {
230232
eventSendingCallback.onResult(null, t);
231233
} else {
232234
PooledConnection connection;
233235
try {
234-
connection = getPooledConnection(maxWaitTimeout, checkoutStart);
236+
connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext());
235237
} catch (Exception e) {
236238
eventSendingCallback.onResult(null, e);
237239
return;
@@ -330,22 +332,24 @@ public int getGeneration() {
330332
return stateAndGeneration.generation();
331333
}
332334

333-
private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException {
335+
private PooledConnection getPooledConnection(final Timeout maxWaitTimeout,
336+
final StartTime startTime,
337+
final TimeoutContext timeoutContext) throws MongoTimeoutException {
334338
try {
335-
UsageTrackingInternalConnection internalConnection = waitQueueTimeout.call(NANOSECONDS,
339+
UsageTrackingInternalConnection internalConnection = maxWaitTimeout.call(NANOSECONDS,
336340
() -> pool.get(-1L, NANOSECONDS),
337341
(ns) -> pool.get(ns, NANOSECONDS),
338342
() -> pool.get(0L, NANOSECONDS));
339343
while (shouldPrune(internalConnection)) {
340344
pool.release(internalConnection, true);
341-
internalConnection = waitQueueTimeout.call(NANOSECONDS,
345+
internalConnection = maxWaitTimeout.call(NANOSECONDS,
342346
() -> pool.get(-1L, NANOSECONDS),
343347
(ns) -> pool.get(ns, NANOSECONDS),
344348
() -> pool.get(0L, NANOSECONDS));
345349
}
346350
return new PooledConnection(internalConnection);
347351
} catch (MongoTimeoutException e) {
348-
throw createTimeoutException(startTime, e);
352+
throw createTimeoutException(startTime, e, timeoutContext);
349353
}
350354
}
351355

@@ -359,14 +363,17 @@ private PooledConnection getPooledConnectionImmediate() {
359363
return internalConnection == null ? null : new PooledConnection(internalConnection);
360364
}
361365

362-
private MongoTimeoutException createTimeoutException(final StartTime startTime, @Nullable final MongoTimeoutException cause) {
366+
private MongoTimeoutException createTimeoutException(final StartTime startTime,
367+
@Nullable final MongoTimeoutException cause,
368+
final TimeoutContext timeoutContext) {
363369
long elapsedMs = startTime.elapsed().toMillis();
364370
int numPinnedToCursor = pinnedStatsManager.getNumPinnedToCursor();
365371
int numPinnedToTransaction = pinnedStatsManager.getNumPinnedToTransaction();
372+
String errorMessage;
373+
366374
if (numPinnedToCursor == 0 && numPinnedToTransaction == 0) {
367-
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.",
368-
elapsedMs, serverId.getAddress()),
369-
cause);
375+
errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.",
376+
elapsedMs, serverId.getAddress());
370377
} else {
371378
int maxSize = pool.getMaxSize();
372379
int numInUse = pool.getInUseCount();
@@ -395,14 +402,15 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime,
395402
int numOtherInUse = numInUse - numPinnedToCursor - numPinnedToTransaction;
396403
assertTrue(numOtherInUse >= 0);
397404
assertTrue(numPinnedToCursor + numPinnedToTransaction + numOtherInUse <= maxSize);
398-
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s. Details: "
405+
errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: "
399406
+ "maxPoolSize: %s, connections in use by cursors: %d, connections in use by transactions: %d, "
400407
+ "connections in use by other operations: %d",
401408
elapsedMs, serverId.getAddress(),
402409
sizeToString(maxSize), numPinnedToCursor, numPinnedToTransaction,
403-
numOtherInUse),
404-
cause);
410+
numOtherInUse);
405411
}
412+
413+
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage, cause) : new MongoTimeoutException(errorMessage, cause);
406414
}
407415

408416
@VisibleForTesting(otherwise = PRIVATE)
@@ -499,7 +507,7 @@ private void connectionPoolCreated(final ConnectionPoolListener connectionPoolLi
499507
entries.add(new LogMessage.Entry(MIN_POOL_SIZE, settings.getMinSize()));
500508
entries.add(new LogMessage.Entry(MAX_POOL_SIZE, settings.getMaxSize()));
501509
entries.add(new LogMessage.Entry(MAX_CONNECTING, settings.getMaxConnecting()));
502-
entries.add(new LogMessage.Entry(WAIT_QUEUE_TIMEOUT_MS, settings.getMaxWaitTime(MILLISECONDS)));
510+
entries.add(new LogMessage.Entry(MAX_WAIT_TIMEOUT_MS, settings.getMaxWaitTime(MILLISECONDS)));
503511

504512
logMessage("Connection pool created", clusterId, message, entries);
505513
}
@@ -905,11 +913,11 @@ private final class OpenConcurrencyLimiter {
905913
}
906914

907915
PooledConnection openOrGetAvailable(final OperationContext operationContext, final PooledConnection connection,
908-
final Timeout waitQueueTimeout, final StartTime startTime)
916+
final Timeout maxWaitTimeout, final StartTime startTime)
909917
throws MongoTimeoutException {
910918
PooledConnection result = openWithConcurrencyLimit(
911919
operationContext, connection, OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE,
912-
waitQueueTimeout, startTime);
920+
maxWaitTimeout, startTime);
913921
return assertNotNull(result);
914922
}
915923

@@ -952,7 +960,7 @@ void openImmediatelyAndTryHandOverOrRelease(final OperationContext operationCont
952960
* </ol>
953961
*
954962
* @param operationContext the operation context
955-
* @param waitQueueTimeout Applies only to the first phase.
963+
* @param maxWaitTimeout Applies only to the first phase.
956964
* @return An {@linkplain PooledConnection#opened() opened} connection which is either the specified
957965
* {@code connection}, or potentially a different one if {@code mode} is
958966
* {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}, or {@code null} if {@code mode} is
@@ -961,13 +969,14 @@ void openImmediatelyAndTryHandOverOrRelease(final OperationContext operationCont
961969
*/
962970
@Nullable
963971
private PooledConnection openWithConcurrencyLimit(final OperationContext operationContext,
964-
final PooledConnection connection, final OpenWithConcurrencyLimitMode mode,
965-
final Timeout waitQueueTimeout, final StartTime startTime)
972+
final PooledConnection connection, final OpenWithConcurrencyLimitMode mode,
973+
final Timeout maxWaitTimeout, final StartTime startTime)
966974
throws MongoTimeoutException {
967975
PooledConnection availableConnection;
968976
try {//phase one
969977
availableConnection = acquirePermitOrGetAvailableOpenedConnection(
970-
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime);
978+
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, maxWaitTimeout, startTime,
979+
operationContext.getTimeoutContext());
971980
} catch (Exception e) {
972981
connection.closeSilently();
973982
throw e;
@@ -1009,7 +1018,8 @@ void openWithConcurrencyLimitAsync(
10091018
final SingleResultCallback<PooledConnection> callback) {
10101019
PooledConnection availableConnection;
10111020
try {//phase one
1012-
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime);
1021+
availableConnection =
1022+
acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext.getTimeoutContext());
10131023
} catch (Exception e) {
10141024
connection.closeSilently();
10151025
callback.onResult(null, e);
@@ -1040,7 +1050,8 @@ void openWithConcurrencyLimitAsync(
10401050
*/
10411051
@Nullable
10421052
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable,
1043-
final Timeout waitQueueTimeout, final StartTime startTime)
1053+
final Timeout maxWaitTimeout, final StartTime startTime,
1054+
final TimeoutContext timeoutContext)
10441055
throws MongoTimeoutException, MongoInterruptedException {
10451056
PooledConnection availableConnection = null;
10461057
boolean expressedDesireToGetAvailableConnection = false;
@@ -1068,10 +1079,10 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
10681079
& !stateAndGeneration.throwIfClosedOrPaused()
10691080
& (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) {
10701081

1071-
Timeout.onExistsAndExpired(waitQueueTimeout, () -> {
1072-
throw createTimeoutException(startTime, null);
1082+
Timeout.onExistsAndExpired(maxWaitTimeout, () -> {
1083+
throw createTimeoutException(startTime, null, timeoutContext);
10731084
});
1074-
waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
1085+
maxWaitTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
10751086
() -> "acquiring permit or getting available opened connection");
10761087
}
10771088
if (availableConnection == null) {
@@ -1391,10 +1402,15 @@ final class Task {
13911402
private final Timeout timeout;
13921403
private final StartTime startTime;
13931404
private final Consumer<RuntimeException> action;
1405+
private final TimeoutContext timeoutContext;
13941406
private boolean completed;
13951407

1396-
Task(final Timeout timeout, final StartTime startTime, final Consumer<RuntimeException> action) {
1408+
Task(final Timeout timeout,
1409+
final StartTime startTime,
1410+
final TimeoutContext timeoutContext,
1411+
final Consumer<RuntimeException> action) {
13971412
this.timeout = timeout;
1413+
this.timeoutContext = timeoutContext;
13981414
this.startTime = startTime;
13991415
this.action = action;
14001416
}
@@ -1408,7 +1424,7 @@ void failAsClosed() {
14081424
}
14091425

14101426
void failAsTimedOut() {
1411-
doComplete(() -> createTimeoutException(startTime, null));
1427+
doComplete(() -> createTimeoutException(startTime, null, timeoutContext));
14121428
}
14131429

14141430
private void doComplete(final Supplier<RuntimeException> failureSupplier) {

driver-core/src/main/com/mongodb/internal/logging/LogMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public enum Name {
120120
MIN_POOL_SIZE("minPoolSize"),
121121
MAX_POOL_SIZE("maxPoolSize"),
122122
MAX_CONNECTING("maxConnecting"),
123-
WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"),
123+
MAX_WAIT_TIMEOUT_MS("waitQueueTimeoutMS"),
124124
SELECTOR("selector"),
125125
TOPOLOGY_DESCRIPTION("topologyDescription"),
126126
REMAINING_TIME_MS("remainingTimeMS"),

driver-core/src/main/com/mongodb/internal/time/StartTime.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* A point in time used to track how much time has elapsed. In contrast to a
2323
* Timeout, it is guaranteed to not be in the future, and is never infinite.
2424
*
25+
* Implementations of this interface must be immutable.
26+
*
2527
* @see TimePoint
2628
*/
2729
public interface StartTime {

driver-core/src/main/com/mongodb/internal/time/Timeout.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import com.mongodb.internal.function.CheckedFunction;
2222
import com.mongodb.internal.function.CheckedRunnable;
2323
import com.mongodb.internal.function.CheckedSupplier;
24-
import com.mongodb.lang.Nullable;
2524
import com.mongodb.lang.NonNull;
25+
import com.mongodb.lang.Nullable;
2626

2727
import java.util.Arrays;
2828
import java.util.Collections;
@@ -40,6 +40,8 @@
4040
/**
4141
* A Timeout is a "deadline", point in time by which something must happen.
4242
*
43+
* Implementations of this interface must be immutable.
44+
*
4345
* @see TimePoint
4446
*/
4547
public interface Timeout {

0 commit comments

Comments
 (0)