Skip to content

Commit 187498f

Browse files
Merge pull request #89 from timvaillancourt/MCB_1.0-cleanup_v6
Mcb 1.0 cleanup v6
2 parents 93900f1 + fc81566 commit 187498f

File tree

15 files changed

+116
-53
lines changed

15 files changed

+116
-53
lines changed

mongodb_consistent_backup/Archive/Archive.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

33
from Tar import Tar
4-
from mongodb_consistent_backup.Common import config_to_string, parse_method
4+
from mongodb_consistent_backup.Common import Timer, config_to_string, parse_method
55

66

77
class Archive:
@@ -11,6 +11,7 @@ def __init__(self, config, backup_dir):
1111

1212
self.method = None
1313
self._archiver = None
14+
self.timer = Timer()
1415
self.init()
1516

1617
def init(self):
@@ -42,7 +43,12 @@ def archive(self):
4243
if self._archiver:
4344
config_string = config_to_string(self.config.archive[self.method])
4445
logging.info("Archiving with method: %s (options: %s)" % (self.method, config_string))
45-
return self._archiver.run()
46+
self.timer.start()
47+
48+
self._archiver.run()
49+
50+
self.timer.stop()
51+
logging.info("Archiving completed in %s seconds" % self.timer.duration())
4652

4753
def close(self):
4854
if self._archiver:

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,14 @@ def run(self):
6767
self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run)
6868
except Exception, e:
6969
self._pool.terminate()
70-
logging.fatal("Could not create archiving thread! Error: %s" % e)
70+
logging.fatal("Could not create tar archiving thread! Error: %s" % e)
7171
raise e
7272
self._pool.close()
7373
self._pool.join()
74-
logging.info("Archiver threads completed")
7574

7675
def close(self):
77-
logging.debug("Stopping Archiver threads")
76+
logging.debug("Stopping tar archiving threads")
7877
if self._pool is not None:
7978
self._pool.terminate()
8079
self._pool.join()
81-
logging.info("Stopped all Archiver threads")
80+
logging.info("Stopped all tar archiving threads")

mongodb_consistent_backup/Archive/Tar/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
def config(parser):
55
parser.add_argument("--archive.tar.compression", dest="archive.tar.compression",
6-
help="Archiver compression method (default: gzip)", default='gzip', choices=['gzip', 'none'])
6+
help="Tar archiver compression method (default: gzip)", default='gzip', choices=['gzip', 'none'])
77
parser.add_argument("--archive.tar.threads", dest="archive.tar.threads",
8-
help="Number of threads to use in archive phase (default: 1-per-CPU)", default=0, type=int)
8+
help="Number of Tar archiver threads to use (default: 1-per-CPU)", default=0, type=int)
99
return parser

mongodb_consistent_backup/Backup/Backup.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

33
from Mongodump import Mongodump
4-
from mongodb_consistent_backup.Common import config_to_string, parse_method
4+
from mongodb_consistent_backup.Common import Timer, config_to_string, parse_method
55

66

77
class Backup:
@@ -13,15 +13,14 @@ def __init__(self, config, backup_dir, secondaries, config_server=None):
1313

1414
self.method = None
1515
self._method = None
16+
self.timer = Timer()
1617
self.init()
1718

1819
def init(self):
1920
backup_method = self.config.backup.method
2021
if not backup_method or parse_method(backup_method) == "none":
2122
raise Exception, 'Must specify a backup method!', None
22-
self.method = parse_method(backup_method)
23-
config_string = config_to_string(self.config.backup[self.method])
24-
logging.info("Using backup method: %s (options: %s)" % (self.method, config_string))
23+
self.method = parse_method(backup_method)
2524
try:
2625
self._method = globals()[self.method.capitalize()](
2726
self.config,
@@ -40,7 +39,16 @@ def is_compressed(self):
4039

4140
def backup(self):
4241
if self._method:
43-
return self._method.run()
42+
config_string = config_to_string(self.config.backup[self.method])
43+
logging.info("Using backup method: %s (options: %s)" % (self.method, config_string))
44+
self.timer.start()
45+
46+
info = self._method.run()
47+
48+
self.timer.stop()
49+
logging.info("Backup completed in %s seconds" % self.timer.duration())
50+
51+
return info
4452

4553
def close(self):
4654
if self._method:

mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33

44
from multiprocessing import Process
55
from signal import signal, SIGINT, SIGTERM
6-
from time import time
76

8-
from mongodb_consistent_backup.Common import LocalCommand
7+
from mongodb_consistent_backup.Common import LocalCommand, Timer
98
from mongodb_consistent_backup.Oplog import Oplog
109

1110

@@ -28,12 +27,12 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
2827
self.dump_gzip = dump_gzip
2928
self.verbose = verbose
3029

30+
self.timer = Timer()
3131
self._command = None
3232
self.completed = False
3333
self.backup_dir = "%s/%s" % (self.base_dir, self.backup_name)
3434
self.dump_dir = "%s/dump" % self.backup_dir
3535
self.oplog_file = "%s/oplog.bson" % self.dump_dir
36-
self.start_time = time()
3736

3837
signal(SIGINT, self.close)
3938
signal(SIGTERM, self.close)
@@ -52,6 +51,8 @@ def run(self):
5251
self.port
5352
))
5453

54+
self.timer.start()
55+
5556
mongodump_flags = ["-h", self.host_port, "--oplog", "-o", "%s/dump" % self.backup_dir]
5657
if self.threads > 0:
5758
mongodump_flags.extend(["--numParallelCollections="+str(self.threads)])
@@ -89,7 +90,8 @@ def run(self):
8990
'completed': self.completed
9091
})
9192

92-
time_diff = time() - self.start_time
93+
self.timer.stop()
94+
9395
logging.info("Backup for %s/%s:%s completed in %s sec with %i oplog changes captured to: %s" % (
94-
self.backup_name, self.host, self.port, time_diff, oplog.count(), str(oplog.last_ts())
96+
self.backup_name, self.host, self.port, self.timer.duration(), oplog.count(), str(oplog.last_ts())
9597
))

mongodb_consistent_backup/Common/Config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import mongodb_consistent_backup
12
import sys
23

34
from argparse import Action
@@ -71,7 +72,7 @@ def _get(self, keys, data=None):
7172
return data[keys]
7273

7374
def parse_submodules(self):
74-
for _, modname, ispkg in walk_packages(path="."):
75+
for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__+'.'):
7576
if ispkg:
7677
try:
7778
components = modname.split('.')
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from time import time
2+
3+
4+
class Timer:
5+
def __init__(self):
6+
self.count = 0
7+
self.rounds = {}
8+
9+
def start(self):
10+
self.count += 1
11+
self.rounds[self.count] = { 'start': time(), 'started': True }
12+
13+
def stop(self):
14+
if self.rounds[self.count] and self.rounds[self.count]['started']:
15+
self.rounds[self.count]['started'] = False
16+
self.rounds[self.count]['end'] = time()
17+
18+
def duration(self):
19+
if 'start' in self.rounds[self.count]:
20+
if 'end' in self.rounds[self.count]:
21+
end = self.rounds[self.count]['end']
22+
else:
23+
end = time()
24+
return end - self.rounds[self.count]['start']
25+
return -1

mongodb_consistent_backup/Common/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
from DB import DB
33
from LocalCommand import LocalCommand
44
from Lock import Lock
5+
from Timer import Timer
56
from Util import config_to_string, parse_method, validate_hostname

mongodb_consistent_backup/Main.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
from datetime import datetime
55
from multiprocessing import current_process
66
from signal import signal, SIGINT, SIGTERM
7-
from time import time
87

98
from Archive import Archive
109
from Backup import Backup
11-
from Common import Config, DB, Lock, validate_hostname
10+
from Common import Config, DB, Lock, Timer, validate_hostname
1211
from Notify import Notify
1312
from Oplog import Tailer, Resolver
1413
from Replication import Replset, ReplsetSharded
@@ -29,18 +28,16 @@ def __init__(self, prog_name="mongodb-consistent-backup"):
2928
self.oplog_resolver = None
3029
self.upload = None
3130
self.lock = None
32-
self.start_time = time()
33-
self.end_time = None
34-
self.backup_duration = None
3531
self.backup_time = None
3632
self.backup_root_directory = None
3733
self.backup_root_subdirectory = None
3834
self.db = None
3935
self.is_sharded = False
36+
self.log_level = None
37+
self.timer = Timer()
4038
self.secondaries = {}
4139
self.oplog_summary = {}
4240
self.backup_summary = {}
43-
self.log_level = None
4441

4542
self.setup_config()
4643
self.setup_logger()
@@ -149,6 +146,7 @@ def run(self):
149146
logging.info("Starting %s version %s (git commit hash: %s)" % (self.program_name, self.config.version, self.config.git_commit))
150147

151148
self.get_lock()
149+
self.timer.start()
152150

153151
# Setup the notifier
154152
try:
@@ -285,8 +283,9 @@ def run(self):
285283
self.exception("Problem restoring balancer lock! Error: %s" % e)
286284

287285
# resolve/merge tailed oplog into mongodump oplog.bson to a consistent point for all shards
288-
if self.config.backup.method == "mongodump" and self.oplogtailer:
286+
if self.backup.method == "mongodump" and self.oplogtailer:
289287
self.oplog_resolver = Resolver(self.config, self.oplog_summary, self.backup_summary)
288+
self.oplog_resolver.compression(self.oplogtailer.compression())
290289
self.oplog_resolver.run()
291290

292291
# archive backup directories
@@ -295,21 +294,20 @@ def run(self):
295294
except Exception, e:
296295
self.exception("Problem performing archiving! Error: %s" % e)
297296

298-
self.end_time = time()
299-
self.backup_duration = self.end_time - self.start_time
300-
301297
# upload backup
302298
try:
303299
self.upload.upload()
304300
except Exception, e:
305301
self.exception("Problem performing upload of backup! Error: %s" % e)
306302

303+
self.timer.stop()
304+
307305
# send notifications of backup state
308306
try:
309307
self.notify.notify("%s: backup '%s' succeeded in %s secs" % (
310308
self.program_name,
311309
self.config.backup.name,
312-
self.backup_duration
310+
self.timer.duration()
313311
), True)
314312
except Exception, e:
315313
self.exception("Problem running Notifier! Error: %s" % e)
@@ -319,4 +317,4 @@ def run(self):
319317

320318
self.release_lock()
321319

322-
logging.info("Backup completed in %s sec" % self.backup_duration)
320+
logging.info("Completed %s in %s sec" % (self.program_name, self.timer.duration()))

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from types import MethodType
1010

1111
from ResolverThread import ResolverThread
12+
from mongodb_consistent_backup.Common import Timer, parse_method
1213

1314

1415
# Allows pooled .apply_async()s to work on Class-methods:
@@ -26,24 +27,35 @@ def __init__(self, config, tailed_oplogs_summary, backup_oplogs_summary):
2627
self.config = config
2728
self.tailed_oplogs = tailed_oplogs_summary
2829
self.backup_oplogs = backup_oplogs_summary
29-
self.thread_count = self.config.oplog.resolver.threads
30-
31-
self.dump_gzip = False
32-
if self.config.oplog.compression == 'gzip':
33-
self.dump_gzip = True
3430

31+
self.timer = Timer()
3532
self.end_ts = None
3633
self.delete_oplogs = {}
3734

38-
if self.thread_count is None or self.thread_count < 1:
39-
self.thread_count = cpu_count() * 2
40-
4135
try:
42-
self._pool = Pool(processes=self.thread_count)
36+
self._pool = Pool(processes=self.threads())
4337
except Exception, e:
44-
logging.fatal("Could not start pool! Error: %s" % e)
38+
logging.fatal("Could not start oplog resolver pool! Error: %s" % e)
4539
raise e
4640

41+
def compression(self, method=None):
42+
if method:
43+
logging.debug("Setting oplog resolver compression to: %s" % method)
44+
self.config.oplog.compression = parse_method(method)
45+
return parse_method(self.config.oplog.compression)
46+
47+
def do_gzip(self):
48+
if self.compression() == 'gzip':
49+
return True
50+
return False
51+
52+
def threads(self, threads=None):
53+
if threads:
54+
self.config.oplog.resolver.threads = int(threads)
55+
if self.config.oplog.resolver.threads is None or self.config.oplog.resolver.threads < 1:
56+
self.config.oplog.resolver.threads = int(cpu_count() * 2)
57+
return int(self.config.oplog.resolver.threads)
58+
4759
def get_consistent_end_ts(self):
4860
ts = None
4961
for host in self.tailed_oplogs:
@@ -55,7 +67,8 @@ def get_consistent_end_ts(self):
5567
return ts
5668

5769
def run(self):
58-
logging.info("Resolving oplogs using %i threads max" % self.thread_count)
70+
logging.info("Resolving oplogs using %i threads max" % self.threads())
71+
self.timer.start()
5972

6073
self.end_ts = self.get_consistent_end_ts()
6174
for host in self.backup_oplogs:
@@ -84,7 +97,7 @@ def run(self):
8497
backup_oplog['file'],
8598
backup_oplog['last_ts'],
8699
self.end_ts,
87-
self.dump_gzip
100+
self.do_gzip()
88101
).run)
89102
except Exception, e:
90103
logging.fatal("Resolve failed for %s:%s! Error: %s" % (host, port, e))
@@ -105,4 +118,5 @@ def run(self):
105118
logging.fatal("Deleting of tailed oplog file %s failed! Error: %s" % (oplog_file, e))
106119
raise e
107120

108-
logging.info("Done resolving oplogs")
121+
self.timer.stop()
122+
logging.info("Oplog resolving completed in %s seconds" % self.timer.duration())

0 commit comments

Comments
 (0)