From 6ca01999b58844f1352eac8d023de0b3ad06cc72 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Tue, 26 Aug 2025 14:35:12 +0530 Subject: [PATCH] add support for "bench warp" command Signed-off-by: Utkarsh Srivastava --- deploy/warp/warp-job.yaml | 17 ++ deploy/warp/warp-svc.yaml | 14 ++ deploy/warp/warp.yaml | 39 ++++ pkg/admission/server.go | 18 +- pkg/bench/warp.go | 447 ++++++++++++++++++++++++++++++++++++++ pkg/bundle/deploy.go | 81 +++++++ pkg/cli/cli.go | 6 + pkg/util/util.go | 18 ++ 8 files changed, 630 insertions(+), 10 deletions(-) create mode 100644 deploy/warp/warp-job.yaml create mode 100644 deploy/warp/warp-svc.yaml create mode 100644 deploy/warp/warp.yaml create mode 100644 pkg/bench/warp.go diff --git a/deploy/warp/warp-job.yaml b/deploy/warp/warp-job.yaml new file mode 100644 index 0000000000..730b7b4585 --- /dev/null +++ b/deploy/warp/warp-job.yaml @@ -0,0 +1,17 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: warp-job +spec: + template: + spec: + containers: + - name: warp-job + env: + - name: WARP_ACCESS_KEY + - name: WARP_SECRET_KEY + image: "minio/warp:latest" + imagePullPolicy: Always + restartPolicy: Never + backoffLimit: 0 + diff --git a/deploy/warp/warp-svc.yaml b/deploy/warp/warp-svc.yaml new file mode 100644 index 0000000000..a1c5948393 --- /dev/null +++ b/deploy/warp/warp-svc.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: warp + labels: + app: warp +spec: + publishNotReadyAddresses: true + clusterIP: None + ports: + - port: 7761 + name: warp + selector: + app: warp \ No newline at end of file diff --git a/deploy/warp/warp.yaml b/deploy/warp/warp.yaml new file mode 100644 index 0000000000..54d2f36cde --- /dev/null +++ b/deploy/warp/warp.yaml @@ -0,0 +1,39 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: warp + labels: + app: warp +spec: + serviceName: warp + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: warp + template: + metadata: + name: warp + labels: + app: warp + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - warp + topologyKey: "kubernetes.io/hostname" + containers: + - name: warp + image: "minio/warp:latest" + imagePullPolicy: Always + args: + - client + ports: + - name: http + containerPort: 7761 + diff --git a/pkg/admission/server.go b/pkg/admission/server.go index bba764677a..ba5cb26181 100644 --- a/pkg/admission/server.go +++ b/pkg/admission/server.go @@ -6,10 +6,10 @@ import ( "fmt" "net/http" "os" - "os/signal" "syscall" "github.com/noobaa/noobaa-operator/v5/pkg/options" + "github.com/noobaa/noobaa-operator/v5/pkg/util" "github.com/sirupsen/logrus" ) @@ -67,13 +67,11 @@ func RunAdmissionServer() { log.Infof("Server running and listening in port: %s", port) // listening shutdown singal - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - <-signalChan - - log.Info("Got shutdown signal, shutting down webhook server gracefully...") - err = server.Shutdown(context.Background()) - if err != nil { - log.Info("Failed to Shutdown admission server") - } + util.OnSignal(func() { + log.Info("Got shutdown signal, shutting down webhook server gracefully...") + err = server.Shutdown(context.Background()) + if err != nil { + log.Info("Failed to Shutdown admission server") + } + }, syscall.SIGINT, syscall.SIGTERM) } diff --git a/pkg/bench/warp.go b/pkg/bench/warp.go new file mode 100644 index 0000000000..75097bfc4e --- /dev/null +++ b/pkg/bench/warp.go @@ -0,0 +1,447 @@ +package bench + +import ( + "context" + "fmt" + "io" + "os" + "slices" + "strings" + "syscall" + "time" + + "github.com/noobaa/noobaa-operator/v5/pkg/bundle" + "github.com/noobaa/noobaa-operator/v5/pkg/options" + "github.com/noobaa/noobaa-operator/v5/pkg/util" + "github.com/spf13/cobra" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type EndpointType = string + +const ( + EndpointInternal EndpointType = "internal" + EndpointPodIP EndpointType = "podip" + EndpointNodeport EndpointType = "nodeport" + EndpointLoadbalancer EndpointType = "loadbalancer" + EndpointManual EndpointType = "manual" +) + +func Cmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "bench", + Short: "Run benchmark", + } + cmd.AddCommand( + CmdWarp(), + ) + + return cmd +} + +func CmdWarp() *cobra.Command { + cmd := &cobra.Command{ + Use: "warp", + Short: "Run warp benchmark", + Run: RunBenchWarp, + Args: cobra.ExactArgs(1), + } + cmd.Flags().String( + "bucket", "first.bucket", + "Bucket to use for benchmark data. ALL DATA WILL BE DELETED IN BUCKET!", + ) + cmd.Flags().Bool( + "use-https", false, + "Use HTTPS endpoints for benchmark", + ) + cmd.Flags().Int32( + "clients", 0, + "Number of warp instances", + ) + cmd.Flags().String( + "image", "minio/warp:latest", + "Warp image", + ) + cmd.Flags().String( + "endpoint-type", EndpointInternal, + fmt.Sprintf( + "Endpoint type could be %s,%s,%s,%s,%s", + EndpointInternal, EndpointPodIP, EndpointNodeport, + EndpointLoadbalancer, EndpointManual, + ), + ) + cmd.Flags().String( + "access-key", "", + "Access Key to access the S3 bucket", + ) + cmd.Flags().String( + "secret-key", "", + "Secret Key to access the S3 bucket", + ) + cmd.Flags().String( + "warp-args", "", + "Arguments to be passed directly to warp CLI", + ) + + return cmd +} + +func RunBenchWarp(cmd *cobra.Command, args []string) { + log := util.Logger() + log.Info("Starting warp benchmark") + + bench := args[0] + bucket := util.GetFlagStringOrPrompt(cmd, "bucket") + image := util.GetFlagStringOrPrompt(cmd, "image") + endpointType := util.GetFlagStringOrPrompt(cmd, "endpoint-type") + + useHTTPs, _ := cmd.Flags().GetBool("use-https") + clients, _ := cmd.Flags().GetInt32("clients") + accessKey, _ := cmd.Flags().GetString("access-key") + secretKey, _ := cmd.Flags().GetString("secret-key") + warpArgs, _ := cmd.Flags().GetString("warp-args") + + if accessKey == "" || secretKey == "" { + noobaaAdminSecret := util.KubeObject(bundle.File_deploy_internal_secret_empty_yaml).(*corev1.Secret) + noobaaAdminSecret.Name = "noobaa-admin" + noobaaAdminSecret.Namespace = options.Namespace + + if !util.KubeCheck(noobaaAdminSecret) { + log.Fatal("❌ Access Key and/or Secret Key not provided and failed to fetch \"noobaa-admin\" secret") + } + + accessKey = noobaaAdminSecret.StringData["AWS_ACCESS_KEY_ID"] + secretKey = noobaaAdminSecret.StringData["AWS_SECRET_ACCESS_KEY"] + + if accessKey == "" || secretKey == "" { + log.Fatal("❌ Access Key and/or Secret Key not provided and failed to find credentials in \"noobaa-admin\" secret") + } + } + + clients = getClientNums(clients) + if clients == 0 { + log.Fatal("❌ Number of clients cannot be 0") + } + + // Validate "bench" type + allowedBench := []string{ + "mixed", "get", "put", + "delete", "list", "stat", + "versioned", "multipart", + "multipart-put", + } + if !slices.Contains(allowedBench, bench) { + log.Fatalf("❌ Invalid bench provided - supported %+v", allowedBench) + } + + // Validate endpoint type + allowedEndpointType := []EndpointType{ + EndpointInternal, EndpointLoadbalancer, EndpointNodeport, + EndpointPodIP, EndpointManual, + } + if !slices.Contains(allowedEndpointType, endpointType) { + log.Fatalf("❌ Invalid endpoint-type provided - supported %+v", allowedEndpointType) + } + + go util.OnSignal(func() { + cleanupWarp() + }, syscall.SIGINT, syscall.SIGTERM) + + provisionWarp(clients, image) + startWarpJob( + clients, + bench, bucket, accessKey, secretKey, image, + endpointType, useHTTPs, warpArgs, + ) + pollWarp() + cleanupWarp() +} + +func provisionWarp(clients int32, image string) { + log := util.Logger() + + warpSts := util.KubeObject(bundle.File_deploy_warp_warp_yaml).(*appsv1.StatefulSet) + containers := warpSts.Spec.Template.Spec.Containers + if len(containers) != 1 { + log.Fatal("❌ Unexepected number of containers in the Warp STS") + } + + // Provision in the same namespace where NooBaa is running + warpSts.Namespace = options.Namespace + warpSts.Spec.Replicas = &clients + if image != "" { + containers[0].Image = image + } + util.KubeApply(warpSts) + + // Wait for the warp client to get ready + if err := wait.PollUntilContextCancel(context.TODO(), 2*time.Second, true, func(ctx context.Context) (done bool, err error) { + util.KubeCheck(warpSts) + return warpSts.Status.ReadyReplicas == *warpSts.Spec.Replicas, nil + }); err != nil { + log.Fatalf("❌ error while waiting for warp sts to be ready: %s", err) + } + + warpSvc := util.KubeObject(bundle.File_deploy_warp_warp_svc_yaml).(*corev1.Service) + warpSvc.Namespace = options.Namespace + util.KubeApply(warpSvc) +} + +func startWarpJob( + clients int32, + bench, bucket, accessKey, secretKey, image string, + endpointType EndpointType, + useHttps bool, + warpArgs string, +) { + warpJob := util.KubeObject(bundle.File_deploy_warp_warp_job_yaml).(*batchv1.Job) + warpJob.Namespace = options.Namespace + + for idx, container := range warpJob.Spec.Template.Spec.Containers { + if container.Name == "warp-job" { + warpJob.Spec.Template.Spec.Containers[idx].Env = []corev1.EnvVar{ + { + Name: "WARP_ACCESS_KEY", + Value: accessKey, + }, + { + Name: "WARP_SECRET_KEY", + Value: secretKey, + }, + } + + args := append( + container.Args, + bench, + "--bucket", bucket, + ) + if endpointType != EndpointManual { + args = append( + args, + "--host", prepareWarpHostList(endpointType, useHttps), + ) + } + if clients == 1 { + args = append( + args, + "--warp-client", fmt.Sprintf("warp-0.warp.%s.svc.cluster.local:7761", options.Namespace), + ) + } else { + args = append( + args, + "--warp-client", fmt.Sprintf("warp-{0..%d}.warp.%s.svc.cluster.local:7761", clients-1, options.Namespace), + ) + } + + if useHttps { + args = append(args, "--insecure", "--tls") + } + if warpArgs != "" { + args = append(args, strings.Split(warpArgs, " ")...) + } + + warpJob.Spec.Template.Spec.Containers[idx].Args = args + warpJob.Spec.Template.Spec.Containers[idx].Image = image + break + } + } + + util.KubeApply(warpJob) +} + +func pollWarp() { + log := util.Logger() + + log.Infof( + "Benchmark started - you can check the warp client logs by \"kubectl logs -f -n %s -l app=warp\"\n", + options.Namespace, + ) + + warpJob := util.KubeObject(bundle.File_deploy_warp_warp_job_yaml).(*batchv1.Job) + warpJob.Namespace = options.Namespace + for { + if !util.KubeCheckQuiet(warpJob) { + log.Error("❌ No Warp Job found to poll") + break + } + + if warpJob.Status.Succeeded == 0 && warpJob.Status.Failed == 0 { + log.Info("Benchmark still running") + } else { + break + } + + time.Sleep(5 * time.Second) + } + + if warpJob.Status.Failed != 0 { + log.Error("❌ Warp Job Failed") + } + + warpJobPodList := &corev1.PodList{} + util.KubeList(warpJobPodList, client.InNamespace(options.Namespace), client.MatchingLabels{ + "job-name": "warp-job", + }) + + for _, pod := range warpJobPodList.Items { + logs, err := util.GetPodLogs(pod) + if err != nil { + log.Errorf("❌ Failed to get logs for pod %q - %s", pod.Name, err) + continue + } + + for _, container := range logs { + if _, err := io.Copy(os.Stdout, container); err != nil { + log.Warn("encountered error while copying logs -", err) + } + } + } +} + +func cleanupWarp() { + util.Logger().Info("Cleaning up Warp") + + warpJob := util.KubeObject(bundle.File_deploy_warp_warp_job_yaml).(*batchv1.Job) + warpJob.Namespace = options.Namespace + deletePolicy := metav1.DeletePropagationForeground + util.KubeDelete(warpJob, &client.DeleteOptions{ + PropagationPolicy: &deletePolicy, + }) + + warpSvc := util.KubeObject(bundle.File_deploy_warp_warp_svc_yaml).(*corev1.Service) + warpSvc.Namespace = options.Namespace + util.KubeDelete(warpSvc) + + warpSts := util.KubeObject(bundle.File_deploy_warp_warp_yaml).(*appsv1.StatefulSet) + warpSts.Namespace = options.Namespace + util.KubeDelete(warpSts) +} + +func prepareWarpHostList(endpointType EndpointType, https bool) string { + log := util.Logger() + ips := []string{} + port := int32(0) + + portname := "s3" + if https { + portname = "s3-https" + } + + getPort := func(svc *corev1.Service, portname string, fn func(corev1.ServicePort) int32) int32 { + for _, port := range svc.Spec.Ports { + if port.Name == portname { + return fn(port) + } + } + + return 0 + } + + s3svc := util.KubeObject(bundle.File_deploy_internal_service_s3_yaml).(*corev1.Service) + s3svc.Namespace = options.Namespace + s3svc.Spec = corev1.ServiceSpec{} + if !util.KubeCheck(s3svc) { + log.Fatalf("❌ Failed to find S3 service in namespace %s", options.Namespace) + } + + if endpointType == EndpointInternal { + ips = append(ips, fmt.Sprintf("s3.%s.svc", options.Namespace)) + port = getPort(s3svc, portname, func(sp corev1.ServicePort) int32 { + return sp.Port + }) + } + if endpointType == EndpointPodIP { + endpoints := corev1.PodList{} + if !util.KubeList(&endpoints, client.InNamespace(options.Namespace), client.MatchingLabels{"noobaa-s3": "noobaa"}) { + log.Fatalf("❌ Failed to find endpoint pods in namespace: %s", options.Namespace) + } + + for _, ep := range endpoints.Items { + ips = append(ips, ep.Status.PodIP) + } + port = getPort(s3svc, portname, func(sp corev1.ServicePort) int32 { + return sp.TargetPort.IntVal + }) + } + if endpointType == EndpointNodeport { + nodes := corev1.NodeList{} + if !util.KubeList(&nodes) || len(nodes.Items) == 0 { + log.Fatalf("❌ Failed to find node IP - failed to find nodes") + } + + for _, node := range nodes.Items { + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeExternalIP { + ips = append(ips, address.Address) + } + } + + // Use the first external IP we find on the node + if len(ips) > 0 { + break + } + } + // Fallback to interalIP if no external found + if len(ips) == 0 { + for _, node := range nodes.Items { + for _, a := range node.Status.Addresses { + if a.Type == corev1.NodeInternalIP { + ips = append(ips, a.Address) + } + } + + if len(ips) > 0 { + break + } + } + } + port = getPort(s3svc, portname, func(sp corev1.ServicePort) int32 { + return sp.NodePort + }) + } + if endpointType == EndpointLoadbalancer { + if len(s3svc.Status.LoadBalancer.Ingress) == 0 { + log.Fatal("❌ Failed to find loadbalancer ingress") + } + + if s3svc.Status.LoadBalancer.Ingress[0].IP != "" { + ips = append(ips, s3svc.Status.LoadBalancer.Ingress[0].IP) + } else if s3svc.Status.LoadBalancer.Ingress[0].Hostname != "" { + ips = append(ips, s3svc.Status.LoadBalancer.Ingress[0].Hostname) + } else { + log.Fatal("❌ Failed to find loadbalancer IP/Hostname") + } + port = getPort(s3svc, portname, func(sp corev1.ServicePort) int32 { + return sp.Port + }) + } + + return strings.Join(util.Map(ips, func(ip string) string { + return fmt.Sprintf("%s:%d", ip, port) + }), ",") +} + +func getClientNums(clients int32) int32 { + log := util.Logger() + + nodelist := corev1.NodeList{} + util.KubeList(&nodelist) + nodeCount := len(nodelist.Items) + if nodeCount == 0 { + log.Fatal("❌ No nodes found to run warp clients") + } + + if clients > int32(nodeCount) || clients == 0 { + clients = int32(nodeCount) + if clients > int32(nodeCount) { + log.Warn("Number of clients cannot exceed number of nodes") + } + } + + return clients +} diff --git a/pkg/bundle/deploy.go b/pkg/bundle/deploy.go index 24fca381cc..7a4286dabf 100644 --- a/pkg/bundle/deploy.go +++ b/pkg/bundle/deploy.go @@ -6924,3 +6924,84 @@ metadata: name: custom-metrics-prometheus-adapter ` +const Sha256_deploy_warp_warp_job_yaml = "dab54e11a74caf45f5cd0f5b54c88c3937a1837e00744164f670f2dad58cefc4" + +const File_deploy_warp_warp_job_yaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: warp-job +spec: + template: + spec: + containers: + - name: warp-job + env: + - name: WARP_ACCESS_KEY + - name: WARP_SECRET_KEY + image: "minio/warp:latest" + imagePullPolicy: Always + restartPolicy: Never + backoffLimit: 0 + +` + +const Sha256_deploy_warp_warp_svc_yaml = "b9d0fff93a030ab846817d6bbdbe85a52b0277312f5f1c8933a7cc4a44a0f6c5" + +const File_deploy_warp_warp_svc_yaml = `apiVersion: v1 +kind: Service +metadata: + name: warp + labels: + app: warp +spec: + publishNotReadyAddresses: true + clusterIP: None + ports: + - port: 7761 + name: warp + selector: + app: warp` + +const Sha256_deploy_warp_warp_yaml = "869bfc23eaf9c79ef03d8315257c0e63136cda2772c00353bff54e4f699be471" + +const File_deploy_warp_warp_yaml = `apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: warp + labels: + app: warp +spec: + serviceName: warp + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app: warp + template: + metadata: + name: warp + labels: + app: warp + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - warp + topologyKey: "kubernetes.io/hostname" + containers: + - name: warp + image: "minio/warp:latest" + imagePullPolicy: Always + args: + - client + ports: + - name: http + containerPort: 7761 + +` + diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index ae94c56faf..bb30f507b0 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -8,6 +8,7 @@ import ( "time" "github.com/noobaa/noobaa-operator/v5/pkg/backingstore" + "github.com/noobaa/noobaa-operator/v5/pkg/bench" "github.com/noobaa/noobaa-operator/v5/pkg/bucket" "github.com/noobaa/noobaa-operator/v5/pkg/bucketclass" "github.com/noobaa/noobaa-operator/v5/pkg/cnpg" @@ -27,6 +28,8 @@ import ( "github.com/noobaa/noobaa-operator/v5/pkg/util" "github.com/noobaa/noobaa-operator/v5/pkg/version" "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -59,6 +62,8 @@ const ASCIILogo2 = ` // Run runs func Run() { + log.SetLogger(zap.New()) + err := Cmd().Execute() if err != nil { os.Exit(1) @@ -147,6 +152,7 @@ Load noobaa completion to bash: pvstore.Cmd(), crd.Cmd(), olm.Cmd(), + bench.Cmd(), }, }} diff --git a/pkg/util/util.go b/pkg/util/util.go index a7cd469c94..4f4068ada1 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -18,6 +18,7 @@ import ( "net/url" "os" "os/exec" + "os/signal" "path/filepath" "regexp" "sort" @@ -2408,3 +2409,20 @@ func MakeAuthToken(payload map[string]any, secret []byte) (string, error) { return jwt.NewWithClaims(jwt.SigningMethodHS256, filtered).SignedString(secret) } + +func Map[T any, U any](slice []T, fn func(T) U) []U { + result := make([]U, len(slice)) + for idx, item := range slice { + result[idx] = fn(item) + } + + return result +} + +func OnSignal(cb func(), signals ...os.Signal) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, signals...) + <-signalChan + + cb() +}