Skip to content

Commit 6336187

Browse files
Fix async api responses' statuses (status would be stuck in in_queue state) (#2072)
Co-authored-by: vishal <vishalbollu@users.noreply.github.com>
1 parent bc4eb1a commit 6336187

File tree

4 files changed

+79
-15
lines changed

4 files changed

+79
-15
lines changed

async-gateway/service.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func (s *service) CreateWorkload(id string, payload io.Reader, contentType strin
6666
return "", err
6767
}
6868

69-
statusPath := fmt.Sprintf("%s/%s/status", prefix, id)
69+
statusPath := fmt.Sprintf("%s/%s/%s", prefix, id, StatusInQueue)
7070
log.Debug(fmt.Sprintf("setting status to %s", StatusInQueue))
71-
if err := s.storage.Upload(statusPath, strings.NewReader(string(StatusInQueue)), "text/plain"); err != nil {
71+
if err := s.storage.Upload(statusPath, strings.NewReader(""), "text/plain"); err != nil {
7272
return "", err
7373
}
7474

@@ -77,32 +77,24 @@ func (s *service) CreateWorkload(id string, payload io.Reader, contentType strin
7777

7878
// GetWorkload retrieves the status and result, if available, of a given workload
7979
func (s *service) GetWorkload(id string) (GetWorkloadResponse, error) {
80-
prefix := s.workloadStoragePrefix()
8180
log := s.logger.With(zap.String("id", id))
8281

83-
// download workload status
84-
statusPath := fmt.Sprintf("%s/%s/status", prefix, id)
85-
log.Debug("downloading status file", zap.String("path", statusPath))
86-
statusBuf, err := s.storage.Download(statusPath)
82+
status, err := s.getStatus(id)
8783
if err != nil {
8884
return GetWorkloadResponse{}, err
8985
}
9086

91-
status := Status(statusBuf[:])
92-
switch status {
93-
case StatusFailed, StatusInProgress, StatusInQueue:
87+
if status != StatusCompleted {
9488
return GetWorkloadResponse{
9589
ID: id,
9690
Status: status,
9791
}, nil
98-
case StatusCompleted: // continues execution after switch/case, below
99-
default:
100-
return GetWorkloadResponse{}, fmt.Errorf("invalid workload status: %s", status)
10192
}
10293

10394
// attempt to download user result
95+
prefix := s.workloadStoragePrefix()
10496
resultPath := fmt.Sprintf("%s/%s/result.json", prefix, id)
105-
log.Debug("donwloading user result", zap.String("path", resultPath))
97+
log.Debug("downloading user result", zap.String("path", resultPath))
10698
resultBuf, err := s.storage.Download(resultPath)
10799
if err != nil {
108100
return GetWorkloadResponse{}, err
@@ -127,6 +119,38 @@ func (s *service) GetWorkload(id string) (GetWorkloadResponse, error) {
127119
}, nil
128120
}
129121

122+
func (s *service) getStatus(id string) (Status, error) {
123+
prefix := s.workloadStoragePrefix()
124+
log := s.logger.With(zap.String("id", id))
125+
126+
// download workload status
127+
log.Debug("checking status", zap.String("path", fmt.Sprintf("%s/%s/*", prefix, id)))
128+
files, err := s.storage.List(fmt.Sprintf("%s/%s", prefix, id))
129+
if err != nil {
130+
return "", err
131+
}
132+
133+
// determine request status
134+
status := StatusInQueue
135+
for _, file := range files {
136+
fileStatus := Status(file)
137+
138+
if !fileStatus.Valid() {
139+
status = fileStatus
140+
return "", fmt.Errorf("invalid workload status: %s", status)
141+
}
142+
if fileStatus == StatusInProgress {
143+
status = fileStatus
144+
}
145+
if fileStatus == StatusCompleted || fileStatus == StatusFailed {
146+
status = fileStatus
147+
break
148+
}
149+
}
150+
151+
return status, nil
152+
}
153+
130154
func (s *service) workloadStoragePrefix() string {
131155
return fmt.Sprintf("%s/apis/%s/workloads", s.clusterName, s.apiName)
132156
}

async-gateway/storage.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package main
1818

1919
import (
2020
"io"
21+
"path"
22+
"strings"
2123
"time"
2224

2325
"github.com/aws/aws-sdk-go/aws"
@@ -30,6 +32,7 @@ import (
3032
type Storage interface {
3133
Upload(key string, payload io.Reader, contentType string) error
3234
Download(key string) ([]byte, error)
35+
List(key string) ([]string, error)
3336
GetLastModified(key string) (time.Time, error)
3437
}
3538

@@ -80,6 +83,30 @@ func (s *s3) Download(key string) ([]byte, error) {
8083
return buff.Bytes(), nil
8184
}
8285

86+
// List lists a set of files from a given S3 path.
87+
// Works only for one level deep sub-paths.
88+
func (s *s3) List(key string) ([]string, error) {
89+
if key != "" && !strings.HasSuffix(key, "/") {
90+
key += "/"
91+
}
92+
93+
result, err := s.client.ListObjectsV2(&awss3.ListObjectsV2Input{
94+
Prefix: aws.String(key),
95+
Bucket: aws.String(s.bucket),
96+
})
97+
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
files := []string{}
103+
for _, obj := range result.Contents {
104+
_, file := path.Split(*obj.Key)
105+
files = append(files, file)
106+
}
107+
return files, nil
108+
}
109+
83110
// GetLastModified retrieves the last modified timestamp of an S3 object
84111
func (s *s3) GetLastModified(key string) (time.Time, error) {
85112
input := awss3.GetObjectInput{

async-gateway/types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@ const (
3232
StatusCompleted Status = "completed"
3333
)
3434

35+
func (status Status) String() string {
36+
return string(status)
37+
}
38+
39+
func (status Status) Valid() bool {
40+
switch status {
41+
case StatusFailed, StatusInProgress, StatusInQueue, StatusCompleted:
42+
return true
43+
default:
44+
return false
45+
}
46+
}
47+
3548
//CreateWorkloadResponse represents the response returned to the user on workload creation
3649
type CreateWorkloadResponse struct {
3750
ID string `json:"id"`

pkg/cortex/serve/cortex_internal/lib/api/async_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def statsd(self):
9494
return self.__statsd
9595

9696
def update_status(self, request_id: str, status: str):
97-
self.storage.put_str(status, f"{self.storage_path}/{request_id}/status")
97+
self.storage.put_str("", f"{self.storage_path}/{request_id}/{status}")
9898

9999
def upload_result(self, request_id: str, result: Dict[str, Any]):
100100
if not isinstance(result, dict):

0 commit comments

Comments
 (0)