Skip to content

Commit b082720

Browse files
committed
PYTHON-2634 Only update pools for data-bearing servers (#590)
Fixes a noisy OperationFailure: Authentication failed error. Do not attempt to create unneeded connections to arbiters, ghosts, hidden members, or unknown members. (cherry picked from commit 4c7718e) Conflicts: pymongo/topology.py test/test_client.py test/test_cmap.py
1 parent c58dee8 commit b082720

File tree

6 files changed

+137
-39
lines changed

6 files changed

+137
-39
lines changed

pymongo/topology.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -431,15 +431,26 @@ def handle_getlasterror(self, address, error_msg):
431431
ServerDescription(address, error=error), True)
432432
server.request_check()
433433

434+
def data_bearing_servers(self):
435+
"""Return a list of all data-bearing servers.
436+
437+
This includes any server that might be selected for an operation.
438+
"""
439+
if self._description.topology_type == TOPOLOGY_TYPE.Single:
440+
return self._description.known_servers
441+
return self._description.readable_servers
442+
434443
def update_pool(self, all_credentials):
435444
# Remove any stale sockets and add new sockets if pool is too small.
436445
servers = []
437446
with self._lock:
438-
for server in self._servers.values():
439-
servers.append((server, server._pool.generation))
447+
# Only update pools for data-bearing servers.
448+
for sd in self.data_bearing_servers():
449+
server = self._servers[sd.address]
450+
servers.append((server, server.pool.generation))
440451

441452
for server, generation in servers:
442-
server._pool.remove_stale_sockets(generation, all_credentials)
453+
server.pool.remove_stale_sockets(generation, all_credentials)
443454

444455
def close(self):
445456
"""Clear pools and terminate monitors. Topology reopens on demand."""

test/pymongo_mocks.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ def _check_once(self):
8585
class MockClient(MongoClient):
8686
def __init__(
8787
self, standalones, members, mongoses, ismaster_hosts=None,
88-
*args, **kwargs):
88+
arbiters=None, down_hosts=None, *args, **kwargs):
8989
"""A MongoClient connected to the default server, with a mock topology.
9090
91-
standalones, members, mongoses determine the configuration of the
92-
topology. They are formatted like ['a:1', 'b:2']. ismaster_hosts
93-
provides an alternative host list for the server's mocked ismaster
94-
response; see test_connect_with_internal_ips.
91+
standalones, members, mongoses, arbiters, and down_hosts determine the
92+
configuration of the topology. They are formatted like ['a:1', 'b:2'].
93+
ismaster_hosts provides an alternative host list for the server's
94+
mocked ismaster response; see test_connect_with_internal_ips.
9595
"""
9696
self.mock_standalones = standalones[:]
9797
self.mock_members = members[:]
@@ -101,6 +101,9 @@ def __init__(
101101
else:
102102
self.mock_primary = None
103103

104+
# Hosts that should be considered an arbiter.
105+
self.mock_arbiters = arbiters[:] if arbiters else []
106+
104107
if ismaster_hosts is not None:
105108
self.mock_ismaster_hosts = ismaster_hosts
106109
else:
@@ -109,7 +112,7 @@ def __init__(
109112
self.mock_mongoses = mongoses[:]
110113

111114
# Hosts that should raise socket errors.
112-
self.mock_down_hosts = []
115+
self.mock_down_hosts = down_hosts[:] if down_hosts else []
113116

114117
# Hostname -> (min wire version, max wire version)
115118
self.mock_wire_versions = {}
@@ -182,6 +185,10 @@ def mock_is_master(self, host):
182185

183186
if self.mock_primary:
184187
response['primary'] = self.mock_primary
188+
189+
if host in self.mock_arbiters:
190+
response['arbiterOnly'] = True
191+
response['secondary'] = False
185192
elif host in self.mock_mongoses:
186193
response = {
187194
'ok': 1,

test/test_client.py

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from bson.son import SON
3636
from bson.tz_util import utc
3737
import pymongo
38-
from pymongo import auth, message
38+
from pymongo import auth, message, monitoring
3939
from pymongo.common import CONNECT_TIMEOUT, _UUID_REPRESENTATIONS
4040
from pymongo.command_cursor import CommandCursor
4141
from pymongo.compression_support import _HAVE_SNAPPY, _HAVE_ZSTD
@@ -58,7 +58,7 @@
5858
from pymongo.pool import SocketInfo, _METADATA
5959
from pymongo.read_preferences import ReadPreference
6060
from pymongo.server_description import ServerDescription
61-
from pymongo.server_selectors import (any_server_selector,
61+
from pymongo.server_selectors import (readable_server_selector,
6262
writable_server_selector)
6363
from pymongo.server_type import SERVER_TYPE
6464
from pymongo.settings import TOPOLOGY_TYPE
@@ -78,6 +78,7 @@
7878
from test.pymongo_mocks import MockClient
7979
from test.utils import (assertRaisesExactly,
8080
connected,
81+
CMAPListener,
8182
delay,
8283
FunctionCallRecorder,
8384
get_pool,
@@ -454,21 +455,25 @@ def test_uri_security_options(self):
454455

455456
class TestClient(IntegrationTest):
456457

457-
def test_max_idle_time_reaper(self):
458+
def test_max_idle_time_reaper_default(self):
458459
with client_knobs(kill_cursor_frequency=0.1):
459460
# Assert reaper doesn't remove sockets when maxIdleTimeMS not set
460461
client = rs_or_single_client()
461-
server = client._get_topology().select_server(any_server_selector)
462+
server = client._get_topology().select_server(
463+
readable_server_selector)
462464
with server._pool.get_socket({}) as sock_info:
463465
pass
464466
self.assertEqual(1, len(server._pool.sockets))
465467
self.assertTrue(sock_info in server._pool.sockets)
466468
client.close()
467469

470+
def test_max_idle_time_reaper_removes_stale_minPoolSize(self):
471+
with client_knobs(kill_cursor_frequency=0.1):
468472
# Assert reaper removes idle socket and replaces it with a new one
469473
client = rs_or_single_client(maxIdleTimeMS=500,
470474
minPoolSize=1)
471-
server = client._get_topology().select_server(any_server_selector)
475+
server = client._get_topology().select_server(
476+
readable_server_selector)
472477
with server._pool.get_socket({}) as sock_info:
473478
pass
474479
# When the reaper runs at the same time as the get_socket, two
@@ -480,11 +485,14 @@ def test_max_idle_time_reaper(self):
480485
"replace stale socket")
481486
client.close()
482487

488+
def test_max_idle_time_reaper_does_not_exceed_maxPoolSize(self):
489+
with client_knobs(kill_cursor_frequency=0.1):
483490
# Assert reaper respects maxPoolSize when adding new sockets.
484491
client = rs_or_single_client(maxIdleTimeMS=500,
485492
minPoolSize=1,
486493
maxPoolSize=1)
487-
server = client._get_topology().select_server(any_server_selector)
494+
server = client._get_topology().select_server(
495+
readable_server_selector)
488496
with server._pool.get_socket({}) as sock_info:
489497
pass
490498
# When the reaper runs at the same time as the get_socket,
@@ -496,9 +504,12 @@ def test_max_idle_time_reaper(self):
496504
"replace stale socket")
497505
client.close()
498506

507+
def test_max_idle_time_reaper_removes_stale(self):
508+
with client_knobs(kill_cursor_frequency=0.1):
499509
# Assert reaper has removed idle socket and NOT replaced it
500510
client = rs_or_single_client(maxIdleTimeMS=500)
501-
server = client._get_topology().select_server(any_server_selector)
511+
server = client._get_topology().select_server(
512+
readable_server_selector)
502513
with server._pool.get_socket({}) as sock_info_one:
503514
pass
504515
# Assert that the pool does not close sockets prematurely.
@@ -514,12 +525,14 @@ def test_max_idle_time_reaper(self):
514525
def test_min_pool_size(self):
515526
with client_knobs(kill_cursor_frequency=.1):
516527
client = rs_or_single_client()
517-
server = client._get_topology().select_server(any_server_selector)
528+
server = client._get_topology().select_server(
529+
readable_server_selector)
518530
self.assertEqual(0, len(server._pool.sockets))
519531

520532
# Assert that pool started up at minPoolSize
521533
client = rs_or_single_client(minPoolSize=10)
522-
server = client._get_topology().select_server(any_server_selector)
534+
server = client._get_topology().select_server(
535+
readable_server_selector)
523536
wait_until(lambda: 10 == len(server._pool.sockets),
524537
"pool initialized with 10 sockets")
525538

@@ -534,7 +547,8 @@ def test_max_idle_time_checkout(self):
534547
# Use high frequency to test _get_socket_no_auth.
535548
with client_knobs(kill_cursor_frequency=99999999):
536549
client = rs_or_single_client(maxIdleTimeMS=500)
537-
server = client._get_topology().select_server(any_server_selector)
550+
server = client._get_topology().select_server(
551+
readable_server_selector)
538552
with server._pool.get_socket({}) as sock_info:
539553
pass
540554
self.assertEqual(1, len(server._pool.sockets))
@@ -548,7 +562,8 @@ def test_max_idle_time_checkout(self):
548562

549563
# Test that sockets are reused if maxIdleTimeMS is not set.
550564
client = rs_or_single_client()
551-
server = client._get_topology().select_server(any_server_selector)
565+
server = client._get_topology().select_server(
566+
readable_server_selector)
552567
with server._pool.get_socket({}) as sock_info:
553568
pass
554569
self.assertEqual(1, len(server._pool.sockets))
@@ -2016,5 +2031,60 @@ def timeout_task():
20162031
self.assertIsNone(ct.get())
20172032

20182033

2034+
class TestClientPool(MockClientTest):
2035+
2036+
def test_rs_client_does_not_maintain_pool_to_arbiters(self):
2037+
listener = CMAPListener()
2038+
c = MockClient(
2039+
standalones=[],
2040+
members=['a:1', 'b:2', 'c:3', 'd:4'],
2041+
mongoses=[],
2042+
arbiters=['c:3'], # c:3 is an arbiter.
2043+
down_hosts=['d:4'], # d:4 is unreachable.
2044+
host=['a:1', 'b:2', 'c:3', 'd:4'],
2045+
replicaSet='rs',
2046+
minPoolSize=1, # minPoolSize
2047+
event_listeners=[listener],
2048+
)
2049+
self.addCleanup(c.close)
2050+
2051+
wait_until(lambda: len(c.nodes) == 3, 'connect')
2052+
self.assertEqual(c.address, ('a', 1))
2053+
self.assertEqual(c.arbiters, set([('c', 3)]))
2054+
# Assert that we create 2 and only 2 pooled connections.
2055+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 2)
2056+
self.assertEqual(
2057+
listener.event_count(monitoring.ConnectionCreatedEvent), 2)
2058+
# Assert that we do not create connections to arbiters.
2059+
arbiter = c._topology.get_server_by_address(('c', 3))
2060+
self.assertFalse(arbiter.pool.sockets)
2061+
# Assert that we do not create connections to unknown servers.
2062+
arbiter = c._topology.get_server_by_address(('d', 4))
2063+
self.assertFalse(arbiter.pool.sockets)
2064+
2065+
def test_direct_client_maintains_pool_to_arbiter(self):
2066+
listener = CMAPListener()
2067+
c = MockClient(
2068+
standalones=[],
2069+
members=['a:1', 'b:2', 'c:3'],
2070+
mongoses=[],
2071+
arbiters=['c:3'], # c:3 is an arbiter.
2072+
host='c:3',
2073+
directConnection=True,
2074+
minPoolSize=1, # minPoolSize
2075+
event_listeners=[listener],
2076+
)
2077+
self.addCleanup(c.close)
2078+
2079+
wait_until(lambda: len(c.nodes) == 1, 'connect')
2080+
self.assertEqual(c.address, ('c', 3))
2081+
# Assert that we create 1 pooled connection.
2082+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 1)
2083+
self.assertEqual(
2084+
listener.event_count(monitoring.ConnectionCreatedEvent), 1)
2085+
arbiter = c._topology.get_server_by_address(('c', 3))
2086+
self.assertEqual(len(arbiter.pool.sockets), 1)
2087+
2088+
20192089
if __name__ == "__main__":
20202090
unittest.main()

test/test_heartbeat_monitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ def _check_with_socket(self, *args, **kwargs):
5151
# monitor thread may run multiple times during the execution
5252
# of this test.
5353
wait_until(
54-
lambda: len(listener.results) >= expected_len,
54+
lambda: len(listener.events) >= expected_len,
5555
"publish all events")
5656

5757
try:
5858
# zip gives us len(expected_results) pairs.
59-
for expected, actual in zip(expected_results, listener.results):
59+
for expected, actual in zip(expected_results, listener.events):
6060
self.assertEqual(expected,
6161
actual.__class__.__name__)
6262
self.assertEqual(actual.connection_id,

test/test_streaming_protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def hb_failed(event):
203203
self.assertTrue(hb_failed_events[0].awaited)
204204
# Depending on thread scheduling, the failed heartbeat could occur on
205205
# the second or third check.
206-
events = [type(e) for e in hb_listener.results[:4]]
206+
events = [type(e) for e in hb_listener.events[:4]]
207207
if events == [monitoring.ServerHeartbeatStartedEvent,
208208
monitoring.ServerHeartbeatSucceededEvent,
209209
monitoring.ServerHeartbeatStartedEvent,

test/utils.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
monitoring, operations, read_preferences)
4040
from pymongo.collection import ReturnDocument
4141
from pymongo.errors import ConfigurationError, OperationFailure
42-
from pymongo.monitoring import _SENSITIVE_COMMANDS, ConnectionPoolListener
42+
from pymongo.monitoring import _SENSITIVE_COMMANDS
4343
from pymongo.pool import (_CancellationContext,
4444
PoolOptions)
4545
from pymongo.read_concern import ReadConcern
@@ -63,7 +63,7 @@
6363
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=50)
6464

6565

66-
class CMAPListener(ConnectionPoolListener):
66+
class BaseListener(object):
6767
def __init__(self):
6868
self.events = []
6969

@@ -74,8 +74,26 @@ def add_event(self, event):
7474
self.events.append(event)
7575

7676
def event_count(self, event_type):
77-
return len([event for event in self.events[:]
78-
if isinstance(event, event_type)])
77+
return len(self.events_by_type(event_type))
78+
79+
def events_by_type(self, event_type):
80+
"""Return the matching events by event class.
81+
82+
event_type can be a single class or a tuple of classes.
83+
"""
84+
return self.matching(lambda e: isinstance(e, event_type))
85+
86+
def matching(self, matcher):
87+
"""Return the matching events."""
88+
return [event for event in self.events[:] if matcher(event)]
89+
90+
def wait_for_event(self, event, count):
91+
"""Wait for a number of events to be published, or fail."""
92+
wait_until(lambda: self.event_count(event) >= count,
93+
'find %s %s event(s)' % (count, event))
94+
95+
96+
class CMAPListener(BaseListener, monitoring.ConnectionPoolListener):
7997

8098
def connection_created(self, event):
8199
self.add_event(event)
@@ -217,25 +235,17 @@ class ServerAndTopologyEventListener(ServerEventListener,
217235
"""Listens to Server and Topology events."""
218236

219237

220-
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
238+
class HeartbeatEventListener(BaseListener, monitoring.ServerHeartbeatListener):
221239
"""Listens to only server heartbeat events."""
222240

223-
def __init__(self):
224-
self.results = []
225-
226241
def started(self, event):
227-
self.results.append(event)
242+
self.add_event(event)
228243

229244
def succeeded(self, event):
230-
self.results.append(event)
245+
self.add_event(event)
231246

232247
def failed(self, event):
233-
self.results.append(event)
234-
235-
def matching(self, matcher):
236-
"""Return the matching events."""
237-
results = self.results[:]
238-
return [event for event in results if matcher(event)]
248+
self.add_event(event)
239249

240250

241251
class MockSocketInfo(object):

0 commit comments

Comments
 (0)