@@ -244,10 +244,12 @@ def create_and_enqueue_job(
244244 raise TypeError (f"Invalid type for when=`{ when } `" )
245245 return job_model
246246
247- def job_handle_success (self , job : JobModel , result : Any , result_ttl : int , connection : ConnectionType ):
247+ def job_handle_success (
248+ self , job : JobModel , result : Any , job_info_ttl : int , result_ttl : int , connection : ConnectionType
249+ ):
248250 """Saves and cleanup job after successful execution"""
249251 job .after_execution (
250- result_ttl ,
252+ job_info_ttl ,
251253 JobStatus .FINISHED ,
252254 prev_registry = self .active_job_registry ,
253255 new_registry = self .finished_job_registry ,
@@ -280,40 +282,26 @@ def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str,
280282 exc_string = exc_string ,
281283 )
282284
283- def run_job (self , job : JobModel ) -> JobModel :
284- """Run the job
285- :param job: The job to run
286- :returns: The job result
287- """
285+ def run_sync (self , job : JobModel ) -> JobModel :
286+ """Run a job synchronously, meaning on the same process the method was called."""
287+ job .prepare_for_execution ("sync" , self .active_job_registry , self .connection )
288288 try :
289289 result = perform_job (job , self .connection )
290290
291- result_ttl = job .success_ttl
292291 with self .connection .pipeline () as pipeline :
293- self .job_handle_success (job , result = result , result_ttl = result_ttl , connection = pipeline )
294- job .expire (result_ttl , connection = pipeline )
292+ self .job_handle_success (
293+ job , result = result , job_info_ttl = job .job_info_ttl , result_ttl = job .success_ttl , connection = pipeline
294+ )
295+
295296 pipeline .execute ()
296- except Exception as e :
297+ except Exception as e : # noqa
297298 logger .warning (f"Job { job .name } failed with exception: { e } " )
298299 with self .connection .pipeline () as pipeline :
299300 exc_string = "" .join (traceback .format_exception (* sys .exc_info ()))
300301 self .job_handle_failure (JobStatus .FAILED , job , exc_string , pipeline )
301302 pipeline .execute ()
302303 return job
303304
304- def run_sync (self , job : JobModel ) -> JobModel :
305- """Run a job synchronously, meaning on the same process the method was called."""
306- job .prepare_for_execution ("sync" , self .active_job_registry , self .connection )
307-
308- try :
309- self .run_job (job )
310- except : # noqa
311- with self .connection .pipeline () as pipeline :
312- exc_string = "" .join (traceback .format_exception (* sys .exc_info ()))
313- self .job_handle_failure (JobStatus .FAILED , job , exc_string , pipeline )
314- pipeline .execute ()
315- return job
316-
317305 @classmethod
318306 def dequeue_any (
319307 cls ,
0 commit comments