Skip to content

Commit 6b4e895

Browse files
adding subclasses for "tasks" and "stages" for uniform code execution
1 parent 740fd8d commit 6b4e895

File tree

14 files changed

+218
-214
lines changed

14 files changed

+218
-214
lines changed

mongodb_consistent_backup/Archive/Archive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
class Archive(Stage):
66
def __init__(self, manager, config, timer, base_dir, backup_dir):
77
super(Archive, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
8-
self.method = self.config.archive.method
8+
self.task = self.config.archive.method
99
self.init()

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
import logging
33

44
from copy_reg import pickle
5-
from multiprocessing import Pool, cpu_count
5+
from multiprocessing import Pool
66
from types import MethodType
77

88
from TarThread import TarThread
99
from mongodb_consistent_backup.Common import parse_method
10-
from mongodb_consistent_backup.Errors import Error, OperationError
10+
from mongodb_consistent_backup.Errors import Error
11+
from mongodb_consistent_backup.Pipeline import Task
1112

1213

1314
# Allows pooled .apply_async()s to work on Class-methods:
@@ -20,35 +21,13 @@ def _reduce_method(m):
2021
pickle(MethodType, _reduce_method)
2122

2223

23-
class Tar:
24+
class Tar(Task):
2425
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
25-
self.config = config
26-
self.backup_base_dir = backup_dir
27-
self.verbose = self.config.verbose
28-
self.binary = "tar"
26+
super(Tar, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
27+
self.compression_method = self.config.archive.tar.compression
28+
self.binary = "tar"
2929

30-
self._pool = None
31-
self.stopped = False
32-
self.completed = False
33-
34-
def compression(self, method=None):
35-
if method:
36-
self.config.archive.tar.compression = parse_method(method)
37-
logging.info("Setting tar compression method to: %s" % self.config.archive.tar.compression)
38-
return parse_method(self.config.archive.tar.compression)
39-
40-
def do_gzip(self):
41-
if self.compression() == 'gzip':
42-
return True
43-
return False
44-
45-
def threads(self, thread_count=None):
46-
if thread_count:
47-
self.config.archive.tar.threads = int(thread_count)
48-
logging.info("Setting tar thread count to: %i" % self.config.archive.tar.threads)
49-
if self.config.archive.tar.threads is None or self.config.archive.tar.threads < 1:
50-
self.config.archive.tar.threads = cpu_count()
51-
return int(self.config.archive.tar.threads)
30+
self._pool = None
5231

5332
def run(self):
5433
try:
@@ -59,10 +38,11 @@ def run(self):
5938
logging.fatal("Could not start pool! Error: %s" % e)
6039
raise Error(e)
6140

62-
if os.path.isdir(self.backup_base_dir):
41+
if os.path.isdir(self.backup_dir):
6342
try:
64-
for backup_dir in os.listdir(self.backup_base_dir):
65-
subdir_name = os.path.join(self.backup_base_dir, backup_dir)
43+
self.running = True
44+
for backup_dir in os.listdir(self.backup_dir):
45+
subdir_name = os.path.join(self.backup_dir, backup_dir)
6646
if not os.path.isdir(os.path.join(subdir_name, "dump")):
6747
continue
6848
output_file = "%s.tar" % subdir_name
@@ -73,8 +53,10 @@ def run(self):
7353
self._pool.terminate()
7454
logging.fatal("Could not create tar archiving thread! Error: %s" % e)
7555
raise Error(e)
76-
self._pool.close()
77-
self._pool.join()
56+
finally:
57+
self._pool.close()
58+
self._pool.join()
59+
self.stopped = True
7860
self.completed = True
7961

8062
def close(self):

mongodb_consistent_backup/Backup/Backup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
class Backup(Stage):
66
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
77
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, sharding=sharding)
8-
self.method = self.config.backup.method
8+
self.task = self.config.backup.method
99
self.init()

mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

Lines changed: 43 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,73 +5,52 @@
55
from fabric.api import hide, settings, local
66
from math import floor
77
from multiprocessing import cpu_count
8-
from signal import signal, SIGINT, SIG_IGN
98
from time import sleep
109

1110
from mongodb_consistent_backup.Common import MongoUri
1211
from mongodb_consistent_backup.Errors import Error, OperationError
1312
from mongodb_consistent_backup.Oplog import OplogState
13+
from mongodb_consistent_backup.Pipeline import Task
1414

1515
from MongodumpThread import MongodumpThread
1616

1717

18-
class Mongodump:
19-
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
20-
self.manager = manager
21-
self.config = config
22-
self.timer = timer
23-
self.base_dir = base_dir
24-
self.backup_dir = backup_dir
25-
self.binary = self.config.backup.mongodump.binary
26-
self.user = self.config.user
27-
self.password = self.config.password
28-
self.authdb = self.config.authdb
29-
self.verbose = self.config.verbose
18+
class Mongodump(Task):
19+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
20+
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
21+
self.compression_method = self.config.backup.mongodump.compression
22+
self.binary = self.config.backup.mongodump.binary
23+
self.user = self.config.user
24+
self.password = self.config.password
25+
self.authdb = self.config.authdb
26+
self.replsets = replsets
27+
self.sharding = sharding
3028

31-
try:
32-
self.replsets = kwargs['replsets']
33-
except KeyError:
34-
raise Error("'replsets' kwargs required!")
35-
36-
self.sharding = None
37-
if 'sharding' in kwargs:
38-
self.sharding = kwargs['sharding']
39-
40-
signal(SIGINT, SIG_IGN)
41-
signal(SIGINT, self.close)
42-
43-
self.completed = False
44-
self.timer_name = self.__class__.__name__
45-
self.threads_per_dump_max = 16
46-
self.config_replset = False
47-
self.cpu_count = cpu_count()
48-
self.threads = []
49-
self.states = {}
50-
self._summary = {}
51-
self._threads_per_dump = None
29+
self.threads_max = 16
30+
self.config_replset = False
31+
self.dump_threads = []
32+
self.states = {}
33+
self._summary = {}
5234

5335
with hide('running', 'warnings'), settings(warn_only=True):
5436
self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True)
55-
self.do_gzip = self.can_gzip()
5637

57-
if not self.do_gzip and self.config.backup.mongodump.compression == 'gzip':
38+
if self.can_gzip():
39+
if self.compression() == 'none':
40+
self.compression('gzip')
41+
elif self.compression() == 'gzip':
5842
logging.warning("mongodump gzip compression requested on binary that does not support gzip!")
5943

60-
if not isinstance(self.replsets, dict):
61-
raise Error("Field 'replsets' must be a dictionary of mongodb_consistent_backup.Replication.Replset classes!")
62-
6344
def can_gzip(self):
6445
if os.path.isfile(self.binary) and os.access(self.binary, os.X_OK):
46+
logging.debug("Mongodump binary supports gzip")
6547
if tuple("3.2.0".split(".")) <= tuple(self.version.split(".")):
6648
return True
6749
return False
6850
else:
6951
logging.fatal("Cannot find or execute the mongodump binary file %s!" % self.binary)
7052
sys.exit(1)
7153

72-
def is_compressed(self):
73-
return self.can_gzip()
74-
7554
def summary(self):
7655
return self._summary
7756

@@ -85,14 +64,14 @@ def get_summaries(self):
8564

8665
def wait(self):
8766
completed = 0
88-
start_threads = len(self.threads)
67+
start_threads = len(self.dump_threads)
8968
# wait for all threads to finish
90-
while len(self.threads) > 0:
91-
for thread in self.threads:
69+
while len(self.dump_threads) > 0:
70+
for thread in self.dump_threads:
9271
if not thread.is_alive():
9372
if thread.exitcode == 0:
9473
completed += 1
95-
self.threads.remove(thread)
74+
self.dump_threads.remove(thread)
9675
sleep(0.5)
9776

9877
# sleep for 3 sec to fix logging order before gathering summaries
@@ -106,19 +85,19 @@ def wait(self):
10685
else:
10786
raise OperationError("Not all mongodump threads completed successfully!")
10887

109-
def threads_per_dump(self, threads=None):
88+
def threads(self, threads=None):
11089
if threads:
111-
self._threads_per_dump = int(threads)
112-
elif not self._threads_per_dump:
90+
self.thread_count = int(threads)
91+
elif not self.thread_count:
11392
if tuple(self.version.split(".")) >= tuple("3.2.0".split(".")):
114-
self._threads_per_dump = 1
93+
self.thread_count = 1
11594
if self.cpu_count > len(self.replsets):
116-
self._threads_per_dump = int(floor(self.cpu_count / len(self.replsets)))
117-
if self._threads_per_dump > self.threads_per_dump_max:
118-
self._threads_per_dump = self.threads_per_dump_max
95+
self.thread_count = int(floor(self.cpu_count / len(self.replsets)))
96+
if self.thread_count > self.threads_max:
97+
self.thread_count = self.threads_max
11998
else:
12099
logging.warn("Threading unsupported by mongodump version %s. Use mongodump 3.2.0 or greater to enable per-dump threading." % self.version)
121-
return self._threads_per_dump
100+
return self.thread_count
122101

123102
def run(self):
124103
self.timer.start(self.timer_name)
@@ -137,18 +116,18 @@ def run(self):
137116
self.authdb,
138117
self.backup_dir,
139118
self.binary,
140-
self.threads_per_dump(),
119+
self.threads(),
141120
self.do_gzip,
142121
self.verbose
143122
)
144-
self.threads.append(thread)
123+
self.dump_threads.append(thread)
145124

146-
if not len(self.threads) > 0:
125+
if not len(self.dump_threads) > 0:
147126
raise OperationError('No backup threads started!')
148127

149128
logging.info(
150-
"Starting backups using mongodump %s (options: gzip=%s, threads_per_dump=%i)" % (self.version, str(self.do_gzip), self.threads_per_dump()))
151-
for thread in self.threads:
129+
"Starting backups using mongodump %s (options: compression=%s, threads_per_dump=%i)" % (self.version, self.compression(), self.threads()))
130+
for thread in self.dump_threads:
152131
thread.start()
153132
self.wait()
154133

@@ -159,7 +138,7 @@ def run(self):
159138
logging.info("Using non-replset backup method for config server mongodump")
160139
mongo_uri = MongoUri(config_server['host'], 27019, 'configsvr')
161140
self.states['configsvr'] = OplogState(self.manager, mongo_uri)
162-
self.threads = [MongodumpThread(
141+
self.dump_threads = [MongodumpThread(
163142
self.states['configsvr'],
164143
mongo_uri,
165144
self.timer,
@@ -168,20 +147,20 @@ def run(self):
168147
self.authdb,
169148
self.backup_dir,
170149
self.binary,
171-
self.threads_per_dump(),
150+
self.threads(),
172151
self.do_gzip,
173152
self.verbose
174153
)]
175-
self.threads[0].start()
154+
self.dump_threads[0].start()
176155
self.wait()
177156

178157
self.completed = True
179158
return self._summary
180159

181160
def close(self):
182161
logging.info("Stopping all mongodump threads")
183-
if len(self.threads) > 0:
184-
for thread in self.threads:
162+
if len(self.dump_threads) > 0:
163+
for thread in self.dump_threads:
185164
thread.terminate()
186165
try:
187166
self.timer.stop(self.timer_name)

mongodb_consistent_backup/Main.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def run(self):
271271
self.replsets
272272
)
273273
if self.backup.is_compressed():
274-
logging.info("Backup method supports gzip compression, disabling compression in archive step")
274+
logging.info("Backup method supports compression, disabling compression in archive step")
275275
self.archive.compression('none')
276276
self.backup_summary = self.backup.run()
277277
self.state.set('backup_oplog', self.backup_summary)
@@ -321,8 +321,9 @@ def run(self):
321321
self.manager,
322322
self.config,
323323
self.timer,
324-
self.replsets,
325-
self.backup_directory
324+
self.backup_root_subdirectory,
325+
self.backup_directory,
326+
self.replsets
326327
)
327328
except Exception, e:
328329
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
@@ -339,9 +340,9 @@ def run(self):
339340
self.sharding
340341
)
341342
if self.backup.is_compressed():
342-
logging.info("Backup method supports gzip compression, disabling compression in archive step and enabling oplog compression")
343+
logging.info("Backup method supports compression, disabling compression in archive step and enabling oplog compression")
343344
self.archive.compression('none')
344-
self.oplogtailer.compression('gzip')
345+
self.oplogtailer.compression(self.backup.compression())
345346
except Exception, e:
346347
self.exception("Problem initializing backup! Error: %s" % e, e)
347348

@@ -387,8 +388,16 @@ def run(self):
387388
self.db.close()
388389

389390
# resolve/merge tailed oplog into mongodump oplog.bson to a consistent point for all shards
390-
if self.backup.method == "mongodump" and self.oplogtailer:
391-
self.resolver = Resolver(self.manager, self.config, self.timer, self.oplog_summary, self.backup_summary)
391+
if self.backup.task.lower() == "mongodump" and self.oplogtailer:
392+
self.resolver = Resolver(
393+
self.manager,
394+
self.config,
395+
self.timer,
396+
self.backup_root_subdirectory,
397+
self.backup_directory,
398+
self.oplog_summary,
399+
self.backup_summary
400+
)
392401
self.resolver.compression(self.oplogtailer.compression())
393402
resolver_summary = self.resolver.run()
394403
for shard in resolver_summary:

mongodb_consistent_backup/Notify/Notify.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
class Notify(Stage):
88
def __init__(self, manager, config, timer, base_dir, backup_dir):
99
super(Notify, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
10-
self.method = self.config.notify.method
10+
self.task = self.config.notify.method
1111

1212
self.completed = False
1313
self.notifications = []
@@ -18,21 +18,21 @@ def notify(self, message, success=False):
1818
self.notifications.append(notification)
1919

2020
def run(self, *args):
21-
if self._method and len(self.notifications) > 0:
22-
logging.info("Sending %i notification(s) to: %s" % (len(self.notifications), self._method.server))
21+
if self._task and len(self.notifications) > 0:
22+
logging.info("Sending %i notification(s) to: %s" % (len(self.notifications), self._task.server))
2323
self.timers.start(self.stage)
2424
while len(self.notifications) > 0:
2525
try:
2626
(success, message) = self.notifications.pop()
27-
state = self._method.failed
27+
state = self._task.failed
2828
if success:
29-
state = self._method.success
30-
self._method.notify(success, message)
29+
state = self._task.success
30+
self._task.run(success, message)
3131
except:
3232
continue
3333
self.timers.stop(self.stage)
3434
self.completed = True
3535

3636
def close(self):
37-
if self._method:
38-
return self._method.close()
37+
if self._task:
38+
return self._task.close()

0 commit comments

Comments
 (0)