@@ -34,7 +34,7 @@ class NoSuchRegistryError(Exception):
3434 pass
3535
3636
37- def perform_job (job_model : JobModel , connection : ConnectionType ) -> Any : # noqa
37+ def queue_perform_job (job_model : JobModel , connection : ConnectionType ) -> Any : # noqa
3838 """The main execution method. Invokes the job function with the job arguments.
3939
4040 :returns: The job's return value
@@ -82,21 +82,12 @@ def __init__(self, connection: ConnectionType, name: str, is_async: bool = True)
8282 self .connection : ConnectionType = connection
8383 self .name = name
8484 self ._is_async = is_async
85- self .queued_job_registry = QueuedJobRegistry (connection = self .connection , name = self .name )
86- self .active_job_registry = ActiveJobRegistry (connection = self .connection , name = self .name )
87- self .failed_job_registry = FailedJobRegistry (connection = self .connection , name = self .name )
88- self .finished_job_registry = FinishedJobRegistry (connection = self .connection , name = self .name )
89- self .scheduled_job_registry = ScheduledJobRegistry (connection = self .connection , name = self .name )
90- self .canceled_job_registry = CanceledJobRegistry (connection = self .connection , name = self .name )
91-
92- def refresh_connection (self , connection : ConnectionType ) -> None :
93- self .connection = connection
94- self .queued_job_registry .connection = connection
95- self .active_job_registry .connection = connection
96- self .failed_job_registry .connection = connection
97- self .finished_job_registry .connection = connection
98- self .scheduled_job_registry .connection = connection
99- self .canceled_job_registry .connection = connection
85+ self .queued_job_registry = QueuedJobRegistry (name = self .name )
86+ self .active_job_registry = ActiveJobRegistry (name = self .name )
87+ self .failed_job_registry = FailedJobRegistry (name = self .name )
88+ self .finished_job_registry = FinishedJobRegistry (name = self .name )
89+ self .scheduled_job_registry = ScheduledJobRegistry (name = self .name )
90+ self .canceled_job_registry = CanceledJobRegistry (name = self .name )
10091
10192 def __len__ (self ) -> int :
10293 return self .count
@@ -114,7 +105,7 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
114105 Removed jobs are added to the global failed job queue.
115106 """
116107 before_score = timestamp or current_timestamp ()
117- self .queued_job_registry .compact ()
108+ self .queued_job_registry .compact (self . connection )
118109 started_jobs : List [Tuple [str , float ]] = self .active_job_registry .get_job_names_before (
119110 self .connection , before_score
120111 )
@@ -142,7 +133,7 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
142133 getattr (self , registry ).cleanup (connection = self .connection , timestamp = before_score )
143134
144135 def first_queued_job_name (self ) -> Optional [str ]:
145- return self .queued_job_registry .get_first ()
136+ return self .queued_job_registry .get_first (self . connection )
146137
147138 @property
148139 def count (self ) -> int :
@@ -160,12 +151,12 @@ def get_registry(self, name: str) -> JobNamesRegistry:
160151
161152 def get_all_job_names (self ) -> List [str ]:
162153 all_job_names = list ()
163- all_job_names .extend (self .queued_job_registry .all ())
164- all_job_names .extend (self .finished_job_registry .all ())
165- all_job_names .extend (self .active_job_registry .all ())
166- all_job_names .extend (self .failed_job_registry .all ())
167- all_job_names .extend (self .scheduled_job_registry .all ())
168- all_job_names .extend (self .canceled_job_registry .all ())
154+ all_job_names .extend (self .queued_job_registry .all (self . connection ))
155+ all_job_names .extend (self .finished_job_registry .all (self . connection ))
156+ all_job_names .extend (self .active_job_registry .all (self . connection ))
157+ all_job_names .extend (self .failed_job_registry .all (self . connection ))
158+ all_job_names .extend (self .scheduled_job_registry .all (self . connection ))
159+ all_job_names .extend (self .canceled_job_registry .all (self . connection ))
169160 res = list (filter (lambda job_name : JobModel .exists (job_name , self .connection ), all_job_names ))
170161 return res
171162
@@ -280,7 +271,7 @@ def run_sync(self, job: JobModel) -> JobModel:
280271 """Run a job synchronously, meaning on the same process the method was called."""
281272 job .prepare_for_execution ("sync" , self .active_job_registry , self .connection )
282273 try :
283- result = perform_job (job , self .connection )
274+ result = queue_perform_job (job , self .connection )
284275 self .job_handle_success (job , result = result , job_info_ttl = job .job_info_ttl , result_ttl = job .success_ttl )
285276 except Exception as e : # noqa
286277 logger .warning (f"Job { job .name } failed with exception: { e } " )
@@ -307,7 +298,7 @@ def dequeue_any(
307298 while True :
308299 registries = [q .queued_job_registry for q in queues ]
309300 for registry in registries :
310- registry .compact ()
301+ registry .compact (connection )
311302
312303 registry_key , job_name = QueuedJobRegistry .pop (connection , registries , timeout )
313304 if job_name is None :
@@ -416,7 +407,7 @@ def enqueue_job(
416407 if at_front :
417408 score = current_timestamp ()
418409 else :
419- score = self .queued_job_registry .get_last_timestamp () or current_timestamp ()
410+ score = self .queued_job_registry .get_last_timestamp (self . connection ) or current_timestamp ()
420411 self .scheduled_job_registry .delete (connection = pipe , job_name = job_model .name )
421412 self .queued_job_registry .add (connection = pipe , score = score , job_name = job_model .name )
422413 pipe .execute ()
0 commit comments