Skip to content

Commit b1efcbd

Browse files
authored
Merge pull request #33 from timvaillancourt/replsethandler_to_backup_py
Use consistent replset info throughout backup + class+method renaming + reuse db connection in MainThread
2 parents 669c7db + dbffeb2 commit b1efcbd

File tree

13 files changed

+412
-367
lines changed

13 files changed

+412
-367
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ VERSION=$(shell cat VERSION)
77

88
all: bin/mongodb-consistent-backup
99

10-
bin/mongodb-consistent-backup: setup.py requirements.txt VERSION scripts/build.sh MongoBackup/*.py MongoBackup/Common/*.py MongoBackup/Notify/*.py MongoBackup/Oplog/*.py MongoBackup/Upload/*.py
10+
bin/mongodb-consistent-backup: setup.py requirements.txt VERSION scripts/build.sh MongoBackup/*.py MongoBackup/Common/*.py MongoBackup/Methods/*.py MongoBackup/Notify/*.py MongoBackup/Oplog/*.py MongoBackup/Upload/*.py
1111
PYTHON_BIN=$(PYTHON_BIN) VIRTUALENV_BIN=$(VIRTUALENV_BIN) bash scripts/build.sh
1212

1313
install: bin/mongodb-consistent-backup

MongoBackup/Backup.py

Lines changed: 66 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
from signal import signal, SIGINT, SIGTERM
99
from time import time
1010

11-
from Common import DB, Lock
12-
from ShardingHandler import ShardingHandler
13-
from Mongodumper import Mongodumper
14-
from Oplog import OplogTailer, OplogResolver
1511
from Archiver import Archiver
12+
from Common import DB, Lock
13+
from Methods import Dumper
1614
from Notify import NotifyNSCA
15+
from Oplog import OplogTailer, OplogResolver
16+
from Replset import Replset, ReplsetSharded
17+
from Sharding import Sharding
1718
from Upload import UploadS3
1819

1920

@@ -34,7 +35,7 @@ def __init__(self, options):
3435
self.backup_location = None
3536
self.dump_gzip = False
3637
self.balancer_wait_secs = 300
37-
self.balancer_sleep = 10
38+
self.balancer_sleep = 5
3839
self.archiver_threads = 1
3940
self.resolver_threads = 1
4041
self.notify_nsca = None
@@ -58,6 +59,7 @@ def __init__(self, options):
5859
self.upload_s3_chunk_size_mb = None
5960
self.archiver = None
6061
self.sharding = None
62+
self.replset = None
6163
self.mongodumper = None
6264
self.oplogtailer = None
6365
self.oplog_resolver = None
@@ -69,6 +71,7 @@ def __init__(self, options):
6971
self.start_time = time()
7072
self.oplog_threads = []
7173
self.oplog_summary = {}
74+
self.secondaries = {}
7275
self.mongodumper_summary = {}
7376

7477
# Setup options are properies and connection to node
@@ -108,9 +111,9 @@ def __init__(self, options):
108111

109112
# Get a DB connection
110113
try:
111-
connection = DB(self.host, self.port, self.user, self.password, self.authdb).connection()
112-
self.is_mongos = connection.is_mongos
113-
connection.close()
114+
self.db = DB(self.host, self.port, self.user, self.password, self.authdb)
115+
self.connection = self.db.connection()
116+
self.is_sharded = self.connection.is_mongos
114117
except Exception, e:
115118
raise e
116119

@@ -140,7 +143,7 @@ def cleanup_and_exit(self, code, frame):
140143
if current_process().name == "MainProcess":
141144
logging.info("Starting cleanup and exit procedure! Killing running threads")
142145

143-
submodules = ['sharding', 'mongodumper', 'oplogtailer', 'archiver', 'uploader_s3']
146+
submodules = ['replset', 'sharding', 'mongodumper', 'oplogtailer', 'archiver', 'uploader_s3']
144147
for submodule_name in submodules:
145148
submodule = getattr(self, submodule_name)
146149
if submodule:
@@ -152,6 +155,9 @@ def cleanup_and_exit(self, code, frame):
152155
self.backup_name
153156
))
154157

158+
if self.db:
159+
self.db.close()
160+
155161
if self._lock:
156162
self._lock.release()
157163

@@ -172,22 +178,37 @@ def run(self):
172178
logging.fatal("Could not acquire lock! Is another %s process running? Exiting" % self.program_name)
173179
sys.exit(1)
174180

175-
if not self.is_mongos:
181+
if not self.is_sharded:
176182
logging.info("Running backup of %s:%s in replset mode" % (self.host, self.port))
177183

178184
self.archiver_threads = 1
179185

186+
# get shard secondary
180187
try:
181-
self.mongodumper = Mongodumper(
182-
self.host,
183-
self.port,
188+
self.replset = Replset(
189+
self.db,
184190
self.user,
185191
self.password,
186192
self.authdb,
193+
self.max_repl_lag_secs
194+
)
195+
secondary = self.replset.find_secondary()
196+
replset_name = secondary['replSet']
197+
198+
self.secondaries[replset_name] = secondary
199+
self.replset.close()
200+
except Exception, e:
201+
self.exception("Problem getting shard secondaries! Error: %s" % e)
202+
203+
try:
204+
self.mongodumper = Dumper(
205+
self.secondaries,
187206
self.backup_root_directory,
188207
self.backup_binary,
189208
self.dump_gzip,
190-
self.max_repl_lag_secs,
209+
self.user,
210+
self.password,
211+
self.authdb,
191212
None,
192213
self.verbose
193214
)
@@ -200,32 +221,48 @@ def run(self):
200221

201222
# connect to balancer and stop it
202223
try:
203-
self.sharding = ShardingHandler(
204-
self.host,
205-
self.port,
224+
self.sharding = Sharding(
225+
self.db,
206226
self.user,
207227
self.password,
208228
self.authdb,
209229
self.balancer_wait_secs,
210230
self.balancer_sleep
211231
)
212232
self.sharding.get_start_state()
233+
except Exception, e:
234+
self.exception("Problem connecting to the balancer! Error: %s" % e)
235+
236+
# get shard secondaries
237+
try:
238+
self.replset = ReplsetSharded(
239+
self.sharding,
240+
self.db,
241+
self.user,
242+
self.password,
243+
self.authdb,
244+
self.max_repl_lag_secs
245+
)
246+
self.secondaries = self.replset.find_secondaries()
247+
except Exception, e:
248+
self.exception("Problem getting shard secondaries! Error: %s" % e)
249+
250+
# Stop the balancer:
251+
try:
213252
self.sharding.stop_balancer()
214253
except Exception, e:
215-
self.exception("Problem connecting-to and/or stopping balancer! Error: %s" % e)
254+
self.exception("Problem stopping the balancer! Error: %s" % e)
216255

217256
# start the oplog tailer threads
218257
if self.no_oplog_tailer:
219258
logging.warning("Oplog tailing disabled! Skipping")
220259
else:
221260
try:
222261
self.oplogtailer = OplogTailer(
262+
self.secondaries,
223263
self.backup_name,
224264
self.backup_root_directory,
225-
self.host,
226-
self.port,
227265
self.dump_gzip,
228-
self.max_repl_lag_secs,
229266
self.user,
230267
self.password,
231268
self.authdb
@@ -236,16 +273,14 @@ def run(self):
236273

237274
# start the mongodumper threads
238275
try:
239-
self.mongodumper = Mongodumper(
240-
self.host,
241-
self.port,
242-
self.user,
243-
self.password,
244-
self.authdb,
276+
self.mongodumper = Dumper(
277+
self.secondaries,
245278
self.backup_root_directory,
246279
self.backup_binary,
247280
self.dump_gzip,
248-
self.max_repl_lag_secs,
281+
self.user,
282+
self.password,
283+
self.authdb,
249284
self.sharding.get_configserver(),
250285
self.verbose
251286
)
@@ -312,6 +347,9 @@ def run(self):
312347
except Exception, e:
313348
self.exception("Problem running NSCA notifier! Error: %s" % e)
314349

350+
if self.db:
351+
self.db.close()
352+
315353
self._lock.release()
316354

317355
logging.info("Backup completed in %s sec" % self.backup_duration)

MongoBackup/Mongodump.py renamed to MongoBackup/Methods/Dump.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
from signal import signal, SIGINT, SIGTERM
66
from time import time
77

8-
from Common import LocalCommand
9-
from Oplog import OplogInfo
8+
from MongoBackup.Common import LocalCommand
9+
from MongoBackup.Oplog import OplogInfo
1010

1111

1212
# noinspection PyStringFormat
13-
class Mongodump(Process):
13+
class Dump(Process):
1414
def __init__(self, response_queue, backup_name, host_port, user, password, authdb, base_dir, binary,
1515
dump_gzip=False, verbose=False):
1616
Process.__init__(self)
@@ -51,7 +51,6 @@ def run(self):
5151

5252
mongodump_flags = ["-h", self.host_port, "--oplog", "-o", "%s/dump" % self.backup_dir]
5353
if self.dump_gzip:
54-
logging.debug("Enabling inline mongodump compression using --gzip flag")
5554
mongodump_flags.extend(["--gzip"])
5655
if self.authdb and self.authdb != "admin":
5756
logging.debug("Using database %s for authentication" % self.authdb)

MongoBackup/Methods/Dumper.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import logging
2+
3+
from fabric.api import hide, settings, local
4+
from multiprocessing import Process, Queue
5+
from time import sleep
6+
7+
from MongoBackup.Methods import Dump
8+
9+
10+
class Dumper:
11+
def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, password=None,
12+
authdb='admin', config_server=None, verbose=False):
13+
self.secondaries = secondaries
14+
self.base_dir = base_dir
15+
self.binary = binary
16+
self.dump_gzip = dump_gzip
17+
self.user = user
18+
self.password = password
19+
self.authdb = authdb
20+
self.config_server = config_server
21+
self.verbose = verbose
22+
23+
self.response_queue = Queue()
24+
self.threads = []
25+
self._summary = {}
26+
27+
if not isinstance(self.secondaries, dict):
28+
raise Exception, "Field 'secondaries' must be a dictionary of secondary info (by shard)!", None
29+
30+
with hide('running', 'warnings'), settings(warn_only=True):
31+
self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True)
32+
33+
def summary(self):
34+
return self._summary
35+
36+
def run(self):
37+
# backup a secondary from each shard:
38+
for shard in self.secondaries:
39+
secondary = self.secondaries[shard]
40+
thread = Dump(
41+
self.response_queue,
42+
secondary['replSet'],
43+
secondary['host'],
44+
self.user,
45+
self.password,
46+
self.authdb,
47+
self.base_dir,
48+
self.binary,
49+
self.dump_gzip,
50+
self.verbose
51+
)
52+
self.threads.append(thread)
53+
54+
# backup a single config server:
55+
if self.config_server:
56+
thread = Dump(
57+
self.response_queue,
58+
'config',
59+
self.config_server,
60+
self.user,
61+
self.password,
62+
self.authdb,
63+
self.base_dir,
64+
self.binary,
65+
self.dump_gzip,
66+
self.verbose
67+
)
68+
self.threads.append(thread)
69+
70+
if not len(self.threads) > 0:
71+
raise Exception, 'No backup threads started!', None
72+
73+
# start all threads
74+
logging.info(
75+
"Starting backups in threads using mongodump %s (inline gzip: %s)" % (self.version, str(self.dump_gzip)))
76+
for thread in self.threads:
77+
thread.start()
78+
79+
# wait for all threads to finish
80+
for thread in self.threads:
81+
thread.join()
82+
83+
# sleep for 3 sec to fix logging order
84+
sleep(3)
85+
86+
# get oplog summaries from the queue
87+
completed = 0
88+
while not self.response_queue.empty():
89+
backup = self.response_queue.get()
90+
host = backup['host']
91+
port = backup['port']
92+
if host not in self._summary:
93+
self._summary[host] = {}
94+
self._summary[host][port] = backup
95+
if backup['completed']:
96+
completed += 1
97+
98+
# check if all threads completed
99+
if completed == len(self.threads):
100+
logging.info("All mongodump backups completed")
101+
else:
102+
raise Exception, "Not all mongodump threads completed successfully!", None
103+
104+
return self._summary
105+
106+
def close(self):
107+
logging.info("Killing all mongodump threads...")
108+
if len(self.threads) > 0:
109+
for thread in self.threads:
110+
thread.terminate()
111+
logging.info("Killed all mongodump threads")

MongoBackup/Methods/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from Dump import Dump
2+
from Dumper import Dumper

0 commit comments

Comments
 (0)