Skip to content

Commit 09c2bbc

Browse files
author
Tim Vaillancourt
committed
use the response queue to check for success instead of a method call (which seems to work inconsistently with threading)
1 parent 79e9b7d commit 09c2bbc

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

MongoBackup/Mongodump.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import logging
33

4-
from multiprocessing import Process, Event
4+
from multiprocessing import Process
55
from signal import signal, SIGINT, SIGTERM
66
from time import time
77

@@ -28,7 +28,7 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
2828
self.verbose = verbose
2929

3030
self._command = None
31-
self._complete = Event()
31+
self.completed = False
3232
self.backup_dir = "%s/%s" % (self.base_dir, self.backup_name)
3333
self.dump_dir = "%s/dump" % self.backup_dir
3434
self.oplog_file = "%s/oplog.bson" % self.dump_dir
@@ -37,13 +37,18 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
3737
signal(SIGINT, self.close)
3838
signal(SIGTERM, self.close)
3939

40-
def complete(self):
41-
return self._complete.is_set()
42-
4340
def close(self, exit_code=None, frame=None):
4441
if self._command:
4542
logging.debug("Killing running subprocess/command: %s" % self._command.command)
4643
self._command.close()
44+
self.response_queue.put({
45+
'host': self.host,
46+
'port': self.port,
47+
'file': self.oplog_file,
48+
'count': oplog.count(),
49+
'last_ts': oplog.last_ts(),
50+
'first_ts': oplog.first_ts()
51+
})
4752

4853
def run(self):
4954
logging.info("Starting mongodump (with oplog) backup of %s/%s:%i" % (
@@ -77,15 +82,16 @@ def run(self):
7782
return None
7883

7984
oplog = OplogInfo(self.oplog_file, self.dump_gzip)
85+
self.completed = True
8086
self.response_queue.put({
8187
'host': self.host,
8288
'port': self.port,
8389
'file': self.oplog_file,
8490
'count': oplog.count(),
8591
'last_ts': oplog.last_ts(),
86-
'first_ts': oplog.first_ts()
92+
'first_ts': oplog.first_ts(),
93+
'completed': self.completed
8794
})
88-
self._complete.set()
8995

9096
time_diff = time() - self.start_time
9197
logging.info("Backup for %s/%s:%s completed in %s sec with %i oplog changes captured to: %s" % (

MongoBackup/Mongodumper.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,28 @@ def run(self):
113113
for thread in self.threads:
114114
thread.start()
115115

116-
# wait for all threads to finish and check their status
116+
# wait for all threads to finish
117117
for thread in self.threads:
118118
thread.join()
119-
if not thread.complete():
120-
raise Exception, "Not all mongodump threads completed successfully!", None
121119

122120
# sleep for 3 sec to fix logging order
123121
sleep(3)
124122

125123
# get oplog summaries from the queue
124+
completed = 0
126125
while not self.response_queue.empty():
127126
backup = self.response_queue.get()
128127
host = backup['host']
129128
port = backup['port']
130129
if host not in self._summary:
131130
self._summary[host] = {}
132131
self._summary[host][port] = backup
132+
if backup['completed']:
133+
completed += 1
134+
135+
# fail if all threads did not complete
136+
if not completed == len(self.threads):
137+
raise Exception, "Not all mongodump threads completed successfully!", None
133138

134139
logging.info("All mongodump backups completed")
135140

0 commit comments

Comments
 (0)