Skip to content

Commit f0e389d

Browse files
authored
Catch operator panics in go routines with sentry (#1746)
1 parent 1050782 commit f0e389d

File tree

12 files changed

+105
-22
lines changed

12 files changed

+105
-22
lines changed

cli/cluster/logs.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cortexlabs/cortex/pkg/lib/errors"
3030
"github.com/cortexlabs/cortex/pkg/lib/exit"
3131
"github.com/cortexlabs/cortex/pkg/lib/json"
32+
"github.com/cortexlabs/cortex/pkg/lib/routines"
3233
"github.com/cortexlabs/cortex/pkg/operator/schema"
3334
"github.com/gorilla/websocket"
3435
)
@@ -112,7 +113,7 @@ func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[strin
112113
}
113114

114115
func handleConnection(connection *websocket.Conn, done chan struct{}) {
115-
go func() {
116+
routines.RunWithPanicHandler(func() {
116117
defer close(done)
117118
for {
118119
_, message, err := connection.ReadMessage()
@@ -121,7 +122,7 @@ func handleConnection(connection *websocket.Conn, done chan struct{}) {
121122
}
122123
fmt.Println(string(message))
123124
}
124-
}()
125+
}, false)
125126
}
126127

127128
func closeConnection(connection *websocket.Conn, done chan struct{}, interrupt chan os.Signal) {

cli/cmd/lib_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexlabs/cortex/pkg/lib/errors"
3535
"github.com/cortexlabs/cortex/pkg/lib/exit"
3636
"github.com/cortexlabs/cortex/pkg/lib/files"
37+
"github.com/cortexlabs/cortex/pkg/lib/routines"
3738
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
3839
"github.com/cortexlabs/yaml"
3940
dockertypes "github.com/docker/docker/api/types"
@@ -91,12 +92,13 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop
9192
c := make(chan os.Signal, 1)
9293
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
9394
caughtCtrlC := false
94-
go func() {
95+
96+
routines.RunWithPanicHandler(func() {
9597
<-c
9698
caughtCtrlC = true
9799
removeContainer()
98100
exit.Error(ErrorDockerCtrlC())
99-
}()
101+
}, false)
100102

101103
for _, copyPath := range copyToPaths {
102104
err = docker.CopyToContainer(containerInfo.ID, copyPath.input, copyPath.containerPath)

pkg/lib/cron/cron.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"github.com/cortexlabs/cortex/pkg/lib/errors"
23+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2324
)
2425

2526
type Cron struct {
@@ -39,7 +40,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron {
3940
}
4041
}
4142

42-
go func() {
43+
routines.RunWithPanicHandler(func() {
4344
timer := time.NewTimer(0)
4445
defer timer.Stop()
4546
for {
@@ -53,7 +54,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron {
5354
}
5455
timer.Reset(delay)
5556
}
56-
}()
57+
}, false)
5758

5859
return Cron{
5960
cronRun: cronRun,

pkg/lib/parallel/parallel.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ func Run(fn func() error, fns ...func() error) []error {
4141
}
4242

4343
go func() {
44+
defer func() {
45+
if r := recover(); r != nil {
46+
errChannel <- errors.CastRecoverError(r)
47+
}
48+
}()
4449
errChannel <- fn()
4550
}()
4651
}

pkg/lib/routines/routines.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2020 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routines
18+
19+
import (
20+
"github.com/cortexlabs/cortex/pkg/lib/errors"
21+
"github.com/cortexlabs/cortex/pkg/lib/exit"
22+
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
23+
)
24+
25+
func RunWithPanicHandler(f func(), exitOnPanic bool) {
26+
go func() {
27+
defer func() {
28+
if r := recover(); r != nil {
29+
err := errors.CastRecoverError(r)
30+
errors.PrintStacktrace(err)
31+
if exitOnPanic {
32+
exit.Error(err)
33+
}
34+
telemetry.Error(err)
35+
}
36+
}()
37+
f()
38+
}()
39+
}

pkg/operator/resources/batchapi/api.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/cortexlabs/cortex/pkg/lib/errors"
2424
"github.com/cortexlabs/cortex/pkg/lib/parallel"
25+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2526
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
2627
"github.com/cortexlabs/cortex/pkg/operator/config"
2728
"github.com/cortexlabs/cortex/pkg/operator/operator"
@@ -51,14 +52,20 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string,
5152

5253
err = applyK8sResources(api, prevVirtualService)
5354
if err != nil {
54-
go deleteK8sResources(api.Name)
55+
routines.RunWithPanicHandler(func() {
56+
deleteK8sResources(api.Name)
57+
}, false)
5558
return nil, "", err
5659
}
5760

5861
err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
5962
if err != nil {
60-
go deleteK8sResources(api.Name)
61-
go operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
63+
routines.RunWithPanicHandler(func() {
64+
deleteK8sResources(api.Name)
65+
}, false)
66+
routines.RunWithPanicHandler(func() {
67+
operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
68+
}, false)
6269
return nil, "", err
6370
}
6471

@@ -150,7 +157,9 @@ func deleteS3Resources(apiName string) error {
150157
},
151158
func() error {
152159
prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName)
153-
go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while
160+
routines.RunWithPanicHandler(func() {
161+
config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while
162+
}, false)
154163
return nil
155164
},
156165
func() error {

pkg/operator/resources/batchapi/job.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/cortexlabs/cortex/pkg/lib/errors"
24+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2425
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
2526
"github.com/cortexlabs/cortex/pkg/operator/config"
2627
"github.com/cortexlabs/cortex/pkg/operator/operator"
@@ -127,7 +128,9 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err
127128
return nil, err
128129
}
129130

130-
go deployJob(apiSpec, &jobSpec, submission)
131+
routines.RunWithPanicHandler(func() {
132+
deployJob(apiSpec, &jobSpec, submission)
133+
}, false)
131134

132135
return &jobSpec, nil
133136
}
@@ -256,12 +259,16 @@ func deleteJobRuntimeResources(jobKey spec.JobKey) error {
256259
func StopJob(jobKey spec.JobKey) error {
257260
jobState, err := getJobState(jobKey)
258261
if err != nil {
259-
go deleteJobRuntimeResources(jobKey)
262+
routines.RunWithPanicHandler(func() {
263+
deleteJobRuntimeResources(jobKey)
264+
}, false)
260265
return err
261266
}
262267

263268
if !jobState.Status.IsInProgress() {
264-
go deleteJobRuntimeResources(jobKey)
269+
routines.RunWithPanicHandler(func() {
270+
deleteJobRuntimeResources(jobKey)
271+
}, false)
265272
return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString())
266273
}
267274

pkg/operator/resources/batchapi/logs.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
2626
"github.com/cortexlabs/cortex/pkg/lib/cache"
2727
"github.com/cortexlabs/cortex/pkg/lib/errors"
28+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2829
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
2930
s "github.com/cortexlabs/cortex/pkg/lib/strings"
3031
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
@@ -63,9 +64,13 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) {
6364
defer close(podCheckCancel)
6465

6566
if jobStatus.Status.IsInProgress() {
66-
go streamFromCloudWatch(jobStatus, podCheckCancel, socket)
67+
routines.RunWithPanicHandler(func() {
68+
streamFromCloudWatch(jobStatus, podCheckCancel, socket)
69+
}, false)
6770
} else {
68-
go fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket)
71+
routines.RunWithPanicHandler(func() {
72+
fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket)
73+
}, false)
6974
}
7075

7176
pumpStdin(socket)

pkg/operator/resources/realtimeapi/api.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2626
"github.com/cortexlabs/cortex/pkg/lib/parallel"
2727
"github.com/cortexlabs/cortex/pkg/lib/pointer"
28+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2829
"github.com/cortexlabs/cortex/pkg/operator/config"
2930
"github.com/cortexlabs/cortex/pkg/operator/operator"
3031
"github.com/cortexlabs/cortex/pkg/operator/schema"
@@ -67,14 +68,18 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
6768
}
6869

6970
if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil {
70-
go deleteK8sResources(api.Name)
71+
routines.RunWithPanicHandler(func() {
72+
deleteK8sResources(api.Name)
73+
}, false)
7174
return nil, "", err
7275
}
7376

7477
if config.Provider == types.AWSProviderType {
7578
err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
7679
if err != nil {
77-
go deleteK8sResources(api.Name)
80+
routines.RunWithPanicHandler(func() {
81+
deleteK8sResources(api.Name)
82+
}, false)
7883
return nil, "", err
7984
}
8085
err = addAPIToDashboard(config.Cluster.ClusterName, api.Name)

pkg/operator/resources/realtimeapi/logs.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
2626
"github.com/cortexlabs/cortex/pkg/lib/cache"
2727
"github.com/cortexlabs/cortex/pkg/lib/errors"
28+
"github.com/cortexlabs/cortex/pkg/lib/routines"
2829
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
2930
s "github.com/cortexlabs/cortex/pkg/lib/strings"
3031
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
@@ -56,7 +57,9 @@ type fluentdLog struct {
5657
func ReadLogs(apiName string, socket *websocket.Conn) {
5758
podCheckCancel := make(chan struct{})
5859
defer close(podCheckCancel)
59-
go streamFromCloudWatch(apiName, podCheckCancel, socket)
60+
routines.RunWithPanicHandler(func() {
61+
streamFromCloudWatch(apiName, podCheckCancel, socket)
62+
}, false)
6063
pumpStdin(socket)
6164
podCheckCancel <- struct{}{}
6265
}

0 commit comments

Comments
 (0)