Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,15 @@ func main() {
// CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources
// necessary to configure Prometheus to scrape metrics from this operator.
services := []*v1.Service{service}
_, err = metrics.CreateServiceMonitors(cfg, namespace, services)
ns := namespace
if namespace == "" {
ns, err = k8sutil.GetOperatorNamespace()
if err != nil {
log.Info("Cannot find operator namespace", "error", err.Error() )
os.Exit(1)
}
}
_, err = metrics.CreateServiceMonitors(cfg, ns, services)
if err != nil {
log.Info("Could not create ServiceMonitor object", "error", err.Error())
// If this operator is deployed to a cluster without the prometheus-operator running, it will return
Expand Down
61 changes: 60 additions & 1 deletion pkg/controller/manager/ensurer.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package manager

import (
"os"
"strings"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/operator-framework/operator-sdk/pkg/metrics"
redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1"
"github.com/ucloud/redis-cluster-operator/pkg/k8sutil"
"github.com/ucloud/redis-cluster-operator/pkg/osm"
"github.com/ucloud/redis-cluster-operator/pkg/resources/configmaps"
"github.com/ucloud/redis-cluster-operator/pkg/resources/poddisruptionbudgets"
"github.com/ucloud/redis-cluster-operator/pkg/resources/services"
"github.com/ucloud/redis-cluster-operator/pkg/resources/statefulsets"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

type IEnsureResource interface {
Expand All @@ -31,6 +35,7 @@ type realEnsureResource struct {
svcClient k8sutil.IServiceControl
configMapClient k8sutil.IConfigMapControl
pdbClient k8sutil.IPodDisruptionBudgetControl
pvcClient k8sutil.IPvcControl
crClient k8sutil.ICustomResource
client client.Client
logger logr.Logger
Expand All @@ -42,6 +47,7 @@ func NewEnsureResource(client client.Client, logger logr.Logger) IEnsureResource
svcClient: k8sutil.NewServiceController(client),
configMapClient: k8sutil.NewConfigMapController(client),
pdbClient: k8sutil.NewPodDisruptionBudgetController(client),
pvcClient: k8sutil.NewPvcController(client),
crClient: k8sutil.NewCRControl(client),
client: client,
logger: logger,
Expand Down Expand Up @@ -70,6 +76,18 @@ func (r *realEnsureResource) ensureRedisStatefulset(cluster *redisv1alpha1.Distr
return false, err
}

foundPvcs, err := r.pvcClient.ListPvcByLabels(cluster.Namespace, labels)
if err != nil {
return false, err
}
for _, pvc := range foundPvcs.Items {
if pvc.Spec.Resources.Requests["ResourceStorage"] != cluster.Spec.Storage.Size {
if err = r.pvcClient.UpdatPvcByLabels(cluster, &pvc); err != nil {
return false, err
}
}
}

ss, err := r.statefulSetClient.GetStatefulSet(cluster.Namespace, ssName)
if err == nil {
if shouldUpdateRedis(cluster, ss) {
Expand All @@ -79,6 +97,7 @@ func (r *realEnsureResource) ensureRedisStatefulset(cluster *redisv1alpha1.Distr
if err != nil {
return false, err
}

return true, r.statefulSetClient.UpdateStatefulSet(newSS)
}
} else if err != nil && errors.IsNotFound(err) {
Expand Down Expand Up @@ -183,8 +202,35 @@ func (r *realEnsureResource) EnsureRedisSvc(cluster *redisv1alpha1.DistributedRe
r.logger.WithValues("Service.Namespace", cluster.Namespace, "Service.Name", cluster.Spec.ServiceName).
Info("creating a new service")
svc := services.NewSvcForCR(cluster, name, labels)
return r.svcClient.CreateService(svc)
err := r.svcClient.CreateService(svc)

if err == nil {
r.logger.WithValues("Service.Namespace", cluster.Namespace, "Service.Name", cluster.Spec.ServiceName).
Info("creating a new servicemonitor")
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
r.logger.Error(err, "")
os.Exit(1)
}

// CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources
// necessary to configure Prometheus to scrape metrics from this operator.
services := []*v1.Service{svc}

_, err = metrics.CreateServiceMonitors(cfg, cluster.Namespace, services)
if err != nil {
r.logger.Info("Could not create ServiceMonitor object", "error", err.Error())
// If this operator is deployed to a cluster without the prometheus-operator running, it will return
// ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation.
if err == metrics.ErrServiceMonitorNotPresent {
r.logger.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error())
}
}
}

}

return err
}

Expand Down Expand Up @@ -290,5 +336,18 @@ func (r *realEnsureResource) updateRedisStatefulset(cluster *redisv1alpha1.Distr
if err != nil {
return err
}

foundPvcs, err := r.pvcClient.ListPvcByLabels(cluster.Namespace, labels)
if err != nil {
return err
}
for _, pvc := range foundPvcs.Items {
if pvc.Spec.Resources.Requests["ResourceStorage"] != cluster.Spec.Storage.Size {
if err = r.pvcClient.UpdatPvcByLabels(cluster, &pvc); err != nil {
return err
}
}
}

return r.statefulSetClient.UpdateStatefulSet(newSS)
}
34 changes: 34 additions & 0 deletions pkg/k8sutil/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8sutil
import (
"context"

redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -13,6 +14,8 @@ type IPvcControl interface {
DeletePvc(claim *corev1.PersistentVolumeClaim) error
DeletePvcByLabels(namespace string, labels map[string]string) error
GetPvc(namespace, name string) (*corev1.PersistentVolumeClaim, error)
UpdatPvcByLabels(cluster *redisv1alpha1.DistributedRedisCluster, pvc *corev1.PersistentVolumeClaim) error
ListPvcByLabels(namespace string, labels map[string]string) (*corev1.PersistentVolumeClaimList, error)
}

type pvcController struct {
Expand Down Expand Up @@ -45,6 +48,37 @@ func (s *pvcController) DeletePvcByLabels(namespace string, labels map[string]st
return nil
}

func (s *pvcController) ListPvcByLabels(namespace string, labels map[string]string) (*corev1.PersistentVolumeClaimList, error) {
foundPvcs := &corev1.PersistentVolumeClaimList{}
err := s.client.List(context.TODO(), foundPvcs, client.InNamespace(namespace), client.MatchingLabels(labels))
if err != nil {
return nil, err
}
return foundPvcs, nil
}

func (s *pvcController) UpdatPvcByLabels(cluster *redisv1alpha1.DistributedRedisCluster, pvc *corev1.PersistentVolumeClaim) error {

mode := corev1.PersistentVolumeFilesystem

pvcSpec := &corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: cluster.Spec.Storage.Size,
},
},
StorageClassName: &cluster.Spec.Storage.Class,
VolumeMode: &mode,
}
pvcSpec.Resources.DeepCopyInto(&pvc.Spec.Resources)
if err := s.client.Update(context.TODO(), pvc); err != nil {
return err
}

return nil
}

// GetPvc implement the IPvcControl.Interface.
func (s *pvcController) GetPvc(namespace, name string) (*corev1.PersistentVolumeClaim, error) {
pvc := &corev1.PersistentVolumeClaim{}
Expand Down
8 changes: 7 additions & 1 deletion pkg/k8sutil/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ func (s *stateFulSetController) CreateStatefulSet(ss *appsv1.StatefulSet) error

// UpdateStatefulSet implement the IStatefulSetControl.Interface.
func (s *stateFulSetController) UpdateStatefulSet(ss *appsv1.StatefulSet) error {
return s.client.Update(context.TODO(), ss)
origss, err := s.GetStatefulSet(ss.Namespace, ss.Name)
if err != nil {
return err
}
patch := client.MergeFrom(origss.DeepCopy())
ss.Spec.Template.DeepCopyInto(&origss.Spec.Template)
return s.client.Patch(context.TODO(), origss, patch)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @debbyku,
With this change I was able to restart pods during resource updates, but the change of the number of replicas is not working anymore. Can you confirm it?

Thanks!!!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed the replicas update here as the "replicas" is not in ss.Spec.Template.

I've modified the codes and and push again.

Thanks.

}

// DeleteStatefulSet implement the IStatefulSetControl.Interface.
Expand Down
4 changes: 4 additions & 0 deletions pkg/redisutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func NewClient(addr, password string, cnxTimeout time.Duration, commandsMapping
if password != "" {
err = c.client.Cmd("AUTH", password).Err
}
if err != nil {
panic(err)
}

return c, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewSvcForCR(cluster *redisv1alpha1.DistributedRedisCluster, name string, la
ports = append(ports, clientPort, gossipPort)
} else {
ports = append(ports, clientPort, gossipPort,
corev1.ServicePort{Name: "prom-http", Port: cluster.Spec.Monitor.Prometheus.Port})
corev1.ServicePort{Name: "http-metrics", Port: cluster.Spec.Monitor.Prometheus.Port})
}

svc := &corev1.Service{
Expand Down
7 changes: 4 additions & 3 deletions pkg/resources/statefulsets/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@ func getRedisCommand(cluster *redisv1alpha1.DistributedRedisCluster, password *c
"/conf/redis.conf",
"--cluster-enabled yes",
"--cluster-config-file /data/nodes.conf",
"--cluster-announce-ip $(POD_IP)",
}
if password != nil {
cmd = append(cmd, fmt.Sprintf("--requirepass '$(%s)'", redisv1alpha1.PasswordENV),
fmt.Sprintf("--masterauth '$(%s)'", redisv1alpha1.PasswordENV))
cmd = append(cmd, fmt.Sprintf("--requirepass \"$(%s)\"", redisv1alpha1.PasswordENV),
fmt.Sprintf("--masterauth \"$(%s)\"", redisv1alpha1.PasswordENV))
}

renameCmdMap := utils.BuildCommandReplaceMapping(config.RedisConf().GetRenameCommandsFile(), log)
Expand Down Expand Up @@ -319,7 +320,7 @@ func redisExporterContainer(cluster *redisv1alpha1.DistributedRedisCluster, pass
ImagePullPolicy: corev1.PullAlways,
Ports: []corev1.ContainerPort{
{
Name: "prom-http",
Name: "http-metrics",
Protocol: corev1.ProtocolTCP,
ContainerPort: cluster.Spec.Monitor.Prometheus.Port,
},
Expand Down