Skip to content

Commit 26939b8

Browse files
authored
Merge branch 'master' into 1.2.0-ssl-pymongo
2 parents b6172a8 + ea53998 commit 26939b8

File tree

16 files changed

+312
-28
lines changed

16 files changed

+312
-28
lines changed

README.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Features
3131
archiving method (*optional*)
3232
- `AWS S3 <https://aws.amazon.com/s3/>`__ Secure Multipart backup uploads (*optional*)
3333
- `Google Cloud Storage <https://cloud.google.com/storage/>`__ Secure backup uploads (*optional*)
34+
- Rsync (over SSH) secure backup uploads (*optional*)
3435
- `Nagios NSCA <https://sourceforge.net/p/nagios/nsca>`__ push
3536
notification support (*optional*)
3637
- Modular backup, archiving, upload and notification components
@@ -78,6 +79,7 @@ To build an CentOS/RedHat RPM of the tool *(recommended)*:
7879
::
7980

8081
$ cd /path/to/mongodb_consistent_backup
82+
$ sudo yum install -y rpm-build
8183
$ make rpm
8284

8385
To build and install from source *(to default '/usr/local/bin/mongodb-consistent-backup')*:
@@ -221,6 +223,7 @@ Roadmap
221223
- Backup retention/rotation *(eg: delete old backups)*
222224
- Support more notification methods *(Prometheus, PagerDuty, etc)*
223225
- Support more upload methods *(Rsync, etc)*
226+
- Support SSL MongoDB connections
224227
- Documentation for running under Docker with persistent volumes
225228
- Python unit tests
226229

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.0
1+
1.2.0

conf/mongodb-consistent-backup.example.conf

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ production:
1717
location: /var/lib/mongodb-consistent-backup
1818
# mongodump:
1919
# binary: [path] (default: /usr/bin/mongodump)
20-
# compression: [auto|none|gzip] (default: auto - enable gzip if supported)
21-
# threads: [1-16] (default: auto-generated - shards/cpu)
20+
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
21+
# threads: [1-16] (default: auto-generated, shards/cpu)
2222
#replication:
2323
# max_lag_secs: [1+] (default: 10)
2424
# min_priority: [0-999] (default: 0)
@@ -29,18 +29,19 @@ production:
2929
# wait_secs: [1+] (default: 300)
3030
# ping_secs: [1+] (default: 3)
3131
#oplog:
32-
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
32+
# compression: [none|gzip] (default: gzip, if used by backup stage)
3333
# flush:
34-
# max_docs: 100
35-
# max_secs: 1
36-
# resolver_threads: [1+] (default: 2 per CPU)
34+
# max_docs: [1+] (default: 100)
35+
# max_secs: [1+] (default: 1)
36+
# resolver:
37+
# threads: [1+] (default: 2 per CPU)
3738
# tailer:
3839
# enabled: true
3940
# status_interval: 30
4041
archive:
4142
method: tar
4243
# tar:
43-
# compression: [none|gzip] (default: gzip - none if backup is already compressed)
44+
# compression: [none|gzip] (default: gzip, none if backup already compressed)
4445
# threads: [1+] (default: 1 per CPU)
4546
# zbackup:
4647
# binary: [path] (default: /usr/bin/zbackup)
@@ -64,7 +65,15 @@ production:
6465
# secret_key: [Google Cloud Storage Secret Key]
6566
# bucket_name: [Google Cloud Storage Bucket Name]
6667
# bucket_prefix: [prefix] (default: /)
67-
# threads: [1+] (default: 1 per CPU)
68+
# threads: [1+] (default: 4)
69+
# rsync:
70+
# path: [Rsync Destination Path]
71+
# user: [SSH Username]
72+
# host: [SSH Hostname/IP]
73+
# port: [SSH Port Number] (default: 22)
74+
# delete: [true|false] (default: false)
75+
# threads: [1+] (default: 4)
76+
# retries: [1+] (default: 5)
6877
# s3:
6978
# region: [AWS S3 Region] (default: us-east-1)
7079
# access_key: [AWS S3 Access Key]

mongodb_consistent_backup/Common/Util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import socket
22

33
from dateutil import parser
4+
from select import select
45

56
from mongodb_consistent_backup.Errors import OperationError
67

@@ -31,3 +32,23 @@ def validate_hostname(hostname):
3132
socket.getaddrinfo(hostname, None)
3233
except socket.error, e:
3334
raise OperationError("Could not resolve host '%s', error: %s" % (hostname, e))
35+
36+
37+
def wait_popen(process, stderr_callback, stdout_callback):
38+
try:
39+
while not process.returncode:
40+
poll = select([process.stderr.fileno(), process.stdout.fileno()], [], [])
41+
if len(poll) >= 1:
42+
for fd in poll[0]:
43+
if process.stderr and fd == process.stderr.fileno():
44+
stderr_callback(process.stderr.readline().rstrip())
45+
if process.stdout and fd == process.stdout.fileno():
46+
stdout_callback(process.stdout.readline().rstrip())
47+
if process.poll() is not None:
48+
break
49+
stderr, stdout = process.communicate()
50+
stderr_callback(stderr.rstrip())
51+
stdout_callback(stdout.rstrip())
52+
except Exception, e:
53+
raise e
54+
return True

mongodb_consistent_backup/Common/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
from Lock import Lock # NOQA
55
from MongoUri import MongoUri # NOQA
66
from Timer import Timer # NOQA
7-
from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA
7+
from Util import config_to_string, is_datetime, parse_method, validate_hostname, wait_popen # NOQA

mongodb_consistent_backup/Upload/Gs/Gs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2727
super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
2828
self.backup_location = self.config.backup.location
2929
self.remove_uploaded = self.config.upload.remove_uploaded
30+
self.retries = self.config.upload.retries
3031
self.project_id = self.config.upload.gs.project_id
3132
self.access_key = self.config.upload.gs.access_key
3233
self.secret_key = self.config.upload.gs.secret_key
3334
self.bucket = self.config.upload.gs.bucket
3435

35-
self.threads(self.config.upload.gs.threads)
36+
self.threads(self.config.upload.threads)
3637
self._pool = Pool(processes=self.threads())
3738

3839
def close(self):
@@ -69,7 +70,8 @@ def run(self):
6970
self.project_id,
7071
self.access_key,
7172
self.secret_key,
72-
self.remove_uploaded
73+
self.remove_uploaded,
74+
self.retries
7375
).run)
7476
self._pool.close()
7577
self._pool.join()

mongodb_consistent_backup/Upload/Gs/GsUploadThread.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
class GsUploadThread:
10-
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False):
10+
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False, retries=5):
1111
self.backup_dir = backup_dir
1212
self.file_path = file_path
1313
self.gs_path = gs_path
@@ -16,6 +16,7 @@ def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_ke
1616
self.access_key = access_key
1717
self.secret_key = secret_key
1818
self.remove_uploaded = remove_uploaded
19+
self.retries = retries
1920

2021
self.path = "%s/%s" % (self.bucket, self.gs_path)
2122
self.meta_data_dir = "mongodb_consistent_backup-META"
@@ -76,10 +77,21 @@ def run(self):
7677
logging.debug("Path %s does not exist, uploading" % self.path)
7778

7879
try:
79-
f = open(self.file_path, 'r')
80-
uri = self.get_uri()
81-
logging.info("Uploading %s to Google Cloud Storage" % self.path)
82-
uri.new_key().set_contents_from_file(f)
80+
f = open(self.file_path, 'r')
81+
uri = self.get_uri()
82+
retry = 0
83+
error = None
84+
while retry < self.retries:
85+
try:
86+
logging.info("Uploading %s to Google Cloud Storage (attempt %i/%i)" % (self.path, retry, self.retries))
87+
uri.new_key().set_contents_from_file(f)
88+
except Exception, e:
89+
logging.error("Received error for Google Cloud Storage upload of %s: %s" % (self.path, e))
90+
error = e
91+
retry += 1
92+
continue
93+
if retry >= self.retries and error:
94+
raise error
8395
finally:
8496
if f:
8597
f.close()

mongodb_consistent_backup/Upload/Gs/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ def config(parser):
77
parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str)
88
parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str)
99
parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str)
10-
parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int)
1110
return parser
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import os
2+
import logging
3+
import re
4+
5+
from copy_reg import pickle
6+
from multiprocessing import Pool
7+
from subprocess import check_output
8+
from types import MethodType
9+
10+
from RsyncUploadThread import RsyncUploadThread
11+
12+
from mongodb_consistent_backup.Common import config_to_string
13+
from mongodb_consistent_backup.Errors import OperationError
14+
from mongodb_consistent_backup.Pipeline import Task
15+
16+
17+
# Allows pooled .apply_async()s to work on Class-methods:
18+
def _reduce_method(m):
19+
if m.im_self is None:
20+
return getattr, (m.im_class, m.im_func.func_name)
21+
else:
22+
return getattr, (m.im_self, m.im_func.func_name)
23+
24+
25+
pickle(MethodType, _reduce_method)
26+
27+
28+
class Rsync(Task):
29+
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
30+
super(Rsync, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
31+
self.backup_location = self.config.backup.location
32+
self.backup_name = self.config.backup.name
33+
self.remove_uploaded = self.config.upload.remove_uploaded
34+
self.rsync_path = self.config.upload.rsync.path
35+
self.rsync_user = self.config.upload.rsync.user
36+
self.rsync_host = self.config.upload.rsync.host
37+
self.rsync_port = self.config.upload.rsync.port
38+
self.rsync_ssh_key = self.config.upload.rsync.ssh_key
39+
self.retries = self.config.upload.rsync.retries
40+
self.thread_count = self.config.upload.rsync.threads
41+
self.rsync_binary = "rsync"
42+
43+
self.rsync_flags = ["--archive", "--compress"]
44+
self.rsync_version = None
45+
self._rsync_info = None
46+
47+
self._pool = Pool(processes=self.threads())
48+
49+
def init(self):
50+
if not self.host_has_rsync():
51+
raise OperationError("Cannot find rsync binary on this host!")
52+
if not os.path.isdir(self.backup_dir):
53+
logging.error("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
54+
raise OperationError("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
55+
56+
def rsync_info(self):
57+
if not self._rsync_info:
58+
output = check_output([self.rsync_binary, "--version"])
59+
search = re.search("^rsync\s+version\s([0-9.-]+)\s+protocol\sversion\s(\d+)", output)
60+
self.rsync_version = search.group(1)
61+
self._rsync_info = {"version": self.rsync_version, "protocol_version": int(search.group(2))}
62+
return self._rsync_info
63+
64+
def host_has_rsync(self):
65+
if self.rsync_info():
66+
return True
67+
return False
68+
69+
def prepare_dest_dir(self):
70+
# mkdir -p the rsync dest path via ssh
71+
ssh_mkdir_cmd = ["ssh"]
72+
if self.rsync_ssh_key:
73+
ssh_mkdir_cmd.extend(["-i", self.rsync_ssh_key])
74+
ssh_mkdir_cmd.extend([
75+
"%s@%s" % (self.rsync_user, self.rsync_host),
76+
"mkdir", "-p", self.base_dir
77+
])
78+
79+
# run the mkdir via ssh
80+
try:
81+
check_output(ssh_mkdir_cmd)
82+
except Exception, e:
83+
logging.error("Creating rsync dest path with ssh failed for %s: %s" % (
84+
self.rsync_host,
85+
e
86+
))
87+
raise e
88+
89+
return True
90+
91+
def done(self, data):
92+
logging.info(data)
93+
94+
def run(self):
95+
try:
96+
self.init()
97+
self.timer.start(self.timer_name)
98+
99+
logging.info("Preparing destination path on %s" % self.rsync_host)
100+
self.prepare_dest_dir()
101+
102+
rsync_config = {
103+
"dest": "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.rsync_path),
104+
"threads": self.threads(),
105+
"retries": self.retries
106+
}
107+
rsync_config.update(self.rsync_info())
108+
logging.info("Starting upload using rsync version %s (%s)" % (
109+
self.rsync_info()['version'],
110+
config_to_string(rsync_config)
111+
))
112+
for child in os.listdir(self.backup_dir):
113+
self._pool.apply_async(RsyncUploadThread(
114+
os.path.join(self.backup_dir, child),
115+
self.base_dir,
116+
self.rsync_flags,
117+
self.rsync_path,
118+
self.rsync_user,
119+
self.rsync_host,
120+
self.rsync_port,
121+
self.rsync_ssh_key,
122+
self.remove_uploaded,
123+
self.retries
124+
).run, callback=self.done)
125+
self.wait()
126+
except Exception, e:
127+
logging.error("Rsync upload failed! Error: %s" % e)
128+
raise OperationError(e)
129+
finally:
130+
self.timer.stop(self.timer_name)
131+
self.completed = True
132+
133+
def wait(self):
134+
if self._pool:
135+
logging.info("Waiting for Rsync upload threads to stop")
136+
self._pool.close()
137+
self._pool.join()
138+
139+
def close(self):
140+
if self._pool:
141+
logging.error("Stopping Rsync upload threads")
142+
self._pool.terminate()
143+
self._pool.join()

0 commit comments

Comments
 (0)