4141 OperationFailure ,
4242 ServerSelectionTimeoutError ,
4343 WriteError )
44+ from pymongo .ismaster import IsMaster
4445from pymongo .monitor import SrvMonitor
4546from pymongo .monotonic import time as _time
4647from pymongo .server import Server
@@ -140,7 +141,8 @@ def target():
140141 executor .open ()
141142
142143 self ._srv_monitor = None
143- if self ._settings .fqdn is not None :
144+ if (self ._settings .fqdn is not None and
145+ not self ._settings .load_balanced ):
144146 self ._srv_monitor = SrvMonitor (self , self ._settings )
145147
146148 def open (self ):
@@ -486,29 +488,38 @@ def pop_all_sessions(self):
486488 with self ._lock :
487489 return self ._session_pool .pop_all ()
488490
489- def get_server_session (self ):
490- """Start or resume a server session, or raise ConfigurationError."""
491- with self ._lock :
492- session_timeout = self ._description .logical_session_timeout_minutes
493- if session_timeout is None :
494- # Maybe we need an initial scan? Can raise ServerSelectionError.
495- if self ._description .topology_type == TOPOLOGY_TYPE .Single :
496- if not self ._description .has_known_servers :
497- self ._select_servers_loop (
498- any_server_selector ,
499- self ._settings .server_selection_timeout ,
500- None )
501- elif not self ._description .readable_servers :
491+ def _check_session_support (self ):
492+ """Internal check for session support on non-load balanced clusters."""
493+ session_timeout = self ._description .logical_session_timeout_minutes
494+ if session_timeout is None :
495+ # Maybe we need an initial scan? Can raise ServerSelectionError.
496+ if self ._description .topology_type == TOPOLOGY_TYPE .Single :
497+ if not self ._description .has_known_servers :
502498 self ._select_servers_loop (
503- readable_server_selector ,
499+ any_server_selector ,
504500 self ._settings .server_selection_timeout ,
505501 None )
502+ elif not self ._description .readable_servers :
503+ self ._select_servers_loop (
504+ readable_server_selector ,
505+ self ._settings .server_selection_timeout ,
506+ None )
506507
507508 session_timeout = self ._description .logical_session_timeout_minutes
508509 if session_timeout is None :
509510 raise ConfigurationError (
510511 "Sessions are not supported by this MongoDB deployment" )
512+ return session_timeout
511513
514+ def get_server_session (self ):
515+ """Start or resume a server session, or raise ConfigurationError."""
516+ with self ._lock :
517+ # Sessions are always supported in load balanced mode.
518+ if not self ._settings .load_balanced :
519+ session_timeout = self ._check_session_support ()
520+ else :
521+ # Sessions never time out in load balanced mode.
522+ session_timeout = float ('inf' )
512523 return self ._session_pool .get_server_session (session_timeout )
513524
514525 def return_server_session (self , server_session , lock ):
@@ -548,6 +559,12 @@ def _ensure_opened(self):
548559 SRV_POLLING_TOPOLOGIES ):
549560 self ._srv_monitor .open ()
550561
562+ if self ._settings .load_balanced :
563+ # Emit initial SDAM events for load balancer mode.
564+ self ._process_change (ServerDescription (
565+ self ._seed_addresses [0 ],
566+ IsMaster ({'ok' : 1 , 'serviceId' : self ._topology_id })))
567+
551568 # Ensure that the monitors are open.
552569 for server in itervalues (self ._servers ):
553570 server .open ()
@@ -601,15 +618,17 @@ def _handle_error(self, address, err_ctx):
601618 err_code = error .details .get ('code' , - 1 )
602619 is_shutting_down = err_code in helpers ._SHUTDOWN_CODES
603620 # Mark server Unknown, clear the pool, and request check.
604- self ._process_change (ServerDescription (address , error = error ))
621+ if not self ._settings .load_balanced :
622+ self ._process_change (ServerDescription (address , error = error ))
605623 if is_shutting_down or (err_ctx .max_wire_version <= 7 ):
606624 # Clear the pool.
607625 server .reset ()
608626 server .request_check ()
609627 elif issubclass (exc_type , ConnectionFailure ):
610628 # "Client MUST replace the server's description with type Unknown
611629 # ... MUST NOT request an immediate check of the server."
612- self ._process_change (ServerDescription (address , error = error ))
630+ if not self ._settings .load_balanced :
631+ self ._process_change (ServerDescription (address , error = error ))
613632 # Clear the pool.
614633 server .reset ()
615634 # "When a client marks a server Unknown from `Network error when
@@ -620,7 +639,9 @@ def _handle_error(self, address, err_ctx):
620639 # Do not request an immediate check since the server is likely
621640 # shutting down.
622641 if error .code in helpers ._NOT_MASTER_CODES :
623- self ._process_change (ServerDescription (address , error = error ))
642+ if not self ._settings .load_balanced :
643+ self ._process_change (
644+ ServerDescription (address , error = error ))
624645 # Clear the pool.
625646 server .reset ()
626647
0 commit comments