Skip to content

Commit e7e323a

Browse files
authored
Fix batch bugs (#1471)
1 parent 6ecfcc5 commit e7e323a

File tree

4 files changed

+21
-9
lines changed

4 files changed

+21
-9
lines changed

pkg/operator/resources/batchapi/enqueue.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,13 @@ func enqueueS3FileContents(jobSpec *spec.Job, delimitedFiles *schema.DelimitedFi
236236
s3Path := awslib.S3Path(bucket, *s3Obj.Key)
237237
writeToJobLogStream(jobSpec.JobKey, fmt.Sprintf("enqueuing contents from file %s", s3Path))
238238

239+
awsClientForBucket, err := awslib.NewFromClientS3Path(s3Path, config.AWS)
240+
if err != nil {
241+
return false, err
242+
}
243+
239244
itemIndex := 0
240-
err := config.AWS.S3FileIterator(bucket, s3Obj, _s3DownloadChunkSize, func(readCloser io.ReadCloser, isLastChunk bool) (bool, error) {
245+
err = awsClientForBucket.S3FileIterator(bucket, s3Obj, _s3DownloadChunkSize, func(readCloser io.ReadCloser, isLastChunk bool) (bool, error) {
241246
_, err := bytesBuffer.ReadFrom(readCloser)
242247
if err != nil {
243248
return false, err

pkg/operator/resources/batchapi/job.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,15 @@ func deployJob(apiSpec *spec.API, jobSpec *spec.Job, submission *schema.JobSubmi
191191
return
192192
}
193193

194-
err = setRunningStatus(jobSpec.JobKey)
194+
err = createK8sJob(apiSpec, jobSpec)
195195
if err != nil {
196196
handleJobSubmissionError(jobSpec.JobKey, err)
197-
return
198197
}
199198

200-
err = createK8sJob(apiSpec, jobSpec)
199+
err = setRunningStatus(jobSpec.JobKey)
201200
if err != nil {
202201
handleJobSubmissionError(jobSpec.JobKey, err)
202+
return
203203
}
204204
}
205205

pkg/operator/resources/batchapi/manage_resources_cron.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,12 @@ func ManageJobResources() error {
132132
continue
133133
}
134134

135-
err = checkIfJobCompleted(jobKey, *queueURL, k8sJob)
136-
if err != nil {
137-
telemetry.Error(err)
138-
errors.PrintError(err)
135+
if jobState.Status == status.JobRunning {
136+
err = checkIfJobCompleted(jobKey, *queueURL, k8sJob)
137+
if err != nil {
138+
telemetry.Error(err)
139+
errors.PrintError(err)
140+
}
139141
}
140142
}
141143

pkg/operator/resources/batchapi/s3_iterator.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,12 @@ func s3IteratorFromLister(s3Lister schema.S3Lister, fn func(string, *s3.Object)
5252
return err
5353
}
5454

55-
err = config.AWS.S3Iterator(bucket, key, false, nil, func(s3Obj *s3.Object) (bool, error) {
55+
awsClientForBucket, err := aws.NewFromClientS3Path(s3Path, config.AWS)
56+
if err != nil {
57+
return err
58+
}
59+
60+
err = awsClientForBucket.S3Iterator(bucket, key, false, nil, func(s3Obj *s3.Object) (bool, error) {
5661
s3FilePath := aws.S3Path(bucket, *s3Obj.Key)
5762

5863
shouldSkip := false

0 commit comments

Comments
 (0)