9292#![ cfg_attr( feature = "clippy" , plugin( clippy) ) ]
9393#![ cfg_attr( feature = "clippy" , allow(
9494 doc_markdown,
95- // allow double_parens for bson/doc macro.
95+ // allow double_parens for bson/doc macro.
9696 double_parens,
97- // more explicit than catch-alls.
97+ // more explicit than catch-alls.
9898 match_wild_err_arm,
9999 too_many_arguments,
100100) ) ]
@@ -178,7 +178,7 @@ use error::Error::ResponseError;
178178use pool:: PooledStream ;
179179use stream:: StreamConnector ;
180180use topology:: { Topology , TopologyDescription , TopologyType , DEFAULT_HEARTBEAT_FREQUENCY_MS ,
181- DEFAULT_LOCAL_THRESHOLD_MS , DEFAULT_SERVER_SELECTION_TIMEOUT_MS } ;
181+ DEFAULT_LOCAL_THRESHOLD_MS , DEFAULT_SERVER_SELECTION_TIMEOUT_MS } ;
182182use topology:: server:: Server ;
183183
184184/// Interfaces with a MongoDB server or replica set.
@@ -236,16 +236,17 @@ impl ClientOptions {
236236
237237 #[ cfg( feature = "ssl" ) ]
238238 /// Creates a new options struct with a specified SSL certificate and key files.
239- pub fn with_ssl ( ca_file : & str ,
240- certificate_file : & str ,
241- key_file : & str ,
242- verify_peer : bool )
243- -> ClientOptions {
244- let mut options = ClientOptions :: new ( ) ;
245- options. stream_connector = StreamConnector :: with_ssl ( ca_file, certificate_file,
246- key_file, verify_peer) ;
247- options
248- }
239+ pub fn with_ssl (
240+ ca_file : & str ,
241+ certificate_file : & str ,
242+ key_file : & str ,
243+ verify_peer : bool ,
244+ ) -> ClientOptions {
245+ let mut options = ClientOptions :: new ( ) ;
246+ options. stream_connector =
247+ StreamConnector :: with_ssl ( ca_file, certificate_file, key_file, verify_peer) ;
248+ options
249+ }
249250}
250251
251252pub trait ThreadedClient : Sync + Sized {
@@ -261,18 +262,20 @@ pub trait ThreadedClient: Sync + Sized {
261262 fn with_uri_and_options ( uri : & str , options : ClientOptions ) -> Result < Self > ;
262263 /// Create a new Client with manual connection configurations.
263264 /// `connect` and `with_uri` should generally be used as higher-level constructors.
264- fn with_config ( config : ConnectionString ,
265- options : Option < ClientOptions > ,
266- description : Option < TopologyDescription > )
267- -> Result < Self > ;
265+ fn with_config (
266+ config : ConnectionString ,
267+ options : Option < ClientOptions > ,
268+ description : Option < TopologyDescription > ,
269+ ) -> Result < Self > ;
268270 /// Creates a database representation.
269271 fn db ( & self , db_name : & str ) -> Database ;
270272 /// Creates a database representation with custom read and write controls.
271- fn db_with_prefs ( & self ,
272- db_name : & str ,
273- read_preference : Option < ReadPreference > ,
274- write_concern : Option < WriteConcern > )
275- -> Database ;
273+ fn db_with_prefs (
274+ & self ,
275+ db_name : & str ,
276+ read_preference : Option < ReadPreference > ,
277+ write_concern : Option < WriteConcern > ,
278+ ) -> Database ;
276279 /// Acquires a connection stream from the pool, along with slave_ok and should_send_read_pref.
277280 fn acquire_stream ( & self , read_pref : ReadPreference ) -> Result < ( PooledStream , bool , bool ) > ;
278281 /// Acquires a connection stream from the pool for write operations.
@@ -319,75 +322,93 @@ impl ThreadedClient for Client {
319322 Client :: with_config ( config, Some ( options) , None )
320323 }
321324
322- fn with_config ( config : ConnectionString ,
323- options : Option < ClientOptions > ,
324- description : Option < TopologyDescription > )
325- -> Result < Client > {
326-
327- let client_options = options. unwrap_or_else ( ClientOptions :: new) ;
328-
329- let rp = client_options. read_preference
330- . unwrap_or_else ( || ReadPreference :: new ( ReadMode :: Primary , None ) ) ;
331- let wc = client_options. write_concern . unwrap_or_else ( WriteConcern :: new) ;
332-
333- let listener = Listener :: new ( ) ;
334- let file = match client_options. log_file {
335- Some ( string) => {
336- let _ = listener. add_start_hook ( log_command_started) ;
337- let _ = listener. add_completion_hook ( log_command_completed) ;
338- Some ( Mutex :: new ( try!( OpenOptions :: new ( )
339- . write ( true )
340- . append ( true )
341- . create ( true )
342- . open ( & string) ) ) )
343- }
344- None => None ,
345- } ;
346-
347- let client = Arc :: new ( ClientInner {
348- req_id : Arc :: new ( ATOMIC_ISIZE_INIT ) ,
349- topology : try!( Topology :: new ( config. clone ( ) , description, client_options. stream_connector . clone ( ) ) ) ,
350- listener : listener,
351- read_preference : rp,
352- write_concern : wc,
353- log_file : file,
354- } ) ;
355-
356- // Fill servers array and set options
357- {
358- let top_description = & client. topology . description ;
359- let mut top = try!( top_description. write ( ) ) ;
360- top. heartbeat_frequency_ms = client_options. heartbeat_frequency_ms ;
361- top. server_selection_timeout_ms = client_options. server_selection_timeout_ms ;
362- top. local_threshold_ms = client_options. local_threshold_ms ;
363-
364- for host in & config. hosts {
365- let server = Server :: new ( client. clone ( ) , host. clone ( ) , top_description. clone ( ) , true , client_options. stream_connector . clone ( ) ) ;
366-
367- top. servers . insert ( host. clone ( ) , server) ;
368- }
325+ fn with_config (
326+ config : ConnectionString ,
327+ options : Option < ClientOptions > ,
328+ description : Option < TopologyDescription > ,
329+ ) -> Result < Client > {
330+
331+ let client_options = options. unwrap_or_else ( ClientOptions :: new) ;
332+
333+ let rp = client_options. read_preference . unwrap_or_else ( || {
334+ ReadPreference :: new ( ReadMode :: Primary , None )
335+ } ) ;
336+ let wc = client_options. write_concern . unwrap_or_else (
337+ WriteConcern :: new,
338+ ) ;
339+
340+ let listener = Listener :: new ( ) ;
341+ let file = match client_options. log_file {
342+ Some ( string) => {
343+ let _ = listener. add_start_hook ( log_command_started) ;
344+ let _ = listener. add_completion_hook ( log_command_completed) ;
345+ Some ( Mutex :: new ( try!(
346+ OpenOptions :: new ( )
347+ . write ( true )
348+ . append ( true )
349+ . create ( true )
350+ . open ( & string)
351+ ) ) )
352+ }
353+ None => None ,
354+ } ;
355+
356+ let client = Arc :: new ( ClientInner {
357+ req_id : Arc :: new ( ATOMIC_ISIZE_INIT ) ,
358+ topology : try!( Topology :: new (
359+ config. clone ( ) ,
360+ description,
361+ client_options. stream_connector . clone ( ) ,
362+ ) ) ,
363+ listener : listener,
364+ read_preference : rp,
365+ write_concern : wc,
366+ log_file : file,
367+ } ) ;
368+
369+ // Fill servers array and set options
370+ {
371+ let top_description = & client. topology . description ;
372+ let mut top = try!( top_description. write ( ) ) ;
373+ top. heartbeat_frequency_ms = client_options. heartbeat_frequency_ms ;
374+ top. server_selection_timeout_ms = client_options. server_selection_timeout_ms ;
375+ top. local_threshold_ms = client_options. local_threshold_ms ;
376+
377+ for host in & config. hosts {
378+ let server = Server :: new (
379+ client. clone ( ) ,
380+ host. clone ( ) ,
381+ top_description. clone ( ) ,
382+ true ,
383+ client_options. stream_connector . clone ( ) ,
384+ ) ;
385+
386+ top. servers . insert ( host. clone ( ) , server) ;
369387 }
370-
371- Ok ( client)
372388 }
373389
390+ Ok ( client)
391+ }
392+
374393 fn db ( & self , db_name : & str ) -> Database {
375394 Database :: open ( self . clone ( ) , db_name, None , None )
376395 }
377396
378- fn db_with_prefs ( & self ,
379- db_name : & str ,
380- read_preference : Option < ReadPreference > ,
381- write_concern : Option < WriteConcern > )
382- -> Database {
383- Database :: open ( self . clone ( ) , db_name, read_preference, write_concern)
384- }
397+ fn db_with_prefs (
398+ & self ,
399+ db_name : & str ,
400+ read_preference : Option < ReadPreference > ,
401+ write_concern : Option < WriteConcern > ,
402+ ) -> Database {
403+ Database :: open ( self . clone ( ) , db_name, read_preference, write_concern)
404+ }
385405
386- fn acquire_stream ( & self ,
387- read_preference : ReadPreference )
388- -> Result < ( PooledStream , bool , bool ) > {
389- self . topology . acquire_stream ( read_preference)
390- }
406+ fn acquire_stream (
407+ & self ,
408+ read_preference : ReadPreference ,
409+ ) -> Result < ( PooledStream , bool , bool ) > {
410+ self . topology . acquire_stream ( read_preference)
411+ }
391412
392413 fn acquire_write_stream ( & self ) -> Result < PooledStream > {
393414 self . topology . acquire_write_stream ( )
@@ -405,7 +426,8 @@ impl ThreadedClient for Client {
405426 let res = try!( db. command ( doc, CommandType :: ListDatabases , None ) ) ;
406427 if let Some ( & Bson :: Array ( ref batch) ) = res. get ( "databases" ) {
407428 // Extract database names
408- let map = batch. iter ( )
429+ let map = batch
430+ . iter ( )
409431 . filter_map ( |bdoc| {
410432 if let Bson :: Document ( ref doc) = * bdoc {
411433 if let Some ( & Bson :: String ( ref name) ) = doc. get ( "name" ) {
@@ -414,11 +436,13 @@ impl ThreadedClient for Client {
414436 }
415437 None
416438 } )
417- . collect ( ) ;
439+ . collect ( ) ;
418440 return Ok ( map) ;
419441 }
420442
421- Err ( ResponseError ( String :: from ( "Server reply does not contain 'databases'." ) ) )
443+ Err ( ResponseError (
444+ String :: from ( "Server reply does not contain 'databases'." ) ,
445+ ) )
422446 }
423447
424448 fn drop_database ( & self , db_name : & str ) -> Result < ( ) > {
@@ -436,7 +460,9 @@ impl ThreadedClient for Client {
436460
437461 match res. get ( "ismaster" ) {
438462 Some ( & Bson :: Boolean ( is_master) ) => Ok ( is_master) ,
439- _ => Err ( ResponseError ( String :: from ( "Server reply does not contain 'ismaster'." ) ) ) ,
463+ _ => Err ( ResponseError (
464+ String :: from ( "Server reply does not contain 'ismaster'." ) ,
465+ ) ) ,
440466 }
441467 }
442468
0 commit comments