Skip to content

Commit ab68890

Browse files
authored
Merge pull request #23 from timvaillancourt/hidden_mode
Score-based SECONDARY decision mode for ReplsetHandler (re-attempt) + Bump to 0.2.0 version
2 parents 09ac126 + f9a3860 commit ab68890

File tree

3 files changed

+105
-52
lines changed

3 files changed

+105
-52
lines changed

MongoBackup/DB.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
import logging
22

33
from pymongo import MongoClient
4+
from time import sleep
45

56

67
class DB:
7-
def __init__(self, host='localhost', port=27017, username=None, password=None, authdb="admin", conn_timeout=5000):
8-
self.host = host
9-
self.port = port
10-
self.username = username
11-
self.password = password
12-
self.authdb = authdb
8+
def __init__(self, host='localhost', port=27017, username=None, password=None, authdb="admin", conn_timeout=5000, retries=5):
9+
self.host = host
10+
self.port = port
11+
self.username = username
12+
self.password = password
13+
self.authdb = authdb
1314
self.conn_timeout = conn_timeout
15+
self.retries = retries
16+
1417
self._conn = None
1518
self.connect()
1619
self.auth_if_required()
@@ -33,14 +36,38 @@ def connect(self):
3336
def auth_if_required(self):
3437
if self.username is not None and self.password is not None:
3538
try:
36-
logging.debug("Authenticating MongoDB connection")
39+
logging.debug("Authenticating connection with username: %s" % self.username)
3740
self._conn[self.authdb].authenticate(self.username, self.password)
3841
except Exception, e:
3942
logging.fatal("Unable to authenticate with host %s:%s: %s" % (self.host, self.port, e))
4043
raise e
4144
else:
4245
pass
4346

47+
def admin_command(self, admin_command):
48+
tries = 0
49+
status = None
50+
while not status and tries < self.retries:
51+
try:
52+
status = self._conn['admin'].command(admin_command)
53+
if not status:
54+
raise e
55+
except Exception, e:
56+
logging.error("Error running admin command '%s': %s" % (admin_command, e))
57+
tries += 1
58+
sleep(1)
59+
if not status:
60+
raise Exception, "Could not get output from command: '%s' after %i retries!" % (admin_command, self.retries), None
61+
return status
62+
63+
def server_version(self):
64+
status = self.admin_command('serverStatus')
65+
if 'version' in status:
66+
version = status['version'].split('-')[0]
67+
return tuple(version.split('.'))
68+
else:
69+
raise Exception, "Could not get server version using admin command 'serverStatus'! Error: %s" % e, None
70+
4471
def connection(self):
4572
return self._conn
4673

MongoBackup/ReplsetHandler.py

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
import logging
22

33
from math import ceil
4-
from time import mktime, sleep
4+
from time import mktime
55

66
from DB import DB
77
from ShardingHandler import ShardingHandler
88

99

1010
class ReplsetHandler:
11-
def __init__(self, host, port, user, password, authdb, max_lag_secs, retries=5):
11+
def __init__(self, host, port, user, password, authdb, max_lag_secs):
1212
self.host = host
1313
self.port = port
1414
self.user = user
1515
self.password = password
1616
self.authdb = authdb
1717
self.max_lag_secs = max_lag_secs
18-
self.retries = retries
1918

2019
try:
21-
self.connection = DB(self.host, self.port, self.user, self.password, self.authdb).connection()
20+
self.db = DB(self.host, self.port, self.user, self.password, self.authdb)
21+
self.connection = self.db.connection()
2222
except Exception, e:
2323
logging.fatal("Could not get DB connection! Error: %s" % e)
2424
raise e
@@ -27,54 +27,85 @@ def close(self):
2727
return self.connection.close()
2828

2929
def get_rs_status(self):
30-
tries = 0
31-
status = None
32-
while not status and tries < self.retries:
33-
try:
34-
status = self.connection['admin'].command("replSetGetStatus")
35-
if not status:
36-
raise e
37-
except Exception, e:
38-
logging.error("Error running command 'replSetGetStatus': %s" % e)
39-
tries += 1
40-
sleep(1)
41-
if not status:
42-
raise Exception, "Could not get output from command: 'replSetGetStatus' after %i retries!" % self.retries, None
43-
return status
30+
try:
31+
return self.db.admin_command('replSetGetStatus')
32+
except Exception, e:
33+
raise Exception, "Error getting replica set status! Error: %s" % e, None
34+
35+
def get_rs_config(self):
36+
try:
37+
if self.db.server_version() > tuple("2.4.0".split(".")):
38+
output = self.db.admin_command('replSetGetConfig')
39+
return output['config']
40+
else:
41+
return self.connection['local'].system.replset.find_one()
42+
except Exception, e:
43+
raise Exception, "Error getting replica set config! Error: %s" % e, None
4444

4545
def find_desirable_secondary(self):
4646
rs_status = self.get_rs_status()
47+
rs_config = self.get_rs_config()
4748
rs_name = rs_status['set']
4849
quorum_count = ceil(len(rs_status['members']) / 2.0)
49-
secondary = None
50-
primary = None
50+
51+
primary = None
5152
for member in rs_status['members']:
52-
if 'health' in member and member['health'] > 0:
53-
logging.debug("Found %s: %s/%s with optime %s" % (
54-
member['stateStr'],
53+
if member['stateStr'] == 'PRIMARY' and member['health'] > 0:
54+
primary = {
55+
'host': member['name'],
56+
'optime': member['optimeDate']
57+
}
58+
optime = member['optime']
59+
if 'ts' in member['optime']:
60+
optime = member['optime']['ts']
61+
logging.debug("Found PRIMARY: %s/%s with optime %s" % (
5562
rs_name,
5663
member['name'],
57-
str(member['optime']['ts'])
64+
str(optime)
5865
))
66+
if primary is None:
67+
logging.fatal("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name)
68+
raise Exception, "Unable to locate a PRIMARY member for replset %s, giving up" % rs_name, None
5969

60-
if member['stateStr'] == 'PRIMARY':
61-
primary = {
62-
'host': member['name'],
63-
'optime': member['optimeDate']
64-
}
65-
elif member['stateStr'] == 'SECONDARY':
66-
if secondary is None or secondary['optime'] < member['optimeDate']:
70+
secondary = None
71+
for member in rs_status['members']:
72+
if member['stateStr'] == 'SECONDARY' and member['health'] > 0:
73+
score = self.max_lag_secs * 10
74+
score_scale = 100 / score
75+
log_data = {}
76+
77+
hidden_weight = 0.20
78+
for member_config in rs_config['members']:
79+
if member_config['host'] == member['name']:
80+
if 'hidden' in member_config and member_config['hidden'] == True:
81+
score += (score * hidden_weight)
82+
log_data['hidden'] = True
83+
if 'priority' in member_config:
84+
log_data['priority'] = int(member_config['priority'])
85+
if member_config['priority'] > 0:
86+
score = score - member_config['priority']
87+
break
88+
89+
rep_lag = (mktime(primary['optime'].timetuple()) - mktime(member['optimeDate'].timetuple()))
90+
score = ceil((score - rep_lag) * score_scale)
91+
if rep_lag < self.max_lag_secs:
92+
if secondary is None or score > secondary['score']:
6793
secondary = {
68-
'replSet': rs_status['set'],
94+
'replSet': rs_name,
6995
'count': 1 if secondary is None else secondary['count'] + 1,
7096
'host': member['name'],
71-
'optime': member['optimeDate']
97+
'optime': member['optimeDate'],
98+
'score': score
7299
}
73-
74-
if primary is None:
75-
logging.fatal("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name)
76-
raise Exception, "Unable to locate a PRIMARY member for replset %s, giving up" % rs_name, None
77-
100+
log_msg = "Found SECONDARY %s/%s" % (rs_name, member['name'])
101+
else:
102+
log_msg = "Found SECONDARY %s/%s with too-high replication lag! Skipping" % (rs_name, member['name'])
103+
104+
log_data['optime'] = member['optime']
105+
if 'ts' in member['optime']:
106+
log_data['optime'] = member['optime']['ts']
107+
log_data['score'] = int(score)
108+
logging.debug("%s: %s" % (log_msg, str(log_data)))
78109
if secondary is None or (secondary['count'] + 1) < quorum_count:
79110
logging.fatal("Not enough secondaries in replset %s to take backup! Num replset members: %i, required quorum: %i" % (
80111
rs_name,
@@ -83,12 +114,7 @@ def find_desirable_secondary(self):
83114
))
84115
raise Exception, "Not enough secondaries in replset %s to safely take backup!" % rs_name, None
85116

86-
rep_lag = (mktime(primary['optime'].timetuple()) - mktime(secondary['optime'].timetuple()))
87-
if rep_lag > self.max_lag_secs:
88-
logging.fatal("No secondary found in replset %s within %s lag time!" % (rs_name, self.max_lag_secs))
89-
raise Exception, "No secondary found in replset %s within %s lag time!" % (rs_name, self.max_lag_secs), None
90-
91-
logging.debug("Choosing SECONDARY %s for replica set %s" % (secondary['host'], rs_name))
117+
logging.debug("Choosing SECONDARY %s for replica set %s (score: %i)" % (secondary['host'], rs_name, secondary['score']))
92118
return secondary
93119

94120

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.0.1beta
1+
0.2.0

0 commit comments

Comments
 (0)