Skip to content

Commit 7789620

Browse files
authored
Merge pull request #12 from timvaillancourt/thread_completed_fix
Moving thread-complete check to response queue
2 parents 79e9b7d + 778bdab commit 7789620

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

MongoBackup/Mongodump.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import logging
33

4-
from multiprocessing import Process, Event
4+
from multiprocessing import Process
55
from signal import signal, SIGINT, SIGTERM
66
from time import time
77

@@ -28,7 +28,7 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
2828
self.verbose = verbose
2929

3030
self._command = None
31-
self._complete = Event()
31+
self.completed = False
3232
self.backup_dir = "%s/%s" % (self.base_dir, self.backup_name)
3333
self.dump_dir = "%s/dump" % self.backup_dir
3434
self.oplog_file = "%s/oplog.bson" % self.dump_dir
@@ -37,9 +37,6 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
3737
signal(SIGINT, self.close)
3838
signal(SIGTERM, self.close)
3939

40-
def complete(self):
41-
return self._complete.is_set()
42-
4340
def close(self, exit_code=None, frame=None):
4441
if self._command:
4542
logging.debug("Killing running subprocess/command: %s" % self._command.command)
@@ -77,15 +74,16 @@ def run(self):
7774
return None
7875

7976
oplog = OplogInfo(self.oplog_file, self.dump_gzip)
77+
self.completed = True
8078
self.response_queue.put({
8179
'host': self.host,
8280
'port': self.port,
8381
'file': self.oplog_file,
8482
'count': oplog.count(),
8583
'last_ts': oplog.last_ts(),
86-
'first_ts': oplog.first_ts()
84+
'first_ts': oplog.first_ts(),
85+
'completed': self.completed
8786
})
88-
self._complete.set()
8987

9088
time_diff = time() - self.start_time
9189
logging.info("Backup for %s/%s:%s completed in %s sec with %i oplog changes captured to: %s" % (

MongoBackup/Mongodumper.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,25 +113,30 @@ def run(self):
113113
for thread in self.threads:
114114
thread.start()
115115

116-
# wait for all threads to finish and check their status
116+
# wait for all threads to finish
117117
for thread in self.threads:
118118
thread.join()
119-
if not thread.complete():
120-
raise Exception, "Not all mongodump threads completed successfully!", None
121119

122120
# sleep for 3 sec to fix logging order
123121
sleep(3)
124122

125123
# get oplog summaries from the queue
124+
completed = 0
126125
while not self.response_queue.empty():
127126
backup = self.response_queue.get()
128127
host = backup['host']
129128
port = backup['port']
130129
if host not in self._summary:
131130
self._summary[host] = {}
132131
self._summary[host][port] = backup
132+
if backup['completed']:
133+
completed += 1
133134

134-
logging.info("All mongodump backups completed")
135+
# check if all threads completed
136+
if completed == len(self.threads):
137+
logging.info("All mongodump backups completed")
138+
else:
139+
raise Exception, "Not all mongodump threads completed successfully!", None
135140

136141
return self._summary
137142

0 commit comments

Comments
 (0)