Skip to content

Commit 740fd8d

Browse files
Fixes for problems when sending Ctrl+C
1 parent 1bd589e commit 740fd8d

File tree

8 files changed

+116
-103
lines changed

8 files changed

+116
-103
lines changed

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2626
self.backup_base_dir = backup_dir
2727
self.verbose = self.config.verbose
2828
self.binary = "tar"
29-
self._pool = None
29+
30+
self._pool = None
31+
self.stopped = False
32+
self.completed = False
3033

3134
def compression(self, method=None):
3235
if method:
@@ -72,10 +75,12 @@ def run(self):
7275
raise Error(e)
7376
self._pool.close()
7477
self._pool.join()
78+
self.completed = True
7579

7680
def close(self):
7781
logging.debug("Stopping tar archiving threads")
78-
if self._pool is not None:
82+
if not self.stopped and self._pool is not None:
7983
self._pool.terminate()
8084
self._pool.join()
8185
logging.info("Stopped all tar archiving threads")
86+
self.stopped = True

mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
4040
signal(SIGINT, SIG_IGN)
4141
signal(SIGINT, self.close)
4242

43+
self.completed = False
4344
self.timer_name = self.__class__.__name__
4445
self.threads_per_dump_max = 16
4546
self.config_replset = False
@@ -174,6 +175,7 @@ def run(self):
174175
self.threads[0].start()
175176
self.wait()
176177

178+
self.completed = True
177179
return self._summary
178180

179181
def close(self):

mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ def run(self):
105105
self._process = Popen(mongodump_cmd, stderr=PIPE)
106106
self.wait()
107107
self.exit_code = self._process.returncode
108+
if self.exit_code > 0:
109+
sys.exit(self.exit_code)
108110
except Exception, e:
109111
logging.exception("Error performing mongodump: %s" % e)
110112

111-
oplog = Oplog(self.oplog_file, self.dump_gzip)
112-
oplog.load()
113+
try:
114+
oplog = Oplog(self.oplog_file, self.dump_gzip)
115+
oplog.load()
116+
except Exception, e:
117+
logging.exception("Error loading oplog: %s" % e)
113118

114119
self.state.set('running', False)
115120
self.state.set('completed', True)
@@ -122,5 +127,3 @@ def run(self):
122127
if oplog.last_ts():
123128
log_msg_extra = "%s, end ts: %s" % (log_msg_extra, oplog.last_ts())
124129
logging.info("Backup %s completed in %.2f seconds, %s" % (self.uri, self.timer.duration(self.timer_name), log_msg_extra))
125-
126-
sys.exit(self.exit_code)

mongodb_consistent_backup/Common/Timer.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,25 @@ def start(self, timer_name):
1111
self.timers[timer_name] = { 'start': time(), 'started': True }
1212

1313
def stop(self, timer_name):
14-
if timer_name in self.timers and 'started' in self.timers[timer_name]:
15-
timer = self.timers.copy()[timer_name]
16-
del timer['started']
17-
timer['end'] = time()
18-
timer['stopped'] = True
19-
timer['duration'] = timer['end'] - timer['start']
20-
self.timers[timer_name] = timer
21-
else:
22-
raise OperationError("No started timer named %s to stop!" % timer_name)
14+
try:
15+
if timer_name in self.timers and 'started' in self.timers[timer_name]:
16+
timer = self.timers.copy()[timer_name]
17+
del timer['started']
18+
timer['end'] = time()
19+
timer['stopped'] = True
20+
timer['duration'] = timer['end'] - timer['start']
21+
self.timers[timer_name] = timer
22+
else:
23+
raise OperationError("No started timer named %s to stop!" % timer_name)
24+
except IOError:
25+
pass
2326

2427
def duration(self, timer_name):
25-
if timer_name in self.timers and 'duration' in self.timers[timer_name]:
26-
return self.timers[timer_name]['duration']
27-
else:
28+
try:
29+
if timer_name in self.timers and 'duration' in self.timers[timer_name]:
30+
return self.timers[timer_name]['duration']
31+
return 0
32+
except IOError:
2833
return 0
2934

3035
def dump(self, timer_name=None):

mongodb_consistent_backup/Notify/Notify.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir):
99
super(Notify, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
1010
self.method = self.config.notify.method
1111

12+
self.completed = False
1213
self.notifications = []
1314
self.init()
1415

@@ -30,6 +31,7 @@ def run(self, *args):
3031
except:
3132
continue
3233
self.timers.stop(self.stage)
34+
self.completed = True
3335

3436
def close(self):
3537
if self._method:

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,13 @@ def threads(self, threads=None):
6565
return int(self.config.oplog.resolver.threads)
6666

6767
def get_consistent_end_ts(self):
68-
end_ts = None
69-
min_tailed_ts = None
70-
max_backup_ts = None
68+
ts = None
7169
for shard in self.tailed_oplogs:
7270
instance = self.tailed_oplogs[shard]
7371
if 'last_ts' in instance and instance['last_ts'] is not None:
74-
if min_tailed_ts is None or instance['last_ts'].time < min_tailed_ts.time:
75-
min_tailed_ts = instance['last_ts']
76-
end_ts = min_tailed_ts
77-
for shard in self.backup_oplogs:
78-
instance = self.backup_oplogs[shard]
79-
if 'last_ts' in instance and instance['last_ts'] is not None:
80-
if max_backup_ts is None or instance['last_ts'].time > max_backup_ts.time:
81-
max_backup_ts = instance['last_ts']
82-
if max_backup_ts:
83-
end_ts = Timestamp(max_backup_ts.time + 1, 0)
84-
if end_ts > min_tailed_ts:
85-
return Error("Backup maximum end time is greater than tailed oplog minimum end time. This should not happen!")
86-
return end_ts
72+
if ts is None or instance['last_ts'].time < ts.time:
73+
ts = Timestamp(instance['last_ts'].time, 0)
74+
return ts
8775

8876
def run(self):
8977
logging.info("Resolving oplogs (options: threads=%s,compression=%s)" % (self.threads(), self.compression()))

mongodb_consistent_backup/Pipeline/Stage.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ def __init__(self, stage_name, manager, config, timers, base_dir, backup_dir, **
1515
self.backup_dir = backup_dir
1616
self.args = kwargs
1717

18-
self.running = False
19-
self.stopped = False
18+
self.running = False
19+
self.stopped = False
20+
self.completed = False
2021

2122
self.stage = "mongodb_consistent_backup.%s" % self.stage_name
2223
self.module = None
@@ -72,14 +73,19 @@ def run(self):
7273
try:
7374
self.timers.start(self.stage)
7475
self.running = True
75-
logging.info("Running backup stage %s with method: %s" % (self.stage, self.method.capitalize()))
76+
logging.info("Running stage %s with method: %s" % (self.stage, self.method.capitalize()))
7677
data = self._method.run()
7778
except Exception, e:
7879
raise OperationError(e)
7980
finally:
8081
self.running = False
8182
self.stopped = True
8283
self.timers.stop(self.stage)
83-
logging.info("Completed running backup stage %s with method %s in %.2f seconds" % (self.stage, self.method.capitalize(), self.timers.duration(self.stage)))
84+
if self._method.completed:
85+
logging.info("Completed running stage %s with method %s in %.2f seconds" % (self.stage, self.method.capitalize(), self.timers.duration(self.stage)))
86+
self.completed = True
87+
else:
88+
logging.error("Stage %s did not complete!" % self.stage)
89+
raise OperationError("Stage %s did not complete!" % self.stage)
8490
self.close()
8591
return data

mongodb_consistent_backup/Upload/S3/S3.py

Lines changed: 67 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(self, config, source_dir, key_prefix):
3636
self.chunk_size_mb = self.config.upload.s3.chunk_size_mb
3737
self.chunk_size = self.chunk_size_mb * 1024 * 1024
3838

39+
self.completed = False
3940
self.timer_name = self.__class__.__name__
4041
self._pool = None
4142
self._multipart = None
@@ -51,74 +52,75 @@ def __init__(self, config, source_dir, key_prefix):
5152
def run(self):
5253
if not os.path.isdir(self.source_dir):
5354
logging.error("The source directory: %s does not exist or is not a directory! Skipping AWS S3 Upload!" % self.source_dir)
54-
else:
55-
try:
56-
self.timer.start(self.timer_name)
57-
for file_name in os.listdir(self.source_dir):
58-
if self.bucket_prefix == "/":
59-
key_name = "/%s/%s" % (self.key_prefix, file_name)
60-
else:
61-
key_name = "%s/%s/%s" % (self.bucket_prefix, self.key_prefix, file_name)
62-
63-
file_path = os.path.join(self.source_dir, file_name)
64-
file_size = os.stat(file_path).st_size
65-
chunk_count = int(ceil(file_size / float(self.chunk_size)))
66-
67-
logging.info("Starting multipart AWS S3 upload to key: %s%s using %i threads, %imb chunks, %i retries" % (
55+
return
56+
try:
57+
self.timer.start(self.timer_name)
58+
for file_name in os.listdir(self.source_dir):
59+
if self.bucket_prefix == "/":
60+
key_name = "/%s/%s" % (self.key_prefix, file_name)
61+
else:
62+
key_name = "%s/%s/%s" % (self.bucket_prefix, self.key_prefix, file_name)
63+
64+
file_path = os.path.join(self.source_dir, file_name)
65+
file_size = os.stat(file_path).st_size
66+
chunk_count = int(ceil(file_size / float(self.chunk_size)))
67+
68+
logging.info("Starting multipart AWS S3 upload to key: %s%s using %i threads, %imb chunks, %i retries" % (
69+
self.bucket_name,
70+
key_name,
71+
self.thread_count,
72+
self.chunk_size_mb,
73+
self.retries
74+
))
75+
self._multipart = self.bucket.initiate_multipart_upload(key_name)
76+
self._pool = Pool(processes=self.thread_count)
77+
78+
for i in range(chunk_count):
79+
offset = self.chunk_size * i
80+
byte_count = min(self.chunk_size, file_size - offset)
81+
part_num = i + 1
82+
self._pool.apply_async(S3UploadThread(
6883
self.bucket_name,
69-
key_name,
70-
self.thread_count,
71-
self.chunk_size_mb,
72-
self.retries
73-
))
74-
self._multipart = self.bucket.initiate_multipart_upload(key_name)
75-
self._pool = Pool(processes=self.thread_count)
76-
77-
for i in range(chunk_count):
78-
offset = self.chunk_size * i
79-
byte_count = min(self.chunk_size, file_size - offset)
80-
part_num = i + 1
81-
self._pool.apply_async(S3UploadThread(
82-
self.bucket_name,
83-
self.access_key,
84-
self.secret_key,
85-
self.s3_host,
86-
self._multipart.id,
87-
part_num,
88-
file_path,
89-
offset,
90-
byte_count,
91-
self.retries,
92-
self.secure
93-
).run)
94-
self._pool.close()
95-
self._pool.join()
96-
97-
if len(self._multipart.get_all_parts()) == chunk_count:
98-
self._multipart.complete_upload()
99-
key = self.bucket.get_key(key_name)
100-
key.set_acl(self.s3_acl)
101-
self._upload_done = True
102-
103-
if self.remove_uploaded:
104-
logging.info("Uploaded AWS S3 key: %s%s successfully. Removing local file" % (self.bucket_name, key_name))
105-
os.remove(os.path.join(self.source_dir, file_name))
106-
else:
107-
logging.info("Uploaded AWS S3 key: %s%s successfully" % (self.bucket_name, key_name))
84+
self.access_key,
85+
self.secret_key,
86+
self.s3_host,
87+
self._multipart.id,
88+
part_num,
89+
file_path,
90+
offset,
91+
byte_count,
92+
self.retries,
93+
self.secure
94+
).run)
95+
self._pool.close()
96+
self._pool.join()
97+
98+
if len(self._multipart.get_all_parts()) == chunk_count:
99+
self._multipart.complete_upload()
100+
key = self.bucket.get_key(key_name)
101+
key.set_acl(self.s3_acl)
102+
self._upload_done = True
103+
104+
if self.remove_uploaded:
105+
logging.info("Uploaded AWS S3 key: %s%s successfully. Removing local file" % (self.bucket_name, key_name))
106+
os.remove(os.path.join(self.source_dir, file_name))
108107
else:
109-
self._multipart.cancel_upload()
110-
logging.error("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self.bucket_name, key_name))
111-
raise OperationError("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self.bucket_name, key_name))
112-
113-
if self.remove_uploaded:
114-
logging.info("Removing backup source dir after successful AWS S3 upload of all backups")
115-
os.rmdir(self.source_dir)
116-
self.timer.stop(self.timer_name)
117-
except Exception, e:
118-
logging.error("Uploading to AWS S3 failed! Error: %s" % e)
119-
if self._multipart:
108+
logging.info("Uploaded AWS S3 key: %s%s successfully" % (self.bucket_name, key_name))
109+
else:
120110
self._multipart.cancel_upload()
121-
raise OperationError(e)
111+
logging.error("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self.bucket_name, key_name))
112+
raise OperationError("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self.bucket_name, key_name))
113+
114+
if self.remove_uploaded:
115+
logging.info("Removing backup source dir after successful AWS S3 upload of all backups")
116+
os.rmdir(self.source_dir)
117+
self.timer.stop(self.timer_name)
118+
except Exception, e:
119+
logging.error("Uploading to AWS S3 failed! Error: %s" % e)
120+
if self._multipart:
121+
self._multipart.cancel_upload()
122+
raise OperationError(e)
123+
self.completed = True
122124

123125
def close(self):
124126
if self._pool:

0 commit comments

Comments
 (0)