1- use std:: {
2- sync:: { Arc , Weak } ,
3- time:: Duration ,
4- } ;
1+ use std:: { sync:: Weak , time:: Duration } ;
52
63use bson:: { bson, doc} ;
74use lazy_static:: lazy_static;
@@ -15,6 +12,7 @@ use crate::{
1512 cmap:: { Command , Connection } ,
1613 error:: Result ,
1714 is_master:: IsMasterReply ,
15+ options:: { ClientOptions , StreamAddress } ,
1816 sdam:: update_topology,
1917} ;
2018
@@ -25,27 +23,50 @@ lazy_static! {
2523 pub ( crate ) static ref MIN_HEARTBEAT_FREQUENCY : time:: Duration = time:: Duration :: milliseconds( 500 ) ;
2624}
2725
28- /// Starts a monitoring thread associated with a given Server. A weak reference is used to ensure
29- /// that the monitoring thread doesn't keep the server alive after it's been removed from the
30- /// topology or the client has been dropped.
31- pub ( super ) fn monitor_server (
32- mut conn : Connection ,
26+ pub ( super ) struct Monitor {
27+ address : StreamAddress ,
28+ connection : Option < Connection > ,
3329 server : Weak < Server > ,
34- heartbeat_frequency : Option < Duration > ,
35- ) {
36- std:: thread:: spawn ( move || {
37- let mut server_type = ServerType :: Unknown ;
38- let heartbeat_frequency = heartbeat_frequency. unwrap_or ( DEFAULT_HEARTBEAT_FREQUENCY ) ;
30+ server_type : ServerType ,
31+ options : ClientOptions ,
32+ }
33+
34+ impl Monitor {
35+ /// Starts a monitoring thread associated with a given Server. A weak reference is used to
36+ /// ensure that the monitoring thread doesn't keep the server alive after it's been removed
37+ /// from the topology or the client has been dropped.
38+ pub ( super ) fn start (
39+ address : StreamAddress ,
40+ server : Weak < Server > ,
41+ options : ClientOptions ,
42+ ) -> Result < ( ) > {
43+ let mut monitor = Self {
44+ address,
45+ connection : None ,
46+ server,
47+ server_type : ServerType :: Unknown ,
48+ options,
49+ } ;
50+
51+ std:: thread:: spawn ( move || {
52+ monitor. execute ( ) ;
53+ } ) ;
54+
55+ Ok ( ( ) )
56+ }
57+
58+ fn execute ( & mut self ) {
59+ let heartbeat_frequency = self
60+ . options
61+ . heartbeat_freq
62+ . unwrap_or ( DEFAULT_HEARTBEAT_FREQUENCY ) ;
3963
4064 loop {
41- server_type = match monitor_server_check ( & mut conn, server_type, & server) {
42- Some ( server_type) => server_type,
43- None => return ,
44- } ;
65+ self . check_server_and_update_topology ( ) ;
4566
4667 let last_check = PreciseTime :: now ( ) ;
4768
48- let timed_out = match server. upgrade ( ) {
69+ let timed_out = match self . server . upgrade ( ) {
4970 Some ( server) => server. wait_timeout ( heartbeat_frequency) ,
5071 None => return ,
5172 } ;
@@ -56,68 +77,100 @@ pub(super) fn monitor_server(
5677 if duration_since_last_check < * MIN_HEARTBEAT_FREQUENCY {
5778 let remaining_time = * MIN_HEARTBEAT_FREQUENCY - duration_since_last_check;
5879
59- // Since MIN_HEARTBEAT_FREQUENCY is 500 and `duration_since_last_check` is less
60- // than it but still positive, we can be sure that the time::Duration can be
61- // successfully converted to a std::time::Duration. However, in the case of some
62- // bug causing this not to be true, rather than panicking the monitoring thread,
63- // we instead just don't sleep and proceed to checking the server a bit early.
80+ // Since MIN_HEARTBEAT_FREQUENCY is 500 and `duration_since_last_check` is
81+ // less than it but still positive, we can be sure
82+ // that the time::Duration can be successfully
83+ // converted to a std::time::Duration. However, in the case of some
84+ // bug causing this not to be true, rather than panicking the monitoring
85+ // thread, we instead just don't sleep and proceed
86+ // to checking the server a bit early.
6487 if let Ok ( remaining_time) = remaining_time. to_std ( ) {
6588 std:: thread:: sleep ( remaining_time) ;
6689 }
6790 }
6891 }
6992 }
70- } ) ;
71- }
93+ }
7294
73- fn monitor_server_check (
74- conn : & mut Connection ,
75- mut server_type : ServerType ,
76- server : & Weak < Server > ,
77- ) -> Option < ServerType > {
78- // If the server has been dropped, terminate the monitoring thread.
79- let server = match server. upgrade ( ) {
80- Some ( server) => server,
81- None => return None ,
82- } ;
83-
84- // If the topology has been dropped, terminate the monitoring thread.
85- let topology = match server. topology ( ) {
86- Some ( topology) => topology,
87- None => return None ,
88- } ;
89-
90- // Send an isMaster to the server.
91- let server_description = check_server ( conn, server_type, & server) ;
92- server_type = server_description. server_type ;
93-
94- update_topology ( topology, server_description) ;
95-
96- Some ( server_type)
97- }
95+ /// Checks the the server by running an `isMaster` command. If an I/O error occurs, the
96+ /// connection will replaced with a new one.
97+ fn check_server_and_update_topology ( & mut self ) {
98+ // If the server has been dropped, terminate the monitoring thread.
99+ let server = match self . server . upgrade ( ) {
100+ Some ( server) => server,
101+ None => return ,
102+ } ;
103+
104+ // If the topology has been dropped, terminate the monitoring thread.
105+ let topology = match server. topology ( ) {
106+ Some ( topology) => topology,
107+ None => return ,
108+ } ;
109+
110+ // Send an isMaster to the server.
111+ let server_description = self . check_server ( ) ;
112+ self . server_type = server_description. server_type ;
113+
114+ update_topology ( topology, server_description) ;
115+ }
98116
99- fn check_server (
100- conn : & mut Connection ,
101- server_type : ServerType ,
102- server : & Arc < Server > ,
103- ) -> ServerDescription {
104- let address = conn. address ( ) . clone ( ) ;
117+ fn check_server ( & mut self ) -> ServerDescription {
118+ let address = self . address . clone ( ) ;
105119
106- match is_master ( conn ) {
107- Ok ( reply) => return ServerDescription :: new ( address, Some ( Ok ( reply) ) ) ,
108- Err ( e) => {
109- server . clear_connection_pool ( ) ;
120+ match self . perform_is_master ( ) {
121+ Ok ( reply) => ServerDescription :: new ( address, Some ( Ok ( reply) ) ) ,
122+ Err ( e) => {
123+ self . clear_connection_pool ( ) ;
110124
111- if server_type == ServerType :: Unknown {
112- return ServerDescription :: new ( address, Some ( Err ( e) ) ) ;
125+ if self . server_type == ServerType :: Unknown {
126+ return ServerDescription :: new ( address, Some ( Err ( e) ) ) ;
127+ }
128+
129+ ServerDescription :: new ( address, Some ( self . perform_is_master ( ) ) )
113130 }
114131 }
115132 }
116133
117- ServerDescription :: new ( address, Some ( is_master ( conn) ) )
134+ fn perform_is_master ( & mut self ) -> Result < IsMasterReply > {
135+ let connection = self . resolve_connection ( ) ?;
136+ let result = is_master ( connection) ;
137+
138+ if result
139+ . as_ref ( )
140+ . err ( )
141+ . map ( |e| e. kind . is_network_error ( ) )
142+ . unwrap_or ( false )
143+ {
144+ self . connection . take ( ) ;
145+ }
146+
147+ result
148+ }
149+
150+ fn resolve_connection ( & mut self ) -> Result < & mut Connection > {
151+ if let Some ( ref mut connection) = self . connection {
152+ return Ok ( connection) ;
153+ }
154+
155+ let connection = Connection :: new_monitoring (
156+ self . address . clone ( ) ,
157+ self . options . connect_timeout ,
158+ self . options . tls_options ( ) ,
159+ ) ?;
160+
161+ // Since the connection was not `Some` above, this will always insert the new connection and
162+ // return a reference to it.
163+ Ok ( self . connection . get_or_insert ( connection) )
164+ }
165+
166+ fn clear_connection_pool ( & self ) {
167+ if let Some ( server) = self . server . upgrade ( ) {
168+ server. clear_connection_pool ( ) ;
169+ }
170+ }
118171}
119172
120- fn is_master ( conn : & mut Connection ) -> Result < IsMasterReply > {
173+ fn is_master ( connection : & mut Connection ) -> Result < IsMasterReply > {
121174 let command = Command :: new_read (
122175 "isMaster" . into ( ) ,
123176 "admin" . into ( ) ,
@@ -126,14 +179,13 @@ fn is_master(conn: &mut Connection) -> Result<IsMasterReply> {
126179 ) ;
127180
128181 let start_time = PreciseTime :: now ( ) ;
129- let command_response = conn . send_command ( command, None ) ?;
182+ let command_response = connection . send_command ( command, None ) ?;
130183 let end_time = PreciseTime :: now ( ) ;
131184
132185 let command_response = command_response. body ( ) ?;
133186
134187 Ok ( IsMasterReply {
135188 command_response,
136- // TODO RUST-193: Round-trip time
137189 round_trip_time : Some ( start_time. to ( end_time) . to_std ( ) . unwrap ( ) ) ,
138190 } )
139191}
0 commit comments