@@ -90,10 +90,10 @@ func ManageJobResources() error {
9090 return err
9191 }
9292
93- k8sJobMap := map [string ]* kbatch.Job {}
93+ k8sJobMap := map [string ]kbatch.Job {}
9494 k8sJobIDSet := strset.Set {}
9595 for _ , kJob := range jobs {
96- k8sJobMap [kJob .Labels ["jobID" ]] = & kJob
96+ k8sJobMap [kJob .Labels ["jobID" ]] = kJob
9797 k8sJobIDSet .Add (kJob .Labels ["jobID" ])
9898 }
9999
@@ -103,7 +103,7 @@ func ManageJobResources() error {
103103 queueURL = pointer .String (queueURLMap [jobKey .ID ])
104104 }
105105
106- k8sJob := k8sJobMap [jobKey .ID ]
106+ k8sJob , jobFound := k8sJobMap [jobKey .ID ]
107107
108108 jobLogger , err := operator .GetJobLogger (jobKey )
109109 if err != nil {
@@ -135,7 +135,7 @@ func ManageJobResources() error {
135135 continue
136136 }
137137
138- newStatusCode , msg , err := reconcileInProgressJob (jobState , queueURL , k8sJob )
138+ newStatusCode , msg , err := reconcileInProgressJob (jobState , queueURL , jobFound )
139139 if err != nil {
140140 telemetry .Error (err )
141141 operatorLogger .Error (err )
@@ -150,7 +150,7 @@ func ManageJobResources() error {
150150 continue
151151 }
152152 }
153- if queueURL == nil || k8sJob == nil {
153+ if queueURL == nil {
154154 // job has been submitted within the grace period, it may take a while for a newly created queues and jobs to show up in list results
155155 continue
156156 }
@@ -249,7 +249,7 @@ func ManageJobResources() error {
249249}
250250
251251// verifies that queue exists for an in progress job and k8s job exists for a job in running status, if verification fails return the a job code to reflect the state
252- func reconcileInProgressJob (jobState * job.State , queueURL * string , k8sJob * kbatch. Job ) (status.JobCode , string , error ) {
252+ func reconcileInProgressJob (jobState * job.State , queueURL * string , jobFound bool ) (status.JobCode , string , error ) {
253253 jobKey := jobState .JobKey
254254
255255 if queueURL == nil {
@@ -275,45 +275,49 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc
275275 return jobState .Status , "" , nil
276276 }
277277
278- if k8sJob == nil { // unexpected k8s job missing
278+ if ! jobFound { // unexpected k8s job missing
279279 return status .JobUnexpectedError , fmt .Sprintf ("terminating job %s; unable to find kubernetes job" , jobKey .UserString ()), nil
280280 }
281281 }
282282
283283 return jobState .Status , "" , nil
284284}
285285
286- func checkIfJobCompleted (jobState * job.State , queueURL string , k8sJob * kbatch.Job ) error {
286+ func checkIfJobCompleted (jobState * job.State , queueURL string , k8sJob kbatch.Job ) error {
287287 jobKey := jobState .JobKey
288288
289289 jobFailed , err := checkForJobFailure (jobKey , k8sJob )
290290 if err != nil || jobFailed {
291291 return err
292292 }
293293
294- queueMessages , err := getQueueMetricsFromURL ( queueURL )
294+ jobLogger , err := operator . GetJobLogger ( jobKey )
295295 if err != nil {
296296 return err
297297 }
298298
299- jobLogger , err := operator .GetJobLogger (jobKey )
299+ // job is still in-progress
300+ if int (k8sJob .Status .Active ) != 0 {
301+ return nil
302+ }
303+
304+ queueMessages , err := getQueueMetricsFromURL (queueURL )
300305 if err != nil {
301306 return err
302307 }
303308
304309 if ! queueMessages .IsEmpty () {
305310 // Give time for queue metrics to reach consistency
306- if k8sJob != nil && int (k8sJob .Status .Active ) == 0 {
307- if _jobsToDelete .Has (jobKey .ID ) {
308- _jobsToDelete .Remove (jobKey .ID )
309- jobLogger .Error ("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress" )
310- return errors .FirstError (
311- job .SetUnexpectedErrorStatus (jobKey ),
312- deleteJobRuntimeResources (jobKey ),
313- )
314- }
315- _jobsToDelete .Add (jobKey .ID )
311+ if _jobsToDelete .Has (jobKey .ID ) {
312+ _jobsToDelete .Remove (jobKey .ID )
313+ jobLogger .Error ("unexpected job status because cluster state indicates job has completed but metrics indicate that job is still in progress" )
314+ return errors .FirstError (
315+ job .SetUnexpectedErrorStatus (jobKey ),
316+ deleteJobRuntimeResources (jobKey ),
317+ )
316318 }
319+ _jobsToDelete .Add (jobKey .ID )
320+
317321 return nil
318322 }
319323
@@ -356,7 +360,7 @@ func checkIfJobCompleted(jobState *job.State, queueURL string, k8sJob *kbatch.Jo
356360 return nil
357361}
358362
359- func checkForJobFailure (jobKey spec.JobKey , k8sJob * kbatch.Job ) (bool , error ) {
363+ func checkForJobFailure (jobKey spec.JobKey , k8sJob kbatch.Job ) (bool , error ) {
360364 jobLogger , err := operator .GetJobLogger (jobKey )
361365 if err != nil {
362366 return false , err
@@ -372,7 +376,7 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
372376 deleteJobRuntimeResources (jobKey ),
373377 )
374378 }
375- if k8sJob != nil && int (k8sJob .Status .Failed ) > 0 {
379+ if int (k8sJob .Status .Failed ) > 0 {
376380 podStatus := k8s .GetPodStatus (& pod )
377381 for _ , containerStatus := range pod .Status .ContainerStatuses {
378382 if containerStatus .LastTerminationState .Terminated != nil {
@@ -394,9 +398,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
394398 }
395399 }
396400
397- if k8sJob == nil {
398- return false , nil
399- }
400401 if int (k8sJob .Status .Failed ) > 0 {
401402 if ! reasonFound {
402403 jobLogger .Error ("workers were killed for unknown reason" )
@@ -405,12 +406,6 @@ func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
405406 job .SetWorkerErrorStatus (jobKey ),
406407 deleteJobRuntimeResources (jobKey ),
407408 )
408- } else if int (k8sJob .Status .Active ) == 0 && int (k8sJob .Status .Failed ) == 0 && len (pods ) == 0 {
409- // really unexpected situation which doesn't hurt if we check
410- return true , errors .FirstError (
411- job .SetUnexpectedErrorStatus (jobKey ),
412- deleteJobRuntimeResources (jobKey ),
413- )
414409 }
415410
416411 return false , nil
0 commit comments