Skip to content

Commit 9fd19b2

Browse files
Merge remote-tracking branch 'origin/1.2.0-rsync_v1' into 1.2.0-upload-thread-retries
2 parents 5c0a633 + bc85dfb commit 9fd19b2

File tree

10 files changed

+280
-12
lines changed

10 files changed

+280
-12
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Features
3131
archiving method (*optional*)
3232
- `AWS S3 <https://aws.amazon.com/s3/>`__ Secure Multipart backup uploads (*optional*)
3333
- `Google Cloud Storage <https://cloud.google.com/storage/>`__ Secure backup uploads (*optional*)
34+
- Rsync (over SSH) secure backup uploads (*optional*)
3435
- `Nagios NSCA <https://sourceforge.net/p/nagios/nsca>`__ push
3536
notification support (*optional*)
3637
- Modular backup, archiving, upload and notification components
@@ -219,7 +220,6 @@ Roadmap
219220
- Upload compatibility for ZBackup archive phase *(upload unsupported today)*
220221
- Backup retention/rotation *(eg: delete old backups)*
221222
- Support more notification methods *(Prometheus, PagerDuty, etc)*
222-
- Support more upload methods *(Rsync, etc)*
223223
- Support SSL MongoDB connections
224224
- Documentation for running under Docker with persistent volumes
225225
- Python unit tests

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.0
1+
1.2.0

conf/mongodb-consistent-backup.example.conf

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ production:
1111
location: /var/lib/mongodb-consistent-backup
1212
# mongodump:
1313
# binary: [path] (default: /usr/bin/mongodump)
14-
# compression: [auto|none|gzip] (default: auto - enable gzip if supported)
15-
# threads: [1-16] (default: auto-generated - shards/cpu)
14+
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
15+
# threads: [1-16] (default: auto-generated, shards/cpu)
1616
#replication:
1717
# max_lag_secs: [1+] (default: 10)
1818
# min_priority: [0-999] (default: 0)
@@ -23,18 +23,19 @@ production:
2323
# wait_secs: [1+] (default: 300)
2424
# ping_secs: [1+] (default: 3)
2525
#oplog:
26-
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
26+
# compression: [none|gzip] (default: gzip, if used by backup stage)
2727
# flush:
28-
# max_docs: 100
29-
# max_secs: 1
30-
# resolver_threads: [1+] (default: 2 per CPU)
28+
# max_docs: [1+] (default: 100)
29+
# max_secs: [1+] (default: 1)
30+
# resolver:
31+
# threads: [1+] (default: 2 per CPU)
3132
# tailer:
3233
# enabled: true
3334
# status_interval: 30
3435
archive:
3536
method: tar
3637
# tar:
37-
# compression: [none|gzip] (default: gzip - none if backup is already compressed)
38+
# compression: [none|gzip] (default: gzip, none if backup already compressed)
3839
# threads: [1+] (default: 1 per CPU)
3940
# zbackup:
4041
# binary: [path] (default: /usr/bin/zbackup)
@@ -58,7 +59,15 @@ production:
5859
# secret_key: [Google Cloud Storage Secret Key]
5960
# bucket_name: [Google Cloud Storage Bucket Name]
6061
# bucket_prefix: [prefix] (default: /)
61-
# threads: [1+] (default: 1 per CPU)
62+
# threads: [1+] (default: 4)
63+
# rsync:
64+
# path: [Rsync Destination Path]
65+
# user: [SSH Username]
66+
# host: [SSH Hostname/IP]
67+
# port: [SSH Port Number] (default: 22)
68+
# delete: [true|false] (default: false)
69+
# threads: [1+] (default: 4)
70+
# retries: [1+] (default: 5)
6271
# s3:
6372
# region: [AWS S3 Region] (default: us-east-1)
6473
# access_key: [AWS S3 Access Key]

mongodb_consistent_backup/Common/Util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import socket
22

33
from dateutil import parser
4+
from select import select
45

56
from mongodb_consistent_backup.Errors import OperationError
67

@@ -31,3 +32,23 @@ def validate_hostname(hostname):
3132
socket.getaddrinfo(hostname, None)
3233
except socket.error, e:
3334
raise OperationError("Could not resolve host '%s', error: %s" % (hostname, e))
35+
36+
37+
def wait_popen(process, stderr_callback, stdout_callback):
38+
try:
39+
while not process.returncode:
40+
poll = select([process.stderr.fileno(), process.stdout.fileno()], [], [])
41+
if len(poll) >= 1:
42+
for fd in poll[0]:
43+
if process.stderr and fd == process.stderr.fileno():
44+
stderr_callback(process.stderr.readline().rstrip())
45+
if process.stdout and fd == process.stdout.fileno():
46+
stdout_callback(process.stdout.readline().rstrip())
47+
if process.poll() is not None:
48+
break
49+
stderr, stdout = process.communicate()
50+
stderr_callback(stderr.rstrip())
51+
stdout_callback(stdout.rstrip())
52+
except Exception, e:
53+
raise e
54+
return True

mongodb_consistent_backup/Common/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
from Lock import Lock # NOQA
55
from MongoUri import MongoUri # NOQA
66
from Timer import Timer # NOQA
7-
from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA
7+
from Util import config_to_string, is_datetime, parse_method, validate_hostname, wait_popen # NOQA
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import os
2+
import logging
3+
import re
4+
5+
from copy_reg import pickle
6+
from multiprocessing import Pool
7+
from subprocess import check_output
8+
from types import MethodType
9+
10+
from RsyncUploadThread import RsyncUploadThread
11+
12+
from mongodb_consistent_backup.Common import config_to_string
13+
from mongodb_consistent_backup.Errors import OperationError
14+
from mongodb_consistent_backup.Pipeline import Task
15+
16+
17+
# Allows pooled .apply_async()s to work on Class-methods:
18+
def _reduce_method(m):
19+
if m.im_self is None:
20+
return getattr, (m.im_class, m.im_func.func_name)
21+
else:
22+
return getattr, (m.im_self, m.im_func.func_name)
23+
24+
25+
pickle(MethodType, _reduce_method)
26+
27+
28+
class Rsync(Task):
29+
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
30+
super(Rsync, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
31+
self.backup_location = self.config.backup.location
32+
self.backup_name = self.config.backup.name
33+
self.remove_uploaded = self.config.upload.remove_uploaded
34+
self.rsync_path = self.config.upload.rsync.path
35+
self.rsync_user = self.config.upload.rsync.user
36+
self.rsync_host = self.config.upload.rsync.host
37+
self.rsync_port = self.config.upload.rsync.port
38+
self.rsync_ssh_key = self.config.upload.rsync.ssh_key
39+
self.retries = self.config.upload.rsync.retries
40+
self.thread_count = self.config.upload.rsync.threads
41+
self.rsync_binary = "rsync"
42+
43+
self.rsync_flags = ["--archive", "--compress"]
44+
self.rsync_version = None
45+
self._rsync_info = None
46+
47+
self._pool = Pool(processes=self.threads())
48+
49+
def init(self):
50+
if not self.host_has_rsync():
51+
raise OperationError("Cannot find rsync binary on this host!")
52+
if not os.path.isdir(self.backup_dir):
53+
logging.error("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
54+
raise OperationError("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
55+
56+
def rsync_info(self):
57+
if not self._rsync_info:
58+
output = check_output([self.rsync_binary, "--version"])
59+
search = re.search("^rsync\s+version\s([0-9.-]+)\s+protocol\sversion\s(\d+)", output)
60+
self.rsync_version = search.group(1)
61+
self._rsync_info = {"version": self.rsync_version, "protocol_version": int(search.group(2))}
62+
return self._rsync_info
63+
64+
def host_has_rsync(self):
65+
if self.rsync_info():
66+
return True
67+
return False
68+
69+
def prepare_dest_dir(self):
70+
# mkdir -p the rsync dest path via ssh
71+
ssh_mkdir_cmd = ["ssh"]
72+
if self.rsync_ssh_key:
73+
ssh_mkdir_cmd.extend(["-i", self.rsync_ssh_key])
74+
ssh_mkdir_cmd.extend([
75+
"%s@%s" % (self.rsync_user, self.rsync_host),
76+
"mkdir", "-p", self.base_dir
77+
])
78+
79+
# run the mkdir via ssh
80+
try:
81+
check_output(ssh_mkdir_cmd)
82+
except Exception, e:
83+
logging.error("Creating rsync dest path with ssh failed for %s: %s" % (
84+
self.rsync_host,
85+
e
86+
))
87+
raise e
88+
89+
return True
90+
91+
def done(self, data):
92+
logging.info(data)
93+
94+
def run(self):
95+
try:
96+
self.init()
97+
self.timer.start(self.timer_name)
98+
99+
logging.info("Preparing destination path on %s" % self.rsync_host)
100+
self.prepare_dest_dir()
101+
102+
rsync_config = {
103+
"dest": "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.rsync_path),
104+
"threads": self.threads(),
105+
"retries": self.retries
106+
}
107+
rsync_config.update(self.rsync_info())
108+
logging.info("Starting upload using rsync version %s (%s)" % (
109+
self.rsync_info()['version'],
110+
config_to_string(rsync_config)
111+
))
112+
for child in os.listdir(self.backup_dir):
113+
self._pool.apply_async(RsyncUploadThread(
114+
os.path.join(self.backup_dir, child),
115+
self.base_dir,
116+
self.rsync_flags,
117+
self.rsync_path,
118+
self.rsync_user,
119+
self.rsync_host,
120+
self.rsync_port,
121+
self.rsync_ssh_key,
122+
self.remove_uploaded,
123+
self.retries
124+
).run, callback=self.done)
125+
self.wait()
126+
except Exception, e:
127+
logging.error("Rsync upload failed! Error: %s" % e)
128+
raise OperationError(e)
129+
finally:
130+
self.timer.stop(self.timer_name)
131+
self.completed = True
132+
133+
def wait(self):
134+
if self._pool:
135+
logging.info("Waiting for Rsync upload threads to stop")
136+
self._pool.close()
137+
self._pool.join()
138+
139+
def close(self):
140+
if self._pool:
141+
logging.error("Stopping Rsync upload threads")
142+
self._pool.terminate()
143+
self._pool.join()
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import logging
2+
import os
3+
4+
from shutil import rmtree
5+
from subprocess import Popen, PIPE
6+
7+
from mongodb_consistent_backup.Common import wait_popen
8+
9+
10+
class RsyncUploadThread:
11+
def __init__(self, src_path, base_path, rsync_flags, rsync_path, rsync_user, rsync_host,
12+
rsync_port=22, rsync_ssh_key=None, remove_uploaded=False, retries=5,
13+
rsync_binary="rsync"):
14+
self.src_path = src_path
15+
self.base_path = base_path
16+
self.rsync_flags = rsync_flags
17+
self.rsync_path = rsync_path
18+
self.rsync_user = rsync_user
19+
self.rsync_host = rsync_host
20+
self.rsync_port = rsync_port
21+
self.rsync_ssh_key = rsync_ssh_key
22+
self.remove_uploaded = remove_uploaded
23+
self.retries = retries
24+
self.rsync_binary = rsync_binary
25+
26+
self.completed = False
27+
self.rsync_url = None
28+
self.rsync_cmd = None
29+
self.meta_dir = "mongodb-consistent-backup_META"
30+
31+
def init(self):
32+
self.rsync_url = "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.get_dest_path())
33+
self.rsync_cmd = [self.rsync_binary]
34+
self.rsync_cmd.extend(self.rsync_flags)
35+
self.rsync_cmd.extend([self.src_path, self.rsync_url])
36+
37+
def get_dest_path(self):
38+
return os.path.join(self.rsync_path, self.base_path)
39+
40+
def handle_success(self):
41+
if self.remove_uploaded:
42+
if self.meta_dir in self.src_path:
43+
logging.info("Skipping removal of metadata path: %s" % self.src_path)
44+
else:
45+
logging.info("Removing uploaded path: %s" % self.src_path)
46+
rmtree(self.src_path)
47+
48+
def stderr(self, data):
49+
if data:
50+
logging.error(data)
51+
52+
def stdout(self, data):
53+
if data:
54+
logging.info(data)
55+
56+
def do_rsync(self):
57+
# do the rsync
58+
self._command = Popen(self.rsync_cmd, stderr=PIPE, stdout=PIPE)
59+
wait_popen(self._command, self.stderr, self.stdout)
60+
61+
def run(self):
62+
self.init()
63+
try:
64+
logging.info("Uploading to %s" % (self.rsync_url))
65+
logging.debug("Rsync cmd: %s" % self.rsync_cmd)
66+
self._command = Popen(self.rsync_cmd, stderr=PIPE, stdout=PIPE)
67+
self.completed = wait_popen(self._command, self.stderr, self.stdout)
68+
69+
if self.completed:
70+
self.handle_success()
71+
finally:
72+
self.close()
73+
return self.completed, self.src_path
74+
75+
def close(self, code=None, frame=None):
76+
logging.info("Stopping upload to %s@%s:%s" % (
77+
self.rsync_user,
78+
self.rsync_host,
79+
self.dest_path
80+
))
81+
if not self.completed and self._command:
82+
self._command.terminate()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from Rsync import Rsync # NOQA
2+
3+
4+
def config(parser):
5+
parser.add_argument("--upload.rsync.path", dest="upload.rsync.path", help="Rsync upload base destination path (default: /)", default='/', type=str)
6+
parser.add_argument("--upload.rsync.user", dest="upload.rsync.user", help="Rsync upload SSH username (default: current)", default=None, type=str)
7+
parser.add_argument("--upload.rsync.host", dest="upload.rsync.host", help="Rsync upload SSH hostname/IP", default=None, type=str)
8+
parser.add_argument("--upload.rsync.port", dest="upload.rsync.port", help="Rsync upload SSH port number (default: 22)", default=22, type=int)
9+
parser.add_argument("--upload.rsync.ssh_key", dest="upload.rsync.ssh_key", help="Rsync upload SSH key path", default=None, type=str)
10+
parser.add_argument("--upload.rsync.retries", dest="upload.rsync.retries", help="Rsync upload retries (default: 5)", default=5, type=int)
11+
parser.add_argument("--upload.rsync.threads", dest="upload.rsync.threads", help="Rsync upload threads (default: 4)", default=4, type=int)
12+
return parser

mongodb_consistent_backup/Upload/Upload.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from mongodb_consistent_backup.Upload.Gs import Gs # NOQA
22
from mongodb_consistent_backup.Upload.S3 import S3 # NOQA
3+
from mongodb_consistent_backup.Upload.Rsync import Rsync # NOQA
34
from mongodb_consistent_backup.Pipeline import Stage
45

56

mongodb_consistent_backup/Upload/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
def config(parser):
5-
parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 's3', 'none'])
5+
parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 'rsync', 's3', 'none'])
66
parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded", help="Remove source files after successful upload (default: false)", default=False, action="store_true")
77
parser.add_argument("--upload.retries", dest="upload.retries", help="Number of times to retry upload attempts (default: 5)", default=5, type=int)
88
parser.add_argument("--upload.threads", dest="upload.threads", help="Number of threads to use for upload (default: 4)", default=4, type=int)

0 commit comments

Comments
 (0)