Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vertical-pod-autoscaler/docs/flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ This document is auto-generated from the flag definitions in the VPA updater cod
| `logtostderr` | | true | log to standard error instead of files |
| `min-replicas` | int | 2 | Minimum number of replicas to perform update |
| `one-output` | severity | | If true, only write logs to their native level (vs also writing to each lower severity level; no effect when -logtostderr=true) |
| `pod-label-selectors` | string | | If present, the updater will only process pods matching the given label selectors. |
| `pod-update-threshold` | float | 0.1 | Ignore updates that have priority lower than the value of this flag |
| `profiling` | int | | Is debug/pprof endpoenabled |
| `skip-headers` | | | If true, avoid header prefixes in the log messages |
Expand Down
72 changes: 72 additions & 0 deletions vertical-pod-autoscaler/e2e/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
RecommenderDeploymentName = "vpa-recommender"
// RecommenderNamespace is namespace to deploy VPA recommender
RecommenderNamespace = "kube-system"
// UpdaterNamespace is namespace to deploy VPA updater
UpdaterNamespace = "kube-system"
// PollInterval is interval for polling
PollInterval = 10 * time.Second
// PollTimeout is timeout for polling
Expand All @@ -67,6 +69,9 @@ var HamsterTargetRef = &autoscaling.CrossVersionObjectReference{
// RecommenderLabels are labels of VPA recommender
var RecommenderLabels = map[string]string{"app": "vpa-recommender"}

// CustomUpdaterLabels are labels of VPA updater
var CustomUpdaterLabels = map[string]string{"app": "custom-vpa-updater"}

// HamsterLabels are labels of hamster app
var HamsterLabels = map[string]string{"app": "hamster"}

Expand Down Expand Up @@ -215,6 +220,73 @@ func NewVPADeployment(f *framework.Framework, flags []string) *appsv1.Deployment
return d
}

// NewUpdaterDeployment creates a new updater deployment for e2e test purposes
func NewUpdaterDeployment(f *framework.Framework, deploymentName string, flags []string) *appsv1.Deployment {
d := framework_deployment.NewDeployment(
deploymentName, /*deploymentName*/
1, /*replicas*/
CustomUpdaterLabels, /*podLabels*/
"updater", /*imageName*/
"localhost:5001/vpa-updater", /*image*/
appsv1.RollingUpdateDeploymentStrategyType, /*strategyType*/
)
d.ObjectMeta.Namespace = f.Namespace.Name
d.Spec.Template.Spec.Containers[0].ImagePullPolicy = apiv1.PullNever // Image must be loaded first
d.Spec.Template.Spec.ServiceAccountName = "vpa-updater"
d.Spec.Template.Spec.Containers[0].Command = []string{"/updater"}
d.Spec.Template.Spec.Containers[0].Args = flags

runAsNonRoot := true
var runAsUser int64 = 65534 // nobody
d.Spec.Template.Spec.SecurityContext = &apiv1.PodSecurityContext{
RunAsNonRoot: &runAsNonRoot,
RunAsUser: &runAsUser,
}

// Same as deploy/updater-deployment.yaml
d.Spec.Template.Spec.Containers[0].Resources = apiv1.ResourceRequirements{
Limits: apiv1.ResourceList{
apiv1.ResourceCPU: resource.MustParse("200m"),
apiv1.ResourceMemory: resource.MustParse("1000Mi"),
},
Requests: apiv1.ResourceList{
apiv1.ResourceCPU: resource.MustParse("50m"),
apiv1.ResourceMemory: resource.MustParse("500Mi"),
},
}

d.Spec.Template.Spec.Containers[0].Ports = []apiv1.ContainerPort{{
Name: "prometheus",
ContainerPort: 8943,
}}

d.Spec.Template.Spec.Containers[0].LivenessProbe = &apiv1.Probe{
ProbeHandler: apiv1.ProbeHandler{
HTTPGet: &apiv1.HTTPGetAction{
Path: "/health-check",
Port: intstr.FromString("prometheus"),
Scheme: apiv1.URISchemeHTTP,
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
FailureThreshold: 3,
}
d.Spec.Template.Spec.Containers[0].ReadinessProbe = &apiv1.Probe{
ProbeHandler: apiv1.ProbeHandler{
HTTPGet: &apiv1.HTTPGetAction{
Path: "/health-check",
Port: intstr.FromString("prometheus"),
Scheme: apiv1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
FailureThreshold: 3,
}

return d
}

// NewNHamstersDeployment creates a simple hamster deployment with n containers
// for e2e test purposes.
func NewNHamstersDeployment(f *framework.Framework, n int) *appsv1.Deployment {
Expand Down
151 changes: 151 additions & 0 deletions vertical-pod-autoscaler/e2e/v1/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package autoscaling
import (
"context"
"fmt"
"strconv"
"strings"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
Expand All @@ -30,6 +32,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
"k8s.io/kubernetes/test/e2e/framework"
podsecurity "k8s.io/pod-security-admission/api"
"k8s.io/utils/ptr"

ginkgo "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
Expand Down Expand Up @@ -205,8 +208,156 @@ var _ = UpdaterE2eDescribe("Updater", func() {
err := WaitForPodsUpdatedWithoutEviction(f, initialPods)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

framework.It("filters pods using the --pod-label-selectors flag", framework.WithSerial(), func() {
const (
testLabelKey = "vpa-updater-test"
testLabelValueMatch = "enabled"
matchingReplicas = 3
nonMatchingReplicas = 2
)
testNamespace := f.Namespace.Name

ginkgo.By("Creating pods with non-matching labels")
nonMatchingDeployment := utils.NewNHamstersDeployment(f, 1)
nonMatchingDeployment.Name = "non-matching-hamster"
nonMatchingDeployment.Spec.Replicas = ptr.To(int32(nonMatchingReplicas))
nonMatchingDeployment.Spec.Template.Labels[testLabelKey] = "disabled"
nonMatchingDeployment.Spec.Template.Labels["app"] = "non-matching"
nonMatchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = "disabled"
nonMatchingDeployment.Spec.Selector.MatchLabels["app"] = "non-matching"
utils.StartDeploymentPods(f, nonMatchingDeployment)

ginkgo.By("Creating pods with matching labels")
matchingDeployment := utils.NewNHamstersDeployment(f, 1)
matchingDeployment.Name = "matching-hamster"
matchingDeployment.Spec.Replicas = ptr.To(int32(matchingReplicas))
matchingDeployment.Spec.Template.Labels[testLabelKey] = testLabelValueMatch
matchingDeployment.Spec.Template.Labels["app"] = "matching"
matchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = testLabelValueMatch
matchingDeployment.Spec.Selector.MatchLabels["app"] = "matching"
utils.StartDeploymentPods(f, matchingDeployment)

ginkgo.By("Creating VPAs for both deployments")
containerName := utils.GetHamsterContainerNameByIndex(0)
nonMatchingVPA := test.VerticalPodAutoscaler().
WithName("non-matching-vpa").
WithNamespace(testNamespace).
WithTargetRef(&autoscaling.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: nonMatchingDeployment.Name,
}).
WithContainer(containerName).
WithUpdateMode(vpa_types.UpdateModeRecreate).
Get()
utils.InstallVPA(f, nonMatchingVPA)

matchingVPA := test.VerticalPodAutoscaler().
WithName("matching-vpa").
WithNamespace(testNamespace).
WithTargetRef(&autoscaling.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: matchingDeployment.Name,
}).
WithContainer(containerName).
WithUpdateMode(vpa_types.UpdateModeRecreate).
Get()
utils.InstallVPA(f, matchingVPA)

ginkgo.By("Setting up custom updater deployment with --pod-label-selectors flag")
// we swap the namespace to kube-system and then back to the test namespace
// so our custom updater deployment can use the deployed RBAC
originalNamespace := f.Namespace.Name
f.Namespace.Name = utils.UpdaterNamespace
deploymentName := "vpa-updater-with-pod-label-selectors"
updaterDeployment := utils.NewUpdaterDeployment(f, deploymentName, []string{
"--updater-interval=10s",
"--use-admission-controller-status=false",
fmt.Sprintf("--pod-label-selectors=%s=%s", testLabelKey, testLabelValueMatch),
fmt.Sprintf("--vpa-object-namespace=%s", testNamespace),
})
utils.StartDeploymentPods(f, updaterDeployment)
f.Namespace.Name = originalNamespace

defer func() {
ginkgo.By("Cleaning up custom updater deployment")
f.ClientSet.AppsV1().Deployments(utils.UpdaterNamespace).Delete(context.TODO(), deploymentName, metav1.DeleteOptions{})
}()

ginkgo.By("Waiting for custom updater to report controlled pods count via metrics")
gomega.Eventually(func() (float64, error) {
return getMetricValue(f, utils.UpdaterNamespace, "vpa_updater_controlled_pods_total", map[string]string{
"update_mode": string(vpa_types.UpdateModeRecreate),
})
}, 2*time.Minute, 5*time.Second).Should(gomega.Equal(float64(matchingReplicas)),
"Custom updater should only see %d matching pods (not the %d non-matching pods)",
matchingReplicas, nonMatchingReplicas)
})
})

func getMetricValue(f *framework.Framework, namespace, metricName string, labels map[string]string) (float64, error) {
// Port forward to the updater pod
pods, err := f.ClientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: "app=custom-vpa-updater",
})
if err != nil || len(pods.Items) == 0 {
return 0, fmt.Errorf("updater pod not found: %v", err)
}

// Use kubectl port-forward via exec in the pod
req := f.ClientSet.CoreV1().RESTClient().Get().
Namespace(namespace).
Resource("pods").
Name(pods.Items[0].Name).
SubResource("proxy").
Suffix("metrics")

result := req.Do(context.TODO())
body, err := result.Raw()
if err != nil {
return 0, fmt.Errorf("failed to get metrics: %v", err)
}

// Parse Prometheus metrics format
lines := strings.Split(string(body), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "#") {
continue
}
if !strings.HasPrefix(line, metricName) {
continue
}

// Match labels
if len(labels) > 0 {
allLabelsMatch := true
for k, v := range labels {
labelPattern := fmt.Sprintf(`%s="%s"`, k, v)
if !strings.Contains(line, labelPattern) {
allLabelsMatch = false
break
}
}
if !allLabelsMatch {
continue
}
}

// Extract value from end of line
parts := strings.Fields(line)
if len(parts) >= 2 {
value, err := strconv.ParseFloat(parts[len(parts)-1], 64)
if err == nil {
return value, nil
}
}
}

return 0, fmt.Errorf("metric %s not found", metricName)
}

func setupPodsForUpscalingEviction(f *framework.Framework) *apiv1.PodList {
return setupPodsForEviction(f, "100m", "100Mi", nil)
}
Expand Down
32 changes: 28 additions & 4 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (

"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corescheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -102,6 +105,7 @@ func NewUpdater(
namespace string,
ignoredNamespaces []string,
patchCalculators []patch.Calculator,
podLabelSelector labels.Selector,
) (Updater, error) {
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
// TODO: Create in-place rate limits for the in-place rate limiter
Expand All @@ -118,7 +122,7 @@ func NewUpdater(

return &updater{
vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace),
podLister: newPodLister(kubeClient, namespace),
podLister: newPodLister(kubeClient, namespace, podLabelSelector),
eventRecorder: newEventRecorder(kubeClient),
restrictionFactory: factory,
recommendationProcessor: recommendationProcessor,
Expand Down Expand Up @@ -397,10 +401,30 @@ func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod {
})
}

func newPodLister(kubeClient kube_client.Interface, namespace string) v1lister.PodLister {
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
func newPodLister(kubeClient kube_client.Interface, namespace string, labelSelector labels.Selector) v1lister.PodLister {
fieldSelector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector.String()
if labelSelector != nil && !labelSelector.Empty() {
options.LabelSelector = labelSelector.String()
}
return kubeClient.CoreV1().Pods(namespace).List(context.TODO(), options)
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector.String()
if labelSelector != nil && !labelSelector.Empty() {
options.LabelSelector = labelSelector.String()
}
return kubeClient.CoreV1().Pods(namespace).Watch(context.TODO(), options)
}

podListWatch := &cache.ListWatch{
ListFunc: listFunc,
WatchFunc: watchFunc,
}

store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
Expand Down
Loading