Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 11 additions & 36 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from bson import DEFAULT_CODEC_OPTIONS
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.client_session import _validate_session_write_concern
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.asynchronous.network import command
from pymongo.common import (
MAX_BSON_SIZE,
Expand Down Expand Up @@ -789,8 +789,8 @@ def __init__(
# Also used for: clearing the wait queue
self._max_connecting_cond = _async_create_condition(self.lock)
self._pending = 0
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -846,8 +846,6 @@ async def _reset(
async with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -930,11 +928,6 @@ async def _reset(
for conn in sockets:
await conn.close_conn(ConnectionClosedReason.STALE)

@property
def max_connecting(self) -> int:
"""The current max connecting limit for the pool."""
return 1 if self._backoff else self.opts.max_connecting

async def update_is_writable(self, is_writable: Optional[bool]) -> None:
"""Updates the is_writable attribute on all sockets currently in the
Pool.
Expand Down Expand Up @@ -1001,7 +994,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
async with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self.max_connecting:
if self._pending >= self._max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1034,24 +1027,11 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in
# Look for an AutoReconnect error raised from a ConnectionResetError with
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
# a closed connection.
# If found, set backoff and add error labels.
# If found, add error labels.
if self.is_sdam or type(error) != AutoReconnect:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now want to label NetworkTimeouts on connect as overload, should this be isinstance(error, AutoReconnect)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added NetworkTimeout explicitly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good, this caught a logic error in the backoff network timeout test

return
self._backoff += 1
error._add_error_label("SystemOverloadedError")
error._add_error_label("RetryableError")
# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_BACKOFF,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
)

async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection:
"""Connect to Mongo and return a new AsyncConnection.
Expand Down Expand Up @@ -1082,10 +1062,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
await asyncio.sleep(_backoff(self._backoff))

# Pass a context to determine if we successfully create a configured socket.
context = dict(has_created_socket=False)

Expand Down Expand Up @@ -1126,9 +1102,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
completed_hello = False
try:
if not self.is_sdam:
await conn.hello()
completed_hello = True
self.is_writable = conn.is_writable
if handler:
handler.contribute_socket(conn, completed_handshake=False)
Expand All @@ -1138,15 +1116,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
except BaseException as e:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello", conn_id)
if not completed_hello:
self._handle_connection_error(e, "hello", conn_id)
await conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -1323,12 +1300,12 @@ async def _get_conn(
# to be checked back into the pool.
async with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self.max_connecting):
while not (self.conns or self._pending < self._max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not await _async_cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self.max_connecting:
if self.conns or self._pending < self._max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1469,7 +1446,7 @@ async def _perished(self, conn: AsyncConnection) -> bool:
:class:`~pymongo.errors.AutoReconnect` exceptions on server
hiccups, etc. We only check if the socket was closed by an external
error if it has been > 1 second since the socket was checked into the
pool, or we are in backoff mode, to keep performance reasonable -
pool to keep performance reasonable -
we can't avoid AutoReconnects completely anyway.
"""
idle_time_seconds = conn.idle_time_seconds()
Expand All @@ -1482,8 +1459,6 @@ async def _perished(self, conn: AsyncConnection) -> bool:
return True

check_interval_seconds = self._check_interval_seconds
if self._backoff:
check_interval_seconds = 0
if check_interval_seconds is not None and (
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
):
Expand Down
11 changes: 2 additions & 9 deletions pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None):
self._timeout = timeout
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted these changes from #2509 since they were unrelated

self._closed = asyncio.get_running_loop().create_future()
self._connection_lost = False
self._closing_exception = None

def settimeout(self, timeout: float | None) -> None:
self._timeout = timeout
Expand All @@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None:
self.transport.abort()
self._resolve_pending(exc)
self._connection_lost = True
self._closing_exception = exc # type:ignore[assignment]

def connection_lost(self, exc: Optional[Exception] = None) -> None:
self._resolve_pending(exc)
self._closing_exception = exc # type:ignore[assignment]
if not self._closed.done():
self._closed.set_result(None)

Expand Down Expand Up @@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[
if self._done_messages:
message = await self._done_messages.popleft()
else:
if self._closed.done():
if self._closing_exception:
raise self._closing_exception
else:
raise OSError("connection closed")
if self.transport and self.transport.is_closing():
raise OSError("connection is already closed")
read_waiter = asyncio.get_running_loop().create_future()
self._pending_messages.append(read_waiter)
try:
Expand Down Expand Up @@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
else:
msg.set_exception(exc)
self._done_messages.append(msg)
self._pending_messages.clear()


class PyMongoKMSProtocol(PyMongoBaseProtocol):
Expand Down
47 changes: 11 additions & 36 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
from pymongo.server_type import SERVER_TYPE
from pymongo.socket_checker import SocketChecker
from pymongo.synchronous.client_session import _validate_session_write_concern
from pymongo.synchronous.helpers import _backoff, _handle_reauth
from pymongo.synchronous.helpers import _handle_reauth
from pymongo.synchronous.network import command

if TYPE_CHECKING:
Expand Down Expand Up @@ -787,8 +787,8 @@ def __init__(
# Also used for: clearing the wait queue
self._max_connecting_cond = _create_condition(self.lock)
self._pending = 0
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
self._backoff = 0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand Down Expand Up @@ -844,8 +844,6 @@ def _reset(
with self.size_cond:
if self.closed:
return
# Clear the backoff state.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
Expand Down Expand Up @@ -928,11 +926,6 @@ def _reset(
for conn in sockets:
conn.close_conn(ConnectionClosedReason.STALE)

@property
def max_connecting(self) -> int:
"""The current max connecting limit for the pool."""
return 1 if self._backoff else self.opts.max_connecting

def update_is_writable(self, is_writable: Optional[bool]) -> None:
"""Updates the is_writable attribute on all sockets currently in the
Pool.
Expand Down Expand Up @@ -997,7 +990,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self.max_connecting:
if self._pending >= self._max_connecting:
return
self._pending += 1
incremented = True
Expand Down Expand Up @@ -1030,24 +1023,11 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in
# Look for an AutoReconnect error raised from a ConnectionResetError with
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
# a closed connection.
# If found, set backoff and add error labels.
# If found, add error labels.
if self.is_sdam or type(error) != AutoReconnect:
return
self._backoff += 1
error._add_error_label("SystemOverloadedError")
error._add_error_label("RetryableError")
# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_BACKOFF,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
)

def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection:
"""Connect to Mongo and return a new Connection.
Expand Down Expand Up @@ -1078,10 +1058,6 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
time.sleep(_backoff(self._backoff))

# Pass a context to determine if we successfully create a configured socket.
context = dict(has_created_socket=False)

Expand Down Expand Up @@ -1122,9 +1098,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
completed_hello = False
try:
if not self.is_sdam:
conn.hello()
completed_hello = True
self.is_writable = conn.is_writable
if handler:
handler.contribute_socket(conn, completed_handshake=False)
Expand All @@ -1134,15 +1112,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
except BaseException as e:
with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello", conn_id)
if not completed_hello:
self._handle_connection_error(e, "hello", conn_id)
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
return conn

@contextlib.contextmanager
Expand Down Expand Up @@ -1319,12 +1296,12 @@ def _get_conn(
# to be checked back into the pool.
with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self.max_connecting):
while not (self.conns or self._pending < self._max_connecting):
timeout = deadline - time.monotonic() if deadline else None
if not _cond_wait(self._max_connecting_cond, timeout):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self.max_connecting:
if self.conns or self._pending < self._max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout(checkout_started_time)
Expand Down Expand Up @@ -1465,7 +1442,7 @@ def _perished(self, conn: Connection) -> bool:
:class:`~pymongo.errors.AutoReconnect` exceptions on server
hiccups, etc. We only check if the socket was closed by an external
error if it has been > 1 second since the socket was checked into the
pool, or we are in backoff mode, to keep performance reasonable -
pool to keep performance reasonable -
we can't avoid AutoReconnects completely anyway.
"""
idle_time_seconds = conn.idle_time_seconds()
Expand All @@ -1478,8 +1455,6 @@ def _perished(self, conn: Connection) -> bool:
return True

check_interval_seconds = self._check_interval_seconds
if self._backoff:
check_interval_seconds = 0
if check_interval_seconds is not None and (
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
):
Expand Down
Loading
Loading