Skip to content

Commit 8b960bb

Browse files
Check status of pooled threads properly
1 parent 7c16b25 commit 8b960bb

File tree

5 files changed

+27
-11
lines changed

5 files changed

+27
-11
lines changed

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,23 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2727
self.compression_method = self.config.archive.tar.compression
2828
self.binary = "tar"
2929

30-
self._pool = None
30+
self._pool = None
31+
self._pooled = []
32+
33+
def wait(self):
34+
if len(self._pooled) > 0:
35+
self._pool.close()
36+
logging.debug("Waiting for tar threads to stop")
37+
while len(self._pooled) > 0:
38+
try:
39+
item = self._pooled[0]
40+
path, result = item
41+
result.get(1)
42+
logging.debug("Archiving completed for directory: %s" % path)
43+
self._pooled.remove(item)
44+
except TimeoutError:
45+
continue
46+
self.stopped = True
3147

3248
def run(self):
3349
try:
@@ -48,18 +64,17 @@ def run(self):
4864
output_file = "%s.tar" % subdir_name
4965
if self.do_gzip():
5066
output_file = "%s.tgz" % subdir_name
51-
self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run)
67+
result = self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run)
68+
self._pooled.append((subdir_name, result))
5269
except Exception, e:
5370
self._pool.terminate()
5471
logging.fatal("Could not create tar archiving thread! Error: %s" % e)
5572
raise Error(e)
5673
finally:
57-
self._pool.close()
58-
self._pool.join()
59-
self.stopped = True
74+
self.wait()
6075
self.completed = True
6176

62-
def close(self):
77+
def close(self, code=None, frame=None):
6378
logging.debug("Stopping tar archiving threads")
6479
if not self.stopped and self._pool is not None:
6580
self._pool.terminate()

mongodb_consistent_backup/Archive/Tar/TarThread.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ def run(self):
3434
backup_base_dir = os.path.dirname(self.backup_dir)
3535
backup_base_name = os.path.basename(self.backup_dir)
3636

37-
log_msg = "Archiving and compressing directory: %s" % self.backup_dir
37+
log_msg = "Archiving directory: %s" % self.backup_dir
3838
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]
3939

4040
if self.do_gzip:
41-
log_msg = "Archiving directory: %s" % self.backup_dir
41+
log_msg = "Archiving and compressing directory: %s" % self.backup_dir
4242
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]
4343

4444
logging.info(log_msg)

mongodb_consistent_backup/Main.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,5 +460,4 @@ def run(self):
460460
logging.info("Completed %s in %.2f sec" % (self.program_name, self.timer.duration(self.timer_name)))
461461

462462
self.logger.rotate()
463-
self.logger.close()
464463
self.release_lock()

mongodb_consistent_backup/Pipeline/Stage.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ def has_task(self):
5656
return False
5757

5858
def close(self):
59-
if self.has_task():
59+
if self.has_task() and not self.stopped:
6060
logging.debug("Calling close on backup stage %s with task %s" % (self.stage, self.task.capitalize()))
6161
self._task.close()
62+
self.running = False
63+
self.stopped = True
6264

6365
def is_compressed(self):
6466
if self.has_task() and hasattr(self._task, "is_compressed"):

mongodb_consistent_backup/Pipeline/Task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ def threads(self, thread_count=None, default_cpu_multiply=1):
5858
def run(self):
5959
raise Error("Must define a .run() method when using %s class!" % self.__class__.__name__)
6060

61-
def close(self):
61+
def close(self, code=None, frame=None):
6262
raise Error("Must define a .close() method when using %s class!" % self.__class__.__name__)

0 commit comments

Comments
 (0)