Skip to content

Commit 6546ce7

Browse files
authored
Merge pull request #200 from timvaillancourt/1.2.0-rsync_v1
1.2.0: Support threaded Rsync upload method (to resolve #147)
2 parents ec37123 + bc85dfb commit 6546ce7

File tree

15 files changed

+305
-24
lines changed

15 files changed

+305
-24
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Features
3131
archiving method (*optional*)
3232
- `AWS S3 <https://aws.amazon.com/s3/>`__ Secure Multipart backup uploads (*optional*)
3333
- `Google Cloud Storage <https://cloud.google.com/storage/>`__ Secure backup uploads (*optional*)
34+
- Rsync (over SSH) secure backup uploads (*optional*)
3435
- `Nagios NSCA <https://sourceforge.net/p/nagios/nsca>`__ push
3536
notification support (*optional*)
3637
- Modular backup, archiving, upload and notification components
@@ -220,7 +221,6 @@ Roadmap
220221
- Upload compatibility for ZBackup archive phase *(upload unsupported today)*
221222
- Backup retention/rotation *(eg: delete old backups)*
222223
- Support more notification methods *(Prometheus, PagerDuty, etc)*
223-
- Support more upload methods *(Rsync, etc)*
224224
- Support SSL MongoDB connections
225225
- Documentation for running under Docker with persistent volumes
226226
- Python unit tests

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.0
1+
1.2.0

conf/mongodb-consistent-backup.example.conf

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ production:
1111
location: /var/lib/mongodb-consistent-backup
1212
# mongodump:
1313
# binary: [path] (default: /usr/bin/mongodump)
14-
# compression: [auto|none|gzip] (default: auto - enable gzip if supported)
15-
# threads: [1-16] (default: auto-generated - shards/cpu)
14+
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
15+
# threads: [1-16] (default: auto-generated, shards/cpu)
1616
#replication:
1717
# max_lag_secs: [1+] (default: 10)
1818
# min_priority: [0-999] (default: 0)
@@ -23,18 +23,19 @@ production:
2323
# wait_secs: [1+] (default: 300)
2424
# ping_secs: [1+] (default: 3)
2525
#oplog:
26-
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
26+
# compression: [none|gzip] (default: gzip, if used by backup stage)
2727
# flush:
28-
# max_docs: 100
29-
# max_secs: 1
30-
# resolver_threads: [1+] (default: 2 per CPU)
28+
# max_docs: [1+] (default: 100)
29+
# max_secs: [1+] (default: 1)
30+
# resolver:
31+
# threads: [1+] (default: 2 per CPU)
3132
# tailer:
3233
# enabled: true
3334
# status_interval: 30
3435
archive:
3536
method: tar
3637
# tar:
37-
# compression: [none|gzip] (default: gzip - none if backup is already compressed)
38+
# compression: [none|gzip] (default: gzip, none if backup already compressed)
3839
# threads: [1+] (default: 1 per CPU)
3940
# zbackup:
4041
# binary: [path] (default: /usr/bin/zbackup)
@@ -58,7 +59,15 @@ production:
5859
# secret_key: [Google Cloud Storage Secret Key]
5960
# bucket_name: [Google Cloud Storage Bucket Name]
6061
# bucket_prefix: [prefix] (default: /)
61-
# threads: [1+] (default: 1 per CPU)
62+
# threads: [1+] (default: 4)
63+
# rsync:
64+
# path: [Rsync Destination Path]
65+
# user: [SSH Username]
66+
# host: [SSH Hostname/IP]
67+
# port: [SSH Port Number] (default: 22)
68+
# delete: [true|false] (default: false)
69+
# threads: [1+] (default: 4)
70+
# retries: [1+] (default: 5)
6271
# s3:
6372
# region: [AWS S3 Region] (default: us-east-1)
6473
# access_key: [AWS S3 Access Key]

mongodb_consistent_backup/Common/Util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import socket
22

33
from dateutil import parser
4+
from select import select
45

56
from mongodb_consistent_backup.Errors import OperationError
67

@@ -31,3 +32,23 @@ def validate_hostname(hostname):
3132
socket.getaddrinfo(hostname, None)
3233
except socket.error, e:
3334
raise OperationError("Could not resolve host '%s', error: %s" % (hostname, e))
35+
36+
37+
def wait_popen(process, stderr_callback, stdout_callback):
38+
try:
39+
while not process.returncode:
40+
poll = select([process.stderr.fileno(), process.stdout.fileno()], [], [])
41+
if len(poll) >= 1:
42+
for fd in poll[0]:
43+
if process.stderr and fd == process.stderr.fileno():
44+
stderr_callback(process.stderr.readline().rstrip())
45+
if process.stdout and fd == process.stdout.fileno():
46+
stdout_callback(process.stdout.readline().rstrip())
47+
if process.poll() is not None:
48+
break
49+
stderr, stdout = process.communicate()
50+
stderr_callback(stderr.rstrip())
51+
stdout_callback(stdout.rstrip())
52+
except Exception, e:
53+
raise e
54+
return True

mongodb_consistent_backup/Common/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
from Lock import Lock # NOQA
55
from MongoUri import MongoUri # NOQA
66
from Timer import Timer # NOQA
7-
from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA
7+
from Util import config_to_string, is_datetime, parse_method, validate_hostname, wait_popen # NOQA

mongodb_consistent_backup/Upload/Gs/Gs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2727
super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
2828
self.backup_location = self.config.backup.location
2929
self.remove_uploaded = self.config.upload.remove_uploaded
30+
self.retries = self.config.upload.retries
3031
self.project_id = self.config.upload.gs.project_id
3132
self.access_key = self.config.upload.gs.access_key
3233
self.secret_key = self.config.upload.gs.secret_key
3334
self.bucket = self.config.upload.gs.bucket
3435

35-
self.threads(self.config.upload.gs.threads)
36+
self.threads(self.config.upload.threads)
3637
self._pool = Pool(processes=self.threads())
3738

3839
def close(self):
@@ -69,7 +70,8 @@ def run(self):
6970
self.project_id,
7071
self.access_key,
7172
self.secret_key,
72-
self.remove_uploaded
73+
self.remove_uploaded,
74+
self.retries
7375
).run)
7476
self._pool.close()
7577
self._pool.join()

mongodb_consistent_backup/Upload/Gs/GsUploadThread.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
class GsUploadThread:
10-
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False):
10+
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False, retries=5):
1111
self.backup_dir = backup_dir
1212
self.file_path = file_path
1313
self.gs_path = gs_path
@@ -16,6 +16,7 @@ def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_ke
1616
self.access_key = access_key
1717
self.secret_key = secret_key
1818
self.remove_uploaded = remove_uploaded
19+
self.retries = retries
1920

2021
self.path = "%s/%s" % (self.bucket, self.gs_path)
2122
self.meta_data_dir = "mongodb_consistent_backup-META"
@@ -76,10 +77,21 @@ def run(self):
7677
logging.debug("Path %s does not exist, uploading" % self.path)
7778

7879
try:
79-
f = open(self.file_path, 'r')
80-
uri = self.get_uri()
81-
logging.info("Uploading %s to Google Cloud Storage" % self.path)
82-
uri.new_key().set_contents_from_file(f)
80+
f = open(self.file_path, 'r')
81+
uri = self.get_uri()
82+
retry = 0
83+
error = None
84+
while retry < self.retries:
85+
try:
86+
logging.info("Uploading %s to Google Cloud Storage (attempt %i/%i)" % (self.path, retry, self.retries))
87+
uri.new_key().set_contents_from_file(f)
88+
except Exception, e:
89+
logging.error("Received error for Google Cloud Storage upload of %s: %s" % (self.path, e))
90+
error = e
91+
retry += 1
92+
continue
93+
if retry >= self.retries and error:
94+
raise error
8395
finally:
8496
if f:
8597
f.close()

mongodb_consistent_backup/Upload/Gs/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ def config(parser):
77
parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str)
88
parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str)
99
parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str)
10-
parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int)
1110
return parser
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import os
2+
import logging
3+
import re
4+
5+
from copy_reg import pickle
6+
from multiprocessing import Pool
7+
from subprocess import check_output
8+
from types import MethodType
9+
10+
from RsyncUploadThread import RsyncUploadThread
11+
12+
from mongodb_consistent_backup.Common import config_to_string
13+
from mongodb_consistent_backup.Errors import OperationError
14+
from mongodb_consistent_backup.Pipeline import Task
15+
16+
17+
# Allows pooled .apply_async()s to work on Class-methods:
18+
def _reduce_method(m):
19+
if m.im_self is None:
20+
return getattr, (m.im_class, m.im_func.func_name)
21+
else:
22+
return getattr, (m.im_self, m.im_func.func_name)
23+
24+
25+
pickle(MethodType, _reduce_method)
26+
27+
28+
class Rsync(Task):
29+
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
30+
super(Rsync, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
31+
self.backup_location = self.config.backup.location
32+
self.backup_name = self.config.backup.name
33+
self.remove_uploaded = self.config.upload.remove_uploaded
34+
self.rsync_path = self.config.upload.rsync.path
35+
self.rsync_user = self.config.upload.rsync.user
36+
self.rsync_host = self.config.upload.rsync.host
37+
self.rsync_port = self.config.upload.rsync.port
38+
self.rsync_ssh_key = self.config.upload.rsync.ssh_key
39+
self.retries = self.config.upload.rsync.retries
40+
self.thread_count = self.config.upload.rsync.threads
41+
self.rsync_binary = "rsync"
42+
43+
self.rsync_flags = ["--archive", "--compress"]
44+
self.rsync_version = None
45+
self._rsync_info = None
46+
47+
self._pool = Pool(processes=self.threads())
48+
49+
def init(self):
50+
if not self.host_has_rsync():
51+
raise OperationError("Cannot find rsync binary on this host!")
52+
if not os.path.isdir(self.backup_dir):
53+
logging.error("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
54+
raise OperationError("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
55+
56+
def rsync_info(self):
57+
if not self._rsync_info:
58+
output = check_output([self.rsync_binary, "--version"])
59+
search = re.search("^rsync\s+version\s([0-9.-]+)\s+protocol\sversion\s(\d+)", output)
60+
self.rsync_version = search.group(1)
61+
self._rsync_info = {"version": self.rsync_version, "protocol_version": int(search.group(2))}
62+
return self._rsync_info
63+
64+
def host_has_rsync(self):
65+
if self.rsync_info():
66+
return True
67+
return False
68+
69+
def prepare_dest_dir(self):
70+
# mkdir -p the rsync dest path via ssh
71+
ssh_mkdir_cmd = ["ssh"]
72+
if self.rsync_ssh_key:
73+
ssh_mkdir_cmd.extend(["-i", self.rsync_ssh_key])
74+
ssh_mkdir_cmd.extend([
75+
"%s@%s" % (self.rsync_user, self.rsync_host),
76+
"mkdir", "-p", self.base_dir
77+
])
78+
79+
# run the mkdir via ssh
80+
try:
81+
check_output(ssh_mkdir_cmd)
82+
except Exception, e:
83+
logging.error("Creating rsync dest path with ssh failed for %s: %s" % (
84+
self.rsync_host,
85+
e
86+
))
87+
raise e
88+
89+
return True
90+
91+
def done(self, data):
92+
logging.info(data)
93+
94+
def run(self):
95+
try:
96+
self.init()
97+
self.timer.start(self.timer_name)
98+
99+
logging.info("Preparing destination path on %s" % self.rsync_host)
100+
self.prepare_dest_dir()
101+
102+
rsync_config = {
103+
"dest": "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.rsync_path),
104+
"threads": self.threads(),
105+
"retries": self.retries
106+
}
107+
rsync_config.update(self.rsync_info())
108+
logging.info("Starting upload using rsync version %s (%s)" % (
109+
self.rsync_info()['version'],
110+
config_to_string(rsync_config)
111+
))
112+
for child in os.listdir(self.backup_dir):
113+
self._pool.apply_async(RsyncUploadThread(
114+
os.path.join(self.backup_dir, child),
115+
self.base_dir,
116+
self.rsync_flags,
117+
self.rsync_path,
118+
self.rsync_user,
119+
self.rsync_host,
120+
self.rsync_port,
121+
self.rsync_ssh_key,
122+
self.remove_uploaded,
123+
self.retries
124+
).run, callback=self.done)
125+
self.wait()
126+
except Exception, e:
127+
logging.error("Rsync upload failed! Error: %s" % e)
128+
raise OperationError(e)
129+
finally:
130+
self.timer.stop(self.timer_name)
131+
self.completed = True
132+
133+
def wait(self):
134+
if self._pool:
135+
logging.info("Waiting for Rsync upload threads to stop")
136+
self._pool.close()
137+
self._pool.join()
138+
139+
def close(self):
140+
if self._pool:
141+
logging.error("Stopping Rsync upload threads")
142+
self._pool.terminate()
143+
self._pool.join()

0 commit comments

Comments
 (0)