From d6dd35ad7ae19657f76b8a351919f3f807af001f Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 07:40:42 -0600 Subject: [PATCH 01/18] remove backoff state --- pymongo/asynchronous/pool.py | 44 ++++++++---------------------------- pymongo/synchronous/pool.py | 44 ++++++++---------------------------- 2 files changed, 18 insertions(+), 70 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 065686f43a..e048713409 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -37,7 +37,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _backoff, _handle_reauth +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -790,7 +790,6 @@ def __init__( self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -846,8 +845,6 @@ async def _reset( async with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -930,11 +927,6 @@ async def _reset( for conn in sockets: await conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -1001,7 +993,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self.opts.max_connecting: return self._pending += 1 incremented = True @@ -1034,24 +1026,11 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # Look for an AutoReconnect error raised from a ConnectionResetError with # errno == errno.ECONNRESET or raised from an OSError that we've created due to # a closed connection. - # If found, set backoff and add error labels. + # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - self._backoff += 1 error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1082,10 +1061,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - await asyncio.sleep(_backoff(self._backoff)) - # Pass a context to determine if we successfully create a configured socket. context = dict(has_created_socket=False) @@ -1126,9 +1101,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: await conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1138,15 +1115,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e, "hello", conn_id) await conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: await handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.asynccontextmanager @@ -1323,7 +1299,7 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self.opts.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a @@ -1469,7 +1445,7 @@ async def _perished(self, conn: AsyncConnection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1482,8 +1458,6 @@ async def _perished(self, conn: AsyncConnection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d0c517f186..66cb50ca88 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -84,7 +84,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _backoff, _handle_reauth +from pymongo.synchronous.helpers import _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -788,7 +788,6 @@ def __init__( self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -844,8 +843,6 @@ def _reset( with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -928,11 +925,6 @@ def _reset( for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -997,7 +989,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self.opts.max_connecting: return self._pending += 1 incremented = True @@ -1030,24 +1022,11 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # Look for an AutoReconnect error raised from a ConnectionResetError with # errno == errno.ECONNRESET or raised from an OSError that we've created due to # a closed connection. - # If found, set backoff and add error labels. + # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - self._backoff += 1 error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1078,10 +1057,6 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - time.sleep(_backoff(self._backoff)) - # Pass a context to determine if we successfully create a configured socket. context = dict(has_created_socket=False) @@ -1122,9 +1097,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1134,15 +1111,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e, "hello", conn_id) conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.contextmanager @@ -1319,7 +1295,7 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self.opts.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a @@ -1465,7 +1441,7 @@ def _perished(self, conn: Connection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1478,8 +1454,6 @@ def _perished(self, conn: Connection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): From 91f950e35fa9b5da8c9d390e4e3c7cc7d9e0edc9 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 07:44:06 -0600 Subject: [PATCH 02/18] undo changes to specs --- .../connection-logging.json | 40 +++++++++---------- .../pool-create-min-size-error.json | 18 ++++----- .../unified/auth-network-error.json | 6 +-- test/load_balancer/sdam-error-handling.json | 21 ++++------ 4 files changed, 34 insertions(+), 51 deletions(-) diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 60190c7dc0..72103b3cab 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,9 +331,7 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "heartbeatFrequencyMS": 10000 }, "observeLogMessages": { "connection": "debug" @@ -357,9 +355,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "clientAppName" } } @@ -450,6 +446,22 @@ } } }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool cleared", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, { "level": "debug", "component": "connection", @@ -502,22 +514,6 @@ ] } } - }, - { - "level": "debug", - "component": "connection", - "data": { - "message": "Connection pool cleared", - "serverHost": { - "$$type": "string" - }, - "serverPort": { - "$$type": [ - "int", - "long" - ] - } - } } ] } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 8ec958780d..509b2a2356 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -9,23 +9,21 @@ ], "failPoint": { "configureFailPoint": "failCommand", - "mode": "alwaysOn", + "mode": { + "times": 50 + }, "data": { "failCommands": [ "isMaster", "hello" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ @@ -51,15 +49,15 @@ "type": "ConnectionCreated", "address": 42 }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, { "type": "ConnectionClosed", "address": 42, "connectionId": 42, "reason": "error" - }, - { - "type": "ConnectionPoolCleared", - "address": 42 } ], "ignore": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 656b291366..84763af32e 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,9 +53,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "authNetworkErrorTest" } } @@ -77,8 +75,6 @@ ], "uriOptions": { "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index b9842b8017..47323fae4f 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,8 +32,6 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "retryWrites": false }, "observeEvents": [ @@ -66,9 +64,7 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "retryWrites": false }, "observeEvents": [ "connectionCreatedEvent", @@ -286,8 +282,7 @@ "isMaster", "hello" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -350,8 +345,7 @@ "failCommands": [ "saslContinue" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -378,6 +372,9 @@ { "connectionCreatedEvent": {} }, + { + "poolClearedEvent": {} + }, { "connectionClosedEvent": { "reason": "error" @@ -387,9 +384,6 @@ "connectionCheckOutFailedEvent": { "reason": "connectionError" } - }, - { - "poolClearedEvent": {} } ] } @@ -412,8 +406,7 @@ "failCommands": [ "getMore" ], - "closeConnection": true, - "appName": "lbSDAMErrorTestClient" + "closeConnection": true } } } From 3ff6da85a21f8cb70932beb474dc91865c2ad0ce Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 13:57:53 -0600 Subject: [PATCH 03/18] cleanup --- pymongo/asynchronous/pool.py | 7 +++--- pymongo/synchronous/pool.py | 7 +++--- test/asynchronous/test_pooling.py | 38 ------------------------------- test/test_pooling.py | 38 ------------------------------- 4 files changed, 8 insertions(+), 82 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index e048713409..0e37fa905d 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -789,6 +789,7 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id if self.enabled_for_cmap: assert self.opts._event_listeners is not None @@ -993,7 +994,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.opts.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1299,12 +1300,12 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.opts.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 66cb50ca88..94ae6844c5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -787,6 +787,7 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id if self.enabled_for_cmap: assert self.opts._event_listeners is not None @@ -989,7 +990,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.opts.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1295,12 +1296,12 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.opts.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 6cbdf7a65c..80aa023765 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -514,31 +514,6 @@ async def test_connection_timeout_message(self): str(error.exception), ) - async def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addAsyncCleanup(cx_pool.close) - - async with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - await conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - async with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - async with cx_pool.checkout(): - pass - @async_client_context.require_failCommand_appName async def test_pool_backoff_preserves_existing_connections(self): client = await self.async_rs_or_single_client() @@ -564,9 +539,6 @@ async def test_pool_backoff_preserves_existing_connections(self): async with self.fail_point(mock_connection_fail): await coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -575,16 +547,6 @@ async def test_pool_backoff_preserves_existing_connections(self): await t.join() await pool.close() - async def test_pool_backoff_limits_maxConnecting(self): - client = await self.async_rs_or_single_client(maxConnecting=10) - pool = await async_get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - await client.close() - class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/test_pooling.py b/test/test_pooling.py index f3bfcf4ba2..27cf3762e2 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -512,31 +512,6 @@ def test_connection_timeout_message(self): str(error.exception), ) - def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addCleanup(cx_pool.close) - - with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - with cx_pool.checkout(): - pass - @client_context.require_failCommand_appName def test_pool_backoff_preserves_existing_connections(self): client = self.rs_or_single_client() @@ -562,9 +537,6 @@ def test_pool_backoff_preserves_existing_connections(self): with self.fail_point(mock_connection_fail): coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -573,16 +545,6 @@ def test_pool_backoff_preserves_existing_connections(self): t.join() pool.close() - def test_pool_backoff_limits_maxConnecting(self): - client = self.rs_or_single_client(maxConnecting=10) - pool = get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - client.close() - class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self): From 30cadb34a50f5281a2894d8722f8b7d29233ab3d Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 14:35:15 -0600 Subject: [PATCH 04/18] try with alwaysOn --- test/connection_monitoring/pool-create-min-size-error.json | 4 +--- test/load_balancer/sdam-error-handling.json | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 509b2a2356..abf775aa73 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -9,9 +9,7 @@ ], "failPoint": { "configureFailPoint": "failCommand", - "mode": { - "times": 50 - }, + "mode": "alwaysOn", "data": { "failCommands": [ "isMaster", diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 47323fae4f..485addd6cc 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -274,9 +274,7 @@ "client": "failPointClient", "failPoint": { "configureFailPoint": "failCommand", - "mode": { - "times": 1 - }, + "mode": "alwaysOn", "data": { "failCommands": [ "isMaster", From 07d1e638baac87ed1365575472bb73b6d38665fe Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 14:49:51 -0600 Subject: [PATCH 05/18] add new tests --- test/asynchronous/test_pooling.py | 2 +- .../backpressure-network-error-fail.json | 147 +++++++++++++++++ .../backpressure-network-timeout-fail.json | 148 ++++++++++++++++++ test/test_pooling.py | 2 +- 4 files changed, 297 insertions(+), 2 deletions(-) create mode 100644 test/discovery_and_monitoring/unified/backpressure-network-error-fail.json create mode 100644 test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 80aa023765..8f90e3987d 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -515,7 +515,7 @@ async def test_connection_timeout_message(self): ) @async_client_context.require_failCommand_appName - async def test_pool_backoff_preserves_existing_connections(self): + async def test_pool_backpressure_preserves_existing_connections(self): client = await self.async_rs_or_single_client() coll = self.db.t pool = await async_get_pool(client) diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json new file mode 100644 index 0000000000..481b9caa74 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -0,0 +1,147 @@ +{ + "description": "backpressure-network-error-fail", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-error-fail", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network connection errors during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolbackpressureEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "appname": "backpressureNetworkErrorFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-error-fail" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backpressureNetworkErrorFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json new file mode 100644 index 0000000000..b38719b540 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -0,0 +1,148 @@ +{ + "description": "backpressure-network-timeout-error", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network timeout error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "appname": "backpressureNetworkTimeoutErrorTest", + "serverMonitoringMode": "poll", + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-timeout-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": true, + "appName": "backpressureNetworkTimeoutErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/test/test_pooling.py b/test/test_pooling.py index 27cf3762e2..adc2b599f8 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -513,7 +513,7 @@ def test_connection_timeout_message(self): ) @client_context.require_failCommand_appName - def test_pool_backoff_preserves_existing_connections(self): + def test_pool_backpressure_preserves_existing_connections(self): client = self.rs_or_single_client() coll = self.db.t pool = get_pool(client) From 51960c4e21a4949f41a908180b7b10008d3add11 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 15:43:11 -0600 Subject: [PATCH 06/18] test updates --- pymongo/network_layer.py | 11 ++----- .../connection-logging.json | 32 +++++++++---------- .../pool-create-min-size-error.json | 13 +++++--- test/load_balancer/sdam-error-handling.json | 14 ++++---- 4 files changed, 33 insertions(+), 37 deletions(-) diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 9bf46cbc3d..605b8dde9b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None): self._timeout = timeout self._closed = asyncio.get_running_loop().create_future() self._connection_lost = False - self._closing_exception = None def settimeout(self, timeout: float | None) -> None: self._timeout = timeout @@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None: self.transport.abort() self._resolve_pending(exc) self._connection_lost = True - self._closing_exception = exc # type:ignore[assignment] def connection_lost(self, exc: Optional[Exception] = None) -> None: self._resolve_pending(exc) - self._closing_exception = exc # type:ignore[assignment] if not self._closed.done(): self._closed.set_result(None) @@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ if self._done_messages: message = await self._done_messages.popleft() else: - if self._closed.done(): - if self._closing_exception: - raise self._closing_exception - else: - raise OSError("connection closed") + if self.transport and self.transport.is_closing(): + raise OSError("connection is already closed") read_waiter = asyncio.get_running_loop().create_future() self._pending_messages.append(read_waiter) try: @@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: else: msg.set_exception(exc) self._done_messages.append(msg) - self._pending_messages.clear() class PyMongoKMSProtocol(PyMongoBaseProtocol): diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 72103b3cab..5799e834d7 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -446,22 +446,6 @@ } } }, - { - "level": "debug", - "component": "connection", - "data": { - "message": "Connection pool cleared", - "serverHost": { - "$$type": "string" - }, - "serverPort": { - "$$type": [ - "int", - "long" - ] - } - } - }, { "level": "debug", "component": "connection", @@ -514,6 +498,22 @@ ] } } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool cleared", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } } ] } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index abf775aa73..32d0027e74 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -15,13 +15,16 @@ "isMaster", "hello" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ @@ -47,15 +50,15 @@ "type": "ConnectionCreated", "address": 42 }, - { - "type": "ConnectionPoolCleared", - "address": 42 - }, { "type": "ConnectionClosed", "address": 42, "connectionId": 42, "reason": "error" + }, + { + "type": "ConnectionPoolCleared", + "address": 42 } ], "ignore": [ diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 485addd6cc..9da37930a3 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -274,7 +274,9 @@ "client": "failPointClient", "failPoint": { "configureFailPoint": "failCommand", - "mode": "alwaysOn", + "mode": { + "times": 1 + }, "data": { "failCommands": [ "isMaster", @@ -294,9 +296,7 @@ "x": 1 } }, - "expectError": { - "isClientError": true - } + "ignoreResultAndError": true } ], "expectEvents": [ @@ -370,9 +370,6 @@ { "connectionCreatedEvent": {} }, - { - "poolClearedEvent": {} - }, { "connectionClosedEvent": { "reason": "error" @@ -382,6 +379,9 @@ "connectionCheckOutFailedEvent": { "reason": "connectionError" } + }, + { + "poolClearedEvent": {} } ] } From 5a9ebd9dfcebfb170d27b1fcfef0b54cac8907d3 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 15:49:05 -0600 Subject: [PATCH 07/18] update load balancer sdam tests --- test/load_balancer/sdam-error-handling.json | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 9da37930a3..8974c4779c 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,7 +32,9 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", - "retryWrites": false + "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeEvents": [ "connectionCreatedEvent", @@ -282,7 +284,8 @@ "isMaster", "hello" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "lbSDAMErrorTestClient" } } @@ -296,7 +299,9 @@ "x": 1 } }, - "ignoreResultAndError": true + "expectError": { + "isClientError": true + } } ], "expectEvents": [ From 1f1825e776235e40ea1ecb6527f35a1b8d66836d Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 10 Nov 2025 17:52:10 -0600 Subject: [PATCH 08/18] fix tests on replica sets --- .../unified/backpressure-network-error-fail.json | 3 +-- .../unified/backpressure-network-timeout-fail.json | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json index 481b9caa74..12437e3f32 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -49,8 +49,6 @@ "useMultipleMongoses": false, "observeEvents": [ "serverHeartbeatSucceededEvent", - "commandStartedEvent", - "poolbackpressureEvent", "poolReadyEvent", "poolClearedEvent" ], @@ -58,6 +56,7 @@ "retryWrites": false, "heartbeatFrequencyMS": 10000, "serverMonitoringMode": "poll", + "directConnection": true, "appname": "backpressureNetworkErrorFailTest" } } diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index b38719b540..68a0738d38 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -49,7 +49,6 @@ "useMultipleMongoses": false, "observeEvents": [ "serverHeartbeatSucceededEvent", - "commandStartedEvent", "poolReadyEvent", "poolClearedEvent" ], @@ -58,6 +57,7 @@ "heartbeatFrequencyMS": 10000, "appname": "backpressureNetworkTimeoutErrorTest", "serverMonitoringMode": "poll", + "directConnection": true, "connectTimeoutMS": 250, "socketTimeoutMS": 250 } From 829453e3554fb57348b2404eb3042fb52c8ac490 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 09:25:51 -0600 Subject: [PATCH 09/18] formatting --- .../unified/backpressure-network-error-fail.json | 2 +- .../unified/backpressure-network-timeout-fail.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json index 12437e3f32..27a9e296b5 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -143,4 +143,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index 68a0738d38..c2f9329495 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -145,4 +145,4 @@ ] } ] -} \ No newline at end of file +} From 7868ac50951e232f86bbb6cb25659569f6a13c96 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 09:26:29 -0600 Subject: [PATCH 10/18] test without new changes --- pymongo/asynchronous/pool.py | 4 ++-- pymongo/synchronous/pool.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 0e37fa905d..d3433166c0 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1030,8 +1030,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - error._add_error_label("SystemOverloadedError") - error._add_error_label("RetryableError") + # error._add_error_label("SystemOverloadedError") + # error._add_error_label("RetryableError") async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 94ae6844c5..4470e12048 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1026,8 +1026,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - error._add_error_label("SystemOverloadedError") - error._add_error_label("RetryableError") + # error._add_error_label("SystemOverloadedError") + # error._add_error_label("RetryableError") def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. From 05b4984afb76da9fffd8dc71a90bea67ff9fb0e4 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 09:52:39 -0600 Subject: [PATCH 11/18] check for both labels --- pymongo/asynchronous/pool.py | 4 ++-- pymongo/asynchronous/topology.py | 5 +++-- pymongo/synchronous/pool.py | 4 ++-- pymongo/synchronous/topology.py | 5 +++-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index d3433166c0..0e37fa905d 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1030,8 +1030,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - # error._add_error_label("SystemOverloadedError") - # error._add_error_label("RetryableError") + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 1e91bbe79b..00eae565ff 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,8 +890,9 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") + and error.has_error_label("RetryableError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 4470e12048..94ae6844c5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1026,8 +1026,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in # If found, add error labels. if self.is_sdam or type(error) != AutoReconnect: return - # error._add_error_label("SystemOverloadedError") - # error._add_error_label("RetryableError") + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0f6592dfc0..5cbe7cfd11 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,8 +888,9 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") + and error.has_error_label("RetryableError") ): return # "Client MUST replace the server's description with type Unknown From 071626f5989314684e9e66863775009e83110d5a Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 16:10:58 -0600 Subject: [PATCH 12/18] update error behavior and add tests for server description changed --- pymongo/asynchronous/pool.py | 19 ++++---------- pymongo/pool_shared.py | 11 +------- pymongo/synchronous/pool.py | 19 ++++---------- .../backpressure-network-error-fail.json | 25 +++++++++++++++++++ .../backpressure-network-timeout-fail.json | 25 +++++++++++++++++++ 5 files changed, 61 insertions(+), 38 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 0e37fa905d..f912b6bebf 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1022,12 +1022,9 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, add error labels. + # Look for errors of type AutoReconnect and add error labels if appropriate. if self.is_sdam or type(error) != AutoReconnect: return error._add_error_label("SystemOverloadedError") @@ -1062,13 +1059,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = await _configured_protocol_interface( - self.address, self.opts, context=context - ) + networking_interface = await _configured_protocol_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: async with self.lock: @@ -1089,8 +1081,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1117,7 +1108,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A async with self.lock: self.active_contexts.discard(conn.cancel_context) if not completed_hello: - self._handle_connection_error(e, "hello", conn_id) + self._handle_connection_error(e) await conn.close_conn(ConnectionClosedReason.ERROR) raise diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index c555b125df..0536dc3835 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -250,7 +250,6 @@ async def _configured_protocol_interface( address: _Address, options: PoolOptions, protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol, - context: dict[str, bool] | None = None, ) -> AsyncNetworkingInterface: """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. @@ -262,10 +261,6 @@ async def _configured_protocol_interface( ssl_context = options._ssl_context timeout = options.socket_timeout - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: return AsyncNetworkingInterface( await asyncio.get_running_loop().create_connection( @@ -379,7 +374,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket def _configured_socket_interface( - address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None + address: _Address, options: PoolOptions, *args: Any ) -> NetworkingInterface: """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. @@ -390,10 +385,6 @@ def _configured_socket_interface( sock = _create_connection(address, options) ssl_context = options._ssl_context - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: sock.settimeout(options.socket_timeout) return NetworkingInterface(sock) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 94ae6844c5..855a2741e5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1018,12 +1018,9 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, add error labels. + # Look for errors of type AutoReconnect and add error labels if appropriate. if self.is_sdam or type(error) != AutoReconnect: return error._add_error_label("SystemOverloadedError") @@ -1058,13 +1055,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = _configured_socket_interface( - self.address, self.opts, context=context - ) + networking_interface = _configured_socket_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: with self.lock: @@ -1085,8 +1077,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1113,7 +1104,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect with self.lock: self.active_contexts.discard(conn.cancel_context) if not completed_hello: - self._handle_connection_error(e, "hello", conn_id) + self._handle_connection_error(e) conn.close_conn(ConnectionClosedReason.ERROR) raise diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json index 27a9e296b5..28b02304d7 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -49,6 +49,7 @@ "useMultipleMongoses": false, "observeEvents": [ "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", "poolReadyEvent", "poolClearedEvent" ], @@ -89,6 +90,17 @@ "count": 1 } }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, { "name": "failPoint", "object": "testRunner", @@ -139,6 +151,19 @@ "poolReadyEvent": {} } ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] } ] } diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index c2f9329495..a86dba3ff6 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -49,6 +49,7 @@ "useMultipleMongoses": false, "observeEvents": [ "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", "poolReadyEvent", "poolClearedEvent" ], @@ -91,6 +92,17 @@ "count": 1 } }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, { "name": "failPoint", "object": "testRunner", @@ -141,6 +153,19 @@ "poolReadyEvent": {} } ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] } ] } From 373b04e0339d71a168bd0d228961e6705846d861 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 16:12:04 -0600 Subject: [PATCH 13/18] formatting --- .../unified/backpressure-network-error-fail.json | 4 ++-- .../unified/backpressure-network-timeout-fail.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json index 28b02304d7..7188329eba 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -158,10 +158,10 @@ "ignoreExtraEvents": false, "events": [ { - "serverHeartbeatSucceededEvent": {} + "serverHeartbeatSucceededEvent": {} }, { - "serverDescriptionChangedEvent": {} + "serverDescriptionChangedEvent": {} } ] } diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index a86dba3ff6..cfbf14a301 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -160,10 +160,10 @@ "ignoreExtraEvents": false, "events": [ { - "serverHeartbeatSucceededEvent": {} + "serverHeartbeatSucceededEvent": {} }, { - "serverDescriptionChangedEvent": {} + "serverDescriptionChangedEvent": {} } ] } From 0f040ea443de8ad87ab9702d0ec5b6233130b317 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 11 Nov 2025 19:21:07 -0600 Subject: [PATCH 14/18] bump schema versions --- .../unified/backpressure-network-error-fail.json | 2 +- .../unified/backpressure-network-timeout-fail.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json index 7188329eba..a806df571a 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -1,6 +1,6 @@ { "description": "backpressure-network-error-fail", - "schemaVersion": "1.4", + "schemaVersion": "1.17", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index cfbf14a301..94314f3b46 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -1,6 +1,6 @@ { "description": "backpressure-network-timeout-error", - "schemaVersion": "1.4", + "schemaVersion": "1.17", "runOnRequirements": [ { "minServerVersion": "4.4", From 631c2e88888512d3809cb9e3881964f01c79c845 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 12 Nov 2025 12:57:42 -0600 Subject: [PATCH 15/18] address review --- pymongo/asynchronous/pool.py | 3 ++- pymongo/synchronous/pool.py | 3 ++- test/asynchronous/test_pooling.py | 2 +- test/test_pooling.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index f912b6bebf..3c66dad2e6 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -52,6 +52,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -1025,7 +1026,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. # Look for errors of type AutoReconnect and add error labels if appropriate. - if self.is_sdam or type(error) != AutoReconnect: + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 855a2741e5..2597ab7580 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -49,6 +49,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -1021,7 +1022,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. # Look for errors of type AutoReconnect and add error labels if appropriate. - if self.is_sdam or type(error) != AutoReconnect: + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 8f90e3987d..2f0d5fc962 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -517,7 +517,7 @@ async def test_connection_timeout_message(self): @async_client_context.require_failCommand_appName async def test_pool_backpressure_preserves_existing_connections(self): client = await self.async_rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = await async_get_pool(client) await coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) diff --git a/test/test_pooling.py b/test/test_pooling.py index adc2b599f8..0f7ef144f6 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -515,7 +515,7 @@ def test_connection_timeout_message(self): @client_context.require_failCommand_appName def test_pool_backpressure_preserves_existing_connections(self): client = self.rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = get_pool(client) coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) From be8fb9797fd8f6480d71263915d7844db25b4452 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 12 Nov 2025 13:12:06 -0600 Subject: [PATCH 16/18] fix typing --- pymongo/asynchronous/pool.py | 1 + pymongo/synchronous/pool.py | 1 + 2 files changed, 2 insertions(+) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 3c66dad2e6..5936b5f726 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1028,6 +1028,7 @@ def _handle_connection_error(self, error: BaseException) -> None: # Look for errors of type AutoReconnect and add error labels if appropriate. if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 2597ab7580..88b09c9b46 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1024,6 +1024,7 @@ def _handle_connection_error(self, error: BaseException) -> None: # Look for errors of type AutoReconnect and add error labels if appropriate. if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") From 2ab0f4560f46186c4840be4f49fc3611859b7ccf Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 12 Nov 2025 13:53:10 -0600 Subject: [PATCH 17/18] fix tests --- test/connection_monitoring/pool-create-min-size-error.json | 5 +---- .../unified/backpressure-network-timeout-fail.json | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 32d0027e74..4334ce2571 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -15,16 +15,13 @@ "isMaster", "hello" ], - "blockConnection": true, - "blockTimeMS": 1000, + "errorCode": 91, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json index 94314f3b46..0222493f7f 100644 --- a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -116,7 +116,8 @@ "isMaster", "hello" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 500, "appName": "backpressureNetworkTimeoutErrorTest" } } From af427fa093d0210f91f38a988868bb98c098e3e4 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 12 Nov 2025 15:10:36 -0600 Subject: [PATCH 18/18] address review --- pymongo/asynchronous/topology.py | 1 - pymongo/synchronous/topology.py | 1 - 2 files changed, 2 deletions(-) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 00eae565ff..a2b354f7cc 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -892,7 +892,6 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None elif isinstance(error, ConnectionFailure): if isinstance(error, WaitQueueTimeoutError) or ( error.has_error_label("SystemOverloadedError") - and error.has_error_label("RetryableError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 5cbe7cfd11..e967c2089f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -890,7 +890,6 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: elif isinstance(error, ConnectionFailure): if isinstance(error, WaitQueueTimeoutError) or ( error.has_error_label("SystemOverloadedError") - and error.has_error_label("RetryableError") ): return # "Client MUST replace the server's description with type Unknown