Skip to content

Commit 47b7345

Browse files
Miguel Varela RamosRobertLucian
andauthored
TaskAPI (#1717)
Co-authored-by: Robert Lucian Chiriac <robert.lucian.chiriac@gmail.com>
1 parent 8c003b4 commit 47b7345

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+3675
-983
lines changed

build/build-image.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ if [[ "$image" == *"-slim" ]]; then
3333
build_args="--build-arg SLIM=true"
3434
fi
3535

36-
if [ "${image}" == "python-predictor-gpu-slim" ]; then
36+
if [ "${image}" == *"gpu-slim" ]; then
3737
cuda=("10.0" "10.1" "10.1" "10.2" "10.2" "11.0" "11.1")
3838
cudnn=("7" "7" "8" "7" "8" "8" "8")
3939
for i in ${!cudnn[@]}; do

cli/cluster/delete.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexlabs/cortex/pkg/lib/prompt"
2626
s "github.com/cortexlabs/cortex/pkg/lib/strings"
2727
"github.com/cortexlabs/cortex/pkg/operator/schema"
28+
"github.com/cortexlabs/cortex/pkg/types/userconfig"
2829
)
2930

3031
func Delete(operatorConfig OperatorConfig, apiName string, keepCache bool, force bool) (schema.DeleteResponse, error) {
@@ -73,13 +74,20 @@ func getReadyRealtimeAPIReplicasOrNil(operatorConfig OperatorConfig, apiName str
7374
return &totalReady
7475
}
7576

76-
func StopJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.DeleteResponse, error) {
77+
func StopJob(operatorConfig OperatorConfig, kind userconfig.Kind, apiName string, jobID string) (schema.DeleteResponse, error) {
7778
params := map[string]string{
7879
"apiName": apiName,
7980
"jobID": jobID,
8081
}
8182

82-
httpRes, err := HTTPDelete(operatorConfig, path.Join("/batch", apiName), params)
83+
var endpointComponent string
84+
if kind == userconfig.BatchAPIKind {
85+
endpointComponent = "batch"
86+
} else {
87+
endpointComponent = "tasks"
88+
}
89+
90+
httpRes, err := HTTPDelete(operatorConfig, path.Join("/"+endpointComponent, apiName), params)
8391
if err != nil {
8492
return schema.DeleteResponse{}, err
8593
}

cli/cluster/get.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,31 @@ func GetAPIByID(operatorConfig OperatorConfig, apiName string, apiID string) ([]
6565
return apiRes, nil
6666
}
6767

68-
func GetJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.JobResponse, error) {
68+
func GetBatchJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.BatchJobResponse, error) {
6969
endpoint := path.Join("/batch", apiName)
7070
httpRes, err := HTTPGet(operatorConfig, endpoint, map[string]string{"jobID": jobID})
7171
if err != nil {
72-
return schema.JobResponse{}, err
72+
return schema.BatchJobResponse{}, err
7373
}
7474

75-
var jobRes schema.JobResponse
75+
var jobRes schema.BatchJobResponse
7676
if err = json.Unmarshal(httpRes, &jobRes); err != nil {
77-
return schema.JobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
77+
return schema.BatchJobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
78+
}
79+
80+
return jobRes, nil
81+
}
82+
83+
func GetTaskJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.TaskJobResponse, error) {
84+
endpoint := path.Join("/tasks", apiName)
85+
httpRes, err := HTTPGet(operatorConfig, endpoint, map[string]string{"jobID": jobID})
86+
if err != nil {
87+
return schema.TaskJobResponse{}, err
88+
}
89+
90+
var jobRes schema.TaskJobResponse
91+
if err = json.Unmarshal(httpRes, &jobRes); err != nil {
92+
return schema.TaskJobResponse{}, errors.Wrap(err, endpoint, string(httpRes))
7893
}
7994

8095
return jobRes, nil

cli/cluster/logs.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,6 @@ func StreamJobLogs(operatorConfig OperatorConfig, apiName string, jobID string)
4242
return streamLogs(operatorConfig, "/logs/"+apiName, map[string]string{"jobID": jobID})
4343
}
4444

45-
func GetGCPLogsURL(operatorConfig OperatorConfig, apiName string) (schema.GCPLogsResponse, error) {
46-
httpRes, err := HTTPGet(operatorConfig, "/logs/"+apiName)
47-
if err != nil {
48-
return schema.GCPLogsResponse{}, err
49-
}
50-
51-
var gcpLogsResponse schema.GCPLogsResponse
52-
if err = json.Unmarshal(httpRes, &gcpLogsResponse); err != nil {
53-
return schema.GCPLogsResponse{}, errors.Wrap(err, "/logs/"+apiName, string(httpRes))
54-
}
55-
56-
return gcpLogsResponse, nil
57-
}
58-
5945
func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[string]string) error {
6046
interrupt := make(chan os.Signal, 1)
6147
signal.Notify(interrupt, os.Interrupt)

cli/cmd/delete.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ var _deleteCmd = &cobra.Command{
7070

7171
var deleteResponse schema.DeleteResponse
7272
if len(args) == 2 {
73-
deleteResponse, err = cluster.StopJob(MustGetOperatorConfig(env.Name), args[0], args[1])
73+
apisRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), args[0])
74+
if err != nil {
75+
exit.Error(err)
76+
}
77+
78+
deleteResponse, err = cluster.StopJob(MustGetOperatorConfig(env.Name), apisRes[0].Spec.Kind, args[0], args[1])
7479
if err != nil {
7580
exit.Error(err)
7681
}

cli/cmd/get.go

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,17 @@ var _getCmd = &cobra.Command{
127127
return "", err
128128
}
129129

130-
jobTable, err := getJob(env, args[0], args[1])
130+
apisRes, err := cluster.GetAPI(MustGetOperatorConfig(envName), args[0])
131+
if err != nil {
132+
return "", err
133+
}
134+
135+
var jobTable string
136+
if apisRes[0].Spec.Kind == userconfig.BatchAPIKind {
137+
jobTable, err = getBatchJob(env, args[0], args[1])
138+
} else {
139+
jobTable, err = getTaskJob(env, args[0], args[1])
140+
}
131141
if err != nil {
132142
return "", err
133143
}
@@ -189,6 +199,8 @@ func getAPIsInAllEnvironments() (string, error) {
189199
var allRealtimeAPIEnvs []string
190200
var allBatchAPIs []schema.APIResponse
191201
var allBatchAPIEnvs []string
202+
var allTaskAPIs []schema.APIResponse
203+
var allTaskAPIEnvs []string
192204
var allTrafficSplitters []schema.APIResponse
193205
var allTrafficSplitterEnvs []string
194206

@@ -219,6 +231,9 @@ func getAPIsInAllEnvironments() (string, error) {
219231
case userconfig.RealtimeAPIKind:
220232
allRealtimeAPIEnvs = append(allRealtimeAPIEnvs, env.Name)
221233
allRealtimeAPIs = append(allRealtimeAPIs, api)
234+
case userconfig.TaskAPIKind:
235+
allTaskAPIEnvs = append(allTaskAPIEnvs, env.Name)
236+
allTaskAPIs = append(allTaskAPIs, api)
222237
case userconfig.TrafficSplitterKind:
223238
allTrafficSplitterEnvs = append(allTrafficSplitterEnvs, env.Name)
224239
allTrafficSplitters = append(allTrafficSplitters, api)
@@ -243,7 +258,7 @@ func getAPIsInAllEnvironments() (string, error) {
243258

244259
out := ""
245260

246-
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 {
261+
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 && len(allTaskAPIs) == 0 {
247262
// check if any environments errorred
248263
if len(errorsMap) != len(cliConfig.Environments) {
249264
if len(errorsMap) == 0 {
@@ -271,20 +286,26 @@ func getAPIsInAllEnvironments() (string, error) {
271286
out += t.MustFormat()
272287
}
273288

274-
if len(allRealtimeAPIs) > 0 {
275-
t := realtimeAPIsTable(allRealtimeAPIs, allRealtimeAPIEnvs)
276-
289+
if len(allTaskAPIs) > 0 {
290+
t := taskAPIsTable(allTaskAPIs, allTaskAPIEnvs)
277291
if len(allBatchAPIs) > 0 {
278292
out += "\n"
279293
}
294+
out += t.MustFormat()
295+
}
280296

297+
if len(allRealtimeAPIs) > 0 {
298+
t := realtimeAPIsTable(allRealtimeAPIs, allRealtimeAPIEnvs)
299+
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
300+
out += "\n"
301+
}
281302
out += t.MustFormat()
282303
}
283304

284305
if len(allTrafficSplitters) > 0 {
285306
t := trafficSplitterListTable(allTrafficSplitters, allTrafficSplitterEnvs)
286307

287-
if len(allRealtimeAPIs) > 0 || len(allBatchAPIs) > 0 {
308+
if len(allRealtimeAPIs) > 0 || len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
288309
out += "\n"
289310
}
290311

@@ -319,20 +340,23 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
319340

320341
var allRealtimeAPIs []schema.APIResponse
321342
var allBatchAPIs []schema.APIResponse
343+
var allTaskAPIs []schema.APIResponse
322344
var allTrafficSplitters []schema.APIResponse
323345

324346
for _, api := range apisRes {
325347
switch api.Spec.Kind {
326348
case userconfig.BatchAPIKind:
327349
allBatchAPIs = append(allBatchAPIs, api)
350+
case userconfig.TaskAPIKind:
351+
allTaskAPIs = append(allTaskAPIs, api)
328352
case userconfig.RealtimeAPIKind:
329353
allRealtimeAPIs = append(allRealtimeAPIs, api)
330354
case userconfig.TrafficSplitterKind:
331355
allTrafficSplitters = append(allTrafficSplitters, api)
332356
}
333357
}
334358

335-
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 {
359+
if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTaskAPIs) == 0 && len(allTrafficSplitters) == 0 {
336360
return console.Bold("no apis are deployed"), nil
337361
}
338362

@@ -350,6 +374,22 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
350374
out += t.MustFormat()
351375
}
352376

377+
if len(allTaskAPIs) > 0 {
378+
envNames := []string{}
379+
for range allTaskAPIs {
380+
envNames = append(envNames, env.Name)
381+
}
382+
383+
t := taskAPIsTable(allTaskAPIs, envNames)
384+
t.FindHeaderByTitle(_titleEnvironment).Hidden = true
385+
386+
if len(allBatchAPIs) > 0 {
387+
out += "\n"
388+
}
389+
390+
out += t.MustFormat()
391+
}
392+
353393
if len(allRealtimeAPIs) > 0 {
354394
envNames := []string{}
355395
for range allRealtimeAPIs {
@@ -359,7 +399,7 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
359399
t := realtimeAPIsTable(allRealtimeAPIs, envNames)
360400
t.FindHeaderByTitle(_titleEnvironment).Hidden = true
361401

362-
if len(allBatchAPIs) > 0 {
402+
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 {
363403
out += "\n"
364404
}
365405

@@ -375,7 +415,7 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) {
375415
t := trafficSplitterListTable(allTrafficSplitters, envNames)
376416
t.FindHeaderByTitle(_titleEnvironment).Hidden = true
377417

378-
if len(allBatchAPIs) > 0 || len(allRealtimeAPIs) > 0 {
418+
if len(allBatchAPIs) > 0 || len(allTaskAPIs) > 0 || len(allRealtimeAPIs) > 0 {
379419
out += "\n"
380420
}
381421

@@ -412,6 +452,8 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) {
412452
return trafficSplitterTable(apiRes, env)
413453
case userconfig.BatchAPIKind:
414454
return batchAPITable(apiRes), nil
455+
case userconfig.TaskAPIKind:
456+
return taskAPITable(apiRes), nil
415457
default:
416458
return "", errors.ErrorUnexpected(fmt.Sprintf("encountered unexpected kind %s for api %s", apiRes.Spec.Kind, apiRes.Spec.Name))
417459
}

cli/cmd/lib_batch_apis.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab
4949
latestJobID := "-"
5050
runningJobs := 0
5151

52-
for _, job := range batchAPI.JobStatuses {
52+
for _, job := range batchAPI.BatchJobStatuses {
5353
if job.StartTime.After(latestStartTime) {
5454
latestStartTime = job.StartTime
5555
latestJobID = job.ID + fmt.Sprintf(" (submitted %s ago)", libtime.SinceStr(&latestStartTime))
@@ -82,14 +82,14 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab
8282
}
8383

8484
func batchAPITable(batchAPI schema.APIResponse) string {
85-
jobRows := make([][]interface{}, 0, len(batchAPI.JobStatuses))
85+
jobRows := make([][]interface{}, 0, len(batchAPI.BatchJobStatuses))
8686

8787
out := ""
88-
if len(batchAPI.JobStatuses) == 0 {
89-
out = console.Bold("no submitted jobs\n")
88+
if len(batchAPI.BatchJobStatuses) == 0 {
89+
out = console.Bold("no submitted batch jobs\n")
9090
} else {
9191
totalFailed := 0
92-
for _, job := range batchAPI.JobStatuses {
92+
for _, job := range batchAPI.BatchJobStatuses {
9393
succeeded := 0
9494
failed := 0
9595

@@ -144,8 +144,8 @@ func batchAPITable(batchAPI schema.APIResponse) string {
144144
return out
145145
}
146146

147-
func getJob(env cliconfig.Environment, apiName string, jobID string) (string, error) {
148-
resp, err := cluster.GetJob(MustGetOperatorConfig(env.Name), apiName, jobID)
147+
func getBatchJob(env cliconfig.Environment, apiName string, jobID string) (string, error) {
148+
resp, err := cluster.GetBatchJob(MustGetOperatorConfig(env.Name), apiName, jobID)
149149
if err != nil {
150150
return "", err
151151
}
@@ -216,12 +216,11 @@ func getJob(env cliconfig.Environment, apiName string, jobID string) (string, er
216216
out += titleStr("batch stats") + t.MustFormat(&table.Opts{BoldHeader: pointer.Bool(false)})
217217

218218
if job.Status == status.JobEnqueuing {
219-
out += "\nstill enqueuing, workers have not been allocated for this job yet\n"
219+
out += "\n" + "still enqueuing, workers have not been allocated for this job yet\n"
220220
} else if job.Status.IsCompleted() {
221-
out += "\nworker stats are not available because this job is not currently running\n"
221+
out += "\n" + "worker stats are not available because this job is not currently running\n"
222222
} else {
223223
out += titleStr("worker stats")
224-
225224
if job.WorkerCounts != nil {
226225
t := table.Table{
227226
Headers: []table.Header{
@@ -253,7 +252,7 @@ func getJob(env cliconfig.Environment, apiName string, jobID string) (string, er
253252

254253
out += "\n" + console.Bold("job endpoint: ") + resp.Endpoint + "\n"
255254

256-
jobSpecStr, err := libjson.Pretty(job.Job)
255+
jobSpecStr, err := libjson.Pretty(job.BatchJob)
257256
if err != nil {
258257
return "", err
259258
}

0 commit comments

Comments
 (0)