Skip to content

Commit be39ba7

Browse files
RobertLucianMiguel Varela Ramos
andauthored
Report errors to sentry for the async gateway / enqueuer (#2138)
Co-authored-by: Miguel Varela Ramos <miguel@cortexlabs.com>
1 parent ac4de04 commit be39ba7

File tree

38 files changed

+211
-1798
lines changed

38 files changed

+211
-1798
lines changed

async-gateway/go.mod

Lines changed: 0 additions & 10 deletions
This file was deleted.

async-gateway/go.sum

Lines changed: 0 additions & 76 deletions
This file was deleted.

async-gateway/main.go renamed to cmd/async-gateway/main.go

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,110 +20,115 @@ import (
2020
"flag"
2121
"net/http"
2222
"os"
23-
"strings"
2423

25-
"github.com/aws/aws-sdk-go/aws"
26-
"github.com/aws/aws-sdk-go/aws/session"
24+
gateway "github.com/cortexlabs/cortex/pkg/async-gateway"
25+
"github.com/cortexlabs/cortex/pkg/lib/aws"
26+
"github.com/cortexlabs/cortex/pkg/lib/errors"
27+
"github.com/cortexlabs/cortex/pkg/lib/logging"
28+
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
29+
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
30+
"github.com/cortexlabs/cortex/pkg/types/userconfig"
2731
"github.com/gorilla/handlers"
2832
"github.com/gorilla/mux"
29-
"go.uber.org/zap"
30-
"go.uber.org/zap/zapcore"
3133
)
3234

3335
const (
3436
_defaultPort = "8080"
3537
)
3638

37-
func createLogger() (*zap.Logger, error) {
38-
logLevelEnv := os.Getenv("CORTEX_LOG_LEVEL")
39-
disableJSONLogging := os.Getenv("CORTEX_DISABLE_JSON_LOGGING")
40-
41-
var logLevelZap zapcore.Level
42-
switch logLevelEnv {
43-
case "DEBUG":
44-
logLevelZap = zapcore.DebugLevel
45-
case "WARNING":
46-
logLevelZap = zapcore.WarnLevel
47-
case "ERROR":
48-
logLevelZap = zapcore.ErrorLevel
49-
default:
50-
logLevelZap = zapcore.InfoLevel
39+
var (
40+
gatewayLogger = logging.GetLogger()
41+
)
42+
43+
func Exit(err error, wrapStrs ...string) {
44+
for _, str := range wrapStrs {
45+
err = errors.Wrap(err, str)
5146
}
5247

53-
encoderConfig := zap.NewProductionEncoderConfig()
54-
encoderConfig.MessageKey = "message"
48+
if err != nil && !errors.IsNoTelemetry(err) {
49+
telemetry.Error(err)
50+
}
5551

56-
encoding := "json"
57-
if strings.ToLower(disableJSONLogging) == "true" {
58-
encoding = "console"
52+
if err != nil && !errors.IsNoPrint(err) {
53+
gatewayLogger.Error(err)
5954
}
6055

61-
return zap.Config{
62-
Level: zap.NewAtomicLevelAt(logLevelZap),
63-
Encoding: encoding,
64-
EncoderConfig: encoderConfig,
65-
OutputPaths: []string{"stdout"},
66-
ErrorOutputPaths: []string{"stderr"},
67-
}.Build()
56+
telemetry.Close()
57+
58+
os.Exit(1)
6859
}
6960

7061
// usage: ./gateway -bucket <bucket> -region <region> -port <port> -queue queue <apiName>
7162
func main() {
72-
log, err := createLogger()
73-
if err != nil {
74-
panic(err)
75-
}
63+
log := logging.GetLogger()
7664
defer func() {
7765
_ = log.Sync()
7866
}()
7967

8068
var (
81-
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
82-
queueURL = flag.String("queue", "", "SQS queue URL")
83-
region = flag.String("region", "", "AWS region")
84-
bucket = flag.String("bucket", "", "AWS bucket")
85-
clusterUID = flag.String("cluster-uid", "", "cluster UID")
69+
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
70+
queueURL = flag.String("queue", "", "SQS queue URL")
71+
clusterConfigPath = flag.String("cluster-config", "", "cluster config path")
8672
)
8773
flag.Parse()
8874

8975
switch {
9076
case *queueURL == "":
9177
log.Fatal("missing required option: -queue")
92-
case *region == "":
93-
log.Fatal("missing required option: -region")
94-
case *bucket == "":
95-
log.Fatal("missing required option: -bucket")
96-
case *clusterUID == "":
97-
log.Fatal("missing required option: -cluster-uid")
78+
case *clusterConfigPath == "":
79+
log.Fatal("missing required option: -cluster-config")
9880
}
9981

10082
apiName := flag.Arg(0)
10183
if apiName == "" {
10284
log.Fatal("apiName argument was not provided")
10385
}
10486

105-
sess, err := session.NewSessionWithOptions(session.Options{
106-
Config: aws.Config{
107-
Region: region,
87+
clusterConfig, err := clusterconfig.NewForFile(*clusterConfigPath)
88+
if err != nil {
89+
Exit(err)
90+
}
91+
92+
awsClient, err := aws.NewForRegion(clusterConfig.Region)
93+
if err != nil {
94+
Exit(err)
95+
}
96+
97+
_, userID, err := awsClient.CheckCredentials()
98+
if err != nil {
99+
Exit(err)
100+
}
101+
102+
err = telemetry.Init(telemetry.Config{
103+
Enabled: clusterConfig.Telemetry,
104+
UserID: userID,
105+
Properties: map[string]string{
106+
"kind": userconfig.AsyncAPIKind.String(),
107+
"image_type": "async-gateway",
108108
},
109-
SharedConfigState: session.SharedConfigEnable,
109+
Environment: "api",
110+
LogErrors: true,
111+
BackoffMode: telemetry.BackoffDuplicateMessages,
110112
})
111113
if err != nil {
112-
log.Fatal("failed to create AWS session: %s", zap.Error(err))
114+
Exit(err)
113115
}
114116

115-
s3Storage := NewS3(sess, *bucket)
116-
sqsQueue := NewSQS(*queueURL, sess)
117+
sess := awsClient.Session()
118+
s3Storage := gateway.NewS3(sess, clusterConfig.Bucket)
119+
sqsQueue := gateway.NewSQS(*queueURL, sess)
117120

118-
svc := NewService(*clusterUID, apiName, sqsQueue, s3Storage, log)
119-
ep := NewEndpoint(svc, log)
121+
svc := gateway.NewService(clusterConfig.ClusterUID, apiName, sqsQueue, s3Storage, log)
122+
ep := gateway.NewEndpoint(svc, log)
120123

121124
router := mux.NewRouter()
122125
router.HandleFunc("/", ep.CreateWorkload).Methods("POST")
123126
router.HandleFunc(
124127
"/healthz",
125128
func(w http.ResponseWriter, r *http.Request) {
126-
respondPlainText(w, http.StatusOK, "ok")
129+
w.WriteHeader(http.StatusOK)
130+
w.Header().Set("Content-Type", "text/plain")
131+
_, _ = w.Write([]byte("ok"))
127132
},
128133
)
129134
router.HandleFunc("/{id}", ep.GetWorkload).Methods("GET")
@@ -140,6 +145,6 @@ func main() {
140145

141146
log.Info("Running on port " + *port)
142147
if err = http.ListenAndServe(":"+*port, handlers.CORS(corsOptions...)(router)); err != nil {
143-
log.Fatal("failed to start server", zap.Error(err))
148+
Exit(err)
144149
}
145150
}

enqueuer/main.go renamed to cmd/enqueuer/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323

2424
"github.com/cortexlabs/cortex/pkg/consts"
25+
"github.com/cortexlabs/cortex/pkg/enqueuer"
2526
"go.uber.org/zap"
2627
"go.uber.org/zap/zapcore"
2728
)
@@ -105,7 +106,7 @@ func main() {
105106
log.Fatal("-jobID is a required option")
106107
}
107108

108-
envConfig := EnvConfig{
109+
envConfig := enqueuer.EnvConfig{
109110
ClusterUID: clusterUID,
110111
Region: region,
111112
Version: version,
@@ -114,17 +115,17 @@ func main() {
114115
JobID: jobID,
115116
}
116117

117-
enqueuer, err := NewEnqueuer(envConfig, queueURL, log)
118+
eqr, err := enqueuer.NewEnqueuer(envConfig, queueURL, log)
118119
if err != nil {
119120
log.Fatal("failed to create enqueuer", zap.Error(err))
120121
}
121122

122-
totalBatches, err := enqueuer.Enqueue()
123+
totalBatches, err := eqr.Enqueue()
123124
if err != nil {
124125
log.Fatal("failed to enqueue batches", zap.Error(err))
125126
}
126127

127-
if err = enqueuer.UploadBatchCount(totalBatches); err != nil {
128+
if err = eqr.UploadBatchCount(totalBatches); err != nil {
128129
log.Fatal("failed to upload batch count", zap.Error(err))
129130
}
130131

pkg/operator/main.go renamed to cmd/operator/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"github.com/cortexlabs/cortex/pkg/config"
2424
"github.com/cortexlabs/cortex/pkg/lib/cron"
2525
"github.com/cortexlabs/cortex/pkg/lib/errors"
26+
"github.com/cortexlabs/cortex/pkg/lib/logging"
2627
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
2728
"github.com/cortexlabs/cortex/pkg/operator/endpoints"
2829
"github.com/cortexlabs/cortex/pkg/operator/lib/exit"
29-
"github.com/cortexlabs/cortex/pkg/operator/lib/logging"
3030
"github.com/cortexlabs/cortex/pkg/operator/operator"
3131
"github.com/cortexlabs/cortex/pkg/operator/resources/asyncapi"
3232
"github.com/cortexlabs/cortex/pkg/operator/resources/job/taskapi"
@@ -37,7 +37,7 @@ import (
3737
"github.com/prometheus/client_golang/prometheus/promhttp"
3838
)
3939

40-
var operatorLogger = logging.GetOperatorLogger()
40+
var operatorLogger = logging.GetLogger()
4141

4242
const _operatorPortStr = "8888"
4343

File renamed without changes.

dev/operator_local.sh

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fi
7676
export CORTEX_OPERATOR_IN_CLUSTER=false
7777
export CORTEX_CLUSTER_CONFIG_PATH=~/.cortex/cluster-dev.yaml
7878
export CORTEX_DISABLE_JSON_LOGGING=true
79-
export CORTEX_OPERATOR_LOG_LEVEL=debug
79+
export CORTEX_LOG_LEVEL=debug
8080
export CORTEX_PROMETHEUS_URL="http://localhost:9090"
8181

8282
portForwardCMD="kubectl port-forward -n default prometheus-prometheus-0 9090"
@@ -89,17 +89,17 @@ mkdir -p $ROOT/bin
8989

9090
if [ "$operator_only" = "true" ]; then
9191
kill $(pgrep -f rerun) >/dev/null 2>&1 || true
92-
rerun -watch $ROOT/pkg $ROOT/dev/config -run sh -c \
93-
"clear && echo 'building operator...' && go build -o $ROOT/bin/operator $ROOT/pkg/operator && echo 'starting local operator...' && $ROOT/bin/operator"
92+
rerun -watch $ROOT/pkg $ROOT/dev/config $ROOT/cmd/operator -run sh -c \
93+
"clear && echo 'building operator...' && go build -o $ROOT/bin/operator $ROOT/cmd/operator && echo 'starting local operator...' && $ROOT/bin/operator"
9494
elif [ "$debug" = "true" ]; then
95-
DEBUG_CMD="dlv --listen=:2345 --headless=true --api-version=2 debug $ROOT/pkg/operator --output ${ROOT}/bin/__debug_bin"
95+
DEBUG_CMD="dlv --listen=:2345 --headless=true --api-version=2 debug $ROOT/cmd/operator --output ${ROOT}/bin/__debug_bin"
9696
kill $(pgrep -f "${DEBUG_CMD}") >/dev/null 2>&1 || true
9797
kill $(pgrep -f __debug_bin) >/dev/null 2>&1 || true
9898
echo 'starting local operator in debug mode...' && eval "${DEBUG_CMD}"
9999
else
100100
kill $(pgrep -f rerun) >/dev/null 2>&1 || true
101-
rerun -watch $ROOT/pkg $ROOT/cli $ROOT/dev/config -run sh -c \
102-
"clear && echo 'building cli...' && go build -o $ROOT/bin/cortex $ROOT/cli && echo 'building operator...' && go build -o $ROOT/bin/operator $ROOT/pkg/operator && echo 'starting local operator...' && $ROOT/bin/operator"
101+
rerun -watch $ROOT/pkg $ROOT/cli $ROOT/dev/config $ROOT/cmd/operator -run sh -c \
102+
"clear && echo 'building cli...' && go build -o $ROOT/bin/cortex $ROOT/cli && echo 'building operator...' && go build -o $ROOT/bin/operator $ROOT/cmd/operator && echo 'starting local operator...' && $ROOT/bin/operator"
103103
fi
104104

105-
# go run -race $ROOT/pkg/operator/main.go # Check for race conditions. Doesn't seem to catch them all?
105+
# go run -race $ROOT/cmd/operator # Check for race conditions. Doesn't seem to catch them all?

enqueuer/go.mod

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)