Skip to content

Commit 5dfebf0

Browse files
authored
Remove job state update when calculating job state (#1718)
1 parent 56094bf commit 5dfebf0

File tree

2 files changed

+20
-55
lines changed

2 files changed

+20
-55
lines changed

pkg/operator/resources/batchapi/job_state.go

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package batchapi
1818

1919
import (
20-
"fmt"
2120
"path"
2221
"path/filepath"
2322
"time"
@@ -215,7 +214,12 @@ func setStatusForJob(jobKey spec.JobKey, jobStatus status.JobCode) error {
215214
}
216215

217216
func setEnqueuingStatus(jobKey spec.JobKey) error {
218-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueuing.String()))
217+
err := updateLiveness(jobKey)
218+
if err != nil {
219+
return err
220+
}
221+
222+
err = config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueuing.String()))
219223
if err != nil {
220224
return err
221225
}
@@ -354,88 +358,49 @@ func setTimedOutStatus(jobKey spec.JobKey) error {
354358
return nil
355359
}
356360

357-
func getJobStatusFromJobState(initialJobState *JobState, k8sJob *kbatch.Job, pods []kcore.Pod) (*status.JobStatus, error) {
358-
jobKey := initialJobState.JobKey
361+
func getJobStatusFromJobState(jobState *JobState, k8sJob *kbatch.Job, pods []kcore.Pod) (*status.JobStatus, error) {
362+
jobKey := jobState.JobKey
359363

360364
jobSpec, err := downloadJobSpec(jobKey)
361365
if err != nil {
362366
return nil, err
363367
}
364368

365-
latestJobState := initialJobState // Refetch the state of an in progress job in case the cron modifies the job state between the time the initial fetch and now
366-
367-
if initialJobState.Status.IsInProgress() {
368-
queueURL, err := getJobQueueURL(jobKey)
369-
if err != nil {
370-
return nil, err
371-
}
372-
373-
latestJobCode, message, err := reconcileInProgressJob(initialJobState, &queueURL, k8sJob)
374-
if err != nil {
375-
return nil, err
376-
}
377-
378-
if latestJobCode != initialJobState.Status {
379-
err := errors.FirstError(
380-
writeToJobLogStream(jobKey, message),
381-
setStatusForJob(jobKey, latestJobCode),
382-
)
383-
if err != nil {
384-
return nil, err
385-
}
386-
}
387-
388-
latestJobState, err = getJobState(jobKey)
389-
if err != nil {
390-
return nil, err
391-
}
392-
}
393-
394369
jobStatus := status.JobStatus{
395370
Job: *jobSpec,
396-
EndTime: latestJobState.EndTime,
397-
Status: latestJobState.Status,
371+
EndTime: jobState.EndTime,
372+
Status: jobState.Status,
398373
}
399374

400-
if latestJobState.Status.IsInProgress() {
375+
if jobState.Status.IsInProgress() {
401376
queueMetrics, err := getQueueMetrics(jobKey)
402377
if err != nil {
403378
return nil, err
404379
}
405380

406381
jobStatus.BatchesInQueue = queueMetrics.TotalUserMessages()
407382

408-
if latestJobState.Status == status.JobEnqueuing {
383+
if jobState.Status == status.JobEnqueuing {
409384
jobStatus.TotalBatchCount = queueMetrics.TotalUserMessages()
410385
}
411386

412-
if latestJobState.Status == status.JobRunning {
387+
if jobState.Status == status.JobRunning {
413388
metrics, err := getRealTimeBatchMetrics(jobKey)
414389
if err != nil {
415390
return nil, err
416391
}
417392
jobStatus.BatchMetrics = metrics
418393

419-
if k8sJob == nil {
420-
err := setUnexpectedErrorStatus(jobKey)
421-
if err != nil {
422-
return nil, err
423-
}
424-
425-
writeToJobLogStream(jobKey, fmt.Sprintf("unexpected: kubernetes job not found"))
426-
deleteJobRuntimeResources(jobKey)
427-
428-
jobStatus.Status = status.JobUnexpectedError
429-
return &jobStatus, nil
394+
// There can be race conditions where the job state is temporarily out of sync with the cluster state
395+
if k8sJob != nil {
396+
workerCounts := getWorkerCountsForJob(*k8sJob, pods)
397+
jobStatus.WorkerCounts = &workerCounts
430398
}
431-
432-
workerCounts := getWorkerCountsForJob(*k8sJob, pods)
433-
jobStatus.WorkerCounts = &workerCounts
434399
}
435400
}
436401

437-
if latestJobState.Status.IsCompleted() {
438-
metrics, err := getCompletedBatchMetrics(jobKey, jobSpec.StartTime, *latestJobState.EndTime)
402+
if jobState.Status.IsCompleted() {
403+
metrics, err := getCompletedBatchMetrics(jobKey, jobSpec.StartTime, *jobState.EndTime)
439404
if err != nil {
440405
return nil, err
441406
}

pkg/operator/resources/batchapi/manage_resources_cron.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func reconcileInProgressJob(jobState *JobState, queueURL *string, k8sJob *kbatch
249249
return status.JobUnexpectedError, fmt.Sprintf("terminating job %s; sqs queue with url %s was not found", jobKey.UserString(), expectedQueueURL), nil
250250
}
251251

252-
if jobState.Status == status.JobEnqueuing && time.Now().Sub(jobState.LastUpdatedMap[_enqueuingLivenessFile]) >= _enqueuingLivenessPeriod+_enqueuingLivenessBuffer {
252+
if jobState.Status == status.JobEnqueuing && time.Since(jobState.LastUpdatedMap[_enqueuingLivenessFile]) >= _enqueuingLivenessPeriod+_enqueuingLivenessBuffer {
253253
return status.JobEnqueueFailed, fmt.Sprintf("terminating job %s; enqueuing liveness check failed", jobKey.UserString()), nil
254254
}
255255

0 commit comments

Comments
 (0)