Skip to content

Commit 750e033

Browse files
Limit the number of parallel mongodump workers based on CPU
1 parent 26a1800 commit 750e033

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

MongoBackup/Methods/Dump.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# noinspection PyStringFormat
1313
class Dump(Process):
1414
def __init__(self, response_queue, backup_name, host_port, user, password, authdb, base_dir, binary,
15-
dump_gzip=False, verbose=False):
15+
threads=0, dump_gzip=False, verbose=False):
1616
Process.__init__(self)
1717
self.host, port = host_port.split(":")
1818
self.host_port = host_port
@@ -24,6 +24,7 @@ def __init__(self, response_queue, backup_name, host_port, user, password, authd
2424
self.authdb = authdb
2525
self.base_dir = base_dir
2626
self.binary = binary
27+
self.threads = threads
2728
self.dump_gzip = dump_gzip
2829
self.verbose = verbose
2930

@@ -52,6 +53,8 @@ def run(self):
5253
))
5354

5455
mongodump_flags = ["-h", self.host_port, "--oplog", "-o", "%s/dump" % self.backup_dir]
56+
if self.threads > 0:
57+
mongodump_flags.extend(["--numParallelCollections="+str(self.threads)])
5558
if self.dump_gzip:
5659
mongodump_flags.extend(["--gzip"])
5760
if self.authdb and self.authdb != "admin":
@@ -68,6 +71,7 @@ def run(self):
6871
commands.append([self.binary, mongodump_flags])
6972

7073
for (command, command_flags) in commands:
74+
print command_flags
7175
self._command = LocalCommand(command, command_flags, self.verbose)
7276
self._command.run()
7377
except Exception, e:

MongoBackup/Methods/Dumper.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22

33
from fabric.api import hide, settings, local
4-
from multiprocessing import Queue
4+
from math import floor
5+
from multiprocessing import Queue, cpu_count
56
from time import sleep
67

78

@@ -22,6 +23,7 @@ def __init__(self, secondaries, base_dir, binary, dump_gzip=False, user=None, pa
2223
self.verbose = verbose
2324

2425
self.config_replset = False
26+
self.cpu_count = cpu_count()
2527
self.response_queue = Queue()
2628
self.threads = []
2729
self._summary = {}
@@ -65,6 +67,16 @@ def wait(self):
6567
raise Exception, "Not all mongodump threads completed successfully!", None
6668

6769
def run(self):
70+
# decide how many parallel dump workers to use based on cpu count vs # of shards (if 3.2+)
71+
self.threads_per_dump = 1
72+
if tuple(self.version.split(".")) >= tuple("3.2.0".split(".")):
73+
self.threads_per_dump = 1
74+
if self.cpu_count > len(self.secondaries):
75+
self.threads_per_dump = int(floor(self.cpu_count / len(self.secondaries)))
76+
else:
77+
self.threads_per_dump = 0
78+
logging.warn("Threading unsupported by mongodump version %s. Use mongodump 3.2.0 or greater to enable per-dump threading." % self.version)
79+
6880
# backup a secondary from each shard:
6981
for shard in self.secondaries:
7082
secondary = self.secondaries[shard]
@@ -77,6 +89,7 @@ def run(self):
7789
self.authdb,
7890
self.base_dir,
7991
self.binary,
92+
self.threads_per_dump,
8093
self.dump_gzip,
8194
self.verbose
8295
)
@@ -87,7 +100,7 @@ def run(self):
87100

88101
# start all threads and wait
89102
logging.info(
90-
"Starting backups in threads using mongodump %s (inline gzip: %s)" % (self.version, str(self.dump_gzip)))
103+
"Starting backups using mongodump %s (inline gzip: %s, threads per dump: %i)" % (self.version, str(self.dump_gzip), self.threads_per_dump))
91104
for thread in self.threads:
92105
thread.start()
93106
self.wait()
@@ -104,6 +117,7 @@ def run(self):
104117
self.authdb,
105118
self.base_dir,
106119
self.binary,
120+
self.threads_per_dump,
107121
self.dump_gzip,
108122
self.verbose
109123
)]

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.3.4
1+
0.3.5

0 commit comments

Comments
 (0)