11import logging
22
33from math import ceil
4- from time import mktime
4+ from time import mktime , sleep
55
66from DB import DB
77from ShardingHandler import ShardingHandler
88
99
1010class ReplsetHandler :
11- def __init__ (self , host , port , user , password , authdb , max_lag_secs ):
11+ def __init__ (self , host , port , user , password , authdb , max_lag_secs , retries = 5 ):
1212 self .host = host
1313 self .port = port
1414 self .user = user
1515 self .password = password
1616 self .authdb = authdb
1717 self .max_lag_secs = max_lag_secs
18+ self .retries = retries
1819
1920 try :
2021 self .connection = DB (self .host , self .port , self .user , self .password , self .authdb ).connection ()
@@ -27,31 +28,47 @@ def close(self):
2728
2829 def get_rs_status (self ):
2930 try :
30- return self .connection ['admin' ].command ("replSetGetStatus" )
31+ tries = 0
32+ status = None
33+ while not status and tries < self .retries :
34+ status = self .connection ['admin' ].command ("replSetGetStatus" )
35+ tries = tries + 1
36+ sleep (1 )
37+ if not status :
38+ raise Exception , "Could not get output from command: 'replSetGetStatus' after %i retries!" % self .retries , None
39+ return status
3140 except Exception , e :
3241 logging .fatal ("Failed to execute command! Error: %s" % e )
3342 raise e
3443
3544 def find_desirable_secondary (self ):
3645 rs_status = self .get_rs_status ()
37- rs_name = rs_status ['set' ]
46+ rs_name = rs_status ['set' ]
3847 quorum_count = ceil (len (rs_status ['members' ]) / 2.0 )
3948 secondary = None
4049 primary = None
4150 for member in rs_status ['members' ]:
42- if member [ 'stateStr ' ] == 'PRIMARY' :
43- primary = {
44- 'host' : member ['name ' ],
45- 'optime' : member [ 'optimeDate' ]
46- }
47- elif member ['stateStr' ] == 'SECONDARY' :
48- if secondary is None or secondary [ 'optime' ] < member [ 'optimeDate' ]:
49- secondary = {
50- 'replSet' : rs_status [ 'set' ],
51- 'count' : 1 if secondary is None else secondary [ 'count' ] + 1 ,
51+ if 'health' in member and member [ 'health ' ] > 0 :
52+ logging . info ( "Found %s: %s/%s with optime %s" % (
53+ member ['stateStr ' ],
54+ rs_name ,
55+ member [ 'name' ],
56+ str ( member ['optime' ])
57+ ))
58+
59+ if member [ 'stateStr' ] == 'PRIMARY' :
60+ primary = {
5261 'host' : member ['name' ],
5362 'optime' : member ['optimeDate' ]
5463 }
64+ elif member ['stateStr' ] == 'SECONDARY' :
65+ if secondary is None or secondary ['optime' ] < member ['optimeDate' ]:
66+ secondary = {
67+ 'replSet' : rs_status ['set' ],
68+ 'count' : 1 if secondary is None else secondary ['count' ] + 1 ,
69+ 'host' : member ['name' ],
70+ 'optime' : member ['optimeDate' ]
71+ }
5572
5673 if primary is None :
5774 logging .fatal ("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name )
@@ -66,6 +83,8 @@ def find_desirable_secondary(self):
6683 logging .fatal ("No secondary found in replset %s within %s lag time!" % (rs_name , self .max_lag_secs ))
6784 raise Exception , "No secondary found in replset %s within %s lag time!" % (rs_name , self .max_lag_secs ), None
6885
86+ logging .info ("Choosing SECONDARY %s for replica set %s" % (secondary ['host' ], rs_name ))
87+
6988 return secondary
7089
7190
@@ -78,7 +97,7 @@ def __init__(self, host, port, user, password, authdb, max_lag_secs):
7897 self .authdb = authdb
7998 self .max_lag_secs = max_lag_secs
8099
81- self .replset = None
100+ self .replset = None
82101
83102 try :
84103 self .sharding = ShardingHandler (self .host , self .port , self .user , self .password , self .authdb )
@@ -100,6 +119,6 @@ def find_desirable_secondaries(self):
100119 return shard_secondaries
101120
102121 def close (self ):
103- if self .replset :
104- self .replset .close ()
122+ if self .replset :
123+ self .replset .close ()
105124 self .sharding .close ()
0 commit comments