Skip to content

Commit ae45464

Browse files
committed
PYTHON-5517 Updates to connection pool backoff
1 parent d267eb4 commit ae45464

File tree

8 files changed

+105
-58
lines changed

8 files changed

+105
-58
lines changed

pymongo/asynchronous/pool.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
DocumentTooLarge,
5353
ExecutionTimeout,
5454
InvalidOperation,
55+
NetworkTimeout,
5556
NotPrimaryError,
5657
OperationFailure,
5758
PyMongoError,
@@ -723,6 +724,7 @@ class PoolState:
723724
PAUSED = 1
724725
READY = 2
725726
CLOSED = 3
727+
BACKOFF = 4
726728

727729

728730
# Do *not* explicitly inherit from object or Jython won't call __del__
@@ -791,6 +793,7 @@ def __init__(
791793
self._pending = 0
792794
self._client_id = client_id
793795
self._backoff = 0
796+
self._backoff_connection_time = -1
794797
if self.enabled_for_cmap:
795798
assert self.opts._event_listeners is not None
796799
self.opts._event_listeners.publish_pool_created(
@@ -817,6 +820,9 @@ def __init__(
817820
async def ready(self) -> None:
818821
# Take the lock to avoid the race condition described in PYTHON-2699.
819822
async with self.lock:
823+
# Do not set the pool as ready if in backoff.
824+
if self._backoff:
825+
return
820826
if self.state != PoolState.READY:
821827
self.state = PoolState.READY
822828
if self.enabled_for_cmap:
@@ -846,7 +852,7 @@ async def _reset(
846852
async with self.size_cond:
847853
if self.closed:
848854
return
849-
# Clear the backoff state.
855+
# Clear the backoff amount.
850856
self._backoff = 0
851857
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
852858
old_state, self.state = self.state, PoolState.PAUSED
@@ -1029,26 +1035,34 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10291035
self.requests -= 1
10301036
self.size_cond.notify()
10311037

1032-
def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
1038+
def _handle_connection_error(self, error: BaseException, phase: str) -> None:
10331039
# Handle system overload condition for non-sdam pools.
1034-
# Look for an AutoReconnect error raised from a ConnectionResetError with
1035-
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
1036-
# a closed connection.
1040+
# Look for an AutoReconnect or NetworkTimeout error.
10371041
# If found, set backoff and add error labels.
1038-
if self.is_sdam or type(error) != AutoReconnect:
1042+
if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout):
10391043
return
1040-
self._backoff += 1
10411044
error._add_error_label("SystemOverloadedError")
10421045
error._add_error_label("RetryableError")
1046+
self.backoff()
1047+
1048+
def backoff(self):
1049+
"""Set/increase backoff mode."""
1050+
self._backoff += 1
1051+
if self.state != PoolState.BACKOFF:
1052+
self.state = PoolState.BACKOFF
1053+
if self.enabled_for_cmap:
1054+
assert self.opts._event_listeners is not None
1055+
self.opts._event_listeners.publish_pool_backoff(self.address, self._backoff)
1056+
self._backoff_connection_time = _backoff(self._backoff) + time.monotonic()
1057+
10431058
# Log the pool backoff message.
10441059
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
10451060
_debug_log(
10461061
_CONNECTION_LOGGER,
1047-
message=_ConnectionStatusMessage.POOL_BACKOFF,
1062+
message=_ConnectionStatusMessage.POOL_BACKOFF % self._backoff,
10481063
clientId=self._client_id,
10491064
serverHost=self.address[0],
10501065
serverPort=self.address[1],
1051-
driverConnectionId=conn_id,
10521066
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
10531067
error=ConnectionClosedReason.POOL_BACKOFF,
10541068
)
@@ -1082,10 +1096,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10821096
driverConnectionId=conn_id,
10831097
)
10841098

1085-
# Apply backoff if applicable.
1086-
if self._backoff:
1087-
await asyncio.sleep(_backoff(self._backoff))
1088-
10891099
# Pass a context to determine if we successfully create a configured socket.
10901100
context = dict(has_created_socket=False)
10911101

@@ -1114,7 +1124,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11141124
error=ConnectionClosedReason.ERROR,
11151125
)
11161126
if context["has_created_socket"]:
1117-
self._handle_connection_error(error, "handshake", conn_id)
1127+
self._handle_connection_error(error, "handshake")
11181128
if isinstance(error, (IOError, OSError, *SSLErrors)):
11191129
details = _get_timeout_details(self.opts)
11201130
_raise_connection_failure(self.address, error, timeout_details=details)
@@ -1138,15 +1148,18 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11381148
except BaseException as e:
11391149
async with self.lock:
11401150
self.active_contexts.discard(conn.cancel_context)
1141-
self._handle_connection_error(e, "hello", conn_id)
1151+
self._handle_connection_error(e, "hello")
11421152
await conn.close_conn(ConnectionClosedReason.ERROR)
11431153
raise
11441154

11451155
if handler:
11461156
await handler.client._topology.receive_cluster_time(conn._cluster_time)
11471157

11481158
# Clear the backoff state.
1149-
self._backoff = 0
1159+
if self._backoff:
1160+
self._backoff = 0
1161+
await self.ready()
1162+
11501163
return conn
11511164

11521165
@contextlib.asynccontextmanager
@@ -1342,6 +1355,9 @@ async def _get_conn(
13421355
if await self._perished(conn):
13431356
conn = None
13441357
continue
1358+
# See if we need to wait for the backoff period.
1359+
elif self._backoff and (self._backoff_connection_time < time.monotonic()):
1360+
continue
13451361
else: # We need to create a new connection
13461362
try:
13471363
conn = await self.connect(handler=handler)

pymongo/logger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class _ConnectionStatusMessage(str, enum.Enum):
4242
POOL_READY = "Connection pool ready"
4343
POOL_CLOSED = "Connection pool closed"
4444
POOL_CLEARED = "Connection pool cleared"
45-
POOL_BACKOFF = "Connection pool backoff"
45+
POOL_BACKOFF = "Connection pool backoff attempt number {%s}"
4646

4747
CONN_CREATED = "Connection created"
4848
CONN_READY = "Connection ready"

pymongo/monitoring.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,28 @@ class PoolClosedEvent(_PoolEvent):
914914
__slots__ = ()
915915

916916

917+
class PoolBackoffEvent(_PoolEvent):
918+
"""Published when a Connection Pool is backing off.
919+
920+
:param address: The address (host, port) pair of the server this Pool is
921+
attempting to connect to.
922+
:param attempt: The backoff attempt number.
923+
924+
.. versionadded:: 4.16
925+
"""
926+
927+
__slots__ = ("__attempt",)
928+
929+
def __init__(self, address: _Address, attempt: int) -> None:
930+
super().__init__(address)
931+
self.__attempt = attempt
932+
933+
@property
934+
def attempt(self) -> Optional[ObjectId]:
935+
"""The backoff attempt number."""
936+
return self.__attempt
937+
938+
917939
class ConnectionClosedReason:
918940
"""An enum that defines values for `reason` on a
919941
:class:`ConnectionClosedEvent`.
@@ -1830,6 +1852,15 @@ def publish_pool_closed(self, address: _Address) -> None:
18301852
except Exception:
18311853
_handle_exception()
18321854

1855+
def publish_pool_backoutt(self, address: _Address, attempt: int) -> None:
1856+
"""Publish a :class:`PoolBackoffEvent` to all pool listeners."""
1857+
event = PoolBackoffEvent(address, attempt)
1858+
for subscriber in self.__cmap_listeners:
1859+
try:
1860+
subscriber.pool_closed(event)
1861+
except Exception:
1862+
_handle_exception()
1863+
18331864
def publish_connection_created(self, address: _Address, connection_id: int) -> None:
18341865
"""Publish a :class:`ConnectionCreatedEvent` to all connection
18351866
listeners.

pymongo/synchronous/pool.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
DocumentTooLarge,
5050
ExecutionTimeout,
5151
InvalidOperation,
52+
NetworkTimeout,
5253
NotPrimaryError,
5354
OperationFailure,
5455
PyMongoError,
@@ -721,6 +722,7 @@ class PoolState:
721722
PAUSED = 1
722723
READY = 2
723724
CLOSED = 3
725+
BACKOFF = 4
724726

725727

726728
# Do *not* explicitly inherit from object or Jython won't call __del__
@@ -789,6 +791,7 @@ def __init__(
789791
self._pending = 0
790792
self._client_id = client_id
791793
self._backoff = 0
794+
self._backoff_connection_time = -1
792795
if self.enabled_for_cmap:
793796
assert self.opts._event_listeners is not None
794797
self.opts._event_listeners.publish_pool_created(
@@ -815,6 +818,9 @@ def __init__(
815818
def ready(self) -> None:
816819
# Take the lock to avoid the race condition described in PYTHON-2699.
817820
with self.lock:
821+
# Do not set the pool as ready if in backoff.
822+
if self._backoff:
823+
return
818824
if self.state != PoolState.READY:
819825
self.state = PoolState.READY
820826
if self.enabled_for_cmap:
@@ -844,7 +850,7 @@ def _reset(
844850
with self.size_cond:
845851
if self.closed:
846852
return
847-
# Clear the backoff state.
853+
# Clear the backoff amount.
848854
self._backoff = 0
849855
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
850856
old_state, self.state = self.state, PoolState.PAUSED
@@ -1025,26 +1031,34 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
10251031
self.requests -= 1
10261032
self.size_cond.notify()
10271033

1028-
def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
1034+
def _handle_connection_error(self, error: BaseException, phase: str) -> None:
10291035
# Handle system overload condition for non-sdam pools.
1030-
# Look for an AutoReconnect error raised from a ConnectionResetError with
1031-
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
1032-
# a closed connection.
1036+
# Look for an AutoReconnect or NetworkTimeout error.
10331037
# If found, set backoff and add error labels.
1034-
if self.is_sdam or type(error) != AutoReconnect:
1038+
if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout):
10351039
return
1036-
self._backoff += 1
10371040
error._add_error_label("SystemOverloadedError")
10381041
error._add_error_label("RetryableError")
1042+
self.backoff()
1043+
1044+
def backoff(self):
1045+
"""Set/increase backoff mode."""
1046+
self._backoff += 1
1047+
if self.state != PoolState.BACKOFF:
1048+
self.state = PoolState.BACKOFF
1049+
if self.enabled_for_cmap:
1050+
assert self.opts._event_listeners is not None
1051+
self.opts._event_listeners.publish_pool_backoff(self.address, self._backoff)
1052+
self._backoff_connection_time = _backoff(self._backoff) + time.monotonic()
1053+
10391054
# Log the pool backoff message.
10401055
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
10411056
_debug_log(
10421057
_CONNECTION_LOGGER,
1043-
message=_ConnectionStatusMessage.POOL_BACKOFF,
1058+
message=_ConnectionStatusMessage.POOL_BACKOFF % self._backoff,
10441059
clientId=self._client_id,
10451060
serverHost=self.address[0],
10461061
serverPort=self.address[1],
1047-
driverConnectionId=conn_id,
10481062
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
10491063
error=ConnectionClosedReason.POOL_BACKOFF,
10501064
)
@@ -1078,10 +1092,6 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10781092
driverConnectionId=conn_id,
10791093
)
10801094

1081-
# Apply backoff if applicable.
1082-
if self._backoff:
1083-
time.sleep(_backoff(self._backoff))
1084-
10851095
# Pass a context to determine if we successfully create a configured socket.
10861096
context = dict(has_created_socket=False)
10871097

@@ -1110,7 +1120,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
11101120
error=ConnectionClosedReason.ERROR,
11111121
)
11121122
if context["has_created_socket"]:
1113-
self._handle_connection_error(error, "handshake", conn_id)
1123+
self._handle_connection_error(error, "handshake")
11141124
if isinstance(error, (IOError, OSError, *SSLErrors)):
11151125
details = _get_timeout_details(self.opts)
11161126
_raise_connection_failure(self.address, error, timeout_details=details)
@@ -1134,15 +1144,18 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
11341144
except BaseException as e:
11351145
with self.lock:
11361146
self.active_contexts.discard(conn.cancel_context)
1137-
self._handle_connection_error(e, "hello", conn_id)
1147+
self._handle_connection_error(e, "hello")
11381148
conn.close_conn(ConnectionClosedReason.ERROR)
11391149
raise
11401150

11411151
if handler:
11421152
handler.client._topology.receive_cluster_time(conn._cluster_time)
11431153

11441154
# Clear the backoff state.
1145-
self._backoff = 0
1155+
if self._backoff:
1156+
self._backoff = 0
1157+
self.ready()
1158+
11461159
return conn
11471160

11481161
@contextlib.contextmanager
@@ -1338,6 +1351,9 @@ def _get_conn(
13381351
if self._perished(conn):
13391352
conn = None
13401353
continue
1354+
# See if we need to wait for the backoff period.
1355+
elif self._backoff and (self._backoff_connection_time < time.monotonic()):
1356+
continue
13411357
else: # We need to create a new connection
13421358
try:
13431359
conn = self.connect(handler=handler)

test/connection_logging/connection-logging.json

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,7 @@
331331
"uriOptions": {
332332
"retryReads": false,
333333
"appname": "clientAppName",
334-
"heartbeatFrequencyMS": 10000,
335-
"socketTimeoutMS": 500,
336-
"connectTimeoutMS": 500
334+
"heartbeatFrequencyMS": 10000
337335
},
338336
"observeLogMessages": {
339337
"connection": "debug"
@@ -357,9 +355,7 @@
357355
"failCommands": [
358356
"saslContinue"
359357
],
360-
"closeConnection": false,
361-
"blockConnection": true,
362-
"blockTimeMS": 1000,
358+
"errorCode": 18,
363359
"appName": "clientAppName"
364360
}
365361
}
@@ -372,7 +368,7 @@
372368
"filter": {}
373369
},
374370
"expectError": {
375-
"isClientError": true
371+
"isError": true
376372
}
377373
}
378374
],

test/connection_monitoring/pool-create-min-size-error.json

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,13 @@
1515
"isMaster",
1616
"hello"
1717
],
18-
"closeConnection": false,
19-
"blockConnection": true,
20-
"blockTimeMS": 1000,
18+
"errorCode": 18,
2119
"appName": "poolCreateMinSizeErrorTest"
2220
}
2321
},
2422
"poolOptions": {
2523
"minPoolSize": 1,
2624
"backgroundThreadIntervalMS": 50,
27-
"socketTimeoutMS": 500,
28-
"connectTimeoutMS": 500,
2925
"appName": "poolCreateMinSizeErrorTest"
3026
},
3127
"operations": [

test/discovery_and_monitoring/unified/auth-network-error.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@
5353
"failCommands": [
5454
"saslContinue"
5555
],
56-
"closeConnection": false,
57-
"blockConnection": true,
58-
"blockTimeMS": 1000,
56+
"errorCode": 18,
5957
"appName": "authNetworkErrorTest"
6058
}
6159
}

0 commit comments

Comments
 (0)