11import logging
22
33from math import ceil
4- from time import mktime
4+ from time import mktime , sleep
55
66from DB import DB
77from ShardingHandler import ShardingHandler
88
99
1010class ReplsetHandler :
11- def __init__ (self , host , port , user , password , authdb , max_lag_secs ):
11+ def __init__ (self , host , port , user , password , authdb , max_lag_secs , retries = 5 ):
1212 self .host = host
1313 self .port = port
1414 self .user = user
1515 self .password = password
1616 self .authdb = authdb
1717 self .max_lag_secs = max_lag_secs
18+ self .retries = retries
1819
1920 try :
2021 self .connection = DB (self .host , self .port , self .user , self .password , self .authdb ).connection ()
@@ -26,46 +27,68 @@ def close(self):
2627 return self .connection .close ()
2728
2829 def get_rs_status (self ):
29- try :
30- return self .connection ['admin' ].command ("replSetGetStatus" )
31- except Exception , e :
32- logging .fatal ("Failed to execute command! Error: %s" % e )
33- raise e
30+ tries = 0
31+ status = None
32+ while not status and tries < self .retries :
33+ try :
34+ status = self .connection ['admin' ].command ("replSetGetStatus" )
35+ if not status :
36+ raise e
37+ except Exception , e :
38+ logging .error ("Error running command 'replSetGetStatus': %s" % e )
39+ tries += 1
40+ sleep (1 )
41+ if not status :
42+ raise Exception , "Could not get output from command: 'replSetGetStatus' after %i retries!" % self .retries , None
43+ return status
3444
3545 def find_desirable_secondary (self ):
3646 rs_status = self .get_rs_status ()
37- rs_name = rs_status ['set' ]
47+ rs_name = rs_status ['set' ]
3848 quorum_count = ceil (len (rs_status ['members' ]) / 2.0 )
3949 secondary = None
4050 primary = None
4151 for member in rs_status ['members' ]:
42- if member [ 'stateStr ' ] == 'PRIMARY' :
43- primary = {
44- 'host' : member ['name ' ],
45- 'optime' : member [ 'optimeDate' ]
46- }
47- elif member ['stateStr' ] == 'SECONDARY' :
48- if secondary is None or secondary [ 'optime' ] < member [ 'optimeDate' ]:
49- secondary = {
50- 'replSet' : rs_status [ 'set' ],
51- 'count' : 1 if secondary is None else secondary [ 'count' ] + 1 ,
52+ if 'health' in member and member [ 'health ' ] > 0 :
53+ logging . debug ( "Found %s: %s/%s with optime %s" % (
54+ member ['stateStr ' ],
55+ rs_name ,
56+ member [ 'name' ],
57+ str ( member ['optime' ][ 'ts' ])
58+ ))
59+
60+ if member [ 'stateStr' ] == 'PRIMARY' :
61+ primary = {
5262 'host' : member ['name' ],
5363 'optime' : member ['optimeDate' ]
5464 }
65+ elif member ['stateStr' ] == 'SECONDARY' :
66+ if secondary is None or secondary ['optime' ] < member ['optimeDate' ]:
67+ secondary = {
68+ 'replSet' : rs_status ['set' ],
69+ 'count' : 1 if secondary is None else secondary ['count' ] + 1 ,
70+ 'host' : member ['name' ],
71+ 'optime' : member ['optimeDate' ]
72+ }
5573
5674 if primary is None :
5775 logging .fatal ("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name )
5876 raise Exception , "Unable to locate a PRIMARY member for replset %s, giving up" % rs_name , None
5977
6078 if secondary is None or (secondary ['count' ] + 1 ) < quorum_count :
61- logging .fatal ("Not enough secondaries in replset %s to safely take backup!" % rs_name )
79+ logging .fatal ("Not enough secondaries in replset %s to take backup! Num replset members: %i, required quorum: %i" % (
80+ rs_name ,
81+ secondary ['count' ] + 1 ,
82+ quorum_count
83+ ))
6284 raise Exception , "Not enough secondaries in replset %s to safely take backup!" % rs_name , None
6385
6486 rep_lag = (mktime (primary ['optime' ].timetuple ()) - mktime (secondary ['optime' ].timetuple ()))
6587 if rep_lag > self .max_lag_secs :
6688 logging .fatal ("No secondary found in replset %s within %s lag time!" % (rs_name , self .max_lag_secs ))
6789 raise Exception , "No secondary found in replset %s within %s lag time!" % (rs_name , self .max_lag_secs ), None
6890
91+ logging .debug ("Choosing SECONDARY %s for replica set %s" % (secondary ['host' ], rs_name ))
6992 return secondary
7093
7194
@@ -78,7 +101,7 @@ def __init__(self, host, port, user, password, authdb, max_lag_secs):
78101 self .authdb = authdb
79102 self .max_lag_secs = max_lag_secs
80103
81- self .replset = None
104+ self .replset = None
82105
83106 try :
84107 self .sharding = ShardingHandler (self .host , self .port , self .user , self .password , self .authdb )
@@ -100,6 +123,6 @@ def find_desirable_secondaries(self):
100123 return shard_secondaries
101124
102125 def close (self ):
103- if self .replset :
104- self .replset .close ()
126+ if self .replset :
127+ self .replset .close ()
105128 self .sharding .close ()
0 commit comments