Skip to content

Commit e50b5c2

Browse files
corey-hammertontimvaillancourt
authored andcommitted
Configuring all Task sub-classes to use their respected threads configuration item and using the Task threads() functionality (#273)
This commit implements the thread system as described in the documentation. And actually make threading configurable.
1 parent e533791 commit e50b5c2

File tree

6 files changed

+13
-11
lines changed

6 files changed

+13
-11
lines changed

mongodb_consistent_backup/Archive/Tar/Tar.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
3131
self._pool = None
3232
self._pooled = []
3333

34+
self.threads(self.config.archive.tar.threads)
35+
3436
def done(self, done_dir):
3537
if done_dir in self._pooled:
3638
logging.debug("Archiving completed for: %s" % done_dir)
@@ -51,9 +53,8 @@ def wait(self):
5153

5254
def run(self):
5355
try:
54-
thread_count = self.threads()
55-
self._pool = Pool(processes=thread_count)
56-
logging.info("Archiving backup directories with pool of %i thread(s)" % thread_count)
56+
self._pool = Pool(processes=self.threads())
57+
logging.info("Archiving backup directories with pool of %i thread(s)" % self.threads())
5758
except Exception, e:
5859
logging.fatal("Could not start pool! Error: %s" % e)
5960
raise Error(e)

mongodb_consistent_backup/Archive/Zbackup/Zbackup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
1818
self.zbackup_cache_mb = self.config.archive.zbackup.cache_mb
1919
self.zbackup_passwd_file = self.config.archive.zbackup.password_file
2020

21-
if self.config.archive.zbackup.threads and self.config.archive.zbackup.threads > 0:
22-
self.threads(self.config.archive.zbackup.threads)
21+
self.threads(self.config.archive.zbackup.threads)
2322

2423
# only lzma compression supported (for now)
2524
self.compression_method = 'lzma'

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs,
4141
self._pool = None
4242
self._pooled = []
4343
self._results = {}
44+
45+
self.threads(self.config.oplog.resolver.threads)
46+
4447
try:
45-
self._pool = Pool(processes=self.threads(None, 2))
48+
self._pool = Pool(processes=self.threads())
4649
except Exception, e:
4750
logging.fatal("Could not start oplog resolver pool! Error: %s" % e)
4851
raise Error(e)

mongodb_consistent_backup/Upload/Rsync/Rsync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
3232
self.backup_name = self.config.backup.name
3333
self.remove_uploaded = self.config.upload.remove_uploaded
3434
self.retries = self.config.upload.retries
35-
self.thread_count = self.config.upload.threads
3635
self.rsync_path = self.config.upload.rsync.path
3736
self.rsync_user = self.config.upload.rsync.user
3837
self.rsync_host = self.config.upload.rsync.host
@@ -44,6 +43,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
4443
self.rsync_version = None
4544
self._rsync_info = None
4645

46+
self.threads(self.config.upload.threads)
4747
self._pool = Pool(processes=self.threads())
4848

4949
def init(self):

mongodb_consistent_backup/Upload/S3/S3.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
1313
super(S3, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
1414
self.remove_uploaded = self.config.upload.remove_uploaded
1515
self.retries = self.config.upload.retries
16-
self.thread_count = self.config.upload.threads
1716
self.region = self.config.upload.s3.region
1817
self.bucket_name = getattr(self.config.upload.s3, 'bucket_name', None)
1918
self.bucket_prefix = getattr(self.config.upload.s3, 'bucket_prefix', None)
@@ -25,6 +24,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2524
self.s3_acl = self.config.upload.s3.acl
2625
self.key_prefix = base_dir
2726

27+
self.threads(self.config.upload.threads)
2828
self._pool = None
2929

3030
if self.region is None:
@@ -35,7 +35,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
3535
self.region,
3636
self.access_key,
3737
self.secret_key,
38-
self.thread_count,
38+
self.threads(),
3939
self.remove_uploaded,
4040
self.chunk_size,
4141
self.s3_acl
@@ -59,7 +59,7 @@ def run(self):
5959
self.timer.start(self.timer_name)
6060
logging.info("Starting AWS S3 upload to %s (%i threads, %imb multipart chunks, %i retries)" % (
6161
self.bucket_name,
62-
self.thread_count,
62+
self.threads(),
6363
self.chunk_size_mb,
6464
self.retries
6565
))

mongodb_consistent_backup/Upload/S3/S3UploadPool.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ def _reduce_method(m):
2929

3030
class S3UploadPool():
3131
def __init__(self, bucket_name, region, access_key, secret_key, threads=4, remove_uploaded=False, chunk_bytes=50 * 1024 * 1024, key_acl=None):
32-
self.threads = threads
3332
self.bucket_name = bucket_name
3433
self.region = region
3534
self.access_key = access_key

0 commit comments

Comments
 (0)