Skip to content

Commit 5b7166d

Browse files
authored
Merge pull request #34 from timvaillancourt/config_replset
Replset-based config server feature, version 0.3.0
2 parents b1efcbd + a6dde33 commit 5b7166d

File tree

7 files changed

+230
-163
lines changed

7 files changed

+230
-163
lines changed

MongoBackup/Backup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
from Methods import Dumper
1414
from Notify import NotifyNSCA
1515
from Oplog import OplogTailer, OplogResolver
16-
from Replset import Replset, ReplsetSharded
16+
from Replset import Replset
17+
from ReplsetSharded import ReplsetSharded
1718
from Sharding import Sharding
1819
from Upload import UploadS3
1920

@@ -281,7 +282,7 @@ def run(self):
281282
self.user,
282283
self.password,
283284
self.authdb,
284-
self.sharding.get_configserver(),
285+
self.sharding.get_config_server(),
285286
self.verbose
286287
)
287288
self.mongodumper_summary = self.mongodumper.run()

MongoBackup/Common/DB.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def auth_if_required(self):
4444
else:
4545
pass
4646

47-
def admin_command(self, admin_command):
47+
def admin_command(self, admin_command, quiet=False):
4848
tries = 0
4949
status = None
5050
while not status and tries < self.retries:
@@ -53,7 +53,8 @@ def admin_command(self, admin_command):
5353
if not status:
5454
raise e
5555
except Exception, e:
56-
logging.error("Error running admin command '%s': %s" % (admin_command, e))
56+
if not quiet:
57+
logging.error("Error running admin command '%s': %s" % (admin_command, e))
5758
tries += 1
5859
sleep(1)
5960
if not status:

MongoBackup/Methods/Dumper.py

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@ def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, pa
2020
self.config_server = config_server
2121
self.verbose = verbose
2222

23+
self.config_replset = False
2324
self.response_queue = Queue()
2425
self.threads = []
2526
self._summary = {}
2627

28+
if not isinstance(self.config_server, dict) and self.config_server in self.secondaries:
29+
self.config_replset = True
30+
2731
if not isinstance(self.secondaries, dict):
2832
raise Exception, "Field 'secondaries' must be a dictionary of secondary info (by shard)!", None
2933

@@ -33,6 +37,32 @@ def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, pa
3337
def summary(self):
3438
return self._summary
3539

40+
def wait(self):
41+
# wait for all threads to finish
42+
for thread in self.threads:
43+
thread.join()
44+
45+
# sleep for 3 sec to fix logging order
46+
sleep(3)
47+
48+
# get oplog summaries from the queue
49+
completed = 0
50+
while not self.response_queue.empty():
51+
backup = self.response_queue.get()
52+
host = backup['host']
53+
port = backup['port']
54+
if host not in self._summary:
55+
self._summary[host] = {}
56+
self._summary[host][port] = backup
57+
if backup['completed']:
58+
completed += 1
59+
60+
# check if all threads completed
61+
if completed == len(self.threads):
62+
logging.info("All mongodump backups completed")
63+
else:
64+
raise Exception, "Not all mongodump threads completed successfully!", None
65+
3666
def run(self):
3767
# backup a secondary from each shard:
3868
for shard in self.secondaries:
@@ -51,55 +81,33 @@ def run(self):
5181
)
5282
self.threads.append(thread)
5383

54-
# backup a single config server:
55-
if self.config_server:
56-
thread = Dump(
57-
self.response_queue,
58-
'config',
59-
self.config_server,
60-
self.user,
61-
self.password,
62-
self.authdb,
63-
self.base_dir,
64-
self.binary,
65-
self.dump_gzip,
66-
self.verbose
67-
)
68-
self.threads.append(thread)
69-
7084
if not len(self.threads) > 0:
7185
raise Exception, 'No backup threads started!', None
7286

73-
# start all threads
87+
# start all threads and wait
7488
logging.info(
7589
"Starting backups in threads using mongodump %s (inline gzip: %s)" % (self.version, str(self.dump_gzip)))
7690
for thread in self.threads:
7791
thread.start()
92+
self.wait()
7893

79-
# wait for all threads to finish
80-
for thread in self.threads:
81-
thread.join()
82-
83-
# sleep for 3 sec to fix logging order
84-
sleep(3)
85-
86-
# get oplog summaries from the queue
87-
completed = 0
88-
while not self.response_queue.empty():
89-
backup = self.response_queue.get()
90-
host = backup['host']
91-
port = backup['port']
92-
if host not in self._summary:
93-
self._summary[host] = {}
94-
self._summary[host][port] = backup
95-
if backup['completed']:
96-
completed += 1
97-
98-
# check if all threads completed
99-
if completed == len(self.threads):
100-
logging.info("All mongodump backups completed")
101-
else:
102-
raise Exception, "Not all mongodump threads completed successfully!", None
94+
# backup a single non-replset config server, if exists:
95+
if not self.config_replset and isinstance(self.config_server, dict):
96+
logging.info("Using non-replset backup method for config server mongodump")
97+
self.threads = [Dump(
98+
self.response_queue,
99+
'configsvr',
100+
self.config_server['host'],
101+
self.user,
102+
self.password,
103+
self.authdb,
104+
self.base_dir,
105+
self.binary,
106+
self.dump_gzip,
107+
self.verbose
108+
)]
109+
self.threads[0].start()
110+
self.wait()
103111

104112
return self._summary
105113

MongoBackup/Replset.py

Lines changed: 44 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
import logging
22

33
from math import ceil
4-
from time import mktime
54

65
from Common import DB
7-
from Sharding import Sharding
86

97

108
class Replset:
11-
def __init__(self, db, user, password, authdb, max_lag_secs):
9+
def __init__(self, db, user=None, password=None, authdb='admin', max_lag_secs=5):
1210
self.db = db
1311
self.user = user
1412
self.password = password
1513
self.authdb = authdb
1614
self.max_lag_secs = max_lag_secs
1715

16+
self.rs_config = None
1817
self.rs_status = None
1918
self.primary = None
2019
self.secondary = None
@@ -32,49 +31,54 @@ def __init__(self, db, user, password, authdb, max_lag_secs):
3231
def close(self):
3332
pass
3433

35-
def get_rs_status(self, force=False):
34+
def get_rs_status(self, force=False, quiet=False):
3635
try:
3736
if force or not self.rs_status:
38-
self.rs_status = self.db.admin_command('replSetGetStatus')
37+
self.rs_status = self.db.admin_command('replSetGetStatus', quiet)
3938
return self.rs_status
4039
except Exception, e:
4140
raise Exception, "Error getting replica set status! Error: %s" % e, None
4241

43-
def get_rs_config(self):
44-
try:
45-
if self.db.server_version() >= tuple("3.0.0".split(".")):
46-
output = self.db.admin_command('replSetGetConfig')
47-
return output['config']
48-
else:
49-
return self.connection['local'].system.replset.find_one()
50-
except Exception, e:
51-
raise Exception, "Error getting replica set config! Error: %s" % e, None
42+
def get_rs_config(self, force=False, quiet=False):
43+
if force or not self.rs_config:
44+
try:
45+
if self.db.server_version() >= tuple("3.0.0".split(".")):
46+
output = self.db.admin_command('replSetGetConfig', quiet)
47+
self.rs_config = output['config']
48+
else:
49+
self.rs_config = self.connection['local'].system.replset.find_one()
50+
except Exception, e:
51+
raise Exception, "Error getting replica set config! Error: %s" % e, None
52+
return self.rs_config
53+
54+
def get_rs_name(self):
55+
return self.get_rs_status()['set']
5256

53-
def find_primary(self):
54-
rs_status = self.get_rs_status()
57+
def find_primary(self, force=False, quiet=False):
58+
rs_status = self.get_rs_status(force, quiet)
5559
rs_name = rs_status['set']
5660
for member in rs_status['members']:
5761
if member['stateStr'] == 'PRIMARY' and member['health'] > 0:
62+
optime_ts = member['optime']
63+
if isinstance(member['optime'], dict) and 'ts' in member['optime']:
64+
optime_ts = member['optime']['ts']
5865
self.primary = {
5966
'host': member['name'],
60-
'optime': member['optimeDate']
67+
'optime': optime_ts
6168
}
62-
optime = member['optime']
63-
if isinstance(member['optime'], dict) and 'ts' in member['optime']:
64-
optime = member['optime']['ts']
6569
logging.info("Found PRIMARY: %s/%s with optime %s" % (
6670
rs_name,
6771
member['name'],
68-
str(optime)
72+
str(optime_ts)
6973
))
7074
if self.primary is None:
71-
logging.fatal("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name)
75+
logging.error("Unable to locate a PRIMARY member for replset %s, giving up" % rs_name)
7276
raise Exception, "Unable to locate a PRIMARY member for replset %s, giving up" % rs_name, None
7377
return self.primary
7478

75-
def find_secondary(self):
76-
rs_status = self.get_rs_status()
77-
rs_config = self.get_rs_config()
79+
def find_secondary(self, force=False, quiet=False):
80+
rs_status = self.get_rs_status(force, quiet)
81+
rs_config = self.get_rs_config(force, quiet)
7882
rs_name = rs_status['set']
7983
quorum_count = ceil(len(rs_status['members']) / 2.0)
8084

@@ -96,30 +100,37 @@ def find_secondary(self):
96100
score = score - member_config['priority']
97101
break
98102

99-
rep_lag = (mktime(self.primary_optime().timetuple()) - mktime(member['optimeDate'].timetuple()))
103+
optime_ts = member['optime']
104+
if isinstance(member['optime'], dict) and 'ts' in member['optime']:
105+
optime_ts = member['optime']['ts']
106+
107+
rep_lag = (self.primary_optime().time - optime_ts.time)
100108
score = ceil((score - rep_lag) * score_scale)
101109
if rep_lag < self.max_lag_secs:
102110
if self.secondary is None or score > self.secondary['score']:
103111
self.secondary = {
104112
'replSet': rs_name,
105113
'count': 1 if self.secondary is None else self.secondary['count'] + 1,
106114
'host': member['name'],
107-
'optime': member['optimeDate'],
115+
'optime': optime_ts,
108116
'score': score
109117
}
110118
log_msg = "Found SECONDARY %s/%s" % (rs_name, member['name'])
111119
else:
112120
log_msg = "Found SECONDARY %s/%s with too-high replication lag! Skipping" % (rs_name, member['name'])
113121

114-
log_data['optime'] = member['optime']
115-
if isinstance(member['optime'], dict) and 'ts' in member['optime']:
116-
log_data['optime'] = member['optime']['ts']
122+
if 'configsvr' in rs_status and rs_status['configsvr']:
123+
log_data['configsvr'] = True
124+
125+
log_data['lag'] = rep_lag
126+
log_data['optime'] = optime_ts
117127
log_data['score'] = int(score)
118128
logging.info("%s: %s" % (log_msg, str(log_data)))
119129
if self.secondary is None or (self.secondary['count'] + 1) < quorum_count:
120-
logging.fatal("Not enough secondaries in replset %s to take backup! Num replset members: %i, required quorum: %i" % (
130+
secondary_count = self.secondary['count'] + 1 if self.secondary else 0
131+
logging.error("Not enough secondaries in replset %s to take backup! Num replset members: %i, required quorum: %i" % (
121132
rs_name,
122-
self.secondary['count'] + 1,
133+
secondary_count,
123134
quorum_count
124135
))
125136
raise Exception, "Not enough secondaries in replset %s to safely take backup!" % rs_name, None
@@ -128,74 +139,6 @@ def find_secondary(self):
128139
return self.secondary
129140

130141
def primary_optime(self):
131-
rs_status = self.get_rs_status()
132-
rs_primary = self.find_primary()
142+
rs_primary = self.find_primary(True)
133143
if 'optime' in rs_primary:
134144
return rs_primary['optime']
135-
136-
137-
class ReplsetSharded:
138-
def __init__(self, sharding, db, user, password, authdb, max_lag_secs):
139-
self.sharding = sharding
140-
self.db = db
141-
self.user = user
142-
self.password = password
143-
self.authdb = authdb
144-
self.max_lag_secs = max_lag_secs
145-
146-
self.replsets = {}
147-
self.replset_conns = {}
148-
149-
# Check Sharding class:
150-
if not isinstance(self.sharding, Sharding):
151-
raise Exception, "'sharding' field is not an instance of class: 'Sharding'!", None
152-
153-
# Get a DB connection
154-
try:
155-
if isinstance(self.db, DB):
156-
self.connection = self.db.connection()
157-
if not self.connection.is_mongos:
158-
raise Exception, 'MongoDB connection is not to a mongos!', None
159-
else:
160-
raise Exception, "'db' field is not an instance of class: 'DB'!", None
161-
except Exception, e:
162-
logging.fatal("Could not get DB connection! Error: %s" % e)
163-
raise e
164-
165-
def get_replset_connection(self, host, port, force=False):
166-
conn_name = "%s-%i" % (host, port)
167-
if force or not conn_name in self.replset_conns:
168-
try:
169-
self.replset_conns[conn_name] = DB(host, port, self.user, self.password, self.authdb)
170-
except Exception, e:
171-
logging.fatal("Could not get DB connection to %s:%i! Error: %s" % (host, port, e))
172-
raise e
173-
return self.replset_conns[conn_name]
174-
175-
def get_replsets(self, force=False):
176-
for shard in self.sharding.shards():
177-
shard_name, members = shard['host'].split('/')
178-
host, port = members.split(',')[0].split(":")
179-
port = int(port)
180-
if force or not shard_name in self.replsets:
181-
try:
182-
rs_db = self.get_replset_connection(host, port)
183-
self.replsets[shard_name] = Replset(rs_db, self.user, self.password, self.authdb, self.max_lag_secs)
184-
except Exception, e:
185-
logging.fatal("Could not get Replset class object for replset %s! Error: %s" % (rs_name, e))
186-
raise e
187-
return self.replsets
188-
189-
def find_secondaries(self):
190-
shard_secondaries = {}
191-
for rs_name in self.get_replsets():
192-
replset = self.replsets[rs_name]
193-
secondary = replset.find_secondary()
194-
shard_secondaries[rs_name] = secondary
195-
return shard_secondaries
196-
197-
def close(self):
198-
for rs_name in self.replsets:
199-
self.replsets[rs_name].close()
200-
for conn_name in self.replset_conns:
201-
self.replset_conns[conn_name].close()

0 commit comments

Comments
 (0)