Skip to content

Commit 4d34228

Browse files
author
Miguel Varela Ramos
authored
Ensure in-flight are initialized to zero when min_replicas=0 (#2349)
1 parent fe1e319 commit 4d34228

File tree

8 files changed

+159
-20
lines changed

8 files changed

+159
-20
lines changed

cmd/activator/main.go

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ import (
4444
func main() {
4545
var (
4646
port int
47+
adminPort int
4748
inCluster bool
4849
autoscalerURL string
4950
namespace string
5051
clusterConfigPath string
5152
)
5253

5354
flag.IntVar(&port, "port", 8000, "port where the activator server will be exposed")
55+
flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server will be exposed")
5456
flag.BoolVar(&inCluster, "in-cluster", false, "use when autoscaler runs in-cluster")
5557
flag.StringVar(&autoscalerURL, "autoscaler-url", "", "the URL for the cortex autoscaler endpoint")
5658
flag.StringVar(&namespace, "namespace", os.Getenv("CORTEX_NAMESPACE"),
@@ -114,6 +116,8 @@ func main() {
114116
kubeClient := k8sClient.ClientSet()
115117
autoscalerClient := autoscaler.NewClient(autoscalerURL)
116118

119+
prometheusStatsReporter := activator.NewPrometheusStatsReporter()
120+
117121
istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(
118122
istioClient, 10*time.Second, // TODO: check how much makes sense
119123
istioinformers.WithNamespace(namespace),
@@ -129,12 +133,29 @@ func main() {
129133
)
130134
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments().Informer()
131135

132-
act := activator.New(virtualServiceClient, deploymentInformer, virtualServiceInformer, autoscalerClient, log)
136+
act := activator.New(
137+
virtualServiceClient,
138+
deploymentInformer,
139+
virtualServiceInformer,
140+
autoscalerClient,
141+
prometheusStatsReporter,
142+
log,
143+
)
133144

134145
handler := activator.NewHandler(act, log)
135-
server := &http.Server{
136-
Addr: ":" + strconv.Itoa(port),
137-
Handler: handler,
146+
147+
adminHandler := http.NewServeMux()
148+
adminHandler.Handle("/metrics", prometheusStatsReporter)
149+
150+
servers := map[string]*http.Server{
151+
"activator": {
152+
Addr: ":" + strconv.Itoa(port),
153+
Handler: handler,
154+
},
155+
"admin": {
156+
Addr: ":" + strconv.Itoa(adminPort),
157+
Handler: adminHandler,
158+
},
138159
}
139160

140161
stopCh := make(chan struct{})
@@ -145,10 +166,12 @@ func main() {
145166
}()
146167

147168
errCh := make(chan error)
148-
go func() {
149-
log.Infof("Starting activator server on %s", server.Addr)
150-
errCh <- server.ListenAndServe()
151-
}()
169+
for name, server := range servers {
170+
go func(name string, server *http.Server) {
171+
log.Infof("Starting %s server on %s", name, server.Addr)
172+
errCh <- server.ListenAndServe()
173+
}(name, server)
174+
}
152175

153176
sigint := make(chan os.Signal, 1)
154177
signal.Notify(sigint, os.Interrupt)
@@ -159,10 +182,14 @@ func main() {
159182
case <-sigint:
160183
// We received an interrupt signal, shut down.
161184
log.Info("Received TERM signal, handling a graceful shutdown...")
162-
log.Info("Shutting down server")
163-
if err = server.Shutdown(context.Background()); err != nil {
164-
// Error from closing listeners, or context timeout:
165-
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
185+
186+
for name, server := range servers {
187+
log.Infof("Shutting down %s server", name)
188+
if err = server.Shutdown(context.Background()); err != nil {
189+
// Error from closing listeners, or context timeout:
190+
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
191+
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
192+
}
166193
}
167194
log.Info("Shutdown complete, exiting...")
168195
}

cmd/proxy/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,13 @@ func main() {
175175

176176
for name, server := range servers {
177177
log.Infof("Shutting down %s server", name)
178-
if err := server.Shutdown(context.Background()); err != nil {
178+
if err = server.Shutdown(context.Background()); err != nil {
179179
// Error from closing listeners, or context timeout:
180180
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
181181
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
182182
}
183183
}
184184
log.Info("Shutdown complete, exiting...")
185-
telemetry.Close()
186185
}
187186
}
188187

manager/manifests/activator.yaml.j2

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ spec:
8383
- "--namespace=default"
8484
- "--cluster-config=/configs/cluster/cluster.yaml"
8585
ports:
86-
- containerPort: 8000
86+
- name: http
87+
containerPort: 8000
88+
- name: admin
89+
containerPort: 15000
8790
livenessProbe:
8891
httpGet:
8992
port: 8000

manager/manifests/grafana/grafana-dashboard-realtime.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ data:
427427
"steppedLine": false,
428428
"targets": [
429429
{
430-
"expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\"}) by (api_name)",
430+
"expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\", container!=\"activator\"}) by (api_name)",
431431
"interval": "",
432432
"legendFormat": "{{api_name}}",
433433
"refId": "Active Replicas"
@@ -2027,7 +2027,7 @@ data:
20272027
"value": "None"
20282028
},
20292029
"datasource": null,
2030-
"definition": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)",
2030+
"definition": "label_values(cortex_in_flight_requests, api_name)",
20312031
"description": null,
20322032
"error": null,
20332033
"hide": 0,
@@ -2037,7 +2037,7 @@ data:
20372037
"name": "api_name",
20382038
"options": [],
20392039
"query": {
2040-
"query": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)",
2040+
"query": "label_values(cortex_in_flight_requests, api_name)",
20412041
"refId": "StandardVariableQuery"
20422042
},
20432043
"refresh": 1,

manager/manifests/prometheus-monitoring.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,47 @@ spec:
347347
selector:
348348
matchLabels:
349349
cortex.dev/name: operator
350+
351+
---
352+
353+
apiVersion: monitoring.coreos.com/v1
354+
kind: PodMonitor
355+
metadata:
356+
name: activator-stats
357+
labels:
358+
monitoring.cortex.dev: "activator"
359+
spec:
360+
selector:
361+
matchLabels:
362+
app: activator
363+
matchExpressions:
364+
- { key: prometheus-ignore, operator: DoesNotExist }
365+
namespaceSelector:
366+
any: true
367+
jobLabel: activator-stats
368+
podMetricsEndpoints:
369+
- path: /metrics
370+
scheme: http
371+
interval: 10s
372+
port: admin
373+
relabelings:
374+
- action: keep
375+
sourceLabels: [ __meta_kubernetes_pod_container_name ]
376+
regex: "activator"
377+
- sourceLabels: [ __address__, __meta_kubernetes_pod_annotation_prometheus_io_port ]
378+
action: replace
379+
regex: ([^:]+)(?::\d+)?;(\d+)
380+
replacement: $1:$2
381+
targetLabel: __address__
382+
- action: labeldrop
383+
regex: "__meta_kubernetes_pod_label_(.+)"
384+
- sourceLabels: [ __meta_kubernetes_namespace ]
385+
action: replace
386+
targetLabel: namespace
387+
- sourceLabels: [ __meta_kubernetes_pod_name ]
388+
action: replace
389+
targetLabel: pod_name
390+
metricRelabelings:
391+
- action: keep
392+
sourceLabels: [__name__]
393+
regex: "cortex_(.+)"

pkg/activator/activator.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ type ctxValue string
3636

3737
const APINameCtxKey ctxValue = "apiName"
3838

39+
type StatsReporter interface {
40+
AddAPI(apiName string)
41+
RemoveAPI(apiName string)
42+
}
43+
3944
type Activator interface {
4045
Try(ctx context.Context, fn func() error) error
4146
}
@@ -47,6 +52,7 @@ type activator struct {
4752
apiActivators map[string]*apiActivator
4853
readinessTrackers map[string]*readinessTracker
4954
istioClient istionetworkingclient.VirtualServiceInterface
55+
reporter StatsReporter
5056
logger *zap.SugaredLogger
5157
}
5258

@@ -55,6 +61,7 @@ func New(
5561
deploymentInformer cache.SharedIndexInformer,
5662
virtualServiceInformer cache.SharedIndexInformer,
5763
autoscalerClient autoscaler.Client,
64+
reporter StatsReporter,
5865
logger *zap.SugaredLogger,
5966
) Activator {
6067
log := logger.With(zap.String("apiKind", userconfig.RealtimeAPIKind.String()))
@@ -65,6 +72,7 @@ func New(
6572
istioClient: istioClient,
6673
logger: log,
6774
autoscalerClient: autoscalerClient,
75+
reporter: reporter,
6876
}
6977

7078
virtualServiceInformer.AddEventHandler(
@@ -169,6 +177,8 @@ func (a *activator) addAPI(obj interface{}) {
169177
a.apiActivators[apiName] = newAPIActivator(apiMetadata.maxQueueLength, apiMetadata.maxConcurrency)
170178
}
171179
a.activatorsMux.Unlock()
180+
181+
a.reporter.AddAPI(apiName)
172182
}
173183

174184
func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) {
@@ -218,6 +228,8 @@ func (a *activator) removeAPI(obj interface{}) {
218228
a.activatorsMux.Lock()
219229
delete(a.apiActivators, apiMetadata.apiName)
220230
a.activatorsMux.Unlock()
231+
232+
a.reporter.RemoveAPI(apiMetadata.apiName)
221233
}
222234

223235
func (a *activator) awakenAPI(apiName string) {

pkg/activator/request_stats.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package activator
18+
19+
import (
20+
"net/http"
21+
22+
"github.com/prometheus/client_golang/prometheus"
23+
"github.com/prometheus/client_golang/prometheus/promauto"
24+
"github.com/prometheus/client_golang/prometheus/promhttp"
25+
)
26+
27+
type PrometheusStatsReporter struct {
28+
handler http.Handler
29+
inFlightRequests *prometheus.GaugeVec
30+
}
31+
32+
func NewPrometheusStatsReporter() *PrometheusStatsReporter {
33+
inFlightRequestsGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
34+
Name: "cortex_in_flight_requests",
35+
Help: "The number of in-flight requests for a cortex API",
36+
}, []string{"api_name"})
37+
38+
return &PrometheusStatsReporter{
39+
handler: promhttp.Handler(),
40+
inFlightRequests: inFlightRequestsGauge,
41+
}
42+
}
43+
44+
func (r *PrometheusStatsReporter) AddAPI(apiName string) {
45+
r.inFlightRequests.WithLabelValues(apiName).Set(0)
46+
}
47+
48+
func (r *PrometheusStatsReporter) RemoveAPI(apiName string) {
49+
r.inFlightRequests.DeleteLabelValues(apiName)
50+
}
51+
52+
func (r *PrometheusStatsReporter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
53+
r.handler.ServeHTTP(w, req)
54+
}

pkg/autoscaler/realtime_scaler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ func (s *RealtimeScaler) GetInFlightRequests(apiName string, window time.Duratio
101101

102102
// PromQL query:
103103
// sum(sum_over_time(cortex_in_flight_requests{api_name="<apiName>"}[60s])) /
104-
// sum(count_over_time(cortex_in_flight_requests{api_name="<apiName>"}[60s]))
104+
// sum(count_over_time(cortex_in_flight_requests{api_name="<apiName>", container!="activator"}[60s]))
105105
query := fmt.Sprintf(
106106
"sum(sum_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds])) / "+
107-
"max(count_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds]))",
107+
"max(count_over_time(cortex_in_flight_requests{api_name=\"%s\", container!=\"activator\"}[%ds]))",
108108
apiName, windowSeconds,
109109
apiName, windowSeconds,
110110
)

0 commit comments

Comments
 (0)