1919import pickle
2020import json
2121import msgpack
22+ import time
2223
2324from cortex .lib import util
2425from cortex .lib .exceptions import CortexException
@@ -114,18 +115,32 @@ def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
114115 def _upload_string_to_s3 (self , string , key ):
115116 self .s3 .put_object (Bucket = self .bucket , Key = key , Body = string )
116117
117- def _read_bytes_from_s3 (self , key , allow_missing = False , ext_bucket = None ):
118+ def _read_bytes_from_s3 (
119+ self , key , allow_missing = False , ext_bucket = None , num_retries = 0 , retry_delay_sec = 2
120+ ):
121+ while True :
122+ try :
123+ return self ._read_bytes_from_s3_single (
124+ key , allow_missing = allow_missing , ext_bucket = ext_bucket
125+ )
126+ except :
127+ if num_retries <= 0 :
128+ raise
129+ num_retries -= 1
130+ time .sleep (retry_delay_sec )
131+
132+ def _read_bytes_from_s3_single (self , key , allow_missing = False , ext_bucket = None ):
118133 bucket = self .bucket
119134 if ext_bucket is not None :
120135 bucket = ext_bucket
121136
122137 try :
123138 try :
124139 byte_array = self .s3 .get_object (Bucket = bucket , Key = key )["Body" ].read ()
125- except self .s3 .exceptions .NoSuchKey as e :
140+ except self .s3 .exceptions .NoSuchKey :
126141 if allow_missing :
127142 return None
128- raise e
143+ raise
129144 except Exception as e :
130145 raise CortexException (
131146 'key "{}" in bucket "{}" could not be accessed; ' .format (key , bucket )
@@ -140,26 +155,41 @@ def search(self, prefix="", suffix=""):
140155 def put_json (self , obj , key ):
141156 self ._upload_string_to_s3 (json .dumps (obj ), key )
142157
143- def get_json (self , key , allow_missing = False ):
144- obj = self ._read_bytes_from_s3 (key , allow_missing )
158+ def get_json (self , key , allow_missing = False , num_retries = 0 , retry_delay_sec = 2 ):
159+ obj = self ._read_bytes_from_s3 (
160+ key ,
161+ allow_missing = allow_missing ,
162+ num_retries = num_retries ,
163+ retry_delay_sec = retry_delay_sec ,
164+ )
145165 if obj is None :
146166 return None
147167 return json .loads (obj .decode ("utf-8" ))
148168
149169 def put_msgpack (self , obj , key ):
150170 self ._upload_string_to_s3 (msgpack .dumps (obj ), key )
151171
152- def get_msgpack (self , key , allow_missing = False ):
153- obj = self ._read_bytes_from_s3 (key , allow_missing )
172+ def get_msgpack (self , key , allow_missing = False , num_retries = 0 , retry_delay_sec = 2 ):
173+ obj = self ._read_bytes_from_s3 (
174+ key ,
175+ allow_missing = allow_missing ,
176+ num_retries = num_retries ,
177+ retry_delay_sec = retry_delay_sec ,
178+ )
154179 if obj == None :
155180 return None
156181 return msgpack .loads (obj , raw = False )
157182
158183 def put_pyobj (self , obj , key ):
159184 self ._upload_string_to_s3 (pickle .dumps (obj ), key )
160185
161- def get_pyobj (self , key , allow_missing = False ):
162- obj = self ._read_bytes_from_s3 (key , allow_missing )
186+ def get_pyobj (self , key , allow_missing = False , num_retries = 0 , retry_delay_sec = 2 ):
187+ obj = self ._read_bytes_from_s3 (
188+ key ,
189+ allow_missing = allow_missing ,
190+ num_retries = num_retries ,
191+ retry_delay_sec = retry_delay_sec ,
192+ )
163193 if obj is None :
164194 return None
165195 return pickle .loads (obj )
@@ -207,9 +237,15 @@ def download_file_external(self, s3_path, local_path):
207237 + "it may not exist, or you may not have suffienct permissions"
208238 ) from e
209239
210- def get_json_external (self , s3_path ):
240+ def get_json_external (self , s3_path , num_retries = 0 , retry_delay_sec = 2 ):
211241 bucket , key = self .deconstruct_s3_path (s3_path )
212- obj = self ._read_bytes_from_s3 (key , ext_bucket = bucket )
242+ obj = self ._read_bytes_from_s3 (
243+ key ,
244+ allow_missing = False ,
245+ ext_bucket = bucket ,
246+ num_retries = num_retries ,
247+ retry_delay_sec = retry_delay_sec ,
248+ )
213249 if obj is None :
214250 return None
215251 return json .loads (obj .decode ("utf-8" ))
0 commit comments