Skip to content

Commit 898e7a3

Browse files
authored
Fix golang pointer issue and job success calculation for batch (#1729)
1 parent ee79c66 commit 898e7a3

File tree

3 files changed

+19
-29
lines changed

3 files changed

+19
-29
lines changed

docs/workloads/batch/endpoints.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ RESPONSE:
183183
"config": {<string>: <any>},
184184
"api_id": <string>,
185185
"sqs_url": <string>,
186-
"status": <string>, # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_stopped
186+
"status": <string>, # will be one of the following values: status_unknown|status_enqueuing|status_running|status_enqueue_failed|status_completed_with_failures|status_succeeded|status_unexpected_error|status_worker_error|status_worker_oom|status_timed_out|status_stopped
187187
"batches_in_queue": <int> # number of batches remaining in the queue
188188
"batch_metrics": {
189189
"succeeded": <int> # number of succeeded batches

pkg/cortex/serve/start/batch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def renew_message_visibility(receipt_handle: str):
102102
continue
103103
elif e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
104104
# there may be a delay between the cron may deleting the queue and this worker stopping
105-
cx_logger().info(
105+
logger().info(
106106
"failed to renew message visibility because the queue was not found"
107107
)
108108
else:
@@ -266,7 +266,7 @@ def handle_on_job_complete(message):
266266
should_run_on_job_complete = True
267267
time.sleep(10) # verify that the queue is empty one more time
268268
except:
269-
logger.exception("failed to handle on_job_complete")
269+
logger().exception("failed to handle on_job_complete")
270270
raise
271271
finally:
272272
with receipt_handle_mutex:

pkg/operator/resources/batchapi/manage_resources_cron.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ func ManageJobResources() error {
8181

8282
k8sJobMap := map[string]*kbatch.Job{}
8383
k8sJobIDSet := strset.Set{}
84-
for _, job := range jobs {
84+
for i := range jobs {
85+
job := jobs[i]
8586
k8sJobMap[job.Labels["jobID"]] = &job
8687
k8sJobIDSet.Add(job.Labels["jobID"])
8788
}
@@ -302,32 +303,21 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job
302303
return err
303304
}
304305

305-
if jobSpec.Workers == int(k8sJob.Status.Succeeded) {
306-
if jobSpec.TotalBatchCount == batchMetrics.Succeeded {
307-
_jobsToDelete.Remove(jobKey.ID)
308-
return errors.FirstError(
309-
setSucceededStatus(jobKey),
310-
deleteJobRuntimeResources(jobKey),
311-
)
312-
}
306+
if jobSpec.TotalBatchCount == batchMetrics.Succeeded {
307+
_jobsToDelete.Remove(jobKey.ID)
308+
return errors.FirstError(
309+
setSucceededStatus(jobKey),
310+
deleteJobRuntimeResources(jobKey),
311+
)
312+
}
313313

314-
// wait one more cycle for the success metrics to reach consistency
315-
if _jobsToDelete.Has(jobKey.ID) {
316-
_jobsToDelete.Remove(jobKey.ID)
317-
return errors.FirstError(
318-
setCompletedWithFailuresStatus(jobKey),
319-
deleteJobRuntimeResources(jobKey),
320-
)
321-
}
322-
} else {
323-
if _jobsToDelete.Has(jobKey.ID) {
324-
_jobsToDelete.Remove(jobKey.ID)
325-
return errors.FirstError(
326-
writeToJobLogStream(jobKey, "unexpected job state; queue is empty but cluster state still indicates that the job is still in progress"),
327-
setUnexpectedErrorStatus(jobKey),
328-
deleteJobRuntimeResources(jobKey),
329-
)
330-
}
314+
// wait one more cycle for the success metrics to reach consistency
315+
if _jobsToDelete.Has(jobKey.ID) {
316+
_jobsToDelete.Remove(jobKey.ID)
317+
return errors.FirstError(
318+
setCompletedWithFailuresStatus(jobKey),
319+
deleteJobRuntimeResources(jobKey),
320+
)
331321
}
332322

333323
// It takes at least 20 seconds for a worker to exit after determining that the queue is empty.

0 commit comments

Comments
 (0)