Skip to content

Commit 939ad01

Browse files
Use multiprocessing.Pool callback= to track thread state
1 parent 748037f commit 939ad01

File tree

4 files changed

+60
-35
lines changed

4 files changed

+60
-35
lines changed

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from copy_reg import pickle
55
from multiprocessing import Pool, TimeoutError
6+
from time import sleep
67
from types import MethodType
78

89
from TarThread import TarThread
@@ -30,20 +31,23 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
3031
self._pool = None
3132
self._pooled = []
3233

34+
def done(self, done_dir):
35+
if done_dir in self._pooled:
36+
logging.debug("Archiving completed for: %s" % done_dir)
37+
self._pooled.remove(done_dir)
38+
else:
39+
raise OperationError("Unexpected response from tar thread: %s" % done_dir)
40+
3341
def wait(self):
3442
if len(self._pooled) > 0:
3543
self._pool.close()
36-
logging.debug("Waiting for tar threads to stop")
37-
while len(self._pooled) > 0:
38-
try:
39-
item = self._pooled[0]
40-
path, result = item
41-
result.get(1)
42-
logging.debug("Archiving completed for directory: %s" % path)
43-
self._pooled.remove(item)
44-
except TimeoutError:
45-
continue
44+
while len(self._pooled):
45+
logging.debug("Waiting for %i tar thread(s) to stop" % len(self._pooled))
46+
sleep(2)
47+
self._pool.terminate()
48+
logging.debug("Stopped all tar threads")
4649
self.stopped = True
50+
self.running = False
4751

4852
def run(self):
4953
try:
@@ -64,20 +68,18 @@ def run(self):
6468
output_file = "%s.tar" % subdir_name
6569
if self.do_gzip():
6670
output_file = "%s.tgz" % subdir_name
67-
result = self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run)
68-
self._pooled.append((subdir_name, result))
71+
self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run, callback=self.done)
72+
self._pooled.append(subdir_name)
6973
except Exception, e:
7074
self._pool.terminate()
7175
logging.fatal("Could not create tar archiving thread! Error: %s" % e)
7276
raise Error(e)
73-
finally:
74-
self.wait()
77+
self.wait()
7578
self.completed = True
7679

7780
def close(self, code=None, frame=None):
78-
logging.debug("Stopping tar archiving threads")
7981
if not self.stopped and self._pool is not None:
82+
logging.debug("Stopping tar archiving threads")
8083
self._pool.terminate()
81-
self._pool.join()
8284
logging.info("Stopped all tar archiving threads")
8385
self.stopped = True

mongodb_consistent_backup/Archive/Tar/TarThread.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ def run(self):
5050
else:
5151
logging.fatal("Output file: %s already exists!" % self.output_file)
5252
sys.exit(1)
53+
return self.backup_dir

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
# noinspection PyPackageRequirements
55
from bson.timestamp import Timestamp
66
from copy_reg import pickle
7-
from multiprocessing import Pool
7+
from multiprocessing import Pool, TimeoutError
8+
from time import sleep
89
from types import MethodType
910

1011
from ResolverThread import ResolverThread
@@ -34,16 +35,20 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs,
3435
self.resolver_summary = {}
3536
self.resolver_state = {}
3637

38+
self._pool = None
39+
self._pooled = []
3740
try:
3841
self._pool = Pool(processes=self.threads(None, 2))
3942
except Exception, e:
4043
logging.fatal("Could not start oplog resolver pool! Error: %s" % e)
4144
raise Error(e)
4245

43-
def close(self):
44-
if self._pool:
46+
def close(self, code=None, frame=None):
47+
if self._pool and not self.stopped:
48+
logging.debug("Stopping all oplog resolver threads")
4549
self._pool.terminate()
46-
self._pool.join()
50+
logging.info("Stopped all oplog resolver threads")
51+
self.stopped = True
4752

4853
def get_consistent_end_ts(self):
4954
ts = None
@@ -54,8 +59,26 @@ def get_consistent_end_ts(self):
5459
ts = Timestamp(instance['last_ts'].time, 0)
5560
return ts
5661

62+
def done(self, done_uri):
63+
if done_uri in self._pooled:
64+
logging.debug("Resolving completed for: %s" % done_uri)
65+
self._pooled.remove(done_uri)
66+
else:
67+
raise OperationError("Unexpected response from resolver thread: %s" % done_uri)
68+
69+
def wait(self):
70+
if len(self._pooled) > 0:
71+
self._pool.close()
72+
while len(self._pooled):
73+
logging.debug("Waiting for %i oplog resolver thread(s) to stop" % len(self._pooled))
74+
sleep(2)
75+
self._pool.terminate()
76+
logging.debug("Stopped all oplog resolve threads")
77+
self.stopped = True
78+
self.running = False
79+
5780
def run(self):
58-
logging.info("Resolving oplogs (options: threads=%s,compression=%s)" % (self.threads(), self.compression()))
81+
logging.info("Resolving oplogs (options: threads=%s, compression=%s)" % (self.threads(), self.compression()))
5982
self.timer.start(self.timer_name)
6083

6184
for shard in self.backup_oplogs:
@@ -80,14 +103,15 @@ def run(self):
80103
backup_oplog.copy(),
81104
self.get_consistent_end_ts(),
82105
self.do_gzip()
83-
).run)
106+
).run, callback=self.done)
107+
self._pooled.append(uri.str())
84108
except Exception, e:
85109
logging.fatal("Resolve failed for %s! Error: %s" % (uri, e))
86110
raise Error(e)
87111
else:
88112
logging.info("No tailed oplog for host %s" % uri)
89-
self._pool.close()
90-
self._pool.join()
113+
self.wait()
114+
self.completed = True
91115

92116
self.timer.stop(self.timer_name)
93117
logging.info("Oplog resolving completed in %.2f seconds" % self.timer.duration(self.timer_name))

mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,7 @@ def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, dump_g
1818

1919
self.oplogs = {}
2020
self.changes = 0
21-
22-
def cleanup(self):
23-
if 'tailed' in self.oplogs:
24-
self.oplogs['tailed'].close()
25-
del self.oplogs['tailed']
26-
if 'file' in self.tailed_oplog and os.path.isfile(self.tailed_oplog['file']):
27-
os.remove(self.tailed_oplog['file'])
21+
self.stopped = True
2822

2923
def run(self):
3024
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.dump_gzip, 'a+')
@@ -55,12 +49,16 @@ def run(self):
5549
self.close()
5650

5751
if self.exit_code == 0:
58-
logging.info("Applied %i oplog changes to %s oplog, end ts: %s" % (self.changes, self.uri, self.mongodump_oplog_h.last_ts()))
52+
logging.info("Applied %i oplog changes to %s oplog, end ts: %s" % (self.changes, self.uri, self.last_ts))
53+
return self.uri.str()
5954

6055
def close(self):
61-
self.cleanup()
62-
if len(self.oplogs) > 0:
56+
if len(self.oplogs) > 0 and not self.stopped:
57+
logging.debug("Closing oplog file handles")
6358
for oplog in self.oplogs:
6459
self.oplogs[oplog].flush()
6560
self.oplogs[oplog].close()
66-
del self.oplogs[oplog]
61+
self.stopped = True
62+
if 'file' in self.tailed_oplog and os.path.isfile(self.tailed_oplog['file']):
63+
logging.debug("Removing temporary/tailed oplog file: %s" % self.tailed_oplog['file'])
64+
os.remove(self.tailed_oplog['file'])

0 commit comments

Comments
 (0)