Skip to content

Commit 5aefd89

Browse files
authored
Add HPA to Async API Gateway to handle different loads of traffic (#2079)
1 parent aaa382e commit 5aefd89

File tree

5 files changed

+143
-41
lines changed

5 files changed

+143
-41
lines changed

pkg/lib/k8s/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
ErrParseLabel = "k8s.parse_label"
3131
ErrParseAnnotation = "k8s.parse_annotation"
3232
ErrParseQuantity = "k8s.parse_quantity"
33+
ErrMissingMetrics = "k8s.missing_metrics"
3334
)
3435

3536
func ErrorLabelNotFound(labelName string) error {
@@ -66,3 +67,10 @@ func ErrorParseQuantity(qtyStr string) error {
6667
Message: fmt.Sprintf("%s: invalid kubernetes quantity, some valid examples are 1, 200m, 500Mi, 2G (see here for more information: https://docs.cortex.dev/v/%s/)", qtyStr, consts.CortexVersionMinor),
6768
})
6869
}
70+
71+
func ErrorMissingMetrics() error {
72+
return errors.WithStack(&errors.Error{
73+
Kind: ErrMissingMetrics,
74+
Message: "must specify at least one metric",
75+
})
76+
}

pkg/lib/k8s/hpa.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,42 @@ type HPASpec struct {
3737
MinReplicas int32
3838
MaxReplicas int32
3939
TargetCPUUtilization int32
40+
TargetMemUtilization int32
4041
Labels map[string]string
4142
Annotations map[string]string
4243
}
4344

44-
func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler {
45+
func HPA(spec *HPASpec) (*kautoscaling.HorizontalPodAutoscaler, error) {
46+
metrics := []kautoscaling.MetricSpec{}
47+
if spec.TargetCPUUtilization > 0 {
48+
metrics = append(metrics, kautoscaling.MetricSpec{
49+
Type: kautoscaling.ResourceMetricSourceType,
50+
Resource: &kautoscaling.ResourceMetricSource{
51+
Name: kcore.ResourceCPU,
52+
Target: kautoscaling.MetricTarget{
53+
Type: kautoscaling.UtilizationMetricType,
54+
AverageUtilization: &spec.TargetCPUUtilization,
55+
},
56+
},
57+
})
58+
}
59+
if spec.TargetMemUtilization > 0 {
60+
metrics = append(metrics, kautoscaling.MetricSpec{
61+
Type: kautoscaling.ResourceMetricSourceType,
62+
Resource: &kautoscaling.ResourceMetricSource{
63+
Name: kcore.ResourceMemory,
64+
Target: kautoscaling.MetricTarget{
65+
Type: kautoscaling.UtilizationMetricType,
66+
AverageUtilization: &spec.TargetMemUtilization,
67+
},
68+
},
69+
})
70+
}
71+
72+
if len(metrics) == 0 {
73+
return nil, ErrorMissingMetrics()
74+
}
75+
4576
hpa := &kautoscaling.HorizontalPodAutoscaler{
4677
TypeMeta: _hpaTypeMeta,
4778
ObjectMeta: kmeta.ObjectMeta{
@@ -52,26 +83,15 @@ func HPA(spec *HPASpec) *kautoscaling.HorizontalPodAutoscaler {
5283
Spec: kautoscaling.HorizontalPodAutoscalerSpec{
5384
MinReplicas: &spec.MinReplicas,
5485
MaxReplicas: spec.MaxReplicas,
55-
Metrics: []kautoscaling.MetricSpec{
56-
{
57-
Type: kautoscaling.ResourceMetricSourceType,
58-
Resource: &kautoscaling.ResourceMetricSource{
59-
Name: kcore.ResourceCPU,
60-
Target: kautoscaling.MetricTarget{
61-
Type: kautoscaling.UtilizationMetricType,
62-
AverageUtilization: &spec.TargetCPUUtilization,
63-
},
64-
},
65-
},
66-
},
86+
Metrics: metrics,
6787
ScaleTargetRef: kautoscaling.CrossVersionObjectReference{
6888
Kind: _deploymentTypeMeta.Kind,
6989
Name: spec.DeploymentName,
7090
APIVersion: _deploymentTypeMeta.APIVersion,
7191
},
7292
},
7393
}
74-
return hpa
94+
return hpa, nil
7595
}
7696

7797
func (c *Client) CreateHPA(hpa *kautoscaling.HorizontalPodAutoscaler) (*kautoscaling.HorizontalPodAutoscaler, error) {
@@ -166,7 +186,7 @@ func HPAMap(hpas []kautoscaling.HorizontalPodAutoscaler) map[string]kautoscaling
166186
return hpaMap
167187
}
168188

169-
func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32, maxReplicas int32, targetCPUUtilization int32) bool {
189+
func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas, maxReplicas, targetCPUUtilization, targetMemUtilization int32) bool {
170190
if hpa == nil {
171191
return false
172192
}
@@ -179,21 +199,26 @@ func IsHPAUpToDate(hpa *kautoscaling.HorizontalPodAutoscaler, minReplicas int32,
179199
return false
180200
}
181201

182-
if len(hpa.Spec.Metrics) != 1 {
183-
return false
184-
}
185-
metric := hpa.Spec.Metrics[0]
186-
if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil {
187-
return false
188-
}
189-
if metric.Resource.Name != kcore.ResourceCPU {
202+
if len(hpa.Spec.Metrics) != 2 {
190203
return false
191204
}
192-
if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil {
193-
return false
194-
}
195-
if *metric.Resource.Target.AverageUtilization != targetCPUUtilization {
196-
return false
205+
206+
for _, metric := range hpa.Spec.Metrics {
207+
if metric.Type != kautoscaling.ResourceMetricSourceType || metric.Resource == nil {
208+
return false
209+
}
210+
if metric.Resource.Target.Type != kautoscaling.UtilizationMetricType || metric.Resource.Target.AverageUtilization == nil {
211+
return false
212+
}
213+
if metric.Resource.Name != kcore.ResourceCPU && metric.Resource.Name != kcore.ResourceMemory {
214+
return false
215+
}
216+
if metric.Resource.Name == kcore.ResourceCPU && *metric.Resource.Target.AverageUtilization != targetCPUUtilization {
217+
return false
218+
}
219+
if metric.Resource.Name == kcore.ResourceMemory && *metric.Resource.Target.AverageUtilization != targetMemUtilization {
220+
return false
221+
}
197222
}
198223

199224
return true

pkg/operator/resources/asyncapi/api.go

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexlabs/cortex/pkg/types/userconfig"
3535
istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1"
3636
kapps "k8s.io/api/apps/v1"
37+
kautoscaling "k8s.io/api/autoscaling/v2beta2"
3738
kcore "k8s.io/api/core/v1"
3839
)
3940

@@ -51,6 +52,7 @@ type resources struct {
5152
apiDeployment *kapps.Deployment
5253
gatewayDeployment *kapps.Deployment
5354
gatewayService *kcore.Service
55+
gatewayHPA *kautoscaling.HorizontalPodAutoscaler
5456
gatewayVirtualService *istioclientnetworking.VirtualService
5557
}
5658

@@ -287,6 +289,7 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
287289
var deployment *kapps.Deployment
288290
var gatewayDeployment *kapps.Deployment
289291
var gatewayService *kcore.Service
292+
var gatewayHPA *kautoscaling.HorizontalPodAutoscaler
290293
var gatewayVirtualService *istioclientnetworking.VirtualService
291294

292295
gatewayK8sName := getGatewayK8sName(apiConfig.Name)
@@ -308,6 +311,11 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
308311
gatewayService, err = config.K8s.GetService(apiK8sName)
309312
return err
310313
},
314+
func() error {
315+
var err error
316+
gatewayHPA, err = config.K8s.GetHPA(gatewayK8sName)
317+
return err
318+
},
311319
func() error {
312320
var err error
313321
gatewayVirtualService, err = config.K8s.GetVirtualService(apiK8sName)
@@ -319,22 +327,43 @@ func getK8sResources(apiConfig userconfig.API) (resources, error) {
319327
apiDeployment: deployment,
320328
gatewayDeployment: gatewayDeployment,
321329
gatewayService: gatewayService,
330+
gatewayHPA: gatewayHPA,
322331
gatewayVirtualService: gatewayVirtualService,
323332
}, err
324333
}
325334

326335
func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string) error {
327336
apiDeployment := apiDeploymentSpec(api, prevK8sResources.apiDeployment, queueURL)
328337
gatewayDeployment := gatewayDeploymentSpec(api, prevK8sResources.gatewayDeployment, queueURL)
338+
gatewayHPA, err := gatewayHPASpec(api)
339+
if err != nil {
340+
return err
341+
}
329342
gatewayService := gatewayServiceSpec(api)
330343
gatewayVirtualService := gatewayVirtualServiceSpec(api)
331344

332345
return parallel.RunFirstErr(
333346
func() error {
334-
return applyK8sDeployment(api, prevK8sResources.apiDeployment, &apiDeployment)
347+
err := applyK8sDeployment(prevK8sResources.apiDeployment, &apiDeployment)
348+
if err != nil {
349+
return err
350+
}
351+
352+
if err := UpdateMetricsCron(&apiDeployment); err != nil {
353+
return err
354+
}
355+
356+
if err := UpdateAutoscalerCron(&apiDeployment, api); err != nil {
357+
return err
358+
}
359+
360+
return nil
361+
},
362+
func() error {
363+
return applyK8sDeployment(prevK8sResources.gatewayDeployment, &gatewayDeployment)
335364
},
336365
func() error {
337-
return applyK8sDeployment(api, prevK8sResources.gatewayDeployment, &gatewayDeployment)
366+
return applyK8sHPA(prevK8sResources.gatewayHPA, &gatewayHPA)
338367
},
339368
func() error {
340369
return applyK8sService(prevK8sResources.gatewayService, &gatewayService)
@@ -345,7 +374,7 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string
345374
)
346375
}
347376

348-
func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error {
377+
func applyK8sDeployment(prevDeployment *kapps.Deployment, newDeployment *kapps.Deployment) error {
349378
if prevDeployment == nil {
350379
_, err := config.K8s.CreateDeployment(newDeployment)
351380
if err != nil {
@@ -364,15 +393,19 @@ func applyK8sDeployment(api spec.API, prevDeployment *kapps.Deployment, newDeplo
364393
return err
365394
}
366395
}
396+
return nil
397+
}
367398

368-
if err := UpdateMetricsCron(newDeployment); err != nil {
369-
return err
399+
func applyK8sHPA(prevHPA *kautoscaling.HorizontalPodAutoscaler, newHPA *kautoscaling.HorizontalPodAutoscaler) error {
400+
var err error
401+
if prevHPA == nil {
402+
_, err = config.K8s.CreateHPA(newHPA)
403+
} else {
404+
_, err = config.K8s.UpdateHPA(newHPA)
370405
}
371-
372-
if err := UpdateAutoscalerCron(newDeployment, api); err != nil {
406+
if err != nil {
373407
return err
374408
}
375-
376409
return nil
377410
}
378411

@@ -403,6 +436,7 @@ func deleteBucketResources(apiName string) error {
403436

404437
func deleteK8sResources(apiName string) error {
405438
apiK8sName := operator.K8sName(apiName)
439+
gatewayK8sName := getGatewayK8sName(apiName)
406440

407441
err := parallel.RunFirstErr(
408442
func() error {
@@ -419,10 +453,13 @@ func deleteK8sResources(apiName string) error {
419453
return err
420454
},
421455
func() error {
422-
gatewayK8sName := getGatewayK8sName(apiName)
423456
_, err := config.K8s.DeleteDeployment(gatewayK8sName)
424457
return err
425458
},
459+
func() error {
460+
_, err := config.K8s.DeleteHPA(gatewayK8sName)
461+
return err
462+
},
426463
func() error {
427464
_, err := config.K8s.DeleteService(apiK8sName)
428465
return err

pkg/operator/resources/asyncapi/k8s_specs.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ import (
2626
"github.com/cortexlabs/cortex/pkg/types/userconfig"
2727
"istio.io/client-go/pkg/apis/networking/v1beta1"
2828
kapps "k8s.io/api/apps/v1"
29+
kautoscaling "k8s.io/api/autoscaling/v2beta2"
2930
kcore "k8s.io/api/core/v1"
3031
)
3132

32-
var _terminationGracePeriodSeconds int64 = 60 // seconds
33+
var _terminationGracePeriodSeconds int64 = 60 // seconds
34+
var _gatewayHPATargetCPUUtilization int32 = 80 // percentage
35+
var _gatewayHPATargetMemUtilization int32 = 80 // percentage
3336

3437
func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment {
3538
container := operator.AsyncGatewayContainers(api, queueURL)
3639
return *k8s.Deployment(&k8s.DeploymentSpec{
3740
Name: getGatewayK8sName(api.Name),
38-
Replicas: getRequestedReplicasFromDeployment(api, prevDeployment),
41+
Replicas: 1,
3942
MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge),
4043
MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable),
4144
Selector: map[string]string{
@@ -78,6 +81,35 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue
7881
})
7982
}
8083

84+
func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error) {
85+
var maxReplicas int32 = 1
86+
if api.Autoscaling != nil {
87+
maxReplicas = api.Autoscaling.MaxReplicas
88+
}
89+
hpa, err := k8s.HPA(&k8s.HPASpec{
90+
DeploymentName: getGatewayK8sName(api.Name),
91+
MinReplicas: 1,
92+
MaxReplicas: maxReplicas,
93+
TargetCPUUtilization: _gatewayHPATargetCPUUtilization,
94+
TargetMemUtilization: _gatewayHPATargetMemUtilization,
95+
Labels: map[string]string{
96+
"apiName": api.Name,
97+
"apiKind": api.Kind.String(),
98+
"apiID": api.ID,
99+
"specID": api.SpecID,
100+
"deploymentID": api.DeploymentID,
101+
"predictorID": api.PredictorID,
102+
"cortex.dev/api": "true",
103+
"cortex.dev/async": "hpa",
104+
},
105+
})
106+
107+
if err != nil {
108+
return kautoscaling.HorizontalPodAutoscaler{}, err
109+
}
110+
return *hpa, nil
111+
}
112+
81113
func gatewayServiceSpec(api spec.API) kcore.Service {
82114
return *k8s.Service(&k8s.ServiceSpec{
83115
Name: operator.K8sName(api.Name),

test/e2e/tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ def pytest_configure(config):
122122
"workload_timeout": 180, # measured in seconds
123123
},
124124
"task": {
125-
"jobs": 10 ** 2,
125+
"jobs": 10 ** 4,
126126
"concurrency": 4,
127-
"submit_timeout": 60, # measured in seconds
127+
"submit_timeout": 180, # measured in seconds
128128
"workload_timeout": 180, # measured in seconds
129129
},
130130
},

0 commit comments

Comments
 (0)