1- from typing import List , Any , Optional , Union
1+ from typing import List , Optional , Union
22
33import django
44from django .apps import apps
@@ -37,43 +37,33 @@ def register_sentry(sentry_dsn, **opts):
3737 rq_register_sentry (sentry_dsn , ** opts )
3838
3939
40- def as_text (v : Union [bytes , str ]) -> Optional [str ]:
40+ def as_str (v : Union [bytes , str ]) -> Optional [str ]:
4141 """Converts a bytes value to a string using `utf-8`.
4242
43- :param v: The value (bytes or string )
43+ :param v: The value (None/ bytes/str )
4444 :raises: ValueError: If the value is not bytes or string
4545 :returns: Either the decoded string or None
4646 """
4747 if v is None :
4848 return None
49- elif isinstance (v , bytes ):
49+ if isinstance (v , bytes ):
5050 return v .decode ("utf-8" )
51- elif isinstance (v , str ):
51+ if isinstance (v , str ):
5252 return v
53- else :
54- raise ValueError ("Unknown type %r" % type (v ))
55-
56-
57- def compact (lst : List [Any ]) -> List [Any ]:
58- """Remove `None` values from an iterable object.
59- :param lst: A list (or list-like) object
60- :returns: The list without None values
61- """
62- return [item for item in lst if item is not None ]
53+ raise ValueError ("Unknown type %r" % type (v ))
6354
6455
6556class JobExecution (Job ):
66- def __eq__ (self , other ):
57+ def __eq__ (self , other ) -> bool :
6758 return isinstance (other , Job ) and self .id == other .id
6859
6960 @property
70- def is_scheduled_task (self ):
61+ def is_scheduled_task (self ) -> bool :
7162 return self .meta .get ("scheduled_task_id" , None ) is not None
7263
73- def is_execution_of (self , scheduled_job ) :
64+ def is_execution_of (self , task : "ScheduledTask" ) -> bool :
7465 return (
75- self .meta .get ("task_type" , None ) == scheduled_job .TASK_TYPE
76- and self .meta .get ("scheduled_task_id" , None ) == scheduled_job .id
66+ self .meta .get ("task_type" , None ) == task .TASK_TYPE and self .meta .get ("scheduled_task_id" , None ) == task .id
7767 )
7868
7969 def stop_execution (self , connection : ConnectionType ):
@@ -138,7 +128,7 @@ def _start_scheduler(
138128 proc = self .scheduler .start ()
139129 self ._set_property ("scheduler_pid" , proc .pid )
140130
141- def execute_job (self , job : "Job" , queue : "Queue" ):
131+ def execute_job (self , job : "Job" , queue : "Queue" ) -> None :
142132 if self .fork_job_execution :
143133 super (DjangoWorker , self ).execute_job (job , queue )
144134 else :
@@ -150,16 +140,17 @@ def work(self, **kwargs) -> bool:
150140 kwargs .setdefault ("with_scheduler" , True )
151141 return super (DjangoWorker , self ).work (** kwargs )
152142
153- def _set_property (self , prop_name : str , val , pipeline : Optional [PipelineType ] = None ):
143+ def _set_property (self , prop_name : str , val , pipeline : Optional [PipelineType ] = None ) -> None :
154144 connection = pipeline if pipeline is not None else self .connection
155145 if val is None :
156146 connection .hdel (self .key , prop_name )
157147 else :
158148 connection .hset (self .key , prop_name , val )
159149
160- def _get_property (self , prop_name : str , pipeline : Optional [PipelineType ] = None ):
150+ def _get_property (self , prop_name : str , pipeline : Optional [PipelineType ] = None ) -> Optional [ str ] :
161151 connection = pipeline if pipeline is not None else self .connection
162- return as_text (connection .hget (self .key , prop_name ))
152+ res = connection .hget (self .key , prop_name )
153+ return as_str (res )
163154
164155 def scheduler_pid (self ) -> Optional [int ]:
165156 if len (self .queues ) == 0 :
@@ -170,6 +161,9 @@ def scheduler_pid(self) -> Optional[int]:
170161
171162
172163class DjangoQueue (Queue ):
164+ """A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be enqueued later at the end of Django's
165+ request/response cycle."""
166+
173167 REGISTRIES = dict (
174168 finished = "finished_job_registry" ,
175169 failed = "failed_job_registry" ,
@@ -178,12 +172,8 @@ class DjangoQueue(Queue):
178172 deferred = "deferred_job_registry" ,
179173 canceled = "canceled_job_registry" ,
180174 )
181- """
182- A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be
183- enqueued later at the end of Django's request/response cycle.
184- """
185175
186- def __init__ (self , * args , ** kwargs ):
176+ def __init__ (self , * args , ** kwargs ) -> None :
187177 kwargs ["job_class" ] = JobExecution
188178 super (DjangoQueue , self ).__init__ (* args , ** kwargs )
189179
@@ -196,43 +186,43 @@ def get_registry(self, name: str) -> Union[None, BaseRegistry, "DjangoQueue"]:
196186 return None
197187
198188 @property
199- def finished_job_registry (self ):
189+ def finished_job_registry (self ) -> FinishedJobRegistry :
200190 return FinishedJobRegistry (self .name , self .connection )
201191
202192 @property
203- def started_job_registry (self ):
193+ def started_job_registry (self ) -> StartedJobRegistry :
204194 return StartedJobRegistry (
205195 self .name ,
206196 self .connection ,
207197 job_class = JobExecution ,
208198 )
209199
210200 @property
211- def deferred_job_registry (self ):
201+ def deferred_job_registry (self ) -> DeferredJobRegistry :
212202 return DeferredJobRegistry (
213203 self .name ,
214204 self .connection ,
215205 job_class = JobExecution ,
216206 )
217207
218208 @property
219- def failed_job_registry (self ):
209+ def failed_job_registry (self ) -> FailedJobRegistry :
220210 return FailedJobRegistry (
221211 self .name ,
222212 self .connection ,
223213 job_class = JobExecution ,
224214 )
225215
226216 @property
227- def scheduled_job_registry (self ):
217+ def scheduled_job_registry (self ) -> ScheduledJobRegistry :
228218 return ScheduledJobRegistry (
229219 self .name ,
230220 self .connection ,
231221 job_class = JobExecution ,
232222 )
233223
234224 @property
235- def canceled_job_registry (self ):
225+ def canceled_job_registry (self ) -> CanceledJobRegistry :
236226 return CanceledJobRegistry (
237227 self .name ,
238228 self .connection ,
@@ -250,24 +240,24 @@ def get_all_job_ids(self) -> List[str]:
250240 res .extend (self .canceled_job_registry .get_job_ids ())
251241 return res
252242
253- def get_all_jobs (self ):
243+ def get_all_jobs (self ) -> List [ JobExecution ] :
254244 job_ids = self .get_all_job_ids ()
255- return compact ( [self .fetch_job (job_id ) for job_id in job_ids ])
245+ return list ( filter ( lambda j : j is not None , [self .fetch_job (job_id ) for job_id in job_ids ]) )
256246
257- def clean_registries (self ):
247+ def clean_registries (self ) -> None :
258248 self .started_job_registry .cleanup ()
259249 self .failed_job_registry .cleanup ()
260250 self .finished_job_registry .cleanup ()
261251
262- def remove_job_id (self , job_id : str ):
252+ def remove_job_id (self , job_id : str ) -> None :
263253 self .connection .lrem (self .key , 0 , job_id )
264254
265- def last_job_id (self ):
255+ def last_job_id (self ) -> Optional [ str ] :
266256 return self .connection .lindex (self .key , 0 )
267257
268258
269259class DjangoScheduler (RQScheduler ):
270- def __init__ (self , * args , ** kwargs ):
260+ def __init__ (self , * args , ** kwargs ) -> None :
271261 kwargs .setdefault ("interval" , settings .SCHEDULER_CONFIG .SCHEDULER_INTERVAL )
272262 super (DjangoScheduler , self ).__init__ (* args , ** kwargs )
273263
@@ -281,10 +271,10 @@ def reschedule_all_jobs():
281271 logger .debug (f"Rescheduling { str (item )} " )
282272 item .save ()
283273
284- def work (self ):
274+ def work (self ) -> None :
285275 django .setup ()
286276 super (DjangoScheduler , self ).work ()
287277
288- def enqueue_scheduled_jobs (self ):
278+ def enqueue_scheduled_jobs (self ) -> None :
289279 self .reschedule_all_jobs ()
290280 super (DjangoScheduler , self ).enqueue_scheduled_jobs ()
0 commit comments