Skip to content

Commit 8ff681e

Browse files
authored
Check SQS queue length before checking k8s job (#1925)
1 parent 8b42e59 commit 8ff681e

File tree

1 file changed

+10
-14
lines changed
  • pkg/operator/resources/job/batchapi

1 file changed

+10
-14
lines changed

pkg/operator/resources/job/batchapi/cron.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -296,28 +296,24 @@ func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob kbatch.Job
296296
return err
297297
}
298298

299-
// job is still in-progress
300-
if int(k8sJob.Status.Active) != 0 {
301-
return nil
302-
}
303-
304299
queueMessages, err := getQueueMetricsFromURL(queueURL)
305300
if err != nil {
306301
return err
307302
}
308303

309304
if !queueMessages.IsEmpty() {
310305
// Give time for queue metrics to reach consistency
311-
if _jobsToDelete.Has(jobKey.ID) {
312-
_jobsToDelete.Remove(jobKey.ID)
313-
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
314-
return errors.FirstError(
315-
job.SetUnexpectedErrorStatus(jobKey),
316-
deleteJobRuntimeResources(jobKey),
317-
)
306+
if int(k8sJob.Status.Active) == 0 {
307+
if _jobsToDelete.Has(jobKey.ID) {
308+
_jobsToDelete.Remove(jobKey.ID)
309+
jobLogger.Error("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress")
310+
return errors.FirstError(
311+
job.SetUnexpectedErrorStatus(jobKey),
312+
deleteJobRuntimeResources(jobKey),
313+
)
314+
}
315+
_jobsToDelete.Add(jobKey.ID)
318316
}
319-
_jobsToDelete.Add(jobKey.ID)
320-
321317
return nil
322318
}
323319

0 commit comments

Comments
 (0)