Skip to content

Commit 34818a2

Browse files
1.1.0 Release (#188)
* Only apply quorum checking to voting members * Only apply quorum checking to electable members (not arbiter or priority=0) * 'make docker' must depend on the bin/mongodb-consistent-backup Makefile-step (#167) * Fix issue #163, move to OperationError() (#168) * Remove 'secondary_count' logic, only count electable nodes for new quorum logic (#169) * 1.0.4: Print more mongodump details (#170) * Print more details about mongodump * Version must be set first * Check if version unknown * 1.0.4: Ensure Resolver consistent timestamp is not before end of mongodump (#164) (#171) * Don't return an consistent_end_ts that is earlier than the last_ts of mongodump oplogs * Some code cleanup and fixed rounding-up logic * 1.0.4: Support handling of TailThread cursor/connection failures (#174) Support better handling/logging of TailThread cursor/connection failures Fsync the tailed oplog to disk on time or doc-writes thresholds. Pass 'backup_stop' event to threads to signal failure to other child threads. Allow oplog tailing to be disabled. Added script to simulate failure of cursor/query. * Fix disabling of oplog tailer * Fix disabling of oplog tailer (#175) * 1.0.4: Mongodump handle error (#176) * Handle mongodump's "Failed: ..." messages as a proper error * 1.0.4: Support Google Cloud Storage upload v1 (#178) * 1.1.0 VERSION (#180) * Readme gcs (#181) * Readme gcs 2 (#182) * 1.1.0 requirements.txt update (#183) * Update Fabric and boto versions * 1.1.0 repllag default 10s (#184) * Raise default repl lag max to 10s, 5s has shown to be too low on many hosts. 3.4 adds a no-op oplog update every 10s so this seems to align nicely with it * 1.1.0 gcs upload threading (#185)
1 parent 90e7f2f commit 34818a2

32 files changed

+614
-195
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ rpm: bin/mongodb-consistent-backup
3434
cp -f $(PWD)/conf/mongodb-consistent-backup.example.conf build/rpm/SOURCES/mongodb-consistent-backup.conf
3535
rpmbuild -D "_topdir $(PWD)/build/rpm" -D "version $(VERSION)" -bb scripts/$(NAME).spec
3636

37-
docker:
37+
docker: bin/mongodb-consistent-backup
3838
docker build --no-cache --tag $(DOCKER_TAG) --build-arg "RELEASE=$(VERSION)" .
3939

4040
clean:

README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Features
2424
- Block de-duplication and optional AES encryption at rest via `ZBackup <http://zbackup.org/>`__
2525
archiving method (*optional*)
2626
- `AWS S3 <https://aws.amazon.com/s3/>`__ Secure Multipart backup uploads (*optional*)
27+
- `Google Cloud Storage <https://cloud.google.com/storage/>`__ Secure backup uploads (*optional*)
2728
- `Nagios NSCA <https://sourceforge.net/p/nagios/nsca>`__ push
2829
notification support (*optional*)
2930
- Modular backup, archiving, upload and notification components
@@ -212,7 +213,7 @@ Roadmap
212213
- Upload compatibility for ZBackup archive phase *(upload unsupported today)*
213214
- Backup retention/rotation *(eg: delete old backups)*
214215
- Support more notification methods *(Prometheus, PagerDuty, etc)*
215-
- Support more upload methods *(Google Cloud Storage, Rsync, etc)*
216+
- Support more upload methods *(Rsync, etc)*
216217
- Support SSL MongoDB connections
217218
- Documentation for running under Docker with persistent volumes
218219
- Python unit tests

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.3
1+
1.1.0

conf/mongodb-consistent-backup.example.conf

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ production:
1414
# compression: [auto|none|gzip] (default: auto - enable gzip if supported)
1515
# threads: [1-16] (default: auto-generated - shards/cpu)
1616
#replication:
17-
# max_lag_secs: [1+] (default: 5)
17+
# max_lag_secs: [1+] (default: 10)
1818
# min_priority: [0-999] (default: 0)
1919
# max_priority: [2-1000] (default: 1000)
2020
# hidden_only: [true|false] (default: false)
@@ -24,8 +24,12 @@ production:
2424
# ping_secs: [1+] (default: 3)
2525
#oplog:
2626
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
27+
# flush:
28+
# max_docs: 100
29+
# max_secs: 1
2730
# resolver_threads: [1+] (default: 2 per CPU)
2831
# tailer:
32+
# enabled: true
2933
# status_interval: 30
3034
archive:
3135
method: tar
@@ -48,6 +52,13 @@ production:
4852
upload:
4953
method: none
5054
# remove_uploaded: [true|false] (default: false)
55+
# gs:
56+
# project_id: [Google Cloud Project ID]
57+
# access_key: [Google Cloud Storage Access Key]
58+
# secret_key: [Google Cloud Storage Secret Key]
59+
# bucket_name: [Google Cloud Storage Bucket Name]
60+
# bucket_prefix: [prefix] (default: /)
61+
# threads: [1+] (default: 1 per CPU)
5162
# s3:
5263
# region: [AWS S3 Region] (default: us-east-1)
5364
# access_key: [AWS S3 Access Key]

mongodb_consistent_backup/Backup/Backup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44

55
class Backup(Stage):
6-
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
7-
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, sharding=sharding)
6+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
7+
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, backup_stop=backup_stop, sharding=sharding)
88
self.task = self.config.backup.method
99
self.init()

mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
import logging
33
import signal
44

5-
from fabric.api import hide, settings, local
65
from math import floor
76
from multiprocessing import cpu_count
7+
from subprocess import check_output
88
from time import sleep
99

10-
from mongodb_consistent_backup.Common import MongoUri
10+
from mongodb_consistent_backup.Common import MongoUri, config_to_string
1111
from mongodb_consistent_backup.Errors import Error, OperationError
1212
from mongodb_consistent_backup.Oplog import OplogState
1313
from mongodb_consistent_backup.Pipeline import Task
@@ -16,31 +16,46 @@
1616

1717

1818
class Mongodump(Task):
19-
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
19+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
2020
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
2121
self.compression_method = self.config.backup.mongodump.compression
2222
self.binary = self.config.backup.mongodump.binary
2323
self.user = self.config.username
2424
self.password = self.config.password
2525
self.authdb = self.config.authdb
2626
self.replsets = replsets
27+
self.backup_stop = backup_stop
2728
self.sharding = sharding
2829

2930
self.compression_supported = ['auto', 'none', 'gzip']
3031
self.version = 'unknown'
32+
self.version_extra = {}
3133
self.threads_max = 16
3234
self.config_replset = False
3335
self.dump_threads = []
3436
self.states = {}
3537
self._summary = {}
3638

39+
self.parse_mongodump_version()
40+
self.choose_compression()
41+
3742
if self.config.backup.mongodump.threads and self.config.backup.mongodump.threads > 0:
3843
self.threads(self.config.backup.mongodump.threads)
3944

40-
with hide('running', 'warnings'), settings(warn_only=True):
41-
self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True)
42-
43-
self.choose_compression()
45+
def parse_mongodump_version(self):
46+
if os.path.isfile(self.binary):
47+
output = check_output([self.binary, "--version"])
48+
lines = output.rstrip().split("\n")
49+
for line in lines:
50+
if "version:" in line:
51+
name, version_num = line.split(" version: ")
52+
if name == 'mongodump':
53+
self.version = version_num
54+
if '-' in version_num:
55+
self.version = version_num.split("-")[0]
56+
self.version_extra[name.lower()] = version_num
57+
return self.version, self.version_extra
58+
raise OperationError("Could not parse mongodump --version output!")
4459

4560
def choose_compression(self):
4661
if self.can_gzip():
@@ -76,6 +91,9 @@ def wait(self):
7691
start_threads = len(self.dump_threads)
7792
# wait for all threads to finish
7893
while len(self.dump_threads) > 0:
94+
if self.backup_stop and self.backup_stop.is_set():
95+
logging.error("Received backup stop event due to error(s), stopping backup!")
96+
raise OperationError("Received backup stop event due to error(s)")
7997
for thread in self.dump_threads:
8098
if not thread.is_alive():
8199
if thread.exitcode == 0:
@@ -97,7 +115,7 @@ def wait(self):
97115
def threads(self, threads=None):
98116
if threads:
99117
self.thread_count = int(threads)
100-
elif not self.thread_count:
118+
elif not self.thread_count and self.version is not 'unknown':
101119
if tuple(self.version.split(".")) >= tuple("3.2.0".split(".")):
102120
self.thread_count = 1
103121
if self.cpu_count > len(self.replsets):
@@ -120,23 +138,24 @@ def run(self):
120138
self.states[shard],
121139
mongo_uri,
122140
self.timer,
123-
self.user,
124-
self.password,
125-
self.authdb,
141+
self.config,
126142
self.backup_dir,
127-
self.binary,
128143
self.version,
129144
self.threads(),
130-
self.do_gzip(),
131-
self.verbose
145+
self.do_gzip()
132146
)
133147
self.dump_threads.append(thread)
134148

135149
if not len(self.dump_threads) > 0:
136150
raise OperationError('No backup threads started!')
137151

152+
options = {
153+
'compression': self.compression(),
154+
'threads_per_dump': self.threads()
155+
}
156+
options.update(self.version_extra)
138157
logging.info(
139-
"Starting backups using mongodump %s (options: compression=%s, threads_per_dump=%i)" % (self.version, self.compression(), self.threads()))
158+
"Starting backups using mongodump %s (options: %s)" % (self.version, config_to_string(options)))
140159
for thread in self.dump_threads:
141160
thread.start()
142161
self.wait()
@@ -152,20 +171,17 @@ def run(self):
152171
self.states['configsvr'],
153172
mongo_uri,
154173
self.timer,
155-
self.user,
156-
self.password,
157-
self.authdb,
174+
self.config,
158175
self.backup_dir,
159-
self.binary,
160176
self.version,
161177
self.threads(),
162-
self.do_gzip(),
163-
self.verbose
178+
self.do_gzip()
164179
)]
165180
self.dump_threads[0].start()
166181
self.dump_threads[0].join()
167182

168183
self.completed = True
184+
self.stopped = True
169185
return self._summary
170186

171187
def close(self):

mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,25 @@
1414

1515
# noinspection PyStringFormat
1616
class MongodumpThread(Process):
17-
def __init__(self, state, uri, timer, user, password, authdb, base_dir, binary, version,
18-
threads=0, dump_gzip=False, verbose=False):
17+
def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump_gzip=False):
1918
Process.__init__(self)
2019
self.state = state
2120
self.uri = uri
2221
self.timer = timer
23-
self.user = user
24-
self.password = password
25-
self.authdb = authdb
22+
self.config = config
2623
self.base_dir = base_dir
27-
self.binary = binary
2824
self.version = version
2925
self.threads = threads
3026
self.dump_gzip = dump_gzip
31-
self.verbose = verbose
27+
28+
self.user = self.config.username
29+
self.password = self.config.password
30+
self.authdb = self.config.authdb
31+
self.binary = self.config.backup.mongodump.binary
3232

3333
self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset)
3434
self.exit_code = 1
35+
self.error_message = None
3536
self._command = None
3637
self.do_stdin_passwd = False
3738
self.stdin_passwd_sent = False
@@ -76,6 +77,17 @@ def handle_password_prompt(self):
7677
self._process.stdin.flush()
7778
self.stdin_passwd_sent = True
7879

80+
def is_failed_line(self, line):
81+
if line and line.startswith("Failed: "):
82+
return True
83+
return False
84+
85+
def handle_failure(self, line):
86+
self.error_message = line.replace("Failed: ", "").capitalize()
87+
logging.error("Mongodump error: %s" % self.error_message)
88+
self.exit_code = 1
89+
self.close()
90+
7991
def wait(self):
8092
try:
8193
while self._process.stderr:
@@ -88,6 +100,9 @@ def wait(self):
88100
continue
89101
elif self.is_password_prompt(read):
90102
self.handle_password_prompt()
103+
elif self.is_failed_line(read):
104+
self.handle_failure(read)
105+
break
91106
else:
92107
logging.info(line)
93108
if self._process.poll() != None:

mongodb_consistent_backup/Common/Config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def to_dict(self, data):
100100
for key in data:
101101
value = self.to_dict(data[key])
102102
if value and key is not ('merge'):
103-
if key == "password":
103+
if key == "password" or key == "secret_key":
104104
value = "******"
105105
ret[key] = value
106106
return ret

mongodb_consistent_backup/Common/DB.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22

3-
from pymongo import MongoClient
3+
from bson.codec_options import CodecOptions
4+
from inspect import currentframe, getframeinfo
5+
from pymongo import DESCENDING, CursorType, MongoClient
46
from pymongo.errors import ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
57
from time import sleep
68

@@ -117,6 +119,26 @@ def replset(self):
117119
return isMaster['setName']
118120
return None
119121

122+
def get_oplog_rs(self):
123+
if not self._conn:
124+
self.connect()
125+
db = self._conn['local']
126+
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))
127+
128+
def get_oplog_tail_ts(self):
129+
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
130+
return self.get_oplog_rs().find_one(sort=[('$natural', DESCENDING)])['ts']
131+
132+
def get_oplog_cursor_since(self, caller, ts=None):
133+
frame = getframeinfo(currentframe().f_back)
134+
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
135+
if not ts:
136+
ts = self.get_oplog_tail_ts()
137+
query = {'ts':{'$gte':ts}}
138+
logging.debug("Querying oplog on %s with query: %s" % (self.uri, query))
139+
# http://api.mongodb.com/python/current/examples/tailable.html
140+
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
141+
120142
def close(self):
121143
if self._conn:
122144
logging.debug("Closing connection to: %s" % self.uri)

mongodb_consistent_backup/Common/Util.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77

88
def config_to_string(config):
9-
config_vars = ""
9+
config_pairs = []
1010
for key in config:
11-
config_vars += "%s=%s, " % (key, config[key])
12-
return config_vars[:-1]
11+
config_pairs.append("%s=%s" % (key, config[key]))
12+
return ", ".join(config_pairs)
1313

1414
def is_datetime(string):
1515
try:

0 commit comments

Comments
 (0)