Skip to content

Commit 09ef5fd

Browse files
cleaner handling of oplogs in resolver thread
1 parent d06d2e4 commit 09ef5fd

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,49 @@ def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, dump_g
1818
self.max_end_ts = max_end_ts
1919
self.dump_gzip = dump_gzip
2020

21+
self.oplogs = {}
2122
self.changes = 0
2223

2324
def run(self):
24-
self.mongodump_oplog_h = Oplog(self.mongodump_oplog['file'], self.dump_gzip, 'a+')
25-
self.tailed_oplog_fh = Oplog(self.tailed_oplog['file'], self.dump_gzip)
25+
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.dump_gzip, 'a+')
26+
self.oplogs['tailed'] = Oplog(self.tailed_oplog['file'], self.dump_gzip)
2627

2728
logging.info("Resolving oplog for %s to max ts: %s" % (self.uri, self.max_end_ts))
2829
try:
2930
self.state.set('running', True)
3031
self.state.set('first_ts', self.mongodump_oplog['first_ts'])
3132
if not self.state.get('first_ts'):
3233
self.state.set('first_ts', self.tailed_oplog['first_ts'])
33-
for change in decode_file_iter(self.tailed_oplog_fh):
34+
for change in decode_file_iter(self.oplogs['tailed']):
3435
self.last_ts = change['ts']
3536
if not self.mongodump_oplog['last_ts'] or self.last_ts > self.mongodump_oplog['last_ts']:
3637
if self.last_ts < self.max_end_ts:
37-
self.mongodump_oplog_h.add(change)
38+
self.oplogs['backup'].add(change)
3839
self.changes += 1
3940
elif self.last_ts > self.max_end_ts:
4041
break
41-
self.tailed_oplog_fh.close()
42-
self.mongodump_oplog_h.flush()
43-
self.mongodump_oplog_h.close()
4442

4543
# remove temporary tailed oplog
44+
self.oplogs['tailed'].close()
4645
os.remove(self.tailed_oplog['file'])
46+
del self.oplogs['tailed']
4747

4848
self.state.set('count', self.mongodump_oplog['count'] + self.changes)
4949
self.state.set('last_ts', self.last_ts)
5050
self.state.set('running', False)
51+
self.exit_code = 0
5152
except Exception, e:
5253
logging.exception("Resolving of oplogs failed! Error: %s" % e)
53-
sys.exit(1)
54+
finally:
55+
self.close()
5456

55-
logging.info("Applied %i oplog changes to %s oplog, end ts: %s" % (self.changes, self.uri, self.mongodump_oplog_h.last_ts()))
57+
if self.exit_code == 0:
58+
logging.info("Applied %i oplog changes to %s oplog, end ts: %s" % (self.changes, self.uri, self.mongodump_oplog_h.last_ts()))
59+
sys.exit(self.exit_code)
60+
61+
def close(self):
62+
if len(self.oplogs) > 0:
63+
for oplog in self.oplogs:
64+
self.oplogs[oplog].flush()
65+
self.oplogs[oplog].close()
66+
del self.oplogs[oplog]

0 commit comments

Comments
 (0)