Skip to content

Commit 4c6d74d

Browse files
author
Miguel Varela Ramos
authored
Add metrics to AsyncAPI message handler in dequeuer (#2231)
1 parent 9ac8ad7 commit 4c6d74d

File tree

12 files changed

+278
-38
lines changed

12 files changed

+278
-38
lines changed

cmd/dequeuer/main.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ func main() {
141141
exit(log, err, "unable to initialize metrics client")
142142
}
143143

144+
adminHandler := http.NewServeMux()
145+
adminHandler.Handle("/healthz", dequeuer.HealthcheckHandler(func() bool {
146+
return probe.AreProbesHealthy(probes)
147+
}))
148+
144149
var dequeuerConfig dequeuer.SQSDequeuerConfig
145150
var messageHandler dequeuer.MessageHandler
146151

@@ -177,24 +182,22 @@ func main() {
177182
TargetURL: targetURL,
178183
}
179184

180-
messageHandler = dequeuer.NewAsyncMessageHandler(config, awsClient, log)
185+
asyncStatsReporter := dequeuer.NewAsyncPrometheusStatsReporter()
186+
messageHandler = dequeuer.NewAsyncMessageHandler(config, awsClient, asyncStatsReporter, log)
181187
dequeuerConfig = dequeuer.SQSDequeuerConfig{
182188
Region: clusterConfig.Region,
183189
QueueURL: queueURL,
184190
StopIfNoMessages: false,
185191
}
186192

193+
// report prometheus metrics for async api kinds
194+
adminHandler.Handle("/metrics", asyncStatsReporter)
187195
default:
188196
exit(log, err, fmt.Sprintf("kind %s is not supported", apiKind))
189197
}
190198

191199
errCh := make(chan error)
192200

193-
adminHandler := http.NewServeMux()
194-
adminHandler.Handle("/healthz", dequeuer.HealthcheckHandler(func() bool {
195-
return probe.AreProbesHealthy(probes)
196-
}))
197-
198201
go func() {
199202
server := &http.Server{
200203
Addr: ":" + strconv.Itoa(adminPort),
@@ -219,13 +222,17 @@ func main() {
219222
})
220223
}()
221224

222-
for _, probe := range probes {
223-
stopper := probe.StartProbing()
224-
defer func() {
225-
stopper <- struct{}{}
226-
}()
225+
var stopChs []chan struct{}
226+
for _, p := range probes {
227+
stopChs = append(stopChs, p.StartProbing())
227228
}
228229

230+
defer func() {
231+
for _, stopCh := range stopChs {
232+
stopCh <- struct{}{}
233+
}
234+
}()
235+
229236
select {
230237
case err = <-errCh:
231238
exit(log, err, "error during message dequeueing or error from admin server")

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ require (
3030
github.com/morikuni/aec v1.0.0 // indirect
3131
github.com/onsi/ginkgo v1.14.1
3232
github.com/onsi/gomega v1.10.3
33-
github.com/opencontainers/go-digest v1.0.0
33+
github.com/opencontainers/go-digest v1.0.0 // indirect
3434
github.com/ory/dockertest/v3 v3.6.5
3535
github.com/patrickmn/go-cache v2.1.0+incompatible
3636
github.com/pkg/errors v0.9.1

go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
547547
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
548548
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
549549
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
550-
github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs=
551550
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
552551
github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA=
553552
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=

pkg/dequeuer/async_handler.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"net/http"
2424
"strings"
25+
"time"
2526

2627
"github.com/aws/aws-sdk-go/aws"
2728
"github.com/aws/aws-sdk-go/service/s3"
@@ -39,11 +40,12 @@ const (
3940
)
4041

4142
type AsyncMessageHandler struct {
42-
config AsyncMessageHandlerConfig
43-
aws *awslib.Client
44-
log *zap.SugaredLogger
45-
storagePath string
46-
httpClient *http.Client
43+
config AsyncMessageHandlerConfig
44+
aws *awslib.Client
45+
log *zap.SugaredLogger
46+
storagePath string
47+
httpClient *http.Client
48+
eventHandler RequestEventHandler
4749
}
4850

4951
type AsyncMessageHandlerConfig struct {
@@ -58,13 +60,14 @@ type userPayload struct {
5860
ContentType string
5961
}
6062

61-
func NewAsyncMessageHandler(config AsyncMessageHandlerConfig, awsClient *awslib.Client, logger *zap.SugaredLogger) *AsyncMessageHandler {
63+
func NewAsyncMessageHandler(config AsyncMessageHandlerConfig, awsClient *awslib.Client, eventHandler RequestEventHandler, logger *zap.SugaredLogger) *AsyncMessageHandler {
6264
return &AsyncMessageHandler{
63-
config: config,
64-
aws: awsClient,
65-
log: logger,
66-
storagePath: async.StoragePath(config.ClusterUID, config.APIName),
67-
httpClient: &http.Client{},
65+
config: config,
66+
aws: awsClient,
67+
log: logger,
68+
storagePath: async.StoragePath(config.ClusterUID, config.APIName),
69+
httpClient: &http.Client{},
70+
eventHandler: eventHandler,
6871
}
6972
}
7073

@@ -175,11 +178,18 @@ func (h *AsyncMessageHandler) submitRequest(payload *userPayload, requestID stri
175178

176179
req.Header.Set("Content-Type", payload.ContentType)
177180
req.Header.Set(CortexRequestIDHeader, requestID)
181+
182+
startTime := time.Now()
178183
response, err := h.httpClient.Do(req)
179184
if err != nil {
180185
return nil, ErrorUserContainerNotReachable(err)
181186
}
182187

188+
requestEvent := RequestEvent{
189+
StatusCode: response.StatusCode,
190+
Duration: time.Since(startTime),
191+
}
192+
183193
defer func() {
184194
_ = response.Body.Close()
185195
}()
@@ -197,6 +207,8 @@ func (h *AsyncMessageHandler) submitRequest(payload *userPayload, requestID stri
197207
return nil, ErrorUserContainerResponseNotJSONDecodable()
198208
}
199209

210+
h.eventHandler.HandleEvent(requestEvent)
211+
200212
return result, nil
201213
}
202214

pkg/dequeuer/async_handler_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,17 @@ func TestAsyncMessageHandler_Handle(t *testing.T) {
4949
_, _ = w.Write([]byte("{}"))
5050
}))
5151

52+
var requestEventsCount int
53+
eventHandler := NewRequestEventHandlerFunc(func(event RequestEvent) {
54+
requestEventsCount++
55+
})
56+
5257
asyncHandler := NewAsyncMessageHandler(AsyncMessageHandlerConfig{
5358
ClusterUID: "cortex-test",
5459
Bucket: _testBucket,
5560
APIName: "async-test",
5661
TargetURL: server.URL,
57-
}, awsClient, log)
62+
}, awsClient, eventHandler, log)
5863

5964
_, err := awsClient.S3().CreateBucket(&s3.CreateBucketInput{
6065
Bucket: aws.String(_testBucket),
@@ -75,6 +80,7 @@ func TestAsyncMessageHandler_Handle(t *testing.T) {
7580
fmt.Sprintf("%s/%s/status/%s", asyncHandler.storagePath, requestID, async.StatusCompleted),
7681
)
7782
require.NoError(t, err)
83+
require.Equal(t, 1, requestEventsCount)
7884
}
7985

8086
func TestAsyncMessageHandler_Handle_Errors(t *testing.T) {
@@ -105,12 +111,14 @@ func TestAsyncMessageHandler_Handle_Errors(t *testing.T) {
105111
log := newLogger(t)
106112
awsClient := testAWSClient(t)
107113

114+
eventHandler := NewRequestEventHandlerFunc(func(event RequestEvent) {})
115+
108116
asyncHandler := NewAsyncMessageHandler(AsyncMessageHandlerConfig{
109117
ClusterUID: "cortex-test",
110118
Bucket: _testBucket,
111119
APIName: "async-test",
112120
TargetURL: "http://fake.cortex.dev",
113-
}, awsClient, log)
121+
}, awsClient, eventHandler, log)
114122

115123
for _, tt := range cases {
116124
t.Run(tt.name, func(t *testing.T) {

pkg/dequeuer/async_stats.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dequeuer
18+
19+
import (
20+
"net/http"
21+
"strconv"
22+
23+
"github.com/prometheus/client_golang/prometheus"
24+
"github.com/prometheus/client_golang/prometheus/promauto"
25+
"github.com/prometheus/client_golang/prometheus/promhttp"
26+
)
27+
28+
type AsyncStatsReporter struct {
29+
handler http.Handler
30+
latencies *prometheus.HistogramVec
31+
requestCount *prometheus.CounterVec
32+
}
33+
34+
func NewAsyncPrometheusStatsReporter() *AsyncStatsReporter {
35+
latenciesHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
36+
Name: "cortex_async_latency",
37+
Help: "Histogram of the latencies for an AsyncAPI kind in seconds",
38+
}, []string{"status_code"})
39+
40+
requestCounter := promauto.NewCounterVec(prometheus.CounterOpts{
41+
Name: "cortex_async_request_count",
42+
Help: "Request count for an AsyncAPI",
43+
}, []string{"status_code"})
44+
45+
handler := promhttp.Handler()
46+
47+
return &AsyncStatsReporter{
48+
handler: handler,
49+
latencies: latenciesHist,
50+
requestCount: requestCounter,
51+
}
52+
}
53+
54+
func (r *AsyncStatsReporter) HandleEvent(event RequestEvent) {
55+
labels := map[string]string{
56+
"status_code": strconv.Itoa(event.StatusCode),
57+
}
58+
59+
r.latencies.With(labels).Observe(event.Duration.Seconds())
60+
r.requestCount.With(labels).Add(1)
61+
}
62+
63+
func (r *AsyncStatsReporter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
64+
r.handler.ServeHTTP(w, req)
65+
}

pkg/dequeuer/async_stats_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dequeuer
18+
19+
import (
20+
"net/http/httptest"
21+
"strings"
22+
"testing"
23+
"time"
24+
25+
"github.com/prometheus/client_golang/prometheus"
26+
"github.com/prometheus/client_golang/prometheus/promauto"
27+
"github.com/prometheus/client_golang/prometheus/testutil"
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
func TestNewAsyncPrometheusStatsReporter(t *testing.T) {
32+
t.Parallel()
33+
34+
statsReporter := NewAsyncPrometheusStatsReporter()
35+
36+
statsReporter.HandleEvent(
37+
RequestEvent{
38+
StatusCode: 200,
39+
Duration: 100 * time.Millisecond,
40+
},
41+
)
42+
43+
w := httptest.NewRecorder()
44+
r := httptest.NewRequest("GET", "/metrics", nil)
45+
statsReporter.ServeHTTP(w, r)
46+
47+
result := w.Body.String()
48+
require.Contains(t, result, "cortex_async_latency")
49+
require.Contains(t, result, "cortex_async_request_count")
50+
}
51+
52+
func TestAsyncStatsReporter_HandleEvent(t *testing.T) {
53+
t.Parallel()
54+
55+
reg := prometheus.NewRegistry()
56+
57+
latenciesHist := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
58+
Name: "cortex_async_latency",
59+
Help: "Histogram of the latencies for an AsyncAPI kind in seconds",
60+
}, []string{"status_code"})
61+
62+
requestCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
63+
Name: "cortex_async_request_count",
64+
Help: "Request count for an AsyncAPI",
65+
}, []string{"status_code"})
66+
67+
statsReporter := AsyncStatsReporter{
68+
latencies: latenciesHist,
69+
requestCount: requestCounter,
70+
}
71+
72+
statsReporter.HandleEvent(
73+
RequestEvent{
74+
StatusCode: 200,
75+
Duration: 100 * time.Millisecond,
76+
},
77+
)
78+
79+
expectedHist := `
80+
# HELP cortex_async_latency Histogram of the latencies for an AsyncAPI kind in seconds
81+
# TYPE cortex_async_latency histogram
82+
cortex_async_latency_bucket{status_code="200",le="0.005"} 0
83+
cortex_async_latency_bucket{status_code="200",le="0.01"} 0
84+
cortex_async_latency_bucket{status_code="200",le="0.025"} 0
85+
cortex_async_latency_bucket{status_code="200",le="0.05"} 0
86+
cortex_async_latency_bucket{status_code="200",le="0.1"} 1
87+
cortex_async_latency_bucket{status_code="200",le="0.25"} 1
88+
cortex_async_latency_bucket{status_code="200",le="0.5"} 1
89+
cortex_async_latency_bucket{status_code="200",le="1"} 1
90+
cortex_async_latency_bucket{status_code="200",le="2.5"} 1
91+
cortex_async_latency_bucket{status_code="200",le="5"} 1
92+
cortex_async_latency_bucket{status_code="200",le="10"} 1
93+
cortex_async_latency_bucket{status_code="200",le="+Inf"} 1
94+
cortex_async_latency_sum{status_code="200"} 0.1
95+
cortex_async_latency_count{status_code="200"} 1
96+
`
97+
98+
require.Equal(t, float64(1), testutil.ToFloat64(statsReporter.requestCount))
99+
require.NoError(t, testutil.CollectAndCompare(statsReporter.latencies, strings.NewReader(expectedHist)))
100+
}

pkg/dequeuer/batch_handler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ func (h *BatchMessageHandler) submitRequest(messageBody string, isOnJobComplete
128128
if err != nil {
129129
return ErrorUserContainerNotReachable(err)
130130
}
131-
defer response.Body.Close()
131+
defer func() {
132+
_ = response.Body.Close()
133+
}()
132134

133135
if response.StatusCode == http.StatusNotFound && isOnJobComplete {
134136
return nil

0 commit comments

Comments
 (0)