Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ else
fi

# List the packages.
uv sync ${UV_ARGS} --reinstall --quiet
uv sync ${UV_ARGS} --reinstall
uv pip list

# Ensure we go back to base environment after the test.
trap "uv sync" EXIT HUP

# Start the test runner.
uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@"

Expand Down
37 changes: 16 additions & 21 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# See https://just.systems/man/en/ for instructions
set shell := ["bash", "-c"]
# Do not modify the lock file when running justfile commands.
export UV_FROZEN := "1"

# Commonly used command segments.
typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
docs_run := "uv run --extra docs"
uv_run := "uv run --isolated --frozen "
typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
docs_run := uv_run + "--extra docs"
doc_build := "./doc/_build"
mypy_args := "--install-types --non-interactive"

Expand All @@ -14,55 +13,51 @@ mypy_args := "--install-types --non-interactive"
default:
@just --list

[private]
resync:
@uv sync --quiet --frozen

install:
bash .evergreen/scripts/setup-dev-env.sh

[group('docs')]
docs: && resync
docs:
{{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html

[group('docs')]
docs-serve: && resync
docs-serve:
{{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve

[group('docs')]
docs-linkcheck: && resync
docs-linkcheck:
{{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck

[group('typing')]
typing: && resync
typing:
just typing-mypy
just typing-pyright

[group('typing')]
typing-mypy: && resync
typing-mypy:
{{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo
{{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test
{{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py

[group('typing')]
typing-pyright: && resync
typing-pyright:
{{typing_run}} pyright test/test_typing.py test/test_typing_strict.py
{{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py

[group('lint')]
lint: && resync
uv run pre-commit run --all-files
lint:
{{uv_run}} pre-commit run --all-files

[group('lint')]
lint-manual: && resync
uv run pre-commit run --all-files --hook-stage manual
lint-manual:
{{uv_run}} pre-commit run --all-files --hook-stage manual

[group('test')]
test *args="-v --durations=5 --maxfail=10": && resync
uv run --extra test pytest {{args}}
test *args="-v --durations=5 --maxfail=10":
{{uv_run}} --extra test pytest {{args}}

[group('test')]
run-tests *args: && resync
run-tests *args:
bash ./.evergreen/run-tests.sh {{args}}

[group('test')]
Expand Down
71 changes: 12 additions & 59 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from bson import DEFAULT_CODEC_OPTIONS
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.client_session import _validate_session_write_concern
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.asynchronous.network import command
from pymongo.common import (
MAX_BSON_SIZE,
Expand Down Expand Up @@ -788,9 +788,9 @@ def __init__(
# Enforces: maxConnecting
# Also used for: clearing the wait queue
self._max_connecting_cond = _async_create_condition(self.lock)
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -846,8 +846,6 @@ async def _reset(
async with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -930,11 +928,6 @@ async def _reset(
for conn in sockets:
await conn.close_conn(ConnectionClosedReason.STALE)

@property
def max_connecting(self) -> int:
"""The current max connecting limit for the pool."""
return 1 if self._backoff else self.opts.max_connecting

async def update_is_writable(self, is_writable: Optional[bool]) -> None:
"""Updates the is_writable attribute on all sockets currently in the
Pool.
Expand Down Expand Up @@ -1001,7 +994,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
async with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self.max_connecting:
if self._pending >= self._max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1029,30 +1022,6 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
self.requests -= 1
self.size_cond.notify()

def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
# Handle system overload condition for non-sdam pools.
# Look for an AutoReconnect error raised from a ConnectionResetError with
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
# a closed connection.
# If found, set backoff and add error labels.
if self.is_sdam or type(error) != AutoReconnect:
return
self._backoff += 1
error._add_error_label("SystemOverloadedError")
error._add_error_label("RetryableError")
# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_BACKOFF,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
)

async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection:
"""Connect to Mongo and return a new AsyncConnection.

Expand Down Expand Up @@ -1082,17 +1051,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
await asyncio.sleep(_backoff(self._backoff))

# Pass a context to determine if we successfully create a configured socket.
context = dict(has_created_socket=False)

try:
networking_interface = await _configured_protocol_interface(
self.address, self.opts, context=context
)
networking_interface = await _configured_protocol_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as error:
async with self.lock:
Expand All @@ -1113,11 +1073,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if context["has_created_socket"]:
self._handle_connection_error(error, "handshake", conn_id)
if isinstance(error, (IOError, OSError, *SSLErrors)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)

raise

conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
Expand All @@ -1135,18 +1094,15 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A

await conn.authenticate()
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as e:
except BaseException:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello", conn_id)
await conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -1323,12 +1279,12 @@ async def _get_conn(
# to be checked back into the pool.
async with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self.max_connecting):
while not (self.conns or self._pending < self._max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not await _async_cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self.max_connecting:
if self.conns or self._pending < self._max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1469,8 +1425,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
:class:`~pymongo.errors.AutoReconnect` exceptions on server
hiccups, etc. We only check if the socket was closed by an external
error if it has been > 1 second since the socket was checked into the
pool, or we are in backoff mode, to keep performance reasonable -
we can't avoid AutoReconnects completely anyway.
pool, to keep performance reasonable - we can't avoid AutoReconnects
completely anyway.
"""
idle_time_seconds = conn.idle_time_seconds()
# If socket is idle, open a new one.
Expand All @@ -1481,11 +1437,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
await conn.close_conn(ConnectionClosedReason.IDLE)
return True

check_interval_seconds = self._check_interval_seconds
if self._backoff:
check_interval_seconds = 0
if check_interval_seconds is not None and (
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
if self._check_interval_seconds is not None and (
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
):
if conn.conn_closed():
await conn.close_conn(ConnectionClosedReason.ERROR)
Expand Down
4 changes: 1 addition & 3 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,9 +890,7 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None
# Clear the pool.
await server.reset(service_id)
elif isinstance(error, ConnectionFailure):
if isinstance(error, WaitQueueTimeoutError) or error.has_error_label(
"SystemOverloadedError"
):
if isinstance(error, WaitQueueTimeoutError):
return
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
Expand Down
2 changes: 0 additions & 2 deletions pymongo/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class _ConnectionStatusMessage(str, enum.Enum):
POOL_READY = "Connection pool ready"
POOL_CLOSED = "Connection pool closed"
POOL_CLEARED = "Connection pool cleared"
POOL_BACKOFF = "Connection pool backoff"

CONN_CREATED = "Connection created"
CONN_READY = "Connection ready"
Expand Down Expand Up @@ -89,7 +88,6 @@ class _SDAMStatusMessage(str, enum.Enum):
_VERBOSE_CONNECTION_ERROR_REASONS = {
ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed",
ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed",
ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff",
ConnectionClosedReason.STALE: "Connection pool was stale",
ConnectionClosedReason.ERROR: "An error occurred while using the connection",
ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection",
Expand Down
3 changes: 0 additions & 3 deletions pymongo/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,9 +934,6 @@ class ConnectionClosedReason:
POOL_CLOSED = "poolClosed"
"""The pool was closed, making the connection no longer valid."""

POOL_BACKOFF = "poolBackoff"
"""The pool is in backoff mode."""


class ConnectionCheckOutFailedReason:
"""An enum that defines values for `reason` on a
Expand Down
11 changes: 2 additions & 9 deletions pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None):
self._timeout = timeout
self._closed = asyncio.get_running_loop().create_future()
self._connection_lost = False
self._closing_exception = None

def settimeout(self, timeout: float | None) -> None:
self._timeout = timeout
Expand All @@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None:
self.transport.abort()
self._resolve_pending(exc)
self._connection_lost = True
self._closing_exception = exc # type:ignore[assignment]

def connection_lost(self, exc: Optional[Exception] = None) -> None:
self._resolve_pending(exc)
self._closing_exception = exc # type:ignore[assignment]
if not self._closed.done():
self._closed.set_result(None)

Expand Down Expand Up @@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[
if self._done_messages:
message = await self._done_messages.popleft()
else:
if self._closed.done():
if self._closing_exception:
raise self._closing_exception
else:
raise OSError("connection closed")
if self.transport and self.transport.is_closing():
raise OSError("connection is already closed")
read_waiter = asyncio.get_running_loop().create_future()
self._pending_messages.append(read_waiter)
try:
Expand Down Expand Up @@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
else:
msg.set_exception(exc)
self._done_messages.append(msg)
self._pending_messages.clear()


class PyMongoKMSProtocol(PyMongoBaseProtocol):
Expand Down
11 changes: 1 addition & 10 deletions pymongo/pool_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ async def _configured_protocol_interface(
address: _Address,
options: PoolOptions,
protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol,
context: dict[str, bool] | None = None,
) -> AsyncNetworkingInterface:
"""Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface.

Expand All @@ -262,10 +261,6 @@ async def _configured_protocol_interface(
ssl_context = options._ssl_context
timeout = options.socket_timeout

# Signal that we have created the socket successfully.
if context:
context["has_created_socket"] = True

if ssl_context is None:
return AsyncNetworkingInterface(
await asyncio.get_running_loop().create_connection(
Expand Down Expand Up @@ -379,7 +374,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket


def _configured_socket_interface(
address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None
address: _Address, options: PoolOptions, *args: Any
) -> NetworkingInterface:
"""Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket.

Expand All @@ -390,10 +385,6 @@ def _configured_socket_interface(
sock = _create_connection(address, options)
ssl_context = options._ssl_context

# Signal that we have created the socket successfully.
if context:
context["has_created_socket"] = True

if ssl_context is None:
sock.settimeout(options.socket_timeout)
return NetworkingInterface(sock)
Expand Down
Loading
Loading