Skip to content

Commit 341904f

Browse files
authored
Merge pull request #219 from timvaillancourt/1.2.0-read_pref_tags
1.2.0: Support Read Preference tag sets (geo/datacenter awareness, etc)
2 parents ea5626a + ffdadbd commit 341904f

File tree

8 files changed

+132
-48
lines changed

8 files changed

+132
-48
lines changed

conf/mongodb-consistent-backup.example.conf

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ production:
2020
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
2121
# threads: [1-16] (default: auto-generated, shards/cpu)
2222
#replication:
23-
# max_lag_secs: [1+] (default: 10)
24-
# min_priority: [0-999] (default: 0)
25-
# max_priority: [2-1000] (default: 1000)
26-
# hidden_only: [true|false] (default: false)
23+
# max_lag_secs: [1+] (default: 10)
24+
# min_priority: [0-999] (default: 0)
25+
# max_priority: [2-1000] (default: 1000)
26+
# hidden_only: [true|false] (default: false)
27+
# read_pref_tags: [key:value,key:value,...] (default: none)
2728
#sharding:
2829
# balancer:
2930
# wait_secs: [1+] (default: 300)

mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
class Mongodump(Task):
1717
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
1818
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
19-
self.compression_method = self.config.backup.mongodump.compression
20-
self.binary = self.config.backup.mongodump.binary
2119
self.user = self.config.username
2220
self.password = self.config.password
2321
self.authdb = self.config.authdb
22+
self.compression_method = self.config.backup.mongodump.compression
23+
self.binary = self.config.backup.mongodump.binary
2424
self.replsets = replsets
2525
self.backup_stop = backup_stop
2626
self.sharding = sharding

mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
import logging
34
import sys
@@ -8,7 +9,7 @@
89
from signal import signal, SIGINT, SIGTERM, SIG_IGN
910
from subprocess import Popen, PIPE
1011

11-
from mongodb_consistent_backup.Common import is_datetime, parse_config_bool
12+
from mongodb_consistent_backup.Common import is_datetime, parse_config_bool, parse_read_pref_tags
1213
from mongodb_consistent_backup.Oplog import Oplog
1314

1415

@@ -31,6 +32,7 @@ def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump
3132
self.ssl_ca_file = self.config.ssl.ca_file
3233
self.ssl_crl_file = self.config.ssl.crl_file
3334
self.ssl_client_cert_file = self.config.ssl.client_cert_file
35+
self.read_pref_tags = self.config.replication.read_pref_tags
3436
self.binary = self.config.backup.mongodump.binary
3537

3638
self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset)
@@ -67,6 +69,12 @@ def is_version_gte(self, compare):
6769
return True
6870
return False
6971

72+
def parse_read_pref(self, mode="secondary"):
73+
rp = {"mode": mode}
74+
if self.read_pref_tags:
75+
rp["tags"] = parse_read_pref_tags(self.read_pref_tags)
76+
return json.dumps(rp, separators=(',', ':'))
77+
7078
def parse_mongodump_line(self, line):
7179
try:
7280
line = line.rstrip()
@@ -130,37 +138,59 @@ def wait(self):
130138
def mongodump_cmd(self):
131139
mongodump_uri = self.uri.get()
132140
mongodump_cmd = [self.binary]
133-
mongodump_flags = ["--host", mongodump_uri.host, "--port", str(mongodump_uri.port), "--oplog", "--out", "%s/dump" % self.backup_dir]
141+
mongodump_flags = [
142+
"--host=%s" % mongodump_uri.host,
143+
"--port=%s" % str(mongodump_uri.port),
144+
"--oplog",
145+
"--out=%s/dump" % self.backup_dir
146+
]
147+
148+
# --numParallelCollections
134149
if self.threads > 0:
135-
mongodump_flags.extend(["--numParallelCollections=" + str(self.threads)])
150+
mongodump_flags.append("--numParallelCollections=%s" % str(self.threads))
136151

152+
# --gzip
137153
if self.dump_gzip:
138-
mongodump_flags.extend(["--gzip"])
139-
140-
if self.is_version_gte("3.4.0"):
141-
mongodump_flags.extend(["--readPreference=secondary"])
142-
154+
mongodump_flags.append("--gzip")
155+
156+
# --readPreference
157+
if self.is_version_gte("3.2.0"):
158+
read_pref = self.parse_read_pref()
159+
if read_pref:
160+
mongodump_flags.append("--readPreference=%s" % read_pref)
161+
elif self.read_pref_tags:
162+
logging.fatal("Mongodump must be >= 3.2.0 to set read preference!")
163+
sys.exit(1)
164+
165+
# --username/--password/--authdb
143166
if self.authdb and self.authdb != "admin":
144167
logging.debug("Using database %s for authentication" % self.authdb)
145-
mongodump_flags.extend(["--authenticationDatabase", self.authdb])
168+
mongodump_flags.append("--authenticationDatabase=%s" % self.authdb)
146169
if self.user and self.password:
147170
# >= 3.0.2 supports password input via stdin to mask from ps
148171
if self.is_version_gte("3.0.2"):
149-
mongodump_flags.extend(["-u", self.user, "-p", '""'])
172+
mongodump_flags.extend([
173+
"--username=%s" % self.user,
174+
"--password=\"\""
175+
])
150176
self.do_stdin_passwd = True
151177
else:
152178
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
153-
mongodump_flags.extend(["-u", self.user, "-p", self.password])
179+
mongodump_flags.extend([
180+
"--username=%s" % self.user,
181+
"--password=%s" % self.password
182+
])
154183

184+
# --ssl
155185
if self.do_ssl():
156186
if self.is_version_gte("2.6.0"):
157187
mongodump_flags.append("--ssl")
158188
if self.ssl_ca_file:
159-
mongodump_flags.extend(["--sslCAFile", self.ssl_ca_file])
189+
mongodump_flags.append("--sslCAFile=%s" % self.ssl_ca_file)
160190
if self.ssl_crl_file:
161-
mongodump_flags.extend(["--sslCRLFile", self.ssl_crl_file])
191+
mongodump_flags.append("--sslCRLFile=%s" % self.ssl_crl_file)
162192
if self.client_cert_file:
163-
mongodump_flags.extend(["--sslPEMKeyFile", self.ssl_cert_file])
193+
mongodump_flags.append("--sslPEMKeyFile=%s" % self.ssl_cert_file)
164194
if self.do_ssl_insecure():
165195
mongodump_flags.extend(["--sslAllowInvalidCertificates", "--sslAllowInvalidHostnames"])
166196
else:
@@ -182,7 +212,7 @@ def run(self):
182212
if os.path.isdir(self.dump_dir):
183213
rmtree(self.dump_dir)
184214
os.makedirs(self.dump_dir)
185-
logging.debug("Running mongodump cmd: %s" % mongodump_cmd)
215+
logging.debug("Running mongodump cmd: %s" % " ".join(mongodump_cmd))
186216
self._process = Popen(mongodump_cmd, stdin=PIPE, stderr=PIPE)
187217
self.wait()
188218
self.exit_code = self._process.returncode

mongodb_consistent_backup/Common/DB.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,34 @@
1111
from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error
1212

1313

14+
def parse_read_pref_tags(tags_str):
15+
tags = {}
16+
for pair in tags_str.replace(" ", "").split(","):
17+
if ":" in pair:
18+
key, value = pair.split(":")
19+
tags[key] = str(value)
20+
return tags
21+
22+
1423
class DB:
15-
def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_connect=True, conn_timeout=5000, retries=5):
16-
self.uri = uri
17-
self.config = config
18-
self.do_replset = do_replset
19-
self.read_pref = read_pref
20-
self.do_connect = do_connect
21-
self.conn_timeout = conn_timeout
22-
self.retries = retries
24+
def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_rp_tags=False,
25+
do_connect=True, conn_timeout=5000, retries=5):
26+
self.uri = uri
27+
self.config = config
28+
self.do_replset = do_replset
29+
self.read_pref = read_pref
30+
self.do_rp_tags = do_rp_tags
31+
self.do_connect = do_connect
32+
self.conn_timeout = conn_timeout
33+
self.retries = retries
34+
35+
self.username = self.config.username
36+
self.password = self.config.password
37+
self.authdb = self.config.authdb
38+
self.ssl_ca_file = self.config.ssl.ca_file
39+
self.ssl_crl_file = self.config.ssl.crl_file
40+
self.ssl_client_cert_file = self.config.ssl.client_cert_file
41+
self.read_pref_tags = self.config.replication.read_pref_tags
2342

2443
self.username = self.config.username
2544
self.password = self.config.password
@@ -56,6 +75,14 @@ def client_opts(self):
5675
"readPreference": self.read_pref,
5776
"w": "majority"
5877
})
78+
if self.do_rp_tags and self.read_pref_tags:
79+
logging.debug("Using read preference mode: %s, tags: %s" % (
80+
self.read_pref,
81+
parse_read_pref_tags(self.read_pref_tags)
82+
))
83+
self.read_pref_tags = self.read_pref_tags.replace(" ", "")
84+
opts["readPreferenceTags"] = self.read_pref_tags
85+
5986
if self.do_ssl():
6087
logging.debug("Using SSL-secured mongodb connection (ca_cert=%s, client_cert=%s, crl_file=%s, insecure=%s)" % (
6188
self.ssl_ca_file,
@@ -76,10 +103,11 @@ def client_opts(self):
76103

77104
def connect(self):
78105
try:
79-
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, ssl=%s)" % (
106+
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, readPreferenceTags=%s, ssl=%s)" % (
80107
self.uri,
81108
self.replset,
82109
self.read_pref,
110+
self.do_rp_tags,
83111
self.do_ssl(),
84112
))
85113
conn = MongoClient(**self.client_opts())

mongodb_consistent_backup/Common/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from Config import Config, parse_config_bool # NOQA
2-
from DB import DB # NOQA
2+
from DB import DB, parse_read_pref_tags # NOQA
33
from LocalCommand import LocalCommand # NOQA
44
from Lock import Lock # NOQA
55
from MongoUri import MongoUri # NOQA

mongodb_consistent_backup/Oplog/Tailer/TailThread.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
# noinspection PyPackageRequirements
55
from multiprocessing import Process
6-
from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError
6+
from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError, ServerSelectionTimeoutError
77
from signal import signal, SIGINT, SIGTERM, SIG_IGN
88
from time import sleep, time
99

@@ -88,7 +88,7 @@ def status(self):
8888

8989
def connect(self):
9090
if not self.db:
91-
self.db = DB(self.uri, self.config, True, 'secondary')
91+
self.db = DB(self.uri, self.config, True, 'secondary', True)
9292
return self.db.connection()
9393

9494
def run(self):
@@ -144,21 +144,25 @@ def run(self):
144144
continue
145145
sleep(1)
146146
finally:
147-
self._cursor.close()
147+
if self._cursor:
148+
logging.debug("Stopping oplog cursor on %s" % self.uri)
149+
self._cursor.close()
148150
except OperationError, e:
149151
logging.error("Tailer %s encountered error: %s" % (self.uri, e))
150152
self.exit_code = 1
151153
self.backup_stop.set()
152154
raise OperationError(e)
155+
except ServerSelectionTimeoutError, e:
156+
logging.error("Tailer %s could not connect: %s" % (self.uri, e))
157+
self.exit_code = 1
158+
self.backup_stop.set()
159+
raise OperationError(e)
153160
except Exception, e:
154161
logging.error("Tailer %s encountered an unexpected error: %s" % (self.uri, e))
155162
self.exit_code = 1
156163
self.backup_stop.set()
157164
raise e
158165
finally:
159-
if self._cursor:
160-
logging.debug("Stopping oplog cursor on %s" % self.uri)
161-
self._cursor.close()
162166
oplog.flush()
163167
oplog.close()
164168
self.stopped = True

mongodb_consistent_backup/Replication/Replset.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@
44
from math import ceil
55
from time import mktime
66

7-
from mongodb_consistent_backup.Common import DB, MongoUri
7+
from mongodb_consistent_backup.Common import DB, MongoUri, parse_read_pref_tags
88
from mongodb_consistent_backup.Errors import Error, OperationError
99

1010

1111
class Replset:
1212
def __init__(self, config, db):
13-
self.config = config
14-
self.db = db
15-
self.max_lag_secs = self.config.replication.max_lag_secs
16-
self.min_priority = self.config.replication.min_priority
17-
self.max_priority = self.config.replication.max_priority
18-
self.hidden_only = self.config.replication.hidden_only
13+
self.config = config
14+
self.db = db
15+
self.read_pref_tags = self.config.replication.read_pref_tags
16+
self.max_lag_secs = self.config.replication.max_lag_secs
17+
self.min_priority = self.config.replication.min_priority
18+
self.max_priority = self.config.replication.max_priority
19+
self.hidden_only = self.config.replication.hidden_only
1920

2021
self.state_primary = 1
2122
self.state_secondary = 2
@@ -142,6 +143,18 @@ def is_member_electable(self, member):
142143
return True
143144
return False
144145

146+
def has_read_pref_tags(self, member_config):
147+
if "tags" not in member_config:
148+
raise OperationError("Member config has no 'tags' field!")
149+
tags = parse_read_pref_tags(self.read_pref_tags)
150+
member_tags = member_config["tags"]
151+
for key in tags:
152+
if key not in member_tags:
153+
return False
154+
if member_tags[key] != tags[key]:
155+
return False
156+
return True
157+
145158
def find_primary(self, force=False, quiet=False):
146159
if force or not self.primary:
147160
rs_status = self.get_rs_status(force, quiet)
@@ -173,8 +186,8 @@ def find_secondary(self, force=False, quiet=False):
173186
self.get_rs_config(force, quiet)
174187
self.get_mongo_config(force, quiet)
175188

176-
quorum = self.get_rs_quorum()
177-
rs_name = rs_status['set']
189+
quorum = self.get_rs_quorum()
190+
rs_name = rs_status['set']
178191

179192
if self.secondary and not force:
180193
return self.secondary
@@ -196,6 +209,14 @@ def find_secondary(self, force=False, quiet=False):
196209
score = self.max_lag_secs * 10
197210
score_scale = 100.00 / float(score)
198211
priority = 0
212+
213+
if self.read_pref_tags and not self.has_read_pref_tags(member_config):
214+
logging.info("Found SECONDARY %s without read preference tags: %s, skipping" % (
215+
member_uri,
216+
parse_read_pref_tags(self.read_pref_tags)
217+
))
218+
continue
219+
199220
if 'hidden' in member_config and member_config['hidden']:
200221
score += (score * self.hidden_weight)
201222
log_data['hidden'] = True

mongodb_consistent_backup/Replication/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ def config(parser):
77
parser.add_argument("--replication.min_priority", dest="replication.min_priority", help="Min priority of secondary members for backup (default: 0)", default=0, type=int)
88
parser.add_argument("--replication.max_priority", dest="replication.max_priority", help="Max priority of secondary members for backup (default: 1000)", default=1000, type=int)
99
parser.add_argument("--replication.hidden_only", dest="replication.hidden_only", help="Only use hidden secondary members for backup (default: false)", default=False, action="store_true")
10-
# todo: add tag-specific backup option
11-
# parser.add_argument("-replication.use_tag", dest="replication.use_tag", help="Only use secondary members with tag for backup", type=str)
10+
parser.add_argument("--replication.read_pref_tags", dest="replication.read_pref_tags", default=None, type=str,
11+
help="Only use members that match replication tags in comma-separated key:value format (default: none)")
1212
return parser

0 commit comments

Comments
 (0)