Skip to content

Commit 61a5639

Browse files
author
Tim Vaillancourt
committed
unbroke non-replset mode
1 parent edadd46 commit 61a5639

File tree

4 files changed

+74
-43
lines changed

4 files changed

+74
-43
lines changed

MongoBackup/Methods/Dumper.py

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,32 @@ def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, pa
3333
def summary(self):
3434
return self._summary
3535

36+
def wait(self):
37+
# wait for all threads to finish
38+
for thread in self.threads:
39+
thread.join()
40+
41+
# sleep for 3 sec to fix logging order
42+
sleep(3)
43+
44+
# get oplog summaries from the queue
45+
completed = 0
46+
while not self.response_queue.empty():
47+
backup = self.response_queue.get()
48+
host = backup['host']
49+
port = backup['port']
50+
if host not in self._summary:
51+
self._summary[host] = {}
52+
self._summary[host][port] = backup
53+
if backup['completed']:
54+
completed += 1
55+
56+
# check if all threads completed
57+
if completed == len(self.threads):
58+
logging.info("All mongodump backups completed")
59+
else:
60+
raise Exception, "Not all mongodump threads completed successfully!", None
61+
3662
def run(self):
3763
# backup a secondary from each shard:
3864
for shard in self.secondaries:
@@ -51,8 +77,8 @@ def run(self):
5177
)
5278
self.threads.append(thread)
5379

54-
# backup a single config server:
55-
if self.config_server and self.config_server['replset']:
80+
# backup a single replica-set config server alongside the shards, if exists:
81+
if self.config_server and self.config_server['replSet']:
5682
thread = Dump(
5783
self.response_queue,
5884
'config',
@@ -70,36 +96,33 @@ def run(self):
7096
if not len(self.threads) > 0:
7197
raise Exception, 'No backup threads started!', None
7298

73-
# start all threads
99+
# start all threads and wait
74100
logging.info(
75101
"Starting backups in threads using mongodump %s (inline gzip: %s)" % (self.version, str(self.dump_gzip)))
76102
for thread in self.threads:
77103
thread.start()
104+
self.wait()
78105

79-
# wait for all threads to finish
80-
for thread in self.threads:
81-
thread.join()
82-
83-
# sleep for 3 sec to fix logging order
84-
sleep(3)
85-
86-
# get oplog summaries from the queue
87-
completed = 0
88-
while not self.response_queue.empty():
89-
backup = self.response_queue.get()
90-
host = backup['host']
91-
port = backup['port']
92-
if host not in self._summary:
93-
self._summary[host] = {}
94-
self._summary[host][port] = backup
95-
if backup['completed']:
96-
completed += 1
97-
98-
# check if all threads completed
99-
if completed == len(self.threads):
100-
logging.info("All mongodump backups completed")
101-
else:
102-
raise Exception, "Not all mongodump threads completed successfully!", None
106+
# backup a single non-replset config server, if exists:
107+
if self.config_server and not self.config_server['replSet']:
108+
logging.debug("Using non-replset backup method for config server")
109+
thread = Dump(
110+
self.response_queue,
111+
'config',
112+
self.config_server['host'],
113+
self.user,
114+
self.password,
115+
self.authdb,
116+
self.base_dir,
117+
self.binary,
118+
self.dump_gzip,
119+
self.verbose
120+
)
121+
self.threads = [thread]
122+
self.threads[0].start()
123+
if not len(self.threads) == 1:
124+
raise Exception, 'No backup threads started!', None
125+
self.wait()
103126

104127
return self._summary
105128

MongoBackup/Replset.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ def __init__(self, db, user=None, password=None, authdb='admin', max_lag_secs=5)
1111
self.authdb = authdb
1212
self.max_lag_secs = max_lag_secs
1313

14+
self.rs_config = None
1415
self.rs_status = None
1516
self.primary = None
1617
self.secondary = None
@@ -36,21 +37,23 @@ def get_rs_status(self, force=False, quiet=False):
3637
except Exception, e:
3738
raise Exception, "Error getting replica set status! Error: %s" % e, None
3839

39-
def get_rs_config(self, quiet=False):
40-
try:
41-
if self.db.server_version() >= tuple("3.0.0".split(".")):
42-
output = self.db.admin_command('replSetGetConfig', quiet)
43-
return output['config']
44-
else:
45-
return self.connection['local'].system.replset.find_one()
46-
except Exception, e:
47-
raise Exception, "Error getting replica set config! Error: %s" % e, None
40+
def get_rs_config(self, force=False, quiet=False):
41+
if force or not self.rs_config:
42+
try:
43+
if self.db.server_version() >= tuple("3.0.0".split(".")):
44+
output = self.db.admin_command('replSetGetConfig', quiet)
45+
self.rs_config = output['config']
46+
else:
47+
self.rs_config = self.connection['local'].system.replset.find_one()
48+
except Exception, e:
49+
raise Exception, "Error getting replica set config! Error: %s" % e, None
50+
return self.rs_config
4851

4952
def rs_name(self):
5053
return self.get_rs_status['set']
5154

52-
def find_primary(self, force=False):
53-
rs_status = self.get_rs_status(force)
55+
def find_primary(self, force=False, quiet=False):
56+
rs_status = self.get_rs_status(force, quiet)
5457
rs_name = rs_status['set']
5558
for member in rs_status['members']:
5659
if member['stateStr'] == 'PRIMARY' and member['health'] > 0:
@@ -71,9 +74,9 @@ def find_primary(self, force=False):
7174
raise Exception, "Unable to locate a PRIMARY member for replset %s, giving up" % rs_name, None
7275
return self.primary
7376

74-
def find_secondary(self):
75-
rs_status = self.get_rs_status()
76-
rs_config = self.get_rs_config()
77+
def find_secondary(self, force=False, quiet=False):
78+
rs_status = self.get_rs_status(force, quiet)
79+
rs_config = self.get_rs_config(force, quiet)
7780
rs_name = rs_status['set']
7881
quorum_count = ceil(len(rs_status['members']) / 2.0)
7982

@@ -125,6 +128,10 @@ def find_secondary(self):
125128
secondary_count,
126129
quorum_count
127130
))
131+
132+
import pprint
133+
pprint.pprint(rs_status)
134+
128135
raise Exception, "Not enough secondaries in replset %s to safely take backup!" % rs_name, None
129136

130137
logging.info("Choosing SECONDARY %s for replica set %s (score: %i)" % (self.secondary['host'], rs_name, self.secondary['score']))

MongoBackup/ReplsetSharded.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from Common import DB
4+
from Replset import Replset
45
from Sharding import Sharding
56

67

@@ -52,7 +53,7 @@ def get_replsets(self, force=False):
5253
rs_db = self.get_replset_connection(host, port)
5354
self.replsets[shard_name] = Replset(rs_db, self.user, self.password, self.authdb, self.max_lag_secs)
5455
except Exception, e:
55-
logging.fatal("Could not get Replset class object for replset %s! Error: %s" % (rs_name, e))
56+
logging.fatal("Could not get Replset class object for replset %s! Error: %s" % (shard_name, e))
5657
raise e
5758
return self.replsets
5859

MongoBackup/Sharding.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def get_configserver(self, force=False):
119119
db = DB(config_host, config_port, self.user, self.password, self.authdb)
120120
rs = Replset(db, self.user, self.password, self.authdb)
121121
try:
122-
self.config_server = rs.find_secondary()
122+
self.config_server = rs.find_secondary(True, True)
123123
except Exception:
124124
self.config_server = {'host': config_list[0], 'replSet': None}
125125
except Exception, e:

0 commit comments

Comments
 (0)