From c640996c6c3fb8022ec464d2c8e9c69eb1b56699 Mon Sep 17 00:00:00 2001 From: Chris Motley Date: Fri, 7 Nov 2025 18:21:49 -0800 Subject: [PATCH] feat: add pod-label-selectors flag to vpa updater --- vertical-pod-autoscaler/docs/flags.md | 1 + vertical-pod-autoscaler/e2e/utils/common.go | 72 +++++++++ vertical-pod-autoscaler/e2e/v1/updater.go | 151 ++++++++++++++++++ .../pkg/updater/logic/updater.go | 32 +++- .../pkg/updater/logic/updater_test.go | 105 ++++++++++++ vertical-pod-autoscaler/pkg/updater/main.go | 15 ++ 6 files changed, 372 insertions(+), 4 deletions(-) diff --git a/vertical-pod-autoscaler/docs/flags.md b/vertical-pod-autoscaler/docs/flags.md index 66dc939535c..57bd8962951 100644 --- a/vertical-pod-autoscaler/docs/flags.md +++ b/vertical-pod-autoscaler/docs/flags.md @@ -164,6 +164,7 @@ This document is auto-generated from the flag definitions in the VPA updater cod | `logtostderr` | | true | log to standard error instead of files | | `min-replicas` | int | 2 | Minimum number of replicas to perform update | | `one-output` | severity | | If true, only write logs to their native level (vs also writing to each lower severity level; no effect when -logtostderr=true) | +| `pod-label-selectors` | string | | If present, the updater will only process pods matching the given label selectors. | | `pod-update-threshold` | float | 0.1 | Ignore updates that have priority lower than the value of this flag | | `profiling` | int | | Is debug/pprof endpoenabled | | `skip-headers` | | | If true, avoid header prefixes in the log messages | diff --git a/vertical-pod-autoscaler/e2e/utils/common.go b/vertical-pod-autoscaler/e2e/utils/common.go index b4fc8e12c3f..0849add1453 100644 --- a/vertical-pod-autoscaler/e2e/utils/common.go +++ b/vertical-pod-autoscaler/e2e/utils/common.go @@ -46,6 +46,8 @@ const ( RecommenderDeploymentName = "vpa-recommender" // RecommenderNamespace is namespace to deploy VPA recommender RecommenderNamespace = "kube-system" + // UpdaterNamespace is namespace to deploy VPA updater + UpdaterNamespace = "kube-system" // PollInterval is interval for polling PollInterval = 10 * time.Second // PollTimeout is timeout for polling @@ -67,6 +69,9 @@ var HamsterTargetRef = &autoscaling.CrossVersionObjectReference{ // RecommenderLabels are labels of VPA recommender var RecommenderLabels = map[string]string{"app": "vpa-recommender"} +// CustomUpdaterLabels are labels of VPA updater +var CustomUpdaterLabels = map[string]string{"app": "custom-vpa-updater"} + // HamsterLabels are labels of hamster app var HamsterLabels = map[string]string{"app": "hamster"} @@ -215,6 +220,73 @@ func NewVPADeployment(f *framework.Framework, flags []string) *appsv1.Deployment return d } +// NewUpdaterDeployment creates a new updater deployment for e2e test purposes +func NewUpdaterDeployment(f *framework.Framework, deploymentName string, flags []string) *appsv1.Deployment { + d := framework_deployment.NewDeployment( + deploymentName, /*deploymentName*/ + 1, /*replicas*/ + CustomUpdaterLabels, /*podLabels*/ + "updater", /*imageName*/ + "localhost:5001/vpa-updater", /*image*/ + appsv1.RollingUpdateDeploymentStrategyType, /*strategyType*/ + ) + d.ObjectMeta.Namespace = f.Namespace.Name + d.Spec.Template.Spec.Containers[0].ImagePullPolicy = apiv1.PullNever // Image must be loaded first + d.Spec.Template.Spec.ServiceAccountName = "vpa-updater" + d.Spec.Template.Spec.Containers[0].Command = []string{"/updater"} + d.Spec.Template.Spec.Containers[0].Args = flags + + runAsNonRoot := true + var runAsUser int64 = 65534 // nobody + d.Spec.Template.Spec.SecurityContext = &apiv1.PodSecurityContext{ + RunAsNonRoot: &runAsNonRoot, + RunAsUser: &runAsUser, + } + + // Same as deploy/updater-deployment.yaml + d.Spec.Template.Spec.Containers[0].Resources = apiv1.ResourceRequirements{ + Limits: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse("200m"), + apiv1.ResourceMemory: resource.MustParse("1000Mi"), + }, + Requests: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse("50m"), + apiv1.ResourceMemory: resource.MustParse("500Mi"), + }, + } + + d.Spec.Template.Spec.Containers[0].Ports = []apiv1.ContainerPort{{ + Name: "prometheus", + ContainerPort: 8943, + }} + + d.Spec.Template.Spec.Containers[0].LivenessProbe = &apiv1.Probe{ + ProbeHandler: apiv1.ProbeHandler{ + HTTPGet: &apiv1.HTTPGetAction{ + Path: "/health-check", + Port: intstr.FromString("prometheus"), + Scheme: apiv1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 10, + FailureThreshold: 3, + } + d.Spec.Template.Spec.Containers[0].ReadinessProbe = &apiv1.Probe{ + ProbeHandler: apiv1.ProbeHandler{ + HTTPGet: &apiv1.HTTPGetAction{ + Path: "/health-check", + Port: intstr.FromString("prometheus"), + Scheme: apiv1.URISchemeHTTP, + }, + }, + PeriodSeconds: 10, + FailureThreshold: 3, + } + + return d +} + // NewNHamstersDeployment creates a simple hamster deployment with n containers // for e2e test purposes. func NewNHamstersDeployment(f *framework.Framework, n int) *appsv1.Deployment { diff --git a/vertical-pod-autoscaler/e2e/v1/updater.go b/vertical-pod-autoscaler/e2e/v1/updater.go index 19327021c8e..7393c81058a 100644 --- a/vertical-pod-autoscaler/e2e/v1/updater.go +++ b/vertical-pod-autoscaler/e2e/v1/updater.go @@ -19,6 +19,8 @@ package autoscaling import ( "context" "fmt" + "strconv" + "strings" "time" autoscaling "k8s.io/api/autoscaling/v1" @@ -30,6 +32,7 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test" "k8s.io/kubernetes/test/e2e/framework" podsecurity "k8s.io/pod-security-admission/api" + "k8s.io/utils/ptr" ginkgo "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -205,8 +208,156 @@ var _ = UpdaterE2eDescribe("Updater", func() { err := WaitForPodsUpdatedWithoutEviction(f, initialPods) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + + framework.It("filters pods using the --pod-label-selectors flag", framework.WithSerial(), func() { + const ( + testLabelKey = "vpa-updater-test" + testLabelValueMatch = "enabled" + matchingReplicas = 3 + nonMatchingReplicas = 2 + ) + testNamespace := f.Namespace.Name + + ginkgo.By("Creating pods with non-matching labels") + nonMatchingDeployment := utils.NewNHamstersDeployment(f, 1) + nonMatchingDeployment.Name = "non-matching-hamster" + nonMatchingDeployment.Spec.Replicas = ptr.To(int32(nonMatchingReplicas)) + nonMatchingDeployment.Spec.Template.Labels[testLabelKey] = "disabled" + nonMatchingDeployment.Spec.Template.Labels["app"] = "non-matching" + nonMatchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = "disabled" + nonMatchingDeployment.Spec.Selector.MatchLabels["app"] = "non-matching" + utils.StartDeploymentPods(f, nonMatchingDeployment) + + ginkgo.By("Creating pods with matching labels") + matchingDeployment := utils.NewNHamstersDeployment(f, 1) + matchingDeployment.Name = "matching-hamster" + matchingDeployment.Spec.Replicas = ptr.To(int32(matchingReplicas)) + matchingDeployment.Spec.Template.Labels[testLabelKey] = testLabelValueMatch + matchingDeployment.Spec.Template.Labels["app"] = "matching" + matchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = testLabelValueMatch + matchingDeployment.Spec.Selector.MatchLabels["app"] = "matching" + utils.StartDeploymentPods(f, matchingDeployment) + + ginkgo.By("Creating VPAs for both deployments") + containerName := utils.GetHamsterContainerNameByIndex(0) + nonMatchingVPA := test.VerticalPodAutoscaler(). + WithName("non-matching-vpa"). + WithNamespace(testNamespace). + WithTargetRef(&autoscaling.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: nonMatchingDeployment.Name, + }). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeRecreate). + Get() + utils.InstallVPA(f, nonMatchingVPA) + + matchingVPA := test.VerticalPodAutoscaler(). + WithName("matching-vpa"). + WithNamespace(testNamespace). + WithTargetRef(&autoscaling.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: matchingDeployment.Name, + }). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeRecreate). + Get() + utils.InstallVPA(f, matchingVPA) + + ginkgo.By("Setting up custom updater deployment with --pod-label-selectors flag") + // we swap the namespace to kube-system and then back to the test namespace + // so our custom updater deployment can use the deployed RBAC + originalNamespace := f.Namespace.Name + f.Namespace.Name = utils.UpdaterNamespace + deploymentName := "vpa-updater-with-pod-label-selectors" + updaterDeployment := utils.NewUpdaterDeployment(f, deploymentName, []string{ + "--updater-interval=10s", + "--use-admission-controller-status=false", + fmt.Sprintf("--pod-label-selectors=%s=%s", testLabelKey, testLabelValueMatch), + fmt.Sprintf("--vpa-object-namespace=%s", testNamespace), + }) + utils.StartDeploymentPods(f, updaterDeployment) + f.Namespace.Name = originalNamespace + + defer func() { + ginkgo.By("Cleaning up custom updater deployment") + f.ClientSet.AppsV1().Deployments(utils.UpdaterNamespace).Delete(context.TODO(), deploymentName, metav1.DeleteOptions{}) + }() + + ginkgo.By("Waiting for custom updater to report controlled pods count via metrics") + gomega.Eventually(func() (float64, error) { + return getMetricValue(f, utils.UpdaterNamespace, "vpa_updater_controlled_pods_total", map[string]string{ + "update_mode": string(vpa_types.UpdateModeRecreate), + }) + }, 2*time.Minute, 5*time.Second).Should(gomega.Equal(float64(matchingReplicas)), + "Custom updater should only see %d matching pods (not the %d non-matching pods)", + matchingReplicas, nonMatchingReplicas) + }) }) +func getMetricValue(f *framework.Framework, namespace, metricName string, labels map[string]string) (float64, error) { + // Port forward to the updater pod + pods, err := f.ClientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=custom-vpa-updater", + }) + if err != nil || len(pods.Items) == 0 { + return 0, fmt.Errorf("updater pod not found: %v", err) + } + + // Use kubectl port-forward via exec in the pod + req := f.ClientSet.CoreV1().RESTClient().Get(). + Namespace(namespace). + Resource("pods"). + Name(pods.Items[0].Name). + SubResource("proxy"). + Suffix("metrics") + + result := req.Do(context.TODO()) + body, err := result.Raw() + if err != nil { + return 0, fmt.Errorf("failed to get metrics: %v", err) + } + + // Parse Prometheus metrics format + lines := strings.Split(string(body), "\n") + for _, line := range lines { + if strings.HasPrefix(line, "#") { + continue + } + if !strings.HasPrefix(line, metricName) { + continue + } + + // Match labels + if len(labels) > 0 { + allLabelsMatch := true + for k, v := range labels { + labelPattern := fmt.Sprintf(`%s="%s"`, k, v) + if !strings.Contains(line, labelPattern) { + allLabelsMatch = false + break + } + } + if !allLabelsMatch { + continue + } + } + + // Extract value from end of line + parts := strings.Fields(line) + if len(parts) >= 2 { + value, err := strconv.ParseFloat(parts[len(parts)-1], 64) + if err == nil { + return value, nil + } + } + } + + return 0, fmt.Errorf("metric %s not found", metricName) +} + func setupPodsForUpscalingEviction(f *framework.Framework) *apiv1.PodList { return setupPodsForEviction(f, "100m", "100Mi", nil) } diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater.go b/vertical-pod-autoscaler/pkg/updater/logic/updater.go index ffe266d7cbd..8fc9960ab9a 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater.go @@ -24,8 +24,11 @@ import ( "golang.org/x/time/rate" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" corescheme "k8s.io/client-go/kubernetes/scheme" @@ -102,6 +105,7 @@ func NewUpdater( namespace string, ignoredNamespaces []string, patchCalculators []patch.Calculator, + podLabelSelector labels.Selector, ) (Updater, error) { evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) // TODO: Create in-place rate limits for the in-place rate limiter @@ -118,7 +122,7 @@ func NewUpdater( return &updater{ vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace), - podLister: newPodLister(kubeClient, namespace), + podLister: newPodLister(kubeClient, namespace, podLabelSelector), eventRecorder: newEventRecorder(kubeClient), restrictionFactory: factory, recommendationProcessor: recommendationProcessor, @@ -397,10 +401,30 @@ func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod { }) } -func newPodLister(kubeClient kube_client.Interface, namespace string) v1lister.PodLister { - selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + +func newPodLister(kubeClient kube_client.Interface, namespace string, labelSelector labels.Selector) v1lister.PodLister { + fieldSelector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector) + + listFunc := func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector.String() + if labelSelector != nil && !labelSelector.Empty() { + options.LabelSelector = labelSelector.String() + } + return kubeClient.CoreV1().Pods(namespace).List(context.TODO(), options) + } + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector.String() + if labelSelector != nil && !labelSelector.Empty() { + options.LabelSelector = labelSelector.String() + } + return kubeClient.CoreV1().Pods(namespace).Watch(context.TODO(), options) + } + + podListWatch := &cache.ListWatch{ + ListFunc: listFunc, + WatchFunc: watchFunc, + } + store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) podLister := v1lister.NewPodLister(store) podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour) diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go index 65fc196ba8e..27e234fe0ca 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go @@ -567,3 +567,108 @@ func TestLogDeprecationWarnings(t *testing.T) { }) } } + +func TestNewPodListerWithLabelSelector(t *testing.T) { + tests := []struct { + name string + labelSelectors string + pods []*apiv1.Pod + expectedPodCount int + expectedPodNames []string + }{ + { + name: "no selector returns all pods", + labelSelectors: "", + pods: []*apiv1.Pod{ + test.Pod().WithName("pod1").WithLabels(map[string]string{"app": "test1"}).Get(), + test.Pod().WithName("pod2").WithLabels(map[string]string{"app": "test2"}).Get(), + test.Pod().WithName("pod3").WithLabels(map[string]string{"env": "prod"}).Get(), + }, + expectedPodCount: 3, + expectedPodNames: []string{"pod1", "pod2", "pod3"}, + }, + { + name: "no pods returns empty list", + labelSelectors: "env=prod", + pods: []*apiv1.Pod{}, + expectedPodCount: 0, + expectedPodNames: []string{}, + }, + { + name: "single label selector filters correctly", + labelSelectors: "app=test1", + pods: []*apiv1.Pod{ + test.Pod().WithName("pod1").WithLabels(map[string]string{"app": "test1"}).Get(), + test.Pod().WithName("pod2").WithLabels(map[string]string{"app": "test2"}).Get(), + test.Pod().WithName("pod3").WithLabels(map[string]string{"env": "prod"}).Get(), + }, + expectedPodCount: 1, + expectedPodNames: []string{"pod1"}, + }, + { + name: "multiple label selector filters correctly", + labelSelectors: "app=test1,env=prod", + pods: []*apiv1.Pod{ + test.Pod().WithName("pod1").WithLabels(map[string]string{"app": "test1", "env": "prod"}).Get(), + test.Pod().WithName("pod2").WithLabels(map[string]string{"app": "test1", "env": "dev"}).Get(), + test.Pod().WithName("pod3").WithLabels(map[string]string{"app": "test2", "env": "prod"}).Get(), + test.Pod().WithName("pod4").WithLabels(map[string]string{"app": "test1", "env": "prod"}).Get(), + }, + expectedPodCount: 2, + expectedPodNames: []string{"pod1", "pod4"}, + }, + { + name: "no matching pods returns empty", + labelSelectors: "vpa-enabled=true", + pods: []*apiv1.Pod{ + test.Pod().WithName("pod1").WithLabels(map[string]string{"app": "test1"}).Get(), + test.Pod().WithName("pod2").WithLabels(map[string]string{"app": "test2"}).Get(), + }, + expectedPodCount: 0, + expectedPodNames: []string{}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // parse label selectors + var selector labels.Selector + if tc.labelSelectors != "" { + var err error + selector, err = labels.Parse(tc.labelSelectors) + assert.NoError(t, err) + } + + kubeClient := fake.NewSimpleClientset() + podLister := newPodLister(kubeClient, "default", selector) + + // add pods + for _, pod := range tc.pods { + pod.Namespace = "default" + pod.Status.Phase = apiv1.PodRunning + _, err := kubeClient.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + + // wait for cache to pick up the pods + assert.Eventually(t, func() bool { + pods, err := podLister.Pods("default").List(labels.Everything()) + return err == nil && len(pods) == tc.expectedPodCount + }, 5*time.Second, 100*time.Millisecond, "failed to sync pods to cache") + + // list and verify pods + pods, err := podLister.Pods("default").List(labels.Everything()) + assert.NoError(t, err) + assert.Equal(t, tc.expectedPodCount, len(pods), "expected %d pods but got %d", tc.expectedPodCount, len(pods)) + + actualNames := make([]string, len(pods)) + for i, pod := range pods { + actualNames[i] = pod.Name + } + + for _, expectedName := range tc.expectedPodNames { + assert.Contains(t, actualNames, expectedName, "expected pod %s to be listed", expectedName) + } + }) + } +} diff --git a/vertical-pod-autoscaler/pkg/updater/main.go b/vertical-pod-autoscaler/pkg/updater/main.go index 8394fd54b29..d5e595d62d6 100644 --- a/vertical-pod-autoscaler/pkg/updater/main.go +++ b/vertical-pod-autoscaler/pkg/updater/main.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" @@ -75,6 +76,9 @@ var ( useAdmissionControllerStatus = flag.Bool("use-admission-controller-status", true, "If true, updater will only evict pods when admission controller status is valid.") + podLabelSelectors = flag.String("pod-label-selectors", "", + "If present, the updater will only process pods matching the given label selectors.") + namespace = os.Getenv("NAMESPACE") ) @@ -204,6 +208,16 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { ignoredNamespaces := strings.Split(commonFlag.IgnoredVpaObjectNamespaces, ",") + var podSelector labels.Selector + if *podLabelSelectors != "" { + var err error + podSelector, err = labels.Parse(*podLabelSelectors) + if err != nil { + klog.ErrorS(err, "Failed to parse pod label selector", "selector", *podLabelSelectors) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } + recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator)) calculators := []patch.Calculator{inplace.NewResourceInPlaceUpdatesCalculator(recommendationProvider), inplace.NewInPlaceUpdatedCalculator()} @@ -226,6 +240,7 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { commonFlag.VpaObjectNamespace, ignoredNamespaces, calculators, + podSelector, ) if err != nil { klog.ErrorS(err, "Failed to create updater")