Skip to content

Commit d267eb4

Browse files
blink1073sleepyStickNoahStappShaneHarveydependabot[bot]
authored
PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers (#2509)
Co-authored-by: Iris <58442094+sleepyStick@users.noreply.github.com> Co-authored-by: Noah Stapp <noah.stapp@mongodb.com> Co-authored-by: Shane Harvey <shnhrv@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent c458379 commit d267eb4

File tree

19 files changed

+345
-63
lines changed

19 files changed

+345
-63
lines changed

.evergreen/run-tests.sh

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,9 @@ else
2626
fi
2727

2828
# List the packages.
29-
uv sync ${UV_ARGS} --reinstall
29+
uv sync ${UV_ARGS} --reinstall --quiet
3030
uv pip list
3131

32-
# Ensure we go back to base environment after the test.
33-
trap "uv sync" EXIT HUP
34-
3532
# Start the test runner.
3633
uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@"
3734

justfile

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# See https://just.systems/man/en/ for instructions
22
set shell := ["bash", "-c"]
3+
# Do not modify the lock file when running justfile commands.
4+
export UV_FROZEN := "1"
35

46
# Commonly used command segments.
5-
uv_run := "uv run --isolated --frozen "
6-
typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
7-
docs_run := uv_run + "--extra docs"
7+
typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
8+
docs_run := "uv run --extra docs"
89
doc_build := "./doc/_build"
910
mypy_args := "--install-types --non-interactive"
1011

@@ -13,51 +14,55 @@ mypy_args := "--install-types --non-interactive"
1314
default:
1415
@just --list
1516

17+
[private]
18+
resync:
19+
@uv sync --quiet --frozen
20+
1621
install:
1722
bash .evergreen/scripts/setup-dev-env.sh
1823

1924
[group('docs')]
20-
docs:
25+
docs: && resync
2126
{{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html
2227

2328
[group('docs')]
24-
docs-serve:
29+
docs-serve: && resync
2530
{{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve
2631

2732
[group('docs')]
28-
docs-linkcheck:
33+
docs-linkcheck: && resync
2934
{{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck
3035

3136
[group('typing')]
32-
typing:
37+
typing: && resync
3338
just typing-mypy
3439
just typing-pyright
3540

3641
[group('typing')]
37-
typing-mypy:
42+
typing-mypy: && resync
3843
{{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo
3944
{{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test
4045
{{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py
4146

4247
[group('typing')]
43-
typing-pyright:
48+
typing-pyright: && resync
4449
{{typing_run}} pyright test/test_typing.py test/test_typing_strict.py
4550
{{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py
4651

4752
[group('lint')]
48-
lint:
49-
{{uv_run}} pre-commit run --all-files
53+
lint: && resync
54+
uv run pre-commit run --all-files
5055

5156
[group('lint')]
52-
lint-manual:
53-
{{uv_run}} pre-commit run --all-files --hook-stage manual
57+
lint-manual: && resync
58+
uv run pre-commit run --all-files --hook-stage manual
5459

5560
[group('test')]
56-
test *args="-v --durations=5 --maxfail=10":
57-
{{uv_run}} --extra test pytest {{args}}
61+
test *args="-v --durations=5 --maxfail=10": && resync
62+
uv run --extra test pytest {{args}}
5863

5964
[group('test')]
60-
run-tests *args:
65+
run-tests *args: && resync
6166
bash ./.evergreen/run-tests.sh {{args}}
6267

6368
[group('test')]

pymongo/asynchronous/pool.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from bson import DEFAULT_CODEC_OPTIONS
3838
from pymongo import _csot, helpers_shared
3939
from pymongo.asynchronous.client_session import _validate_session_write_concern
40-
from pymongo.asynchronous.helpers import _handle_reauth
40+
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
4141
from pymongo.asynchronous.network import command
4242
from pymongo.common import (
4343
MAX_BSON_SIZE,
@@ -788,9 +788,9 @@ def __init__(
788788
# Enforces: maxConnecting
789789
# Also used for: clearing the wait queue
790790
self._max_connecting_cond = _async_create_condition(self.lock)
791-
self._max_connecting = self.opts.max_connecting
792791
self._pending = 0
793792
self._client_id = client_id
793+
self._backoff = 0
794794
if self.enabled_for_cmap:
795795
assert self.opts._event_listeners is not None
796796
self.opts._event_listeners.publish_pool_created(
@@ -846,6 +846,8 @@ async def _reset(
846846
async with self.size_cond:
847847
if self.closed:
848848
return
849+
# Clear the backoff state.
850+
self._backoff = 0
849851
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
850852
old_state, self.state = self.state, PoolState.PAUSED
851853
self.gen.inc(service_id)
@@ -928,6 +930,11 @@ async def _reset(
928930
for conn in sockets:
929931
await conn.close_conn(ConnectionClosedReason.STALE)
930932

933+
@property
934+
def max_connecting(self) -> int:
935+
"""The current max connecting limit for the pool."""
936+
return 1 if self._backoff else self.opts.max_connecting
937+
931938
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
932939
"""Updates the is_writable attribute on all sockets currently in the
933940
Pool.
@@ -994,7 +1001,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
9941001
async with self._max_connecting_cond:
9951002
# If maxConnecting connections are already being created
9961003
# by this pool then try again later instead of waiting.
997-
if self._pending >= self._max_connecting:
1004+
if self._pending >= self.max_connecting:
9981005
return
9991006
self._pending += 1
10001007
incremented = True
@@ -1022,6 +1029,30 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10221029
self.requests -= 1
10231030
self.size_cond.notify()
10241031

1032+
def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
1033+
# Handle system overload condition for non-sdam pools.
1034+
# Look for an AutoReconnect error raised from a ConnectionResetError with
1035+
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
1036+
# a closed connection.
1037+
# If found, set backoff and add error labels.
1038+
if self.is_sdam or type(error) != AutoReconnect:
1039+
return
1040+
self._backoff += 1
1041+
error._add_error_label("SystemOverloadedError")
1042+
error._add_error_label("RetryableError")
1043+
# Log the pool backoff message.
1044+
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
1045+
_debug_log(
1046+
_CONNECTION_LOGGER,
1047+
message=_ConnectionStatusMessage.POOL_BACKOFF,
1048+
clientId=self._client_id,
1049+
serverHost=self.address[0],
1050+
serverPort=self.address[1],
1051+
driverConnectionId=conn_id,
1052+
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
1053+
error=ConnectionClosedReason.POOL_BACKOFF,
1054+
)
1055+
10251056
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection:
10261057
"""Connect to Mongo and return a new AsyncConnection.
10271058
@@ -1051,8 +1082,17 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10511082
driverConnectionId=conn_id,
10521083
)
10531084

1085+
# Apply backoff if applicable.
1086+
if self._backoff:
1087+
await asyncio.sleep(_backoff(self._backoff))
1088+
1089+
# Pass a context to determine if we successfully create a configured socket.
1090+
context = dict(has_created_socket=False)
1091+
10541092
try:
1055-
networking_interface = await _configured_protocol_interface(self.address, self.opts)
1093+
networking_interface = await _configured_protocol_interface(
1094+
self.address, self.opts, context=context
1095+
)
10561096
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10571097
except BaseException as error:
10581098
async with self.lock:
@@ -1073,10 +1113,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10731113
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
10741114
error=ConnectionClosedReason.ERROR,
10751115
)
1116+
if context["has_created_socket"]:
1117+
self._handle_connection_error(error, "handshake", conn_id)
10761118
if isinstance(error, (IOError, OSError, *SSLErrors)):
10771119
details = _get_timeout_details(self.opts)
10781120
_raise_connection_failure(self.address, error, timeout_details=details)
1079-
10801121
raise
10811122

10821123
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
@@ -1094,15 +1135,18 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10941135

10951136
await conn.authenticate()
10961137
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
1097-
except BaseException:
1138+
except BaseException as e:
10981139
async with self.lock:
10991140
self.active_contexts.discard(conn.cancel_context)
1141+
self._handle_connection_error(e, "hello", conn_id)
11001142
await conn.close_conn(ConnectionClosedReason.ERROR)
11011143
raise
11021144

11031145
if handler:
11041146
await handler.client._topology.receive_cluster_time(conn._cluster_time)
11051147

1148+
# Clear the backoff state.
1149+
self._backoff = 0
11061150
return conn
11071151

11081152
@contextlib.asynccontextmanager
@@ -1279,12 +1323,12 @@ async def _get_conn(
12791323
# to be checked back into the pool.
12801324
async with self._max_connecting_cond:
12811325
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1282-
while not (self.conns or self._pending < self._max_connecting):
1326+
while not (self.conns or self._pending < self.max_connecting):
12831327
timeout = deadline - time.monotonic() if deadline else None
12841328
if not await _async_cond_wait(self._max_connecting_cond, timeout):
12851329
# Timed out, notify the next thread to ensure a
12861330
# timeout doesn't consume the condition.
1287-
if self.conns or self._pending < self._max_connecting:
1331+
if self.conns or self._pending < self.max_connecting:
12881332
self._max_connecting_cond.notify()
12891333
emitted_event = True
12901334
self._raise_wait_queue_timeout(checkout_started_time)
@@ -1425,8 +1469,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
14251469
:class:`~pymongo.errors.AutoReconnect` exceptions on server
14261470
hiccups, etc. We only check if the socket was closed by an external
14271471
error if it has been > 1 second since the socket was checked into the
1428-
pool, to keep performance reasonable - we can't avoid AutoReconnects
1429-
completely anyway.
1472+
pool, or we are in backoff mode, to keep performance reasonable -
1473+
we can't avoid AutoReconnects completely anyway.
14301474
"""
14311475
idle_time_seconds = conn.idle_time_seconds()
14321476
# If socket is idle, open a new one.
@@ -1437,8 +1481,11 @@ async def _perished(self, conn: AsyncConnection) -> bool:
14371481
await conn.close_conn(ConnectionClosedReason.IDLE)
14381482
return True
14391483

1440-
if self._check_interval_seconds is not None and (
1441-
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
1484+
check_interval_seconds = self._check_interval_seconds
1485+
if self._backoff:
1486+
check_interval_seconds = 0
1487+
if check_interval_seconds is not None and (
1488+
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
14421489
):
14431490
if conn.conn_closed():
14441491
await conn.close_conn(ConnectionClosedReason.ERROR)

pymongo/asynchronous/topology.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,9 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None
890890
# Clear the pool.
891891
await server.reset(service_id)
892892
elif isinstance(error, ConnectionFailure):
893-
if isinstance(error, WaitQueueTimeoutError):
893+
if isinstance(error, WaitQueueTimeoutError) or error.has_error_label(
894+
"SystemOverloadedError"
895+
):
894896
return
895897
# "Client MUST replace the server's description with type Unknown
896898
# ... MUST NOT request an immediate check of the server."

pymongo/logger.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class _ConnectionStatusMessage(str, enum.Enum):
4242
POOL_READY = "Connection pool ready"
4343
POOL_CLOSED = "Connection pool closed"
4444
POOL_CLEARED = "Connection pool cleared"
45+
POOL_BACKOFF = "Connection pool backoff"
4546

4647
CONN_CREATED = "Connection created"
4748
CONN_READY = "Connection ready"
@@ -88,6 +89,7 @@ class _SDAMStatusMessage(str, enum.Enum):
8889
_VERBOSE_CONNECTION_ERROR_REASONS = {
8990
ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed",
9091
ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed",
92+
ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff",
9193
ConnectionClosedReason.STALE: "Connection pool was stale",
9294
ConnectionClosedReason.ERROR: "An error occurred while using the connection",
9395
ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection",

pymongo/monitoring.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,9 @@ class ConnectionClosedReason:
934934
POOL_CLOSED = "poolClosed"
935935
"""The pool was closed, making the connection no longer valid."""
936936

937+
POOL_BACKOFF = "poolBackoff"
938+
"""The pool is in backoff mode."""
939+
937940

938941
class ConnectionCheckOutFailedReason:
939942
"""An enum that defines values for `reason` on a

pymongo/network_layer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def __init__(self, timeout: Optional[float] = None):
256256
self._timeout = timeout
257257
self._closed = asyncio.get_running_loop().create_future()
258258
self._connection_lost = False
259+
self._closing_exception = None
259260

260261
def settimeout(self, timeout: float | None) -> None:
261262
self._timeout = timeout
@@ -269,9 +270,11 @@ def close(self, exc: Optional[Exception] = None) -> None:
269270
self.transport.abort()
270271
self._resolve_pending(exc)
271272
self._connection_lost = True
273+
self._closing_exception = exc # type:ignore[assignment]
272274

273275
def connection_lost(self, exc: Optional[Exception] = None) -> None:
274276
self._resolve_pending(exc)
277+
self._closing_exception = exc # type:ignore[assignment]
275278
if not self._closed.done():
276279
self._closed.set_result(None)
277280

@@ -335,8 +338,11 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[
335338
if self._done_messages:
336339
message = await self._done_messages.popleft()
337340
else:
338-
if self.transport and self.transport.is_closing():
339-
raise OSError("connection is already closed")
341+
if self._closed.done():
342+
if self._closing_exception:
343+
raise self._closing_exception
344+
else:
345+
raise OSError("connection closed")
340346
read_waiter = asyncio.get_running_loop().create_future()
341347
self._pending_messages.append(read_waiter)
342348
try:
@@ -474,6 +480,7 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
474480
else:
475481
msg.set_exception(exc)
476482
self._done_messages.append(msg)
483+
self._pending_messages.clear()
477484

478485

479486
class PyMongoKMSProtocol(PyMongoBaseProtocol):

pymongo/pool_shared.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ async def _configured_protocol_interface(
250250
address: _Address,
251251
options: PoolOptions,
252252
protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol,
253+
context: dict[str, bool] | None = None,
253254
) -> AsyncNetworkingInterface:
254255
"""Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface.
255256
@@ -261,6 +262,10 @@ async def _configured_protocol_interface(
261262
ssl_context = options._ssl_context
262263
timeout = options.socket_timeout
263264

265+
# Signal that we have created the socket successfully.
266+
if context:
267+
context["has_created_socket"] = True
268+
264269
if ssl_context is None:
265270
return AsyncNetworkingInterface(
266271
await asyncio.get_running_loop().create_connection(
@@ -374,7 +379,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket
374379

375380

376381
def _configured_socket_interface(
377-
address: _Address, options: PoolOptions, *args: Any
382+
address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None
378383
) -> NetworkingInterface:
379384
"""Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket.
380385
@@ -385,6 +390,10 @@ def _configured_socket_interface(
385390
sock = _create_connection(address, options)
386391
ssl_context = options._ssl_context
387392

393+
# Signal that we have created the socket successfully.
394+
if context:
395+
context["has_created_socket"] = True
396+
388397
if ssl_context is None:
389398
sock.settimeout(options.socket_timeout)
390399
return NetworkingInterface(sock)

0 commit comments

Comments
 (0)