11import os
22import logging
33
4- import boto .s3 .multipart
5- from copy_reg import pickle
6- from math import ceil
7- from multiprocessing import Pool
8- from types import MethodType
9-
10- from S3Session import S3Session
11- from S3UploadThread import S3UploadThread
4+ from S3UploadPool import S3UploadPool
125
136from mongodb_consistent_backup .Errors import OperationError
147from mongodb_consistent_backup .Pipeline import Task
15-
16-
17- # Allows pooled .apply_async()s to work on Class-methods:
18- def _reduce_method (m ):
19- if m .im_self is None :
20- return getattr , (m .im_class , m .im_func .func_name )
21- else :
22- return getattr , (m .im_self , m .im_func .func_name )
23-
24-
25- pickle (MethodType , _reduce_method )
8+ from mongodb_consistent_backup .Upload .Util import get_upload_files
269
2710
2811class S3 (Task ):
@@ -32,122 +15,67 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
3215 self .retries = self .config .upload .retries
3316 self .thread_count = self .config .upload .threads
3417 self .region = self .config .upload .s3 .region
35- self .bucket_name = self .config .upload .s3 . bucket_name
36- self .bucket_prefix = self .config .upload .s3 . bucket_prefix
37- self .bucket_explicit_key = self .config .upload .s3 . bucket_explicit_key
38- self .access_key = self .config .upload .s3 . access_key
39- self .secret_key = self .config .upload .s3 . secret_key
18+ self .bucket_name = getattr ( self .config .upload .s3 , ' bucket_name' , None )
19+ self .bucket_prefix = getattr ( self .config .upload .s3 , ' bucket_prefix' , None )
20+ self .bucket_explicit_key = getattr ( self .config .upload .s3 , ' bucket_explicit_key' , None )
21+ self .access_key = getattr ( self .config .upload .s3 , ' access_key' , None )
22+ self .secret_key = getattr ( self .config .upload .s3 , ' secret_key' , None )
4023 self .chunk_size_mb = self .config .upload .s3 .chunk_size_mb
4124 self .chunk_size = self .chunk_size_mb * 1024 * 1024
42- self .secure = self .config .upload .s3 .secure
4325 self .s3_acl = self .config .upload .s3 .acl
4426 self .key_prefix = base_dir
4527
46- self ._pool = None
47- self ._multipart = None
48- self ._upload_done = False
28+ self ._pool = None
29+
4930 if None in (self .access_key , self .secret_key , self .region ):
50- raise OperationError ("Invalid S3 security key or region detected!" )
51- try :
52- self .s3_conn = S3Session (self .region , self .access_key , self .secret_key , self .bucket_name )
53- self .bucket = self .s3_conn .get_bucket (self .bucket_name )
54- except Exception , e :
55- raise OperationError (e )
31+ raise OperationError ("Invalid or missing AWS S3 access key, secret key or region detected!" )
32+
33+ self ._pool = S3UploadPool (
34+ self .bucket_name ,
35+ self .region ,
36+ self .access_key ,
37+ self .secret_key ,
38+ self .thread_count ,
39+ self .remove_uploaded ,
40+ self .chunk_size ,
41+ self .s3_acl
42+ )
43+
44+ def get_key_name (self , file_path ):
45+ rel_path = os .path .relpath (file_path , self .backup_dir )
46+ if self .bucket_explicit_key :
47+ key_name = self .bucket_explicit_key
48+ elif self .bucket_prefix == "/" :
49+ key_name = "/%s/%s" % (self .key_prefix , rel_path )
50+ else :
51+ key_name = "%s/%s/%s" % (self .bucket_prefix , self .key_prefix , rel_path )
52+ return key_name
5653
5754 def run (self ):
5855 if not os .path .isdir (self .backup_dir ):
5956 logging .error ("The source directory: %s does not exist or is not a directory! Skipping AWS S3 Upload!" % self .backup_dir )
6057 return
6158 try :
6259 self .timer .start (self .timer_name )
63- for file_name in os .listdir (self .backup_dir ):
64- file_path = os .path .join (self .backup_dir , file_name )
65- # skip mongodb-consistent-backup_META dir
66- if os .path .isdir (file_path ):
67- continue
68- file_size = os .stat (file_path ).st_size
69- chunk_count = int (ceil (file_size / float (self .chunk_size )))
70-
71- if self .bucket_explicit_key :
72- key_name = self .bucket_explicit_key
73- else :
74- if self .bucket_prefix == "/" :
75- key_name = "/%s/%s" % (self .key_prefix , file_name )
76- else :
77- key_name = "%s/%s/%s" % (self .bucket_prefix , self .key_prefix , file_name )
78-
79- logging .info ("Starting multipart AWS S3 upload to key: %s%s using %i threads, %imb chunks, %i retries" % (
80- self .bucket_name ,
81- key_name ,
82- self .thread_count ,
83- self .chunk_size_mb ,
84- self .retries
85- ))
86- self ._multipart = self .bucket .initiate_multipart_upload (key_name )
87- self ._pool = Pool (processes = self .thread_count )
88-
89- for i in range (chunk_count ):
90- offset = self .chunk_size * i
91- byte_count = min (self .chunk_size , file_size - offset )
92- part_num = i + 1
93- self ._pool .apply_async (S3UploadThread (
94- self .bucket_name ,
95- self .region ,
96- self .access_key ,
97- self .secret_key ,
98- self ._multipart .id ,
99- part_num ,
100- file_path ,
101- offset ,
102- byte_count ,
103- self .retries ,
104- self .secure
105- ).run )
106- self ._pool .close ()
107- self ._pool .join ()
108-
109- part_count = 0
110- for part in boto .s3 .multipart .part_lister (self ._multipart ):
111- part_count += 1
112- if part_count == chunk_count :
113- self ._multipart .complete_upload ()
114- if self .s3_acl :
115- try :
116- self .bucket .set_acl (self .s3_acl , key_name )
117- except Exception :
118- logging .exception ("Unable to set ACLs on uploaded key: {}." .format (key_name ))
119- self ._upload_done = True
120-
121- if self .remove_uploaded :
122- logging .info ("Uploaded AWS S3 key: %s%s successfully. Removing local file" % (self .bucket_name , key_name ))
123- os .remove (os .path .join (self .backup_dir , file_name ))
124- else :
125- logging .info ("Uploaded AWS S3 key: %s%s successfully" % (self .bucket_name , key_name ))
126- else :
127- self ._multipart .cancel_upload ()
128- logging .error ("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self .bucket_name , key_name ))
129- raise OperationError ("Failed to upload all multiparts for key: %s%s! Upload cancelled" % (self .bucket_name , key_name ))
130-
131- if self .remove_uploaded :
132- logging .info ("Removing backup source dir after successful AWS S3 upload of all backups" )
133- os .rmdir (self .backup_dir )
134- self .timer .stop (self .timer_name )
60+ logging .info ("Starting AWS S3 upload to %s (%i threads, %imb multipart chunks, %i retries)" % (
61+ self .bucket_name ,
62+ self .thread_count ,
63+ self .chunk_size_mb ,
64+ self .retries
65+ ))
66+ for file_path in get_upload_files (self .backup_dir ):
67+ key_name = self .get_key_name (file_path )
68+ self ._pool .upload (file_path , key_name )
69+ self ._pool .wait ()
13570 except Exception , e :
136- logging .error ("Uploading to AWS S3 failed! Error: %s" % e )
137- if self ._multipart :
138- self ._multipart .cancel_upload ()
71+ logging .error ("Uploading to AWS S3 failed! Error: %s (error type: %s)" % (e , type (e )))
13972 raise OperationError (e )
73+ finally :
74+ self .timer .stop (self .timer_name )
75+ self ._pool .close ()
76+
14077 self .completed = True
14178
142- def close (self ):
79+ def close (self , code = None , frame = None ):
14380 if self ._pool :
144- logging .error ("Terminating multipart AWS S3 upload threads" )
145- self ._pool .terminate ()
146- self ._pool .join ()
147-
148- if self ._multipart and not self ._upload_done :
149- logging .error ("Cancelling incomplete multipart AWS S3 upload" )
150- self ._multipart .cancel_upload ()
151-
152- if self .s3_conn :
153- self .s3_conn .close ()
81+ self ._pool .close ()
0 commit comments