diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 065686f43a..5936b5f726 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -37,7 +37,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _backoff, _handle_reauth +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -52,6 +52,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -789,8 +790,8 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -846,8 +847,6 @@ async def _reset( async with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -930,11 +929,6 @@ async def _reset( for conn in sockets: await conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -1001,7 +995,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1029,29 +1023,14 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1082,17 +1061,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - await asyncio.sleep(_backoff(self._backoff)) - - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = await _configured_protocol_interface( - self.address, self.opts, context=context - ) + networking_interface = await _configured_protocol_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: async with self.lock: @@ -1113,8 +1083,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1126,9 +1095,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: await conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1138,15 +1109,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e) await conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: await handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.asynccontextmanager @@ -1323,12 +1293,12 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1469,7 +1439,7 @@ async def _perished(self, conn: AsyncConnection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1482,8 +1452,6 @@ async def _perished(self, conn: AsyncConnection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 1e91bbe79b..a2b354f7cc 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,8 +890,8 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 9bf46cbc3d..605b8dde9b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None): self._timeout = timeout self._closed = asyncio.get_running_loop().create_future() self._connection_lost = False - self._closing_exception = None def settimeout(self, timeout: float | None) -> None: self._timeout = timeout @@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None: self.transport.abort() self._resolve_pending(exc) self._connection_lost = True - self._closing_exception = exc # type:ignore[assignment] def connection_lost(self, exc: Optional[Exception] = None) -> None: self._resolve_pending(exc) - self._closing_exception = exc # type:ignore[assignment] if not self._closed.done(): self._closed.set_result(None) @@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ if self._done_messages: message = await self._done_messages.popleft() else: - if self._closed.done(): - if self._closing_exception: - raise self._closing_exception - else: - raise OSError("connection closed") + if self.transport and self.transport.is_closing(): + raise OSError("connection is already closed") read_waiter = asyncio.get_running_loop().create_future() self._pending_messages.append(read_waiter) try: @@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: else: msg.set_exception(exc) self._done_messages.append(msg) - self._pending_messages.clear() class PyMongoKMSProtocol(PyMongoBaseProtocol): diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index c555b125df..0536dc3835 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -250,7 +250,6 @@ async def _configured_protocol_interface( address: _Address, options: PoolOptions, protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol, - context: dict[str, bool] | None = None, ) -> AsyncNetworkingInterface: """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. @@ -262,10 +261,6 @@ async def _configured_protocol_interface( ssl_context = options._ssl_context timeout = options.socket_timeout - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: return AsyncNetworkingInterface( await asyncio.get_running_loop().create_connection( @@ -379,7 +374,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket def _configured_socket_interface( - address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None + address: _Address, options: PoolOptions, *args: Any ) -> NetworkingInterface: """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. @@ -390,10 +385,6 @@ def _configured_socket_interface( sock = _create_connection(address, options) ssl_context = options._ssl_context - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: sock.settimeout(options.socket_timeout) return NetworkingInterface(sock) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d0c517f186..88b09c9b46 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -49,6 +49,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -84,7 +85,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _backoff, _handle_reauth +from pymongo.synchronous.helpers import _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -787,8 +788,8 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -844,8 +845,6 @@ def _reset( with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -928,11 +927,6 @@ def _reset( for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -997,7 +991,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1025,29 +1019,14 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1078,17 +1057,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - time.sleep(_backoff(self._backoff)) - - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = _configured_socket_interface( - self.address, self.opts, context=context - ) + networking_interface = _configured_socket_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: with self.lock: @@ -1109,8 +1079,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1122,9 +1091,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1134,15 +1105,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e) conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.contextmanager @@ -1319,12 +1289,12 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1465,7 +1435,7 @@ def _perished(self, conn: Connection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1478,8 +1448,6 @@ def _perished(self, conn: Connection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0f6592dfc0..e967c2089f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,8 +888,8 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 6cbdf7a65c..2f0d5fc962 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -514,35 +514,10 @@ async def test_connection_timeout_message(self): str(error.exception), ) - async def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addAsyncCleanup(cx_pool.close) - - async with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - await conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - async with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - async with cx_pool.checkout(): - pass - @async_client_context.require_failCommand_appName - async def test_pool_backoff_preserves_existing_connections(self): + async def test_pool_backpressure_preserves_existing_connections(self): client = await self.async_rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = await async_get_pool(client) await coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) @@ -564,9 +539,6 @@ async def test_pool_backoff_preserves_existing_connections(self): async with self.fail_point(mock_connection_fail): await coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -575,16 +547,6 @@ async def test_pool_backoff_preserves_existing_connections(self): await t.join() await pool.close() - async def test_pool_backoff_limits_maxConnecting(self): - client = await self.async_rs_or_single_client(maxConnecting=10) - pool = await async_get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - await client.close() - class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 60190c7dc0..5799e834d7 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,9 +331,7 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "heartbeatFrequencyMS": 10000 }, "observeLogMessages": { "connection": "debug" @@ -357,9 +355,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "clientAppName" } } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 8ec958780d..4334ce2571 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -15,17 +15,13 @@ "isMaster", "hello" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "errorCode": 91, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 656b291366..84763af32e 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,9 +53,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "authNetworkErrorTest" } } @@ -77,8 +75,6 @@ ], "uriOptions": { "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json new file mode 100644 index 0000000000..a806df571a --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -0,0 +1,171 @@ +{ + "description": "backpressure-network-error-fail", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-error-fail", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network connection errors during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "directConnection": true, + "appname": "backpressureNetworkErrorFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-error-fail" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backpressureNetworkErrorFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json new file mode 100644 index 0000000000..0222493f7f --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -0,0 +1,174 @@ +{ + "description": "backpressure-network-timeout-error", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network timeout error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "appname": "backpressureNetworkTimeoutErrorTest", + "serverMonitoringMode": "poll", + "directConnection": true, + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-timeout-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "blockConnection": true, + "blockTimeMS": 500, + "appName": "backpressureNetworkTimeoutErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index b9842b8017..8974c4779c 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,9 +32,9 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, - "retryWrites": false + "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeEvents": [ "connectionCreatedEvent", @@ -66,9 +66,7 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "retryWrites": false }, "observeEvents": [ "connectionCreatedEvent", @@ -350,8 +348,7 @@ "failCommands": [ "saslContinue" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -412,8 +409,7 @@ "failCommands": [ "getMore" ], - "closeConnection": true, - "appName": "lbSDAMErrorTestClient" + "closeConnection": true } } } diff --git a/test/test_pooling.py b/test/test_pooling.py index f3bfcf4ba2..0f7ef144f6 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -512,35 +512,10 @@ def test_connection_timeout_message(self): str(error.exception), ) - def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addCleanup(cx_pool.close) - - with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - with cx_pool.checkout(): - pass - @client_context.require_failCommand_appName - def test_pool_backoff_preserves_existing_connections(self): + def test_pool_backpressure_preserves_existing_connections(self): client = self.rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = get_pool(client) coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) @@ -562,9 +537,6 @@ def test_pool_backoff_preserves_existing_connections(self): with self.fail_point(mock_connection_fail): coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -573,16 +545,6 @@ def test_pool_backoff_preserves_existing_connections(self): t.join() pool.close() - def test_pool_backoff_limits_maxConnecting(self): - client = self.rs_or_single_client(maxConnecting=10) - pool = get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - client.close() - class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self):