1515import inspect
1616import json
1717import os
18+ import sys
1819import pathlib
1920import threading
2021import time
2728from cortex_internal .lib .concurrency import LockedFile
2829from cortex_internal .lib .storage import S3
2930from cortex_internal .lib .log import configure_logger
31+ from cortex_internal .lib .exceptions import CortexException
3032
3133logger = configure_logger ("cortex" , os .environ ["CORTEX_LOG_CONFIG_FILE" ])
3234
@@ -268,9 +270,8 @@ def handle_on_job_complete(message):
268270 break
269271 should_run_on_job_complete = True
270272 time .sleep (10 ) # verify that the queue is empty one more time
271- except :
272- logger .exception ("failed to handle on_job_complete" )
273- raise
273+ except Exception as err :
274+ raise CortexException ("failed to handle on_job_complete" ) from err
274275 finally :
275276 with receipt_handle_mutex :
276277 stop_renewal .add (receipt_handle )
@@ -310,11 +311,19 @@ def start():
310311 storage , api_spec = get_spec (provider , api_spec_path , cache_dir , region )
311312 job_spec = get_job_spec (storage , cache_dir , job_spec_path )
312313
313- client = api .predictor .initialize_client (
314- tf_serving_host = tf_serving_host , tf_serving_port = tf_serving_port
315- )
316- logger .info ("loading the predictor from {}" .format (api .predictor .path ))
317- predictor_impl = api .predictor .initialize_impl (project_dir , client , job_spec )
314+ try :
315+ client = api .predictor .initialize_client (
316+ tf_serving_host = tf_serving_host , tf_serving_port = tf_serving_port
317+ )
318+ logger .info ("loading the predictor from {}" .format (api .predictor .path ))
319+ predictor_impl = api .predictor .initialize_impl (project_dir , client , job_spec )
320+ except CortexException as err :
321+ err .wrap (f"failed to start job { job_spec ['job_id' ]} " )
322+ logger .error (str (err ), exc_info = True )
323+ sys .exit (1 )
324+ except :
325+ logger .error (f"failed to start job { job_spec ['job_id' ]} " , exc_info = True )
326+ sys .exit (1 )
318327
319328 local_cache ["api_spec" ] = api
320329 local_cache ["provider" ] = provider
@@ -326,7 +335,15 @@ def start():
326335 open ("/mnt/workspace/api_readiness.txt" , "a" ).close ()
327336
328337 logger .info ("polling for batches..." )
329- sqs_loop ()
338+ try :
339+ sqs_loop ()
340+ except CortexException as err :
341+ err .wrap (f"failed to run job { job_spec ['job_id' ]} " )
342+ logger .error (str (err ), exc_info = True )
343+ sys .exit (1 )
344+ except :
345+ logger .error (f"failed to run job { job_spec ['job_id' ]} " , exc_info = True )
346+ sys .exit (1 )
330347
331348
332349if __name__ == "__main__" :
0 commit comments