55
66import croniter
77from django .apps import apps
8+ from django .conf import settings as django_settings
89from django .contrib import admin
910from django .contrib .contenttypes .fields import GenericRelation
1011from django .core .exceptions import ValidationError
12+ from django .core .mail import mail_admins
1113from django .db import models
1214from django .templatetags .tz import utc
1315from django .urls import reverse
2426SCHEDULER_INTERVAL = settings .SCHEDULER_CONFIG ['SCHEDULER_INTERVAL' ]
2527
2628
27- def callback_save_job (job , connection , result , * args , ** kwargs ):
29+ def failure_callback (job , connection , result , * args , ** kwargs ):
30+ model_name = job .meta .get ('task_type' , None )
31+ scheduled_task_id = job .meta .get ('scheduled_task_id' , None )
32+ if model_name is None or scheduled_task_id :
33+ return
34+ model = apps .get_model (app_label = 'scheduler' , model_name = model_name )
35+ task = model .objects .filter (id = scheduled_task_id ).first ()
36+ mail_admins (f'Task { task .id } /{ task .name } has failed' ,
37+ 'See django-admin for logs' , )
38+ pass
39+
40+
41+ def success_callback (job , connection , result , * args , ** kwargs ):
2842 model_name = job .meta .get ('task_type' , None )
2943 if model_name is None :
3044 return
3145 model = apps .get_model (app_label = 'scheduler' , model_name = model_name )
3246 task = model .objects .filter (job_id = job .id ).first ()
33- if task is not None :
34- task .force_schedule ()
47+ if task is None :
48+ return
49+ task .schedule ()
3550
3651
3752class BaseTask (models .Model ):
@@ -107,12 +122,12 @@ def function_string(self) -> str:
107122 return self .callable + f"({ ', ' .join (args_list + kwargs_list )} )"
108123
109124 def parse_args (self ):
110- """Parse args for running job"""
125+ """Parse args for running the job"""
111126 args = self .callable_args .all ()
112127 return [arg .value () for arg in args ]
113128
114129 def parse_kwargs (self ):
115- """Parse kwargs for running job"""
130+ """Parse kwargs for running the job"""
116131 kwargs = self .callable_kwargs .all ()
117132 return dict ([kwarg .value () for kwarg in kwargs ])
118133
@@ -122,12 +137,12 @@ def _next_job_id(self):
122137 return f'{ self .queue } :{ name } :{ addition } '
123138
124139 def _enqueue_args (self ) -> Dict :
125- """args for DjangoQueue.enqueue.
140+ """Args for DjangoQueue.enqueue.
126141 Set all arguments for DjangoQueue.enqueue/enqueue_at.
127142 Particularly:
128143 - set job timeout and ttl
129- - ensure a callback to reschedule job next iteration.
130- - set job-id to proper format
144+ - ensure a callback to reschedule the job next iteration.
145+ - Set job-id to proper format
131146 - set job meta
132147 """
133148 res = dict (
@@ -136,8 +151,8 @@ def _enqueue_args(self) -> Dict:
136151 task_type = self .TASK_TYPE ,
137152 scheduled_task_id = self .id ,
138153 ),
139- on_success = callback_save_job ,
140- on_failure = callback_save_job ,
154+ on_success = success_callback ,
155+ on_failure = failure_callback ,
141156 job_id = self ._next_job_id (),
142157 )
143158 if self .at_front :
@@ -150,37 +165,31 @@ def _enqueue_args(self) -> Dict:
150165
151166 @property
152167 def rqueue (self ) -> DjangoQueue :
153- """Returns django-queue for job
154- """
168+ """Returns redis-queue for job"""
155169 return get_queue (self .queue )
156170
157171 def ready_for_schedule (self ) -> bool :
158- """Is task ready to be scheduled?
172+ """Is the task ready to be scheduled?
159173
160- If task is already scheduled or disabled, then it is not
174+ If the task is already scheduled or disabled, then it is not
161175 ready to be scheduled.
162176
163- :returns: True if task is ready to be scheduled.
177+ :returns: True if the task is ready to be scheduled.
164178 """
165179 if self .is_scheduled ():
166- logger .debug (f'Job { self .name } already scheduled' )
180+ logger .debug (f'Task { self .name } already scheduled' )
167181 return False
168182 if not self .enabled :
169- logger .debug (f'Job { str (self )} disabled, enable job before scheduling' )
183+ logger .debug (f'Task { str (self )} disabled, enable task before scheduling' )
170184 return False
171185 return True
172186
173187 def schedule (self ) -> bool :
174- """Schedule job to run.
175- :returns: True if job was scheduled, False otherwise.
188+ """Schedule the next execution for the task to run.
189+ :returns: True if a job was scheduled, False otherwise.
176190 """
177191 if not self .ready_for_schedule ():
178192 return False
179- self .force_schedule ()
180- return True
181-
182- def force_schedule (self ):
183- """Schedule task regardless of its current status"""
184193 schedule_time = self ._schedule_time ()
185194 kwargs = self ._enqueue_args ()
186195 job = self .rqueue .enqueue_at (
@@ -190,6 +199,7 @@ def force_schedule(self):
190199 ** kwargs , )
191200 self .job_id = job .id
192201 super (BaseTask , self ).save ()
202+ return True
193203
194204 def enqueue_to_run (self ) -> bool :
195205 """Enqueue job to run now."""
@@ -218,7 +228,7 @@ def unschedule(self) -> bool:
218228 return True
219229
220230 def _schedule_time (self ):
221- return utc (self .scheduled_time )
231+ return utc (self .scheduled_time ) if django_settings . USE_TZ else self . scheduled_time
222232
223233 def to_dict (self ) -> Dict :
224234 """Export model to dictionary, so it can be saved as external file backup"""
@@ -258,10 +268,10 @@ def save(self, **kwargs):
258268 update_fields = kwargs .get ('update_fields' , None )
259269 if update_fields :
260270 kwargs ['update_fields' ] = set (update_fields ).union ({'modified' })
261- super (BaseTask , self ).save (** kwargs )
262271 if schedule_job :
263- self .schedule ()
264- super (BaseTask , self ).save ()
272+ self .schedule () # schedule() already calls save()
273+ else :
274+ super (BaseTask , self ).save (** kwargs )
265275
266276 def delete (self , ** kwargs ):
267277 self .unschedule ()
@@ -371,16 +381,20 @@ def _enqueue_args(self):
371381 res ['meta' ]['interval' ] = self .interval_seconds ()
372382 return res
373383
384+ def _schedule_time (self ):
385+ _now = timezone .now ()
386+ if self .scheduled_time >= _now :
387+ return super ()._schedule_time ()
388+ gap = math .ceil ((_now .timestamp () - self .scheduled_time .timestamp ()) / self .interval_seconds ())
389+ if self .repeat is None or self .repeat >= gap :
390+ self .scheduled_time += timedelta (seconds = self .interval_seconds () * gap )
391+ self .repeat = (self .repeat - gap ) if self .repeat is not None else None
392+ return super ()._schedule_time ()
393+
374394 def ready_for_schedule (self ):
375395 if super (RepeatableTask , self ).ready_for_schedule () is False :
376396 return False
377- if self .scheduled_time < timezone .now ():
378- gap = math .ceil ((timezone .now ().timestamp () - self .scheduled_time .timestamp ()) / self .interval_seconds ())
379- if self .repeat is None or self .repeat >= gap :
380- self .scheduled_time += timedelta (seconds = self .interval_seconds () * gap )
381- self .repeat = (self .repeat - gap ) if self .repeat is not None else None
382-
383- if self .scheduled_time < timezone .now ():
397+ if self ._schedule_time () < timezone .now ():
384398 return False
385399 return True
386400
@@ -411,7 +425,8 @@ def clean_cron_string(self):
411425 raise ValidationError ({'cron_string' : ValidationError (_ (str (e )), code = 'invalid' )})
412426
413427 def _schedule_time (self ):
414- return tools .get_next_cron_time (self .cron_string )
428+ self .scheduled_time = tools .get_next_cron_time (self .cron_string )
429+ return super ()._schedule_time ()
415430
416431 class Meta :
417432 verbose_name = _ ('Cron Task' )
0 commit comments