1919)
2020from scheduler .redis_models import JobStatus , SchedulerLock , Result , ResultType , JobModel
2121from scheduler .settings import logger , SCHEDULER_CONFIG
22- from scheduler .types import ConnectionType , FunctionReferenceType , Self
22+ from scheduler .types import ConnectionType , FunctionReferenceType , Self , PipelineType
2323
2424
2525class InvalidJobOperation (Exception ):
@@ -30,6 +30,10 @@ class NoSuchJobError(Exception):
3030 pass
3131
3232
33+ class NoSuchRegistryError (Exception ):
34+ pass
35+
36+
3337def perform_job (job_model : JobModel , connection : ConnectionType ) -> Any : # noqa
3438 """The main execution method. Invokes the job function with the job arguments.
3539
@@ -45,17 +49,17 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
4549 coro_result = loop .run_until_complete (result )
4650 result = coro_result
4751 if job_model .success_callback :
48- job_model .success_callback (job_model , connection , result ) # type: ignore
52+ job_model .success_callback (job_model , connection , result )
4953 return result
5054 except :
5155 if job_model .failure_callback :
52- job_model .failure_callback (job_model , connection , * sys .exc_info ()) # type: ignore
56+ job_model .failure_callback (job_model , connection , * sys .exc_info ())
5357 raise
5458 finally :
5559 assert job_model is _job_stack .pop ()
5660
5761
58- _job_stack = []
62+ _job_stack : List [ JobModel ] = []
5963
6064
6165class Queue :
@@ -68,14 +72,14 @@ class Queue:
6872 queued = "queued_job_registry" ,
6973 )
7074
71- def __init__ (self , connection : Optional [ ConnectionType ] , name : str , is_async : bool = True ) -> None :
75+ def __init__ (self , connection : ConnectionType , name : str , is_async : bool = True ) -> None :
7276 """Initializes a Queue object.
7377
7478 :param name: The queue name
7579 :param connection: Broker connection
7680 :param is_async: Whether jobs should run "async" (using the worker).
7781 """
78- self .connection = connection
82+ self .connection : ConnectionType = connection
7983 self .name = name
8084 self ._is_async = is_async
8185 self .queued_job_registry = QueuedJobRegistry (connection = self .connection , name = self .name )
@@ -85,11 +89,11 @@ def __init__(self, connection: Optional[ConnectionType], name: str, is_async: bo
8589 self .scheduled_job_registry = ScheduledJobRegistry (connection = self .connection , name = self .name )
8690 self .canceled_job_registry = CanceledJobRegistry (connection = self .connection , name = self .name )
8791
88- def __len__ (self ):
92+ def __len__ (self ) -> int :
8993 return self .count
9094
9195 @property
92- def scheduler_pid (self ) -> int :
96+ def scheduler_pid (self ) -> Optional [ int ] :
9397 lock = SchedulerLock (self .name )
9498 pid = lock .value (self .connection )
9599 return int (pid .decode ()) if pid is not None else None
@@ -155,11 +159,11 @@ def count(self) -> int:
155159 res += getattr (self , registry ).count (connection = self .connection )
156160 return res
157161
158- def get_registry (self , name : str ) -> Union [ None , JobNamesRegistry ] :
162+ def get_registry (self , name : str ) -> JobNamesRegistry :
159163 name = name .lower ()
160164 if name in Queue .REGISTRIES :
161- return getattr (self , Queue .REGISTRIES [name ])
162- return None
165+ return getattr (self , Queue .REGISTRIES [name ]) # type: ignore
166+ raise NoSuchRegistryError ( f"Unknown registry name { name } " )
163167
164168 def get_all_job_names (self ) -> List [str ]:
165169 res = list ()
@@ -178,22 +182,21 @@ def get_all_jobs(self) -> List[JobModel]:
178182 def create_and_enqueue_job (
179183 self ,
180184 func : FunctionReferenceType ,
181- args : Union [Tuple , List , None ] = None ,
182- kwargs : Optional [Dict ] = None ,
185+ args : Union [Tuple [ Any , ...], List [ Any ] , None ] = None ,
186+ kwargs : Optional [Dict [ str , Any ] ] = None ,
183187 when : Optional [datetime ] = None ,
184188 timeout : Optional [int ] = None ,
185189 result_ttl : Optional [int ] = None ,
186190 job_info_ttl : Optional [int ] = None ,
187191 description : Optional [str ] = None ,
188192 name : Optional [str ] = None ,
189193 at_front : bool = False ,
190- meta : Optional [Dict ] = None ,
194+ meta : Optional [Dict [ str , Any ] ] = None ,
191195 on_success : Optional [Callback ] = None ,
192196 on_failure : Optional [Callback ] = None ,
193197 on_stopped : Optional [Callback ] = None ,
194198 task_type : Optional [str ] = None ,
195199 scheduled_task_id : Optional [int ] = None ,
196- pipeline : Optional [ConnectionType ] = None ,
197200 ) -> JobModel :
198201 """Creates a job to represent the delayed function call and enqueues it.
199202 :param when: When to schedule the job (None to enqueue immediately)
@@ -212,7 +215,6 @@ def create_and_enqueue_job(
212215 :param on_stopped: Callback for on stopped
213216 :param task_type: The task type
214217 :param scheduled_task_id: The scheduled task id
215- :param pipeline: The Broker Pipeline
216218 :returns: The enqueued Job
217219 """
218220 status = JobStatus .QUEUED if when is None else JobStatus .SCHEDULED
@@ -236,7 +238,7 @@ def create_and_enqueue_job(
236238 scheduled_task_id = scheduled_task_id ,
237239 )
238240 if when is None :
239- job_model = self .enqueue_job (job_model , connection = pipeline , at_front = at_front )
241+ job_model = self .enqueue_job (job_model , at_front = at_front )
240242 elif isinstance (when , datetime ):
241243 job_model .save (connection = self .connection )
242244 self .scheduled_job_registry .schedule (self .connection , job_model .name , when )
@@ -246,7 +248,7 @@ def create_and_enqueue_job(
246248
247249 def job_handle_success (
248250 self , job : JobModel , result : Any , job_info_ttl : int , result_ttl : int , connection : ConnectionType
249- ):
251+ ) -> None :
250252 """Saves and cleanup job after successful execution"""
251253 job .after_execution (
252254 job_info_ttl ,
@@ -264,7 +266,7 @@ def job_handle_success(
264266 ttl = result_ttl ,
265267 )
266268
267- def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ):
269+ def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ) -> None :
268270 # Does not set job status since the job might be stopped
269271 job .after_execution (
270272 SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
@@ -304,10 +306,7 @@ def run_sync(self, job: JobModel) -> JobModel:
304306
305307 @classmethod
306308 def dequeue_any (
307- cls ,
308- queues : List [Self ],
309- timeout : Optional [int ],
310- connection : Optional [ConnectionType ] = None ,
309+ cls , queues : List [Self ], timeout : Optional [int ], connection : ConnectionType
311310 ) -> Tuple [Optional [JobModel ], Optional [Self ]]:
312311 """Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
313312 is important.
@@ -410,19 +409,19 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
410409 pass
411410
412411 def enqueue_job (
413- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
412+ self , job_model : JobModel , pipeline : Optional [PipelineType ] = None , at_front : bool = False
414413 ) -> JobModel :
415414 """Enqueues a job for delayed execution without checking dependencies.
416415
417416 If Queue is instantiated with is_async=False, job is executed immediately.
418417 :param job_model: The job redis model
419- :param connection : The Redis Pipeline
418+ :param pipeline : The Broker Pipeline
420419 :param at_front: Whether to enqueue the job at the front
421420
422421 :returns: The enqueued JobModel
423422 """
424423
425- pipe = connection if connection is not None else self .connection .pipeline ()
424+ pipe : PipelineType = pipeline if pipeline is not None else self .connection .pipeline ()
426425 job_model .started_at = None
427426 job_model .ended_at = None
428427 job_model .status = JobStatus .QUEUED
0 commit comments