Skip to content

Commit 6bb750d

Browse files
Merge pull request #110 from timvaillancourt/MCB_1.0-bugfix19
MCB_1.0: Bugfixes 19
2 parents 8236a8e + 3cc5b18 commit 6bb750d

File tree

8 files changed

+75
-38
lines changed

8 files changed

+75
-38
lines changed

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from TarThread import TarThread
1010
from mongodb_consistent_backup.Common import parse_method
11-
from mongodb_consistent_backup.Errors import Error
11+
from mongodb_consistent_backup.Errors import Error, OperationError
1212
from mongodb_consistent_backup.Pipeline import Task
1313

1414

@@ -68,7 +68,7 @@ def run(self):
6868
output_file = "%s.tar" % subdir_name
6969
if self.do_gzip():
7070
output_file = "%s.tgz" % subdir_name
71-
self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run, callback=self.done)
71+
self._pool.apply_async(TarThread(subdir_name, output_file, self.compression(), self.verbose, self.binary).run, callback=self.done)
7272
self._pooled.append(subdir_name)
7373
except Exception, e:
7474
self._pool.terminate()
Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,28 @@
11
import os
22
import logging
3-
import sys
4-
5-
from signal import signal, SIGINT, SIGTERM
63

74
from mongodb_consistent_backup.Common import LocalCommand
5+
from mongodb_consistent_backup.Pipeline import PoolThread
86

97

10-
class TarThread:
11-
def __init__(self, backup_dir, output_file, do_gzip=False, verbose=False, binary="tar"):
12-
self.backup_dir = backup_dir
13-
self.output_file = output_file
14-
self.do_gzip = do_gzip
15-
self.verbose = verbose
16-
self.binary = binary
17-
18-
self._command = None
8+
class TarThread(PoolThread):
9+
def __init__(self, backup_dir, output_file, compression='none', verbose=False, binary="tar"):
10+
super(TarThread, self).__init__(self.__class__.__name__, compression)
11+
self.compression_method = compression
12+
self.backup_dir = backup_dir
13+
self.output_file = output_file
14+
self.verbose = verbose
15+
self.binary = binary
1916

20-
signal(SIGINT, self.close)
21-
signal(SIGTERM, self.close)
17+
self._command = None
2218

2319
def close(self, exit_code=None, frame=None):
24-
if self._command:
25-
logging.debug("Killing running subprocess/command: %s" % self._command.command)
20+
if self._command and not self.stopped:
21+
logging.debug("Stopping running tar command: %s" % self._command.command)
2622
del exit_code
2723
del frame
2824
self._command.close()
25+
self.stopped = True
2926

3027
def run(self):
3128
if os.path.isdir(self.backup_dir):
@@ -37,17 +34,20 @@ def run(self):
3734
log_msg = "Archiving directory: %s" % self.backup_dir
3835
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]
3936

40-
if self.do_gzip:
37+
if self.do_gzip():
4138
log_msg = "Archiving and compressing directory: %s" % self.backup_dir
4239
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]
4340

4441
logging.info(log_msg)
42+
self.running = True
4543
self._command = LocalCommand(self.binary, cmd_flags, self.verbose)
46-
self._command.run()
44+
self.exit_code = self._command.run()
4745
except Exception, e:
4846
logging.fatal("Failed archiving file: %s! Error: %s" % (self.output_file, e))
49-
sys.exit(1)
47+
finally:
48+
self.running = False
49+
self.stopped = True
50+
self.completed = True
5051
else:
5152
logging.fatal("Output file: %s already exists!" % self.output_file)
52-
sys.exit(1)
5353
return self.backup_dir

mongodb_consistent_backup/Main.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,9 @@ def run(self):
269269
self.db
270270
)
271271
replset_name = self.replset.get_rs_name()
272+
replset_dir = os.path.join(self.backup_directory, replset_name)
272273
self.replsets[replset_name] = self.replset
273-
state = StateBackupReplset(self.backup_directory, self.config, self.backup_time, replset_name)
274+
state = StateBackupReplset(replset_dir, self.config, self.backup_time, replset_name)
274275
state.load_state(self.replset.summary())
275276
state.write()
276277
except Exception, e:
@@ -392,7 +393,8 @@ def run(self):
392393
try:
393394
rs_sharded_summary = self.replset_sharded.summary()
394395
for shard in rs_sharded_summary:
395-
state = StateBackupReplset(self.backup_directory, self.config, self.backup_time, shard)
396+
shard_dir = os.path.join(self.backup_directory, shard)
397+
state = StateBackupReplset(shard_dir, self.config, self.backup_time, shard)
396398
state.load_state(rs_sharded_summary[shard])
397399
state.write()
398400
self.replset_sharded.close()
@@ -417,7 +419,8 @@ def run(self):
417419
self.resolver.compression(self.oplogtailer.compression())
418420
resolver_summary = self.resolver.run()
419421
for shard in resolver_summary:
420-
state = StateOplog(self.backup_directory, self.config, self.backup_time, shard)
422+
shard_dir = os.path.join(self.backup_directory, shard)
423+
state = StateOplog(shard_dir, self.config, self.backup_time, shard)
421424
state.load_state(resolver_summary[shard])
422425
state.write()
423426
self.resolver.close()

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def run(self):
106106
tailed_oplog.copy(),
107107
backup_oplog.copy(),
108108
self.get_consistent_end_ts(),
109-
self.do_gzip()
109+
self.compression()
110110
).run, callback=self.done)
111111
self._pooled.append(uri.str())
112112
except Exception, e:

mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,26 @@
55
from bson import decode_file_iter
66

77
from mongodb_consistent_backup.Oplog import Oplog
8+
from mongodb_consistent_backup.Pipeline import PoolThread
89

910

10-
class ResolverThread:
11-
def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, dump_gzip=False):
12-
self.state = state
13-
self.uri = uri
14-
self.tailed_oplog = tailed_oplog
15-
self.mongodump_oplog = mongodump_oplog
16-
self.max_end_ts = max_end_ts
17-
self.dump_gzip = dump_gzip
11+
class ResolverThread(PoolThread):
12+
def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, compression='none'):
13+
super(ResolverThread, self).__init__(self.__class__.__name__, compression)
14+
self.state = state
15+
self.uri = uri
16+
self.tailed_oplog = tailed_oplog
17+
self.mongodump_oplog = mongodump_oplog
18+
self.max_end_ts = max_end_ts
19+
self.compression_method = compression
1820

1921
self.oplogs = {}
2022
self.changes = 0
2123
self.stopped = True
2224

2325
def run(self):
24-
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.dump_gzip, 'a+')
25-
self.oplogs['tailed'] = Oplog(self.tailed_oplog['file'], self.dump_gzip)
26+
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.do_gzip(), 'a+')
27+
self.oplogs['tailed'] = Oplog(self.tailed_oplog['file'], self.do_gzip())
2628

2729
logging.info("Resolving oplog for %s to max ts: %s" % (self.uri, self.max_end_ts))
2830
try:
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import logging
2+
3+
from mongodb_consistent_backup.Errors import Error
4+
5+
6+
class PoolThread(object):
7+
def __init__(self, thread_name, compression_method='none'):
8+
self.thread_name = thread_name
9+
self.compression_method = compression_method
10+
11+
self.timer_name = self.__class__.__name__
12+
self.stopped = False
13+
self.running = False
14+
self.completed = False
15+
self.exit_code = 255
16+
17+
def compression(self, method=None):
18+
if method:
19+
self.compression_method = method
20+
return self.compression_method
21+
22+
def do_gzip(self):
23+
if self.compression() == 'gzip':
24+
return True
25+
return False
26+
27+
def run(self):
28+
raise Error("Must define a .run() method when using %s class!" % self.__class__.__name__)
29+
30+
def close(self, code=None, frame=None):
31+
raise Error("Must define a .close() method when using %s class!" % self.__class__.__name__)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
from PoolThread import PoolThread
12
from Stage import Stage
23
from Task import Task

mongodb_consistent_backup/State.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ def load_state(self, replset):
7373

7474
class StateBackupReplset(StateBaseReplset):
7575
def __init__(self, base_dir, config, backup_time, set_name):
76-
StateBaseReplset.__init__(self, base_dir, config, backup_time, set_name, "set.%s.bson" % set_name)
76+
StateBaseReplset.__init__(self, base_dir, config, backup_time, set_name, "replset.bson")
7777

7878

7979
class StateOplog(StateBaseReplset):
8080
def __init__(self, base_dir, config, backup_time, set_name):
81-
StateBaseReplset.__init__(self, base_dir, config, backup_time, set_name, "oplog.%s.bson" % set_name)
81+
StateBaseReplset.__init__(self, base_dir, config, backup_time, set_name, "oplog.bson")
8282

8383

8484
class StateBackup(StateBase):

0 commit comments

Comments
 (0)