2525
2626
2727def callback_save_job (job , connection , result , * args , ** kwargs ):
28- model_name = job .meta .get ('job_type ' , None )
28+ model_name = job .meta .get ('task_type ' , None )
2929 if model_name is None :
3030 return
3131 model = apps .get_model (app_label = 'scheduler' , model_name = model_name )
3232 task = model .objects .filter (job_id = job .id ).first ()
3333 if task is not None :
34- task .unschedule ()
35- task .schedule ()
34+ task .force_schedule ()
3635
3736
3837class BaseTask (models .Model ):
@@ -80,18 +79,20 @@ def callable_func(self):
8079 """Translate callable string to callable"""
8180 return tools .callable_func (self .callable )
8281
83- @admin .display (boolean = True , description = _ ('is next scheduled?' ))
82+ @admin .display (boolean = True , description = _ ('is scheduled?' ))
8483 def is_scheduled (self ) -> bool :
8584 """Check whether a next job for this task is queued/scheduled to be executed"""
86- if not self .job_id : # no job_id => is not scheduled
85+ if self .job_id is None : # no job_id => is not scheduled
8786 return False
8887 # check whether job_id is in scheduled/enqueued/active jobs
8988 scheduled_jobs = self .rqueue .scheduled_job_registry .get_job_ids ()
9089 enqueued_jobs = self .rqueue .get_job_ids ()
90+ active_jobs = self .rqueue .started_job_registry .get_job_ids ()
9191 res = ((self .job_id in scheduled_jobs )
92- or (self .job_id in enqueued_jobs ))
93- # If the job_id is not scheduled/enqueued, update the job_id to None.
94- # (The job_id belongs to a previous run which is completed or currently running)
92+ or (self .job_id in enqueued_jobs )
93+ or (self .job_id in active_jobs ))
94+ # If the job_id is not scheduled/enqueued/started,
95+ # update the job_id to None. (The job_id belongs to a previous run which is completed)
9596 if not res :
9697 self .job_id = None
9798 super (BaseTask , self ).save ()
@@ -132,8 +133,8 @@ def _enqueue_args(self) -> Dict:
132133 res = dict (
133134 meta = dict (
134135 repeat = self .repeat ,
135- job_type = self .TASK_TYPE ,
136- scheduled_job_id = self .id ,
136+ task_type = self .TASK_TYPE ,
137+ scheduled_task_id = self .id ,
137138 ),
138139 on_success = callback_save_job ,
139140 on_failure = callback_save_job ,
@@ -175,6 +176,11 @@ def schedule(self) -> bool:
175176 """
176177 if not self .ready_for_schedule ():
177178 return False
179+ self .force_schedule ()
180+ return True
181+
182+ def force_schedule (self ):
183+ """Schedule task regardless of its current status"""
178184 schedule_time = self ._schedule_time ()
179185 kwargs = self ._enqueue_args ()
180186 job = self .rqueue .enqueue_at (
@@ -184,19 +190,17 @@ def schedule(self) -> bool:
184190 ** kwargs , )
185191 self .job_id = job .id
186192 super (BaseTask , self ).save ()
187- return True
188193
189194 def enqueue_to_run (self ) -> bool :
190- """Enqueue job to run now.
191- """
195+ """Enqueue job to run now."""
192196 kwargs = self ._enqueue_args ()
193197 job = self .rqueue .enqueue (
194198 tools .run_task ,
195199 args = (self .TASK_TYPE , self .id ),
196200 ** kwargs ,
197201 )
198202 self .job_id = job .id
199- super ( BaseTask , self ) .save ()
203+ self .save (schedule_job = False )
200204 return True
201205
202206 def unschedule (self ) -> bool :
@@ -217,8 +221,7 @@ def _schedule_time(self):
217221 return utc (self .scheduled_time )
218222
219223 def to_dict (self ) -> Dict :
220- """Export model to dictionary, so it can be saved as external file backup
221- """
224+ """Export model to dictionary, so it can be saved as external file backup"""
222225 res = dict (
223226 model = self .TASK_TYPE ,
224227 name = self .name ,
0 commit comments