diff --git a/.evergreen-tasks.yml b/.evergreen-tasks.yml index 2d0803097..1c3e65f27 100644 --- a/.evergreen-tasks.yml +++ b/.evergreen-tasks.yml @@ -942,6 +942,11 @@ tasks: commands: - func: e2e_test + - name: e2e_multi_cluster_new_replica_set_scale_up + tags: [ "patch-run" ] + commands: + - func: e2e_test + - name: e2e_multi_cluster_scale_up_cluster tags: [ "patch-run" ] commands: diff --git a/.evergreen.yml b/.evergreen.yml index 1732b36b6..819c795c7 100644 --- a/.evergreen.yml +++ b/.evergreen.yml @@ -838,6 +838,7 @@ task_groups: - e2e_multi_cluster_mtls_test - e2e_multi_cluster_replica_set_deletion - e2e_multi_cluster_replica_set_scale_up + - e2e_multi_cluster_new_replica_set_scale_up - e2e_multi_cluster_scale_up_cluster - e2e_multi_cluster_scale_up_cluster_new_cluster - e2e_multi_cluster_replica_set_scale_down diff --git a/api/v1/mdb/mongodb_types.go b/api/v1/mdb/mongodb_types.go index 199146431..3ae61d881 100644 --- a/api/v1/mdb/mongodb_types.go +++ b/api/v1/mdb/mongodb_types.go @@ -443,6 +443,8 @@ type MongoDbSpec struct { // +kubebuilder:pruning:PreserveUnknownFields // +optional MemberConfig []automationconfig.MemberOptions `json:"memberConfig,omitempty"` + + ClusterSpecList ClusterSpecList `json:"clusterSpecList,omitempty"` } func (m *MongoDbSpec) GetExternalDomain() *string { @@ -452,6 +454,17 @@ func (m *MongoDbSpec) GetExternalDomain() *string { return nil } +// GetExternalDomainForMemberCluster returns the external domain for a specific member cluster. Falls back to the global +// external domain if not found. +func (m *MongoDbSpec) GetExternalDomainForMemberCluster(clusterName string) *string { + if cfg := m.ClusterSpecList.GetExternalAccessConfigurationForMemberCluster(clusterName); cfg != nil { + if externalDomain := cfg.ExternalDomain; externalDomain != nil { + return externalDomain + } + } + return m.GetExternalDomain() +} + func (m *MongoDbSpec) GetHorizonConfig() []MongoDBHorizonConfig { return m.Connectivity.ReplicaSetHorizons } diff --git a/api/v1/mdb/mongodb_validation.go b/api/v1/mdb/mongodb_validation.go index 651daa644..32290e175 100644 --- a/api/v1/mdb/mongodb_validation.go +++ b/api/v1/mdb/mongodb_validation.go @@ -327,7 +327,7 @@ func additionalMongodConfig(ms MongoDbSpec) v1.ValidationResult { } func replicasetMemberIsSpecified(ms MongoDbSpec) v1.ValidationResult { - if ms.ResourceType == ReplicaSet && ms.Members == 0 { + if ms.ResourceType == ReplicaSet && !ms.IsMultiCluster() && ms.Members == 0 { return v1.ValidationError("'spec.members' must be specified if type of MongoDB is %s", ms.ResourceType) } return v1.ValidationSuccess() diff --git a/api/v1/mdb/mongodbbuilder.go b/api/v1/mdb/mongodbbuilder.go index 737be1ba5..81dace55f 100644 --- a/api/v1/mdb/mongodbbuilder.go +++ b/api/v1/mdb/mongodbbuilder.go @@ -23,6 +23,19 @@ func NewDefaultReplicaSetBuilder() *MongoDBBuilder { return defaultMongoDB(ReplicaSet) } +func NewDefaultMultiReplicaSetBuilder() *MongoDBBuilder { + b := defaultMongoDB(ReplicaSet). + SetMultiClusterTopology() + + // Set test OpsManager config and credentials (matching multi-cluster test fixtures) + b.mdb.Spec.OpsManagerConfig = &PrivateCloudConfig{ + ConfigMapRef: ConfigMapRef{Name: "my-project"}, + } + b.mdb.Spec.Credentials = "my-credentials" + + return b +} + func NewDefaultShardedClusterBuilder() *MongoDBBuilder { return defaultMongoDB(ShardedCluster). SetShardCountSpec(3). @@ -264,6 +277,29 @@ func (b *MongoDBBuilder) AddDummyOpsManagerConfig() *MongoDBBuilder { return b } +func (b *MongoDBBuilder) SetDefaultClusterSpecList() *MongoDBBuilder { + b.mdb.Spec.ClusterSpecList = ClusterSpecList{ + { + ClusterName: "test-cluster-0", + Members: 1, + }, + { + ClusterName: "test-cluster-1", + Members: 1, + }, + { + ClusterName: "test-cluster-2", + Members: 1, + }, + } + return b +} + +func (b *MongoDBBuilder) SetClusterSpecList(clusterSpecList ClusterSpecList) *MongoDBBuilder { + b.mdb.Spec.ClusterSpecList = clusterSpecList + return b +} + func (b *MongoDBBuilder) SetAllClusterSpecLists(clusterSpecList ClusterSpecList) *MongoDBBuilder { b.mdb.Spec.ShardSpec.ClusterSpecList = clusterSpecList b.mdb.Spec.ConfigSrvSpec.ClusterSpecList = clusterSpecList diff --git a/api/v1/mdb/zz_generated.deepcopy.go b/api/v1/mdb/zz_generated.deepcopy.go index f1e25ee2c..b37e5683a 100644 --- a/api/v1/mdb/zz_generated.deepcopy.go +++ b/api/v1/mdb/zz_generated.deepcopy.go @@ -862,6 +862,13 @@ func (in *MongoDbSpec) DeepCopyInto(out *MongoDbSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ClusterSpecList != nil { + in, out := &in.ClusterSpecList, &out.ClusterSpecList + *out = make(ClusterSpecList, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MongoDbSpec. diff --git a/config/crd/bases/mongodb.com_mongodb.yaml b/config/crd/bases/mongodb.com_mongodb.yaml index d421d8837..7c275aef0 100644 --- a/config/crd/bases/mongodb.com_mongodb.yaml +++ b/config/crd/bases/mongodb.com_mongodb.yaml @@ -396,6 +396,146 @@ spec: clusterDomain: format: hostname type: string + clusterSpecList: + items: + description: |- + ClusterSpecItem is the mongodb multi-cluster spec that is specific to a + particular Kubernetes cluster, this maps to the statefulset created in each cluster + properties: + clusterName: + description: |- + ClusterName is name of the cluster where the MongoDB Statefulset will be scheduled, the + name should have a one on one mapping with the service-account created in the central cluster + to talk to the workload clusters. + type: string + externalAccess: + description: ExternalAccessConfiguration provides external access + configuration for Multi-Cluster. + properties: + externalDomain: + description: An external domain that is used for exposing + MongoDB to the outside world. + type: string + externalService: + description: Provides a way to override the default (NodePort) + Service + properties: + annotations: + additionalProperties: + type: string + description: A map of annotations that shall be added + to the externally available Service. + type: object + spec: + description: A wrapper for the Service spec object. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + type: object + memberConfig: + description: MemberConfig allows to specify votes, priorities + and tags for each of the mongodb process. + items: + properties: + priority: + type: string + tags: + additionalProperties: + type: string + type: object + votes: + type: integer + type: object + type: array + members: + description: Amount of members for this MongoDB Replica Set + type: integer + podSpec: + properties: + persistence: + description: Note, that this field is used by MongoDB resources + only, let's keep it here for simplicity + properties: + multiple: + properties: + data: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + journal: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + logs: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + single: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + podTemplate: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + service: + description: this is an optional service, it will get the name + "-service" in case not provided + type: string + statefulSet: + description: |- + StatefulSetConfiguration holds the optional custom StatefulSet + that should be merged into the operator created one. + properties: + metadata: + description: StatefulSetMetadataWrapper is a wrapper around + Labels and Annotations + properties: + annotations: + additionalProperties: + type: string + type: object + labels: + additionalProperties: + type: string + type: object + type: object + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - spec + type: object + required: + - members + type: object + type: array configServerCount: type: integer configSrv: diff --git a/controllers/om/agent.go b/controllers/om/agent.go index 858b9fc33..c0e03ffe4 100644 --- a/controllers/om/agent.go +++ b/controllers/om/agent.go @@ -47,7 +47,7 @@ func (agent AgentStatus) IsRegistered(hostnamePrefix string, log *zap.SugaredLog return false } -// Results are needed to fulfil the Paginated interface +// Results are needed to fulfil the Paginated interface. func (aar AutomationAgentStatusResponse) Results() []interface{} { ans := make([]interface{}, len(aar.AutomationAgents)) for i, aa := range aar.AutomationAgents { diff --git a/controllers/operator/authentication_test.go b/controllers/operator/authentication_test.go index d467b4dac..ffbabb019 100644 --- a/controllers/operator/authentication_test.go +++ b/controllers/operator/authentication_test.go @@ -47,7 +47,7 @@ func TestX509CanBeEnabled_WhenThereAreOnlyTlsDeployments_ReplicaSet(t *testing.T addKubernetesTlsResources(ctx, kubeClient, rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) } @@ -57,7 +57,7 @@ func TestX509ClusterAuthentication_CanBeEnabled_IfX509AuthenticationIsEnabled_Re kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) addKubernetesTlsResources(ctx, kubeClient, rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) } @@ -90,7 +90,7 @@ func TestUpdateOmAuthentication_NoAuthenticationEnabled(t *testing.T) { processNames := []string{"my-rs-0", "my-rs-1", "my-rs-2"} kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) r.updateOmAuthentication(ctx, conn, processNames, rs, "", "", "", false, zap.S()) ac, _ := conn.ReadAutomationConfig() @@ -111,7 +111,7 @@ func TestUpdateOmAuthentication_EnableX509_TlsNotEnabled(t *testing.T) { rs.Spec.Security.TLSConfig.Enabled = true kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) status, isMultiStageReconciliation := r.updateOmAuthentication(ctx, conn, []string{"my-rs-0", "my-rs-1", "my-rs-2"}, rs, "", "", "", false, zap.S()) assert.True(t, status.IsOK(), "configuring both options at once should not result in a failed status") @@ -123,7 +123,7 @@ func TestUpdateOmAuthentication_EnableX509_WithTlsAlreadyEnabled(t *testing.T) { rs := DefaultReplicaSetBuilder().SetName("my-rs").SetMembers(3).EnableTLS().Build() omConnectionFactory := om.NewCachedOMConnectionFactoryWithInitializedConnection(om.NewMockedOmConnection(deployment.CreateFromReplicaSet("fake-mongoDBImage", false, rs))) kubeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) status, isMultiStageReconciliation := r.updateOmAuthentication(ctx, omConnectionFactory.GetConnection(), []string{"my-rs-0", "my-rs-1", "my-rs-2"}, rs, "", "", "", false, zap.S()) assert.True(t, status.IsOK(), "configuring x509 when tls has already been enabled should not result in a failed status") @@ -138,7 +138,7 @@ func TestUpdateOmAuthentication_AuthenticationIsNotConfigured_IfAuthIsNotSet(t * omConnectionFactory := om.NewCachedOMConnectionFactoryWithInitializedConnection(om.NewMockedOmConnection(deployment.CreateFromReplicaSet("fake-mongoDBImage", false, rs))) kubeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) status, _ := r.updateOmAuthentication(ctx, omConnectionFactory.GetConnection(), []string{"my-rs-0", "my-rs-1", "my-rs-2"}, rs, "", "", "", false, zap.S()) assert.True(t, status.IsOK(), "no authentication should have been configured") @@ -161,7 +161,7 @@ func TestUpdateOmAuthentication_DoesNotDisableAuth_IfAuthIsNotSet(t *testing.T) Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, kubeClient, rs) @@ -174,7 +174,7 @@ func TestUpdateOmAuthentication_DoesNotDisableAuth_IfAuthIsNotSet(t *testing.T) rs.Spec.Security.Authentication = nil - reconciler = newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler = newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) @@ -196,7 +196,7 @@ func TestCanConfigureAuthenticationDisabled_WithNoModes(t *testing.T) { Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, kubeClient, rs) @@ -208,7 +208,7 @@ func TestUpdateOmAuthentication_EnableX509_FromEmptyDeployment(t *testing.T) { rs := DefaultReplicaSetBuilder().SetName("my-rs").SetMembers(3).EnableTLS().EnableAuth().EnableX509().Build() omConnectionFactory := om.NewCachedOMConnectionFactoryWithInitializedConnection(om.NewMockedOmConnection(om.NewDeployment())) kubeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) secretName := util.AgentSecretName createAgentCSRs(t, ctx, r.client, secretName, certsv1.CertificateApproved) @@ -229,7 +229,7 @@ func TestX509AgentUserIsCorrectlyConfigured(t *testing.T) { // configure x509/tls resources addKubernetesTlsResources(ctx, kubeClient, rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) @@ -265,7 +265,7 @@ func TestScramAgentUserIsCorrectlyConfigured(t *testing.T) { assert.NoError(t, err) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) @@ -295,7 +295,7 @@ func TestScramAgentUser_IsNotOverridden(t *testing.T) { } }) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, kubeClient) @@ -314,7 +314,7 @@ func TestX509InternalClusterAuthentication_CanBeEnabledWithScram_ReplicaSet(t *t Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, r.client, rs) checkReconcileSuccessful(ctx, t, r, rs, kubeClient) @@ -367,7 +367,7 @@ func TestConfigureLdapDeploymentAuthentication_WithScramAgentAuthentication(t *t Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) data := map[string]string{ "password": "LITZTOd6YiCV8j", } @@ -424,7 +424,7 @@ func TestConfigureLdapDeploymentAuthentication_WithCustomRole(t *testing.T) { Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) data := map[string]string{ "password": "LITZTOd6YiCV8j", } @@ -478,7 +478,7 @@ func TestConfigureLdapDeploymentAuthentication_WithAuthzQueryTemplate_AndUserToD Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + r := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) data := map[string]string{ "password": "LITZTOd6YiCV8j", } @@ -741,7 +741,7 @@ func TestInvalidPEM_SecretDoesNotContainKey(t *testing.T) { Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, kubeClient, rs) // Replace the secret with an empty one @@ -771,7 +771,7 @@ func Test_NoAdditionalDomainsPresent(t *testing.T) { rs.Spec.Security.TLSConfig.AdditionalCertificateDomains = []string{"foo"} kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, kubeClient, rs) certSecret := &corev1.Secret{} @@ -797,7 +797,7 @@ func Test_NoExternalDomainPresent(t *testing.T) { rs.Spec.ExternalAccessConfiguration = &mdbv1.ExternalAccessConfiguration{ExternalDomain: ptr.To("foo")} kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) addKubernetesTlsResources(ctx, kubeClient, rs) secret := &corev1.Secret{} diff --git a/controllers/operator/common_controller.go b/controllers/operator/common_controller.go index e76cf4e81..88317ce50 100644 --- a/controllers/operator/common_controller.go +++ b/controllers/operator/common_controller.go @@ -299,6 +299,25 @@ func checkIfHasExcessProcesses(conn om.Connection, resourceName string, log *zap return workflow.Pending("cannot have more than 1 MongoDB Cluster per project (see https://docs.mongodb.com/kubernetes-operator/stable/tutorial/migrate-to-single-resource/)") } +// getReplicaSetProcessIdsFromDeployment extracts replica set member IDs from the OM deployment. +// This is used to preserve stable member IDs across reconciliations in multi-cluster deployments, +// preventing ID conflicts when clusters scale independently. +// Returns a map of process name to replica set member ID (e.g., {"my-rs-0-0": 0, "my-rs-1-0": 1}). +func getReplicaSetProcessIdsFromDeployment(replicaSetName string, deployment om.Deployment) map[string]int { + processIds := map[string]int{} + + replicaSet := deployment.GetReplicaSetByName(replicaSetName) + if replicaSet == nil { + return map[string]int{} + } + + for _, m := range replicaSet.Members() { + processIds[m.Name()] = m.Id() + } + + return processIds +} + // validateInternalClusterCertsAndCheckTLSType verifies that all the x509 internal cluster certs exist and return whether they are built following the kubernetes.io/tls secret type (tls.crt/tls.key entries). // TODO: this is almost the same as certs.EnsureSSLCertsForStatefulSet, we should centralize the functionality func (r *ReconcileCommonController) validateInternalClusterCertsAndCheckTLSType(ctx context.Context, configurator certs.X509CertConfigurator, opts certs.Options, log *zap.SugaredLogger) error { diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 386ce8a6b..953b82be6 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -728,7 +728,7 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte return err } - processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment) + processIds := getReplicaSetProcessIdsFromDeployment(mrs.Name, existingDeployment) // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation // A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified. @@ -795,21 +795,6 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte return nil } -func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om.Deployment) map[string]int { - processIds := map[string]int{} - - replicaSet := deployment.GetReplicaSetByName(replicaSetName) - if replicaSet == nil { - return map[string]int{} - } - - for _, m := range replicaSet.Members() { - processIds[m.Name()] = m.Id() - } - - return processIds -} - func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) (map[string]int, error) { if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok { processIds := make(map[string]map[string]int) diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index afac5a143..a5777f942 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -2,6 +2,7 @@ package operator import ( "context" + "encoding/json" "fmt" "go.uber.org/zap" @@ -11,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" @@ -36,11 +38,14 @@ import ( "github.com/mongodb/mongodb-kubernetes/controllers/operator/certs" "github.com/mongodb/mongodb-kubernetes/controllers/operator/connection" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct" + "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers" + "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers/interfaces" "github.com/mongodb/mongodb-kubernetes/controllers/operator/controlledfeature" "github.com/mongodb/mongodb-kubernetes/controllers/operator/create" enterprisepem "github.com/mongodb/mongodb-kubernetes/controllers/operator/pem" "github.com/mongodb/mongodb-kubernetes/controllers/operator/project" "github.com/mongodb/mongodb-kubernetes/controllers/operator/recovery" + "github.com/mongodb/mongodb-kubernetes/controllers/operator/secrets" "github.com/mongodb/mongodb-kubernetes/controllers/operator/watch" "github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow" "github.com/mongodb/mongodb-kubernetes/controllers/searchcontroller" @@ -54,6 +59,7 @@ import ( "github.com/mongodb/mongodb-kubernetes/pkg/dns" "github.com/mongodb/mongodb-kubernetes/pkg/images" "github.com/mongodb/mongodb-kubernetes/pkg/kube" + "github.com/mongodb/mongodb-kubernetes/pkg/multicluster" "github.com/mongodb/mongodb-kubernetes/pkg/statefulset" "github.com/mongodb/mongodb-kubernetes/pkg/util" "github.com/mongodb/mongodb-kubernetes/pkg/util/architectures" @@ -70,6 +76,7 @@ import ( type ReconcileMongoDbReplicaSet struct { *ReconcileCommonController omConnectionFactory om.ConnectionFactory + memberClustersMap map[string]client.Client imageUrls images.ImageUrls forceEnterprise bool enableClusterMongoDBRoles bool @@ -78,9 +85,18 @@ type ReconcileMongoDbReplicaSet struct { databaseNonStaticImageVersion string } -type replicaSetDeploymentState struct { - LastAchievedSpec *mdbv1.MongoDbSpec - LastReconcileMemberCount int +// ReplicaSetDeploymentState represents the state that is persisted between reconciliations. +type ReplicaSetDeploymentState struct { + // What the spec looked like when we last reached Running state + LastAchievedSpec *mdbv1.MongoDbSpec + + // Each cluster gets a stable index for StatefulSet naming (e.g. {"cluster-a": 0, "cluster-b": 1}). + // These indexes stick around forever, even when clusters come and go. + ClusterMapping map[string]int + + // Tracks replica count per cluster from last reconciliation (e.g. {"cluster-a": 3, "cluster-b": 5}). + // We compare this to the current desired state to detect scale-downs and trigger proper MongoDB cleanup. + LastAppliedMemberSpec map[string]int } var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} @@ -89,9 +105,10 @@ var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} // This object is NOT shared between reconcile invocations. type ReplicaSetReconcilerHelper struct { resource *mdbv1.MongoDB - deploymentState *replicaSetDeploymentState + deploymentState *ReplicaSetDeploymentState reconciler *ReconcileMongoDbReplicaSet log *zap.SugaredLogger + MemberClusters []multicluster.MemberCluster } func (r *ReconcileMongoDbReplicaSet) newReconcilerHelper( @@ -113,21 +130,93 @@ func (r *ReconcileMongoDbReplicaSet) newReconcilerHelper( } // readState abstract reading the state of the resource that we store on the cluster between reconciliations. -func (r *ReplicaSetReconcilerHelper) readState() (*replicaSetDeploymentState, error) { +func (r *ReplicaSetReconcilerHelper) readState() (*ReplicaSetDeploymentState, error) { // Try to get the last achieved spec from annotations and store it in state lastAchievedSpec, err := r.resource.GetLastSpec() if err != nil { return nil, err } - // Read current member count from Status once at initialization. This provides a stable view throughout - // reconciliation and prepares for eventually storing this in ConfigMap state instead of ephemeral status. - lastReconcileMemberCount := r.resource.Status.Members + state := &ReplicaSetDeploymentState{ + LastAchievedSpec: lastAchievedSpec, + ClusterMapping: map[string]int{}, + LastAppliedMemberSpec: map[string]int{}, + } + + // Try to read ClusterMapping from annotation + if clusterMappingStr := annotations.GetAnnotation(r.resource, util.ClusterMappingAnnotation); clusterMappingStr != "" { + if err := json.Unmarshal([]byte(clusterMappingStr), &state.ClusterMapping); err != nil { + r.log.Warnf("Failed to unmarshal ClusterMapping annotation: %v", err) + } + } + + // Try to read LastAppliedMemberSpec from annotation + if lastAppliedMemberSpecStr := annotations.GetAnnotation(r.resource, util.LastAppliedMemberSpecAnnotation); lastAppliedMemberSpecStr != "" { + if err := json.Unmarshal([]byte(lastAppliedMemberSpecStr), &state.LastAppliedMemberSpec); err != nil { + r.log.Warnf("Failed to unmarshal LastAppliedMemberSpec annotation: %v", err) + } + } + + // MIGRATION: If LastAppliedMemberSpec is empty, initialize from Status.Members + // This ensures backward compatibility with existing single-cluster deployments + // For multi-cluster, leave empty - it will be initialized during initializeMemberClusters + if len(state.LastAppliedMemberSpec) == 0 && !r.resource.Spec.IsMultiCluster() { + state.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName] = r.resource.Status.Members + r.log.Debugf("Initialized LastAppliedMemberSpec from Status.Members for single-cluster: %d", r.resource.Status.Members) + } + + return state, nil +} + +// writeClusterMapping writes the ClusterMapping and LastAppliedMemberSpec annotations. +// This should be called on EVERY reconciliation (Pending, Failed, Running) to maintain accurate state. +func (r *ReplicaSetReconcilerHelper) writeClusterMapping(ctx context.Context) error { + clusterMappingBytes, err := json.Marshal(r.deploymentState.ClusterMapping) + if err != nil { + return xerrors.Errorf("failed to marshal ClusterMapping: %w", err) + } + + lastAppliedMemberSpecBytes, err := json.Marshal(r.deploymentState.LastAppliedMemberSpec) + if err != nil { + return xerrors.Errorf("failed to marshal LastAppliedMemberSpec: %w", err) + } + + annotationsToAdd := map[string]string{ + util.ClusterMappingAnnotation: string(clusterMappingBytes), + util.LastAppliedMemberSpecAnnotation: string(lastAppliedMemberSpecBytes), + } + + if err := annotations.SetAnnotations(ctx, r.resource, annotationsToAdd, r.reconciler.client); err != nil { + return err + } + + r.log.Debugf("Successfully wrote ClusterMapping=%v and LastAppliedMemberSpec=%v for ReplicaSet %s/%s", + r.deploymentState.ClusterMapping, r.deploymentState.LastAppliedMemberSpec, r.resource.Namespace, r.resource.Name) + return nil +} + +// writeLastAchievedSpec writes the lastAchievedSpec and vault annotations to the resource. +// This should ONLY be called on successful reconciliation when the deployment reaches Running state. +// To avoid posting twice to the API server, we include the vault annotations here. +func (r *ReplicaSetReconcilerHelper) writeLastAchievedSpec(ctx context.Context, vaultAnnotations map[string]string) error { + // Get lastAchievedSpec annotation + annotationsToAdd, err := getAnnotationsForResource(r.resource) + if err != nil { + return err + } - return &replicaSetDeploymentState{ - LastAchievedSpec: lastAchievedSpec, - LastReconcileMemberCount: lastReconcileMemberCount, - }, nil + // Merge vault annotations + for k, val := range vaultAnnotations { + annotationsToAdd[k] = val + } + + // Write to CR + if err := annotations.SetAnnotations(ctx, r.resource, annotationsToAdd, r.reconciler.client); err != nil { + return err + } + + r.log.Debugf("Successfully wrote lastAchievedSpec and vault annotations for ReplicaSet %s/%s", r.resource.Namespace, r.resource.Name) + return nil } // getVaultAnnotations gets vault secret version annotations to write to the CR. @@ -158,14 +247,123 @@ func (r *ReplicaSetReconcilerHelper) initialize(ctx context.Context) error { return xerrors.Errorf("failed to initialize replica set state: %w", err) } r.deploymentState = state + + // Initialize member clusters for multi-cluster support + if err := r.initializeMemberClusters(r.reconciler.memberClustersMap); err != nil { + return xerrors.Errorf("failed to initialize member clusters: %w", err) + } + return nil } -// updateStatus is a pass-through method that calls the reconciler updateStatus. -// In the future (multi-cluster epic), this will be enhanced to write deployment state to ConfigMap after every status -// update (similar to sharded cluster pattern), but for now it just delegates to maintain the same architecture. +// initializeMemberClusters initializes the MemberClusters field with an ordered list +// of member clusters to iterate over during reconciliation. +// +// For single-cluster mode: +// - Creates a single "legacy" member cluster using __default cluster name +// - Uses ClusterMapping[__default] (or Status.Members as fallback) for replica count +// - Sets Legacy=true to use old naming conventions (no cluster index in names) +// +// For multi-cluster mode: +// - Updates ClusterMapping to assign stable indexes for new clusters +// - Creates member clusters from ClusterSpecList using createMemberClusterListFromClusterSpecList +// - Includes removed clusters (in ClusterMapping but not in spec) with replicas > 0 +// - Returns Active=true for current clusters, Active=false for removed clusters +// - Returns Healthy=true for reachable clusters, Healthy=false for unreachable clusters +func (r *ReplicaSetReconcilerHelper) initializeMemberClusters( + globalMemberClustersMap map[string]client.Client, +) error { + rs := r.resource + + if rs.Spec.IsMultiCluster() { + // === Multi-Cluster Mode === + + // Validation + if !multicluster.IsMemberClusterMapInitializedForMultiCluster(globalMemberClustersMap) { + return xerrors.Errorf("member clusters must be initialized for MultiCluster topology") + } + if len(rs.Spec.ClusterSpecList) == 0 { + return xerrors.Errorf("clusterSpecList must be non-empty for MultiCluster topology") + } + + // 1. Update ClusterMapping to assign stable indexes + clusterNames := []string{} + for _, item := range rs.Spec.ClusterSpecList { + clusterNames = append(clusterNames, item.ClusterName) + } + r.deploymentState.ClusterMapping = multicluster.AssignIndexesForMemberClusterNames( + r.deploymentState.ClusterMapping, + clusterNames, + ) + + // 2. Define callback to get last applied member count from LastAppliedMemberSpec + getLastAppliedMemberCountFunc := func(memberClusterName string) int { + if count, ok := r.deploymentState.LastAppliedMemberSpec[memberClusterName]; ok { + return count + } + return 0 + } + + // 3. Create member cluster list using existing utility function + // This function handles: + // - Creating MemberCluster objects with proper clients and indexes + // - Including removed clusters (not in spec but in ClusterMapping) if replicas > 0 + // - Marking unhealthy clusters (no client available) as Healthy=false + // - Sorting by index for deterministic ordering + // TODO: all our multi cluster controllers rely on createMemberClusterListFromClusterSpecList, we should unit test it + r.MemberClusters = createMemberClusterListFromClusterSpecList( + rs.Spec.ClusterSpecList, + globalMemberClustersMap, + r.log, + r.deploymentState.ClusterMapping, + getLastAppliedMemberCountFunc, + false, // legacyMemberCluster - use new naming with cluster index + ) + + } else { + // === Single-Cluster Mode === + + // Get last applied member count from LastAppliedMemberSpec with fallback to Status.Members + // This ensures backward compatibility with deployments created before LastAppliedMemberSpec + memberCount, ok := r.deploymentState.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName] + if !ok || memberCount == 0 { + memberCount = rs.Status.Members + } + + // Create single legacy member cluster which + r.MemberClusters = []multicluster.MemberCluster{ + multicluster.GetLegacyCentralMemberCluster( + memberCount, + 0, // index always 0 for single cluster + r.reconciler.client, + r.reconciler.SecretClient, + ), + } + } + + r.log.Debugf("Initialized member cluster list: %+v", util.Transform(r.MemberClusters, func(m multicluster.MemberCluster) string { + return fmt.Sprintf("{Name: %s, Index: %d, Replicas: %d, Active: %t, Healthy: %t}", m.Name, m.Index, m.Replicas, m.Active, m.Healthy) + })) + + return nil +} + +// updateStatus updates the status and writes ClusterMapping on every reconciliation. +// ClusterMapping tracks the current member count per cluster and must be updated on every status change +// (Pending, Failed, Running) to maintain accurate state for scale operations and multi-cluster coordination. func (r *ReplicaSetReconcilerHelper) updateStatus(ctx context.Context, status workflow.Status, statusOptions ...mdbstatus.Option) (reconcile.Result, error) { - return r.reconciler.updateStatus(ctx, r.resource, status, r.log, statusOptions...) + // First update the status + result, err := r.reconciler.updateStatus(ctx, r.resource, status, r.log, statusOptions...) + if err != nil { + return result, err + } + + // Write ClusterMapping after every status update to track current deployment state + if err := r.writeClusterMapping(ctx); err != nil { + return result, err + } + + return result, nil } // Reconcile performs the full reconciliation logic for a replica set. @@ -185,10 +383,13 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R log.Infow("ReplicaSet.Spec", "spec", rs.Spec, "desiredReplicas", scale.ReplicasThisReconciliation(rs), "isScaling", scale.IsStillScaling(rs)) log.Infow("ReplicaSet.Status", "status", rs.Status) + // TODO: adapt validations to multi cluster if err := rs.ProcessValidationsOnReconcile(nil); err != nil { return r.updateStatus(ctx, workflow.Invalid("%s", err.Error())) } + // TODO: add something similar to blockNonEmptyClusterSpecItemRemoval + projectConfig, credsConfig, err := project.ReadConfigAndCredentials(ctx, reconciler.client, reconciler.SecretClient, rs, log) if err != nil { return r.updateStatus(ctx, workflow.Failed(err)) @@ -250,8 +451,14 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R return r.updateStatus(ctx, workflow.Failed(xerrors.Errorf("failed to get agent auth mode: %w", err))) } - // Check if we need to prepare for scale-down - if scale.ReplicasThisReconciliation(rs) < r.deploymentState.LastReconcileMemberCount { + // Check if we need to prepare for scale-down by comparing total current vs previous member count + previousTotalMembers := 0 + for _, count := range r.deploymentState.LastAppliedMemberSpec { + previousTotalMembers += count + } + currentTotalMembers := r.calculateTotalMembers() + + if currentTotalMembers < previousTotalMembers { if err := replicaset.PrepareScaleDownFromMongoDB(conn, rs, log); err != nil { return r.updateStatus(ctx, workflow.Failed(xerrors.Errorf("failed to prepare Replica Set for scaling down using Ops Manager: %w", err))) } @@ -274,7 +481,7 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R // See CLOUDP-189433 and CLOUDP-229222 for more details. if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) - automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, r.deploymentState.LastReconcileMemberCount, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("failed to create/update (Ops Manager reconciliation phase):") + automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, previousTotalMembers, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("failed to create/update (Ops Manager reconciliation phase):") reconcileStatus := r.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) if !reconcileStatus.IsOK() { log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) @@ -288,7 +495,7 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, reconciler.client, *rs, r.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { - return r.updateOmDeploymentRs(ctx, conn, r.deploymentState.LastReconcileMemberCount, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("failed to create/update (Ops Manager reconciliation phase):") + return r.updateOmDeploymentRs(ctx, conn, previousTotalMembers, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("failed to create/update (Ops Manager reconciliation phase):") }, func() workflow.Status { return r.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) @@ -299,35 +506,37 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R } // === 6. Final steps - if scale.IsStillScaling(rs) { - return r.updateStatus(ctx, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), mdbstatus.MembersOption(rs)) - } - - // Get lastspec, vault annotations when needed and write them to the resource. - // These operations should only be performed on successful reconciliations. - // The state of replica sets is currently split between the annotations and the member count in status. Both should - // be migrated to config maps - annotationsToAdd, err := getAnnotationsForResource(r.resource) - if err != nil { - return r.updateStatus(ctx, workflow.Failed(xerrors.Errorf("could not get resource annotations: %w", err))) - } - - for k, val := range r.getVaultAnnotations() { - annotationsToAdd[k] = val + // Check if any cluster is still scaling + if r.shouldContinueScaling() { + // Calculate total target members across all clusters + totalTargetMembers := 0 + for _, memberCluster := range r.MemberClusters { + totalTargetMembers += r.GetReplicaSetScaler(memberCluster).TargetReplicas() + } + currentTotalMembers := r.calculateTotalMembers() + return r.updateStatus(ctx, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), totalTargetMembers, currentTotalMembers), mdbstatus.MembersOption(rs)) } - if err := annotations.SetAnnotations(ctx, r.resource, annotationsToAdd, r.reconciler.client); err != nil { - return r.updateStatus(ctx, workflow.Failed(xerrors.Errorf("could not update resource annotations: %w", err))) + // Write lastAchievedSpec and vault annotations ONLY on successful reconciliation when reaching Running state. + // ClusterMapping is already written in updateStatus() for every reconciliation. + if err := r.writeLastAchievedSpec(ctx, r.getVaultAnnotations()); err != nil { + return r.updateStatus(ctx, workflow.Failed(xerrors.Errorf("failed to write lastAchievedSpec and vault annotations: %w", err))) } log.Infof("Finished reconciliation for MongoDbReplicaSet! %s", completionMessage(conn.BaseURL(), conn.GroupID())) return r.updateStatus(ctx, workflow.OK(), mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) } -func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { +func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, memberClusterMap map[string]client.Client, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { + // Initialize member cluster map for single-cluster mode (like ShardedCluster does) + // This ensures that even in single-cluster deployments, we have a __default member cluster + // This allows the same reconciliation logic to work for both single and multi-cluster topologies + memberClusterMap = multicluster.InitializeGlobalMemberClusterMapForSingleCluster(memberClusterMap, kubeClient) + return &ReconcileMongoDbReplicaSet{ ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), omConnectionFactory: omFunc, + memberClustersMap: memberClusterMap, imageUrls: imageUrls, forceEnterprise: forceEnterprise, enableClusterMongoDBRoles: enableClusterMongoDBRoles, @@ -469,6 +678,31 @@ func (r *ReplicaSetReconcilerHelper) reconcileHostnameOverrideConfigMap(ctx cont return nil } +// replicateAgentKeySecret ensures the agent API key secret exists in all healthy member clusters. +// This is required for multi-cluster deployments where agents in member clusters need to authenticate with Ops Manager. +func (r *ReplicaSetReconcilerHelper) replicateAgentKeySecret(ctx context.Context, conn om.Connection, log *zap.SugaredLogger) error { + rs := r.resource + reconciler := r.reconciler + + for _, memberCluster := range r.MemberClusters { + // Skip legacy (single-cluster) and unhealthy clusters + if memberCluster.Legacy || !memberCluster.Healthy { + continue + } + + var databaseSecretPath string + if reconciler.VaultClient != nil { + databaseSecretPath = reconciler.VaultClient.DatabaseSecretPath() + } + + if _, err := agents.EnsureAgentKeySecretExists(ctx, memberCluster.SecretClient, conn, rs.Namespace, "", conn.GroupID(), databaseSecretPath, log); err != nil { + return xerrors.Errorf("failed to ensure agent key secret in member cluster %s: %w", memberCluster.Name, err) + } + log.Debugf("Successfully synced agent API key secret to member cluster %s", memberCluster.Name) + } + return nil +} + // reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc. // All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset // should be reconciled in this method. @@ -487,27 +721,67 @@ func (r *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex return status } - return r.reconcileStatefulSet(ctx, conn, projectConfig, deploymentOptions) + // Replicate agent API key to all healthy member clusters upfront + if err := r.replicateAgentKeySecret(ctx, conn, log); err != nil { + return workflow.Failed(xerrors.Errorf("failed to replicate agent key secret: %w", err)) + } + + return r.reconcileStatefulSets(ctx, conn, projectConfig, deploymentOptions) +} + +func (r *ReplicaSetReconcilerHelper) reconcileStatefulSets(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { + for _, memberCluster := range r.MemberClusters { + if status := r.reconcileStatefulSet(ctx, conn, memberCluster.Client, memberCluster.SecretClient, projectConfig, deploymentOptions, memberCluster); !status.IsOK() { + return status + } + + // Update LastAppliedMemberSpec with current replica count for this cluster + // This will be persisted to annotations and used in the next reconciliation + scaler := r.GetReplicaSetScaler(memberCluster) + currentReplicas := scale.ReplicasThisReconciliation(scaler) + if memberCluster.Legacy { + r.deploymentState.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName] = currentReplicas + } else { + r.deploymentState.LastAppliedMemberSpec[memberCluster.Name] = currentReplicas + } + } + return workflow.OK() } -func (r *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { +func (r *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, conn om.Connection, client kubernetesClient.Client, secretClient secrets.SecretClient, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, memberCluster multicluster.MemberCluster) workflow.Status { rs := r.resource reconciler := r.reconciler log := r.log - certConfigurator := certs.ReplicaSetX509CertConfigurator{MongoDB: rs, SecretClient: reconciler.SecretClient} + certConfigurator := certs.ReplicaSetX509CertConfigurator{MongoDB: rs, SecretClient: secretClient} status := reconciler.ensureX509SecretAndCheckTLSType(ctx, certConfigurator, deploymentOptions.currentAgentAuthMode, log) if !status.IsOK() { return status } - status = certs.EnsureSSLCertsForStatefulSet(ctx, reconciler.SecretClient, reconciler.SecretClient, *rs.Spec.Security, certs.ReplicaSetConfig(*rs), log) + status = certs.EnsureSSLCertsForStatefulSet(ctx, reconciler.SecretClient, secretClient, *rs.Spec.Security, certs.ReplicaSetConfig(*rs), log) if !status.IsOK() { return status } + // Copy CA ConfigMap from central cluster to member cluster if specified + // Only needed in multi-cluster mode; in Legacy mode, member cluster == central cluster + caConfigMapName := rs.Spec.Security.TLSConfig.CA + if caConfigMapName != "" && !memberCluster.Legacy { + cm, err := reconciler.client.GetConfigMap(ctx, kube.ObjectKey(rs.Namespace, caConfigMapName)) + if err != nil { + return workflow.Failed(xerrors.Errorf("expected CA ConfigMap not found on central cluster: %s", caConfigMapName)) + } + memberCm := configmap.Builder().SetName(caConfigMapName).SetNamespace(rs.Namespace).SetData(cm.Data).Build() + err = configmap.CreateOrUpdate(ctx, client, memberCm) + if err != nil && !errors.IsAlreadyExists(err) { + return workflow.Failed(xerrors.Errorf("failed to sync CA ConfigMap in member cluster, err: %w", err)) + } + log.Debugf("Successfully synced CA ConfigMap %s to member cluster", caConfigMapName) + } + // Build the replica set config - rsConfig, err := r.buildStatefulSetOptions(ctx, conn, projectConfig, deploymentOptions) + rsConfig, err := r.buildStatefulSetOptions(ctx, conn, projectConfig, deploymentOptions, memberCluster) if err != nil { return workflow.Failed(xerrors.Errorf("failed to build StatefulSet options: %w", err)) } @@ -515,17 +789,18 @@ func (r *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, c sts := construct.DatabaseStatefulSet(*rs, rsConfig, log) // Handle PVC resize if needed - if workflowStatus := r.handlePVCResize(ctx, &sts); !workflowStatus.IsOK() { + if workflowStatus := r.handlePVCResize(ctx, client, &sts); !workflowStatus.IsOK() { return workflowStatus } // Create or update the StatefulSet in Kubernetes - if err := create.DatabaseInKubernetes(ctx, reconciler.client, *rs, sts, rsConfig, log); err != nil { + if err := create.DatabaseInKubernetes(ctx, client, *rs, sts, rsConfig, log); err != nil { return workflow.Failed(xerrors.Errorf("failed to create/update (Kubernetes reconciliation phase): %w", err)) } // Check StatefulSet status - if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, reconciler.client); !status.IsOK() { + stsName := r.GetReplicaSetStsName(memberCluster) + if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, stsName, client); !status.IsOK() { return status } @@ -533,8 +808,8 @@ func (r *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, c return workflow.OK() } -func (r *ReplicaSetReconcilerHelper) handlePVCResize(ctx context.Context, sts *appsv1.StatefulSet) workflow.Status { - workflowStatus := create.HandlePVCResize(ctx, r.reconciler.client, sts, r.log) +func (r *ReplicaSetReconcilerHelper) handlePVCResize(ctx context.Context, client kubernetesClient.Client, sts *appsv1.StatefulSet) workflow.Status { + workflowStatus := create.HandlePVCResize(ctx, client, sts, r.log) if !workflowStatus.IsOK() { return workflowStatus } @@ -548,7 +823,7 @@ func (r *ReplicaSetReconcilerHelper) handlePVCResize(ctx context.Context, sts *a } // buildStatefulSetOptions creates the options needed for constructing the StatefulSet -func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { +func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, memberCluster multicluster.MemberCluster) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { rs := r.resource reconciler := r.reconciler log := r.log @@ -593,16 +868,203 @@ func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context WithDatabaseNonStaticImage(images.ContainerImage(reconciler.imageUrls, util.NonStaticDatabaseEnterpriseImage, reconciler.databaseNonStaticImageVersion)), WithAgentImage(images.ContainerImage(reconciler.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)), WithMongodbImage(images.GetOfficialImage(reconciler.imageUrls, rs.Spec.Version, rs.GetAnnotations())), + // Multi-cluster support: cluster-specific naming and replica count + StatefulSetNameOverride(r.GetReplicaSetStsName(memberCluster)), + ServiceName(r.GetReplicaSetServiceName(memberCluster)), + Replicas(scale.ReplicasThisReconciliation(r.GetReplicaSetScaler(memberCluster))), ) return rsConfig, nil } +// getClusterSpecList returns the ClusterSpecList or creates a synthetic one for single-cluster mode. +// This is critical for backward compatibility - without it, the scaler would return TargetReplicas=0 +// for existing single-cluster deployments that don't have ClusterSpecList configured. +func (r *ReplicaSetReconcilerHelper) getClusterSpecList() mdbv1.ClusterSpecList { + rs := r.resource + if rs.Spec.IsMultiCluster() { + return rs.Spec.ClusterSpecList + } + + // Single-cluster mode: Create synthetic ClusterSpecList + // This ensures the scaler returns Spec.Members as the target replica count + return mdbv1.ClusterSpecList{ + { + ClusterName: multicluster.LegacyCentralClusterName, + Members: rs.Spec.Members, + }, + } +} + +// GetReplicaSetStsName returns the StatefulSet name for a member cluster. +// For Legacy mode (single-cluster), it returns the resource name without cluster index. +// For multi-cluster mode, it returns the name with cluster index suffix. +func (r *ReplicaSetReconcilerHelper) GetReplicaSetStsName(memberCluster multicluster.MemberCluster) string { + if memberCluster.Legacy { + return r.resource.Name + } + return dns.GetMultiStatefulSetName(r.resource.Name, memberCluster.Index) +} + +// GetReplicaSetServiceName returns the service name for a member cluster. +// For Legacy mode (single-cluster), it uses the existing ServiceName() method. +// For multi-cluster mode, it returns the name with cluster index suffix. +func (r *ReplicaSetReconcilerHelper) GetReplicaSetServiceName(memberCluster multicluster.MemberCluster) string { + if memberCluster.Legacy { + return r.resource.ServiceName() + } + return dns.GetMultiHeadlessServiceName(r.resource.Name, memberCluster.Index) +} + +// GetReplicaSetScaler returns a scaler for calculating replicas in a member cluster. +// Uses synthetic ClusterSpecList for single-cluster mode to ensure backward compatibility. +func (r *ReplicaSetReconcilerHelper) GetReplicaSetScaler(memberCluster multicluster.MemberCluster) interfaces.MultiClusterReplicaSetScaler { + return scalers.NewMultiClusterReplicaSetScaler( + "replicaset", + r.getClusterSpecList(), + memberCluster.Name, + memberCluster.Index, + r.MemberClusters) +} + +// calculateTotalMembers returns the total member count across all clusters. +func (r *ReplicaSetReconcilerHelper) calculateTotalMembers() int { + total := 0 + for _, memberCluster := range r.MemberClusters { + scaler := r.GetReplicaSetScaler(memberCluster) + total += scale.ReplicasThisReconciliation(scaler) + } + return total +} + +// shouldContinueScaling checks if any cluster is still in the process of scaling. +func (r *ReplicaSetReconcilerHelper) shouldContinueScaling() bool { + for _, memberCluster := range r.MemberClusters { + scaler := r.GetReplicaSetScaler(memberCluster) + if scale.ReplicasThisReconciliation(scaler) != scaler.TargetReplicas() { + return true + } + } + return false +} + +// ============================================================================ +// Multi-Cluster OM Registration Helpers +// ============================================================================ + +// buildReachableHostnames is used for agent registration and goal state checking +func (r *ReplicaSetReconcilerHelper) buildReachableHostnames() []string { + reachable := []string{} + for _, mc := range multicluster.GetHealthyMemberClusters(r.MemberClusters) { + memberCount := r.GetReplicaSetScaler(mc).DesiredReplicas() + if memberCount == 0 { + r.log.Debugf("Skipping cluster %s (0 members)", mc.Name) + continue + } + + hostnames := dns.GetMultiClusterProcessHostnames( + r.resource.Name, + r.resource.Namespace, + mc.Index, + memberCount, + r.resource.Spec.GetClusterDomain(), + r.resource.Spec.GetExternalDomainForMemberCluster(mc.Name), + ) + reachable = append(reachable, hostnames...) + } + return reachable +} + +// filterReachableProcessNames returns only process names from healthy clusters. Used when waiting for OM goal state +func (r *ReplicaSetReconcilerHelper) filterReachableProcessNames(allProcesses []om.Process) []string { + healthyProcessNames := make(map[string]bool) + for _, mc := range multicluster.GetHealthyMemberClusters(r.MemberClusters) { + memberCount := r.GetReplicaSetScaler(mc).DesiredReplicas() + for podNum := 0; podNum < memberCount; podNum++ { + processName := fmt.Sprintf("%s-%d-%d", r.resource.Name, mc.Index, podNum) + healthyProcessNames[processName] = true + } + } + + // Filter allProcesses to only include healthy ones + reachable := []string{} + for _, proc := range allProcesses { + if healthyProcessNames[proc.Name()] { + reachable = append(reachable, proc.Name()) + } + } + return reachable +} + +// buildMultiClusterProcesses creates OM processes for multi-cluster deployment. Returns processes with multi-cluster +// hostnames (e.g., my-rs-0-0, my-rs-1-0). +func (r *ReplicaSetReconcilerHelper) buildMultiClusterProcesses( + mongoDBImage string, + tlsCertPath string, +) ([]om.Process, error) { + processes := []om.Process{} + + for _, mc := range r.MemberClusters { + memberCount := r.GetReplicaSetScaler(mc).DesiredReplicas() + if memberCount == 0 { + r.log.Debugf("Skipping process creation for cluster %s (0 members)", mc.Name) + continue + } + + // Get hostnames for this cluster + hostnames := dns.GetMultiClusterProcessHostnames( + r.resource.Name, + r.resource.Namespace, + mc.Index, + memberCount, + r.resource.Spec.GetClusterDomain(), + r.resource.Spec.GetExternalDomainForMemberCluster(mc.Name), + ) + + // Create process for each hostname + // Process names follow pattern: -- + for podNum, hostname := range hostnames { + processName := fmt.Sprintf("%s-%d-%d", r.resource.Name, mc.Index, podNum) + proc, err := r.createProcessFromHostname(processName, hostname, mongoDBImage, tlsCertPath) + if err != nil { + return nil, xerrors.Errorf("failed to create process %s for hostname %s: %w", processName, hostname, err) + } + processes = append(processes, proc) + } + } + + return processes, nil +} + +// createProcessFromHostname creates a single OM process with correct configuration. +func (r *ReplicaSetReconcilerHelper) createProcessFromHostname( + name string, + hostname string, + mongoDBImage string, + tlsCertPath string, +) (om.Process, error) { + rs := r.resource + + proc := om.NewMongodProcess( + name, // process name (e.g., "my-rs-0-0") + hostname, // hostname for the process + mongoDBImage, + r.reconciler.forceEnterprise, + rs.Spec.GetAdditionalMongodConfig(), + &rs.Spec, + tlsCertPath, + rs.Annotations, + rs.CalculateFeatureCompatibilityVersion(), + ) + + return proc, nil +} + // AddReplicaSetController creates a new MongoDbReplicaset Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. -func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool) error { +func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, memberClustersMap map[string]cluster.Cluster) error { // Create a new controller - reconciler := newReplicaSetReconciler(ctx, mgr.GetClient(), imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles, om.NewOpsManagerConnection) + reconciler := newReplicaSetReconciler(ctx, mgr.GetClient(), imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles, multicluster.ClustersMapToClientMap(memberClustersMap), om.NewOpsManagerConnection) c, err := controller.New(util.MongoDbReplicaSetController, mgr, controller.Options{Reconciler: reconciler, MaxConcurrentReconciles: env.ReadIntOrDefault(util.MaxConcurrentReconcilesEnv, 1)}) // nolint:forbidigo if err != nil { return err @@ -672,11 +1134,49 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls return err } + if err := reconciler.configureMultiCluster(ctx, mgr, c, memberClustersMap); err != nil { + return xerrors.Errorf("failed to configure replica set controller in multi cluster mode: %w", err) + } + zap.S().Infof("Registered controller %s", util.MongoDbReplicaSetController) return nil } +func (r *ReconcileMongoDbReplicaSet) configureMultiCluster(ctx context.Context, mgr manager.Manager, c controller.Controller, memberClustersMap map[string]cluster.Cluster) error { + // TODO: Add cross-cluster StatefulSet watches for drift detection (like MongoDBMultiReplicaSet) + // This will enable automatic reconciliation when users manually modify StatefulSets in member clusters, based on the MongoDBMultiResourceAnnotation annotation + // for k, v := range memberClustersMap { + // err := c.Watch(source.Kind[client.Object](v.GetCache(), &appsv1.StatefulSet{}, &khandler.EnqueueRequestForOwnerMultiCluster{}, watch.PredicatesForMultiStatefulSet())) + // if err != nil { + // return xerrors.Errorf("failed to set Watch on member cluster: %s, err: %w", k, err) + // } + // } + + // TODO: Add member cluster health monitoring for automatic failover (like MongoDBMultiReplicaSet) + // Need to: + // - Start WatchMemberClusterHealth goroutine + // - Watch event channel for health changes + // - Modify memberwatch.WatchMemberClusterHealth to handle MongoDBReplicaSet (currently only handles MongoDBMultiCluster) + // + // eventChannel := make(chan event.GenericEvent) + // memberClusterHealthChecker := memberwatch.MemberClusterHealthChecker{Cache: make(map[string]*memberwatch.MemberHeathCheck)} + // go memberClusterHealthChecker.WatchMemberClusterHealth(ctx, zap.S(), eventChannel, r.client, memberClustersMap) + // err := c.Watch(source.Channel[client.Object](eventChannel, &handler.EnqueueRequestForObject{})) + + // TODO: Add ConfigMap watch for dynamic member list changes (like MongoDBMultiReplicaSet) + // This enables runtime updates to which clusters are part of the deployment + // err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.ConfigMap{}, + // watch.ConfigMapEventHandler{ + // ConfigMapName: util.MemberListConfigMapName, + // ConfigMapNamespace: env.ReadOrPanic(util.CurrentNamespace), + // }, + // predicate.ResourceVersionChangedPredicate{}, + // )) + + return nil +} + // updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated // to automation agents in containers func (r *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, tlsCertPath, internalClusterCertPath string, deploymentOptions deploymentOptionsRS, shouldMirrorKeyfileForMongot bool, isRecovering bool) workflow.Status { @@ -684,19 +1184,60 @@ func (r *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, c log := r.log reconciler := r.reconciler log.Debug("Entering UpdateOMDeployments") - // Only "concrete" RS members should be observed - // - if scaling down, let's observe only members that will remain after scale-down operation - // - if scaling up, observe only current members, because new ones might not exist yet - replicasTarget := scale.ReplicasThisReconciliation(rs) - err := agents.WaitForRsAgentsToRegisterByResource(rs, util_int.Min(membersNumberBefore, replicasTarget), conn, log) - if err != nil && !isRecovering { - return workflow.Failed(err) - } - caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) - replicaSet := replicaset.BuildFromMongoDBWithReplicas(reconciler.imageUrls[mcoConstruct.MongodbImageEnv], reconciler.forceEnterprise, rs, replicasTarget, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) - processNames := replicaSet.GetProcessNames() + var replicaSet om.ReplicaSetWithProcesses + var processNames []string + + if rs.Spec.IsMultiCluster() { + reachableHostnames := r.buildReachableHostnames() + + err := agents.WaitForRsAgentsToRegisterSpecifiedHostnames(conn, reachableHostnames, log) + if err != nil && !isRecovering { + return workflow.Failed(err) + } + + existingDeployment, err := conn.ReadDeployment() + if err != nil && !isRecovering { + return workflow.Failed(err) + } + + // We get the IDs from the deployment for stability + processIds := getReplicaSetProcessIdsFromDeployment(rs.Name, existingDeployment) + log.Debugf("Existing process IDs: %+v", processIds) + + processes, err := r.buildMultiClusterProcesses( + reconciler.imageUrls[mcoConstruct.MongodbImageEnv], + tlsCertPath, + ) + if err != nil && !isRecovering { + return workflow.Failed(xerrors.Errorf("failed to build multi-cluster processes: %w", err)) + } + + replicaSet = om.NewMultiClusterReplicaSetWithProcesses( + om.NewReplicaSet(rs.Name, rs.Spec.Version), + processes, + rs.Spec.GetMemberOptions(), + processIds, + rs.Spec.Connectivity, + ) + processNames = replicaSet.GetProcessNames() + + } else { + // Single cluster path + + // Only "concrete" RS members should be observed + // - if scaling down, let's observe only members that will remain after scale-down operation + // - if scaling up, observe only current members, because new ones might not exist yet + replicasTarget := r.calculateTotalMembers() + err := agents.WaitForRsAgentsToRegisterByResource(rs, util_int.Min(membersNumberBefore, replicasTarget), conn, log) + if err != nil && !isRecovering { + return workflow.Failed(err) + } + + replicaSet = replicaset.BuildFromMongoDBWithReplicas(reconciler.imageUrls[mcoConstruct.MongodbImageEnv], reconciler.forceEnterprise, rs, replicasTarget, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) + processNames = replicaSet.GetProcessNames() + } status, additionalReconciliationRequired := reconciler.updateOmAuthentication(ctx, conn, processNames, rs, deploymentOptions.agentCertPath, caFilePath, internalClusterCertPath, isRecovering, log) if !status.IsOK() && !isRecovering { @@ -732,7 +1273,14 @@ func (r *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, c return workflow.Failed(err) } - if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil { + // For multi-cluster, filter to only reachable processes (skip failed clusters) + processNamesToWaitFor := processNames + if rs.Spec.IsMultiCluster() { + processNamesToWaitFor = r.filterReachableProcessNames(replicaSet.Processes) + log.Debugf("Waiting for reachable processes: %+v", processNamesToWaitFor) + } + + if err := om.WaitForReadyState(conn, processNamesToWaitFor, isRecovering, log); err != nil { return workflow.Failed(err) } @@ -752,6 +1300,7 @@ func (r *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, c return workflow.Failed(err) } + // TODO: check if updateStatus usage is correct here if status := reconciler.ensureBackupConfigurationAndUpdateStatus(ctx, conn, rs, reconciler.SecretClient, log); !status.IsOK() && !isRecovering { return status } diff --git a/controllers/operator/mongodbreplicaset_controller_multi_test.go b/controllers/operator/mongodbreplicaset_controller_multi_test.go new file mode 100644 index 000000000..f97b6a861 --- /dev/null +++ b/controllers/operator/mongodbreplicaset_controller_multi_test.go @@ -0,0 +1,633 @@ +package operator + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb" + "github.com/mongodb/mongodb-kubernetes/api/v1/status" + "github.com/mongodb/mongodb-kubernetes/controllers/om" + "github.com/mongodb/mongodb-kubernetes/controllers/om/host" + "github.com/mongodb/mongodb-kubernetes/controllers/operator/mock" + kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client" + "github.com/mongodb/mongodb-kubernetes/pkg/kube" + "github.com/mongodb/mongodb-kubernetes/pkg/multicluster" + "github.com/mongodb/mongodb-kubernetes/pkg/util" +) + +func init() { + logger, _ := zap.NewDevelopment() + zap.ReplaceGlobals(logger) +} + +// multiClusters defines the cluster names used in multi-cluster tests +var multiClusters = []string{"cluster-0", "cluster-1", "cluster-2"} + +func TestCreateMultiClusterReplicaSet(t *testing.T) { + ctx := context.Background() + + clusterSpecList := mdbv1.ClusterSpecList{ + {ClusterName: "cluster-0", Members: 1}, + {ClusterName: "cluster-1", Members: 1}, + {ClusterName: "cluster-2", Members: 1}, + } + + rs := mdbv1.NewDefaultMultiReplicaSetBuilder(). + SetClusterSpecList(clusterSpecList). + Build() + + reconciler, kubeClient, memberClients, omConnectionFactory := defaultReplicaSetMultiClusterReconciler(ctx, rs) + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, kubeClient, false) + + // Verify StatefulSets exist in each member cluster + for i, clusterName := range multiClusters { + memberClient := memberClients[clusterName] + sts := appsv1.StatefulSet{} + stsName := fmt.Sprintf("%s-%d", rs.Name, i) + err := memberClient.Get(ctx, kube.ObjectKey(rs.Namespace, stsName), &sts) + require.NoError(t, err, "StatefulSet should exist in cluster %s", clusterName) + assert.Equal(t, int32(1), *sts.Spec.Replicas, "Replicas in %s", clusterName) + } + + // Verify OM automation config has all processes + processes := omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 3) +} + +// Helper functions below + +// checkReplicaSetReconcileSuccessful reconciles a ReplicaSet and verifies it completes without error. +// Use shouldRequeue=true when expecting the reconciler to requeue (e.g., during scaling operations). +// Use shouldRequeue=false when expecting successful completion with 24-hour requeue. +func checkReplicaSetReconcileSuccessful( + ctx context.Context, + t *testing.T, + reconciler reconcile.Reconciler, + rs *mdbv1.MongoDB, + client kubernetesClient.Client, + shouldRequeue bool, +) { + err := client.Update(ctx, rs) + assert.NoError(t, err) + + result, e := reconciler.Reconcile(ctx, requestFromObject(rs)) + assert.NoError(t, e) + + if shouldRequeue { + assert.True(t, result.Requeue || result.RequeueAfter > 0) + } else { + assert.Equal(t, reconcile.Result{RequeueAfter: util.TWENTY_FOUR_HOURS}, result) + } + + // Fetch the latest updates as the reconciliation loop can update the resource + err = client.Get(ctx, rs.ObjectKey(), rs) + assert.NoError(t, err) +} + +// getReplicaSetMultiClusterMap simulates multiple K8s clusters using fake clients +func getReplicaSetMultiClusterMap(omConnectionFactory *om.CachedOMConnectionFactory) map[string]client.Client { + clientMap := make(map[string]client.Client) + + for _, clusterName := range multiClusters { + fakeClientBuilder := mock.NewEmptyFakeClientBuilder() + fakeClientBuilder.WithInterceptorFuncs(interceptor.Funcs{ + Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, true), + }) + + clientMap[clusterName] = kubernetesClient.NewClient(fakeClientBuilder.Build()) + } + + return clientMap +} + +// TestReplicaSetMultiClusterScaling tests that multi-cluster ReplicaSets scale one member at a time +// across all clusters, similar to single-cluster behavior. +// +// This test verifies: +// 1. StatefulSets are created correctly in each member cluster with proper naming (-) +// 2. Scaling happens one member at a time across all clusters +// 3. State management (ClusterMapping, LastAppliedMemberSpec) is tracked correctly in annotations +// 4. Clusters scale independently based on their ClusterSpecList configuration +// +// Note: OM process count assertions are skipped because multi-cluster hostname support in OM +// (using GetMultiClusterProcessHostnames) is not yet implemented. This will be added in Phase 6. +func TestReplicaSetMultiClusterScaling(t *testing.T) { + ctx := context.Background() + + t.Run("Create multi-cluster deployment", func(t *testing.T) { + // Setup: Create ReplicaSet with 3 clusters with different member counts + clusterSpecList := mdbv1.ClusterSpecList{ + {ClusterName: "cluster-0", Members: 3}, + {ClusterName: "cluster-1", Members: 1}, + {ClusterName: "cluster-2", Members: 2}, + } + + rs := mdbv1.NewDefaultMultiReplicaSetBuilder(). + SetName("multi-rs"). + SetClusterSpecList(clusterSpecList). + Build() + + reconciler, client, memberClusters, omConnectionFactory := defaultReplicaSetMultiClusterReconciler(ctx, rs) + + // Initial reconciliation - should create StatefulSets in all 3 clusters + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify status is Running after successful reconciliation + assert.Equal(t, status.PhaseRunning, rs.Status.Phase, + "Expected Running phase after successful reconciliation") + + // Verify StatefulSets created with correct replica counts in each cluster + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 3, 1, 2) + + // Verify state annotations track cluster assignments and member counts + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1, "cluster-2": 2}, // ClusterMapping (stable indexes) + map[string]int{"cluster-0": 3, "cluster-1": 1, "cluster-2": 2}) // LastAppliedMemberSpec (member counts) + + // Verify OM has correct number of processes (3 + 1 + 2 = 6) + processes := omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 6, "OM should have 6 processes across all clusters") + }) +} + +// TestReplicaSetMultiClusterOneByOneScaling verifies that multi-cluster scaling happens one member at a time across +// clusters, preventing simultaneous scaling in multiple clusters. +func TestReplicaSetMultiClusterOneByOneScaling(t *testing.T) { + ctx := context.Background() + + // Setup: Create ReplicaSet with 3 clusters, each with 1 member + clusterSpecList := mdbv1.ClusterSpecList{ + {ClusterName: "cluster-0", Members: 1}, + {ClusterName: "cluster-1", Members: 1}, + {ClusterName: "cluster-2", Members: 1}, + } + + rs := mdbv1.NewDefaultMultiReplicaSetBuilder(). + SetName("scaling-rs"). + SetClusterSpecList(clusterSpecList). + Build() + + reconciler, client, memberClusters, omConnectionFactory := defaultReplicaSetMultiClusterReconciler(ctx, rs) + + // Initial reconciliation - create with [1,1,1] + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 1, 1, 1) + + processes := omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 3, "Should have 3 processes initially") + + t.Log("=== Scaling from [1,1,1] to [2,1,2] ===") + + // Change spec to [2,1,2] + rs.Spec.ClusterSpecList[0].Members = 2 + rs.Spec.ClusterSpecList[2].Members = 2 + + // First reconciliation: Only cluster-0 should scale (first cluster needing change) + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, true) + + // Verify intermediate state: cluster-0 scaled to 2, others still at 1 + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 2, 1, 1) + + // Verify state tracking updated for cluster-0 + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1, "cluster-2": 2}, // ClusterMapping unchanged + map[string]int{"cluster-0": 2, "cluster-1": 1, "cluster-2": 1}) // LastAppliedMemberSpec: cluster-0 updated + + // Verify OM processes updated (4 total now) + processes = omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 4, "Should have 4 processes after cluster-0 scales") + + // Second reconciliation: Now cluster-2 should scale + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, true) + + // Verify final state: all clusters at target + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 2, 1, 2) + t.Log("✓ After reconcile 2: [2,1,2] - cluster-2 scaled") + + // Verify state tracking updated for cluster-2 + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1, "cluster-2": 2}, // ClusterMapping unchanged + map[string]int{"cluster-0": 2, "cluster-1": 1, "cluster-2": 2}) // LastAppliedMemberSpec: all at target + + processes = omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 5, "Should have 5 processes after all scaling complete") + + // Third reconciliation: All done, should return OK with 24h requeue + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify state unchanged + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 2, 1, 2) + t.Log("✓ After reconcile 3: [2,1,2] - scaling complete, stable state") +} + +// assertReplicaSetStateAnnotations verifies that ClusterMapping and LastAppliedMemberSpec annotations +// are set correctly. This validates the state management implementation. +func assertReplicaSetStateAnnotations(ctx context.Context, t *testing.T, rs *mdbv1.MongoDB, client kubernetesClient.Client, + expectedClusterMapping map[string]int, expectedLastAppliedMemberSpec map[string]int, +) { + // Fetch latest resource to get annotations + err := client.Get(ctx, rs.ObjectKey(), rs) + require.NoError(t, err) + + // Verify ClusterMapping annotation + clusterMappingStr := rs.Annotations[util.ClusterMappingAnnotation] + require.NotEmpty(t, clusterMappingStr, "ClusterMapping annotation should be present") + + var clusterMapping map[string]int + err = json.Unmarshal([]byte(clusterMappingStr), &clusterMapping) + require.NoError(t, err) + assert.Equal(t, expectedClusterMapping, clusterMapping, + "ClusterMapping should track stable cluster indexes") + + // Verify LastAppliedMemberSpec annotation + lastAppliedMemberSpecStr := rs.Annotations[util.LastAppliedMemberSpecAnnotation] + require.NotEmpty(t, lastAppliedMemberSpecStr, "LastAppliedMemberSpec annotation should be present") + + var lastAppliedMemberSpec map[string]int + err = json.Unmarshal([]byte(lastAppliedMemberSpecStr), &lastAppliedMemberSpec) + require.NoError(t, err) + assert.Equal(t, expectedLastAppliedMemberSpec, lastAppliedMemberSpec, + "LastAppliedMemberSpec should track current member counts for scale detection") +} + +// readReplicaSetStatefulSets fetches all StatefulSets from member clusters for a multi-cluster ReplicaSet. +// Returns a map of cluster name to StatefulSet. +func readReplicaSetStatefulSets(ctx context.Context, rs *mdbv1.MongoDB, memberClusters map[string]kubernetesClient.Client) map[string]appsv1.StatefulSet { + allStatefulSets := map[string]appsv1.StatefulSet{} + + for i, clusterSpec := range rs.Spec.ClusterSpecList { + memberClient := memberClusters[clusterSpec.ClusterName] + if memberClient == nil { + continue + } + + // StatefulSet name pattern for multi-cluster: - + stsName := fmt.Sprintf("%s-%d", rs.Name, i) + sts := appsv1.StatefulSet{} + err := memberClient.Get(ctx, types.NamespacedName{Name: stsName, Namespace: rs.Namespace}, &sts) + if err == nil { + allStatefulSets[clusterSpec.ClusterName] = sts + } + } + + return allStatefulSets +} + +// assertReplicaSetStatefulSetReplicas verifies the replica count for each cluster's StatefulSet. +// Takes variadic expectedReplicas matching the order of rs.Spec.ClusterSpecList. +func assertReplicaSetStatefulSetReplicas(ctx context.Context, t *testing.T, rs *mdbv1.MongoDB, memberClusters map[string]kubernetesClient.Client, expectedReplicas ...int) { + statefulSets := readReplicaSetStatefulSets(ctx, rs, memberClusters) + + for i, clusterSpec := range rs.Spec.ClusterSpecList { + if i >= len(expectedReplicas) { + break + } + + sts, ok := statefulSets[clusterSpec.ClusterName] + if ok { + require.Equal(t, int32(expectedReplicas[i]), *sts.Spec.Replicas, + "StatefulSet for cluster %s should have %d replicas", clusterSpec.ClusterName, expectedReplicas[i]) + } + } +} + +// replicaSetMultiClusterReconciler creates a ReplicaSet reconciler configured for multi-cluster mode. +// This is the base setup without OM mocking - use defaultReplicaSetMultiClusterReconciler for standard tests. +func replicaSetMultiClusterReconciler(ctx context.Context, rs *mdbv1.MongoDB) (*ReconcileMongoDbReplicaSet, kubernetesClient.Client, map[string]kubernetesClient.Client, *om.CachedOMConnectionFactory) { + // Create central cluster client and OM connection factory + centralClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) + + // Create RAW member cluster clients with interceptors + // These are controller-runtime clients, not wrapped in kubernetesClient.Client yet + // The reconciler's createMemberClusterListFromClusterSpecList will wrap them + memberClusterMapRaw := map[string]client.Client{} + memberClusterMapWrapped := map[string]kubernetesClient.Client{} + + for _, clusterName := range multiClusters { + fakeClientBuilder := mock.NewEmptyFakeClientBuilder() + fakeClientBuilder.WithObjects(mock.GetDefaultResources()...) + fakeClientBuilder.WithInterceptorFuncs(interceptor.Funcs{ + Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, true), + }) + + rawClient := fakeClientBuilder.Build() + memberClusterMapRaw[clusterName] = rawClient + memberClusterMapWrapped[clusterName] = kubernetesClient.NewClient(rawClient) + } + + // Create reconciler with multi-cluster support + reconciler := newReplicaSetReconciler(ctx, centralClient, nil, "", "", false, false, memberClusterMapRaw, omConnectionFactory.GetConnectionFunc) + + return reconciler, centralClient, memberClusterMapWrapped, omConnectionFactory +} + +// defaultReplicaSetMultiClusterReconciler creates a ReplicaSet reconciler with standard OM hostname mocking. +// Most tests should use this function. +func defaultReplicaSetMultiClusterReconciler(ctx context.Context, rs *mdbv1.MongoDB) (*ReconcileMongoDbReplicaSet, kubernetesClient.Client, map[string]kubernetesClient.Client, *om.CachedOMConnectionFactory) { + reconciler, client, clusterMap, omConnectionFactory := replicaSetMultiClusterReconciler(ctx, rs) + + omConnectionFactory.SetPostCreateHook(func(connection om.Connection) { + mockedConn := connection.(*om.MockedOmConnection) + mockedConn.Hostnames = nil + + // Pre-register agents for multi-cluster tests + // This simulates agents being already registered with OM + // Register enough agents to handle scaling operations (up to 10 members per cluster) + if rs.Spec.IsMultiCluster() { + hostResult, _ := mockedConn.GetHosts() + // Register agents for up to 10 clusters with up to 10 members each + // This handles scaling and cluster addition scenarios in tests + for clusterIdx := 0; clusterIdx < 10; clusterIdx++ { + for podNum := 0; podNum < 10; podNum++ { + hostname := fmt.Sprintf("%s-%d-%d-svc.%s.svc.cluster.local", rs.Name, clusterIdx, podNum, rs.Namespace) + // Register as a host in OM to simulate agent registration + hostResult.Results = append(hostResult.Results, host.Host{ + Id: fmt.Sprintf("%d", len(hostResult.Results)), + Hostname: hostname, + }) + } + } + } + }) + + return reconciler, client, clusterMap, omConnectionFactory +} + +// ============================================================================ +// State Management Tests +// ============================================================================ + +func TestReplicaSetDeploymentState_Serialization(t *testing.T) { + state := &ReplicaSetDeploymentState{ + LastAchievedSpec: &mdbv1.MongoDbSpec{}, + ClusterMapping: map[string]int{ + multicluster.LegacyCentralClusterName: 0, + "cluster-1": 1, + "cluster-2": 2, + }, + LastAppliedMemberSpec: map[string]int{ + multicluster.LegacyCentralClusterName: 3, + "cluster-1": 5, + "cluster-2": 7, + }, + } + + // Marshal to JSON + bytes, err := json.Marshal(state) + assert.NoError(t, err) + + // Unmarshal back + var decoded ReplicaSetDeploymentState + err = json.Unmarshal(bytes, &decoded) + assert.NoError(t, err) + + // Verify ClusterMapping + assert.Equal(t, 0, decoded.ClusterMapping[multicluster.LegacyCentralClusterName]) + assert.Equal(t, 1, decoded.ClusterMapping["cluster-1"]) + assert.Equal(t, 2, decoded.ClusterMapping["cluster-2"]) + + // Verify LastAppliedMemberSpec + assert.Equal(t, 3, decoded.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName]) + assert.Equal(t, 5, decoded.LastAppliedMemberSpec["cluster-1"]) + assert.Equal(t, 7, decoded.LastAppliedMemberSpec["cluster-2"]) +} + +func TestReadState_ClusterMapping_ReadsFromAnnotation(t *testing.T) { + clusterMapping := map[string]int{multicluster.LegacyCentralClusterName: 7} + clusterMappingJSON, _ := json.Marshal(clusterMapping) + + rs := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: "default", + Annotations: map[string]string{ + util.ClusterMappingAnnotation: string(clusterMappingJSON), + }, + }, + Status: mdbv1.MongoDbStatus{ + Members: 3, // Different from annotation (annotations should be used) + }, + } + + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + log: zap.S(), + } + + state, err := helper.readState() + + assert.NoError(t, err) + assert.NotNil(t, state) + assert.Equal(t, 7, state.ClusterMapping[multicluster.LegacyCentralClusterName], + "Should read from ClusterMapping annotation, not Status.Members") +} + +func TestReadState_ClusterMapping_FallbackToStatusMembers(t *testing.T) { + rs := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: "default", + Annotations: map[string]string{ + // No ClusterMapping annotation + }, + }, + Status: mdbv1.MongoDbStatus{ + Members: 5, // Existing deployment has 5 members + }, + } + + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + log: zap.S(), + } + + state, err := helper.readState() + + assert.NoError(t, err) + assert.NotNil(t, state) + // Migration logic initializes LastAppliedMemberSpec, not ClusterMapping + assert.Equal(t, 5, state.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName], + "Should fallback to Status.Members when annotation missing") +} + +func TestReadState_ClusterMapping_SkipsMigrationForMultiCluster(t *testing.T) { + rs := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: "default", + Annotations: map[string]string{ + // No state annotations + }, + }, + Spec: mdbv1.MongoDbSpec{ + DbCommonSpec: mdbv1.DbCommonSpec{ + Topology: mdbv1.ClusterTopologyMultiCluster, + }, + ClusterSpecList: mdbv1.ClusterSpecList{ + {ClusterName: "cluster-0", Members: 3}, + }, + }, + Status: mdbv1.MongoDbStatus{ + Members: 5, // Should be ignored for multi-cluster + }, + } + + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + log: zap.S(), + } + + state, err := helper.readState() + + assert.NoError(t, err) + assert.NotNil(t, state) + assert.Empty(t, state.LastAppliedMemberSpec, + "Multi-cluster should not migrate from Status.Members") +} + +func TestReadState_LastAppliedMemberSpec_FallbackToStatusMembers(t *testing.T) { + rs := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: "default", + Annotations: map[string]string{ + // No LastAppliedMemberSpec annotation + }, + }, + Status: mdbv1.MongoDbStatus{ + Members: 7, + }, + } + + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + log: zap.S(), + } + + state, err := helper.readState() + + assert.NoError(t, err) + assert.NotNil(t, state) + assert.Equal(t, 7, state.LastAppliedMemberSpec[multicluster.LegacyCentralClusterName], + "Should migrate from Status.Members for single-cluster without annotation") +} + +// TestStateLifecycle_SingleClusterMigration verifies that existing single-cluster +// deployments without state annotations properly migrate from Status.Members. +func TestStateLifecycle_SingleClusterMigration(t *testing.T) { + ctx := context.Background() + + // Simulate existing deployment: has Status.Members but no state annotations + rs := DefaultReplicaSetBuilder(). + SetName("legacy-rs"). + SetMembers(3). + Build() + rs.Status.Members = 3 // Existing deployment + + reconciler, client, _, _ := defaultReplicaSetMultiClusterReconciler(ctx, rs) + + // First reconciliation should migrate state from Status.Members + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify state was written to annotations + err := client.Get(ctx, rs.ObjectKey(), rs) + require.NoError(t, err) + + // Verify LastAppliedMemberSpec was migrated from Status.Members + var lastAppliedMemberSpec map[string]int + err = json.Unmarshal([]byte(rs.Annotations[util.LastAppliedMemberSpecAnnotation]), &lastAppliedMemberSpec) + require.NoError(t, err) + assert.Equal(t, 3, lastAppliedMemberSpec[multicluster.LegacyCentralClusterName], + "Should migrate from Status.Members on first reconciliation") + + // Second reconciliation should read from annotation (not Status.Members) + rs.Status.Members = 999 // Change Status.Members to verify annotation is used + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify state still shows 3 (from annotation, not Status.Members=999) + err = client.Get(ctx, rs.ObjectKey(), rs) + require.NoError(t, err) + err = json.Unmarshal([]byte(rs.Annotations[util.LastAppliedMemberSpecAnnotation]), &lastAppliedMemberSpec) + require.NoError(t, err) + assert.Equal(t, 3, lastAppliedMemberSpec[multicluster.LegacyCentralClusterName], + "Should read from annotation, not Status.Members after migration") +} + +// TestStateLifecycle_MultiClusterStatePreservation verifies that state is correctly maintained across multiple +// reconciliations in multi-cluster mode. +func TestStateLifecycle_MultiClusterStatePreservation(t *testing.T) { + ctx := context.Background() + + // Create multi-cluster ReplicaSet with initial configuration + clusterSpecList := mdbv1.ClusterSpecList{ + {ClusterName: "cluster-0", Members: 3}, + {ClusterName: "cluster-1", Members: 1}, + } + + rs := mdbv1.NewDefaultMultiReplicaSetBuilder(). + SetName("multi-rs"). + SetClusterSpecList(clusterSpecList). + Build() + + reconciler, client, memberClusters, omConnectionFactory := defaultReplicaSetMultiClusterReconciler(ctx, rs) + + // Initial reconciliation + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify initial state + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1}, // ClusterMapping + map[string]int{"cluster-0": 3, "cluster-1": 1}) // LastAppliedMemberSpec + + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 3, 1) + + // Verify OM has correct number of processes (3 + 1 = 4) + processes := omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 4, "OM should have 4 processes after initial reconciliation") + + // Scale cluster-1 from 1 to 2 members + rs.Spec.ClusterSpecList[1].Members = 2 + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify state updated to reflect scaling + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1}, // ClusterMapping unchanged + map[string]int{"cluster-0": 3, "cluster-1": 2}) // LastAppliedMemberSpec updated + + // Verify StatefulSet scaled + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 3, 2) + + // Verify OM has correct number of processes after scaling (3 + 2 = 5) + processes = omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 5, "OM should have 5 processes after scaling cluster-1") + + // Add a third cluster + rs.Spec.ClusterSpecList = append(rs.Spec.ClusterSpecList, + mdbv1.ClusterSpecItem{ClusterName: "cluster-2", Members: 1}) + checkReplicaSetReconcileSuccessful(ctx, t, reconciler, rs, client, false) + + // Verify state includes new cluster with next available index + assertReplicaSetStateAnnotations(ctx, t, rs, client, + map[string]int{"cluster-0": 0, "cluster-1": 1, "cluster-2": 2}, // ClusterMapping with new cluster + map[string]int{"cluster-0": 3, "cluster-1": 2, "cluster-2": 1}) // LastAppliedMemberSpec with new cluster + + // Verify all three StatefulSets exist + assertReplicaSetStatefulSetReplicas(ctx, t, rs, memberClusters, 3, 2, 1) + + // Verify OM has correct number of processes after adding cluster-2 (3 + 2 + 1 = 6) + processes = omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses() + assert.Len(t, processes, 6, "OM should have 6 processes after adding cluster-2") +} diff --git a/controllers/operator/mongodbreplicaset_controller_test.go b/controllers/operator/mongodbreplicaset_controller_test.go index 803de4f3b..c4f3abd8c 100644 --- a/controllers/operator/mongodbreplicaset_controller_test.go +++ b/controllers/operator/mongodbreplicaset_controller_test.go @@ -93,7 +93,7 @@ func TestReplicaSetRace(t *testing.T) { Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, true), }).Build() - reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, nil, omConnectionFactory.GetConnectionFunc) testConcurrentReconciles(ctx, t, fakeClient, reconciler, rs, rs2, rs3) } @@ -394,7 +394,7 @@ func TestCreateDeleteReplicaSet(t *testing.T) { omConnectionFactory := om.NewCachedOMConnectionFactory(omConnectionFactoryFuncSettingVersion()) fakeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, fakeClient) omConn := omConnectionFactory.GetConnection() @@ -533,7 +533,7 @@ func TestFeatureControlPolicyAndTagAddedWithNewerOpsManager(t *testing.T) { omConnectionFactory := om.NewCachedOMConnectionFactory(omConnectionFactoryFuncSettingVersion()) fakeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, fakeClient) @@ -557,7 +557,7 @@ func TestFeatureControlPolicyNoAuthNewerOpsManager(t *testing.T) { omConnectionFactory := om.NewCachedOMConnectionFactory(omConnectionFactoryFuncSettingVersion()) fakeClient := mock.NewDefaultFakeClientWithOMConnectionFactory(omConnectionFactory, rs) - reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, fakeClient, nil, "fake-initDatabaseNonStaticImageVersion", "fake-databaseNonStaticImageVersion", false, false, nil, omConnectionFactory.GetConnectionFunc) checkReconcileSuccessful(ctx, t, reconciler, rs, fakeClient) @@ -896,7 +896,7 @@ func TestReplicaSetAnnotations_NotWrittenOnFailure(t *testing.T) { WithObjects(mock.GetProjectConfigMap(mock.TestProjectConfigMapName, "testProject", "testOrg")). Build() - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, nil) _, err := reconciler.Reconcile(ctx, requestFromObject(rs)) require.NoError(t, err, "Reconcile should not return error (error captured in status)") @@ -917,7 +917,7 @@ func TestReplicaSetAnnotations_PreservedOnSubsequentFailure(t *testing.T) { rs := DefaultReplicaSetBuilder().Build() kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, omConnectionFactory.GetConnectionFunc) + reconciler := newReplicaSetReconciler(ctx, kubeClient, nil, "", "", false, false, nil, omConnectionFactory.GetConnectionFunc) _, err := reconciler.Reconcile(ctx, requestFromObject(rs)) require.NoError(t, err) @@ -1091,7 +1091,7 @@ func assertCorrectNumberOfMembersAndProcesses(ctx context.Context, t *testing.T, func defaultReplicaSetReconciler(ctx context.Context, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, rs *mdbv1.MongoDB) (*ReconcileMongoDbReplicaSet, kubernetesClient.Client, *om.CachedOMConnectionFactory) { kubeClient, omConnectionFactory := mock.NewDefaultFakeClient(rs) - return newReplicaSetReconciler(ctx, kubeClient, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, false, false, omConnectionFactory.GetConnectionFunc), kubeClient, omConnectionFactory + return newReplicaSetReconciler(ctx, kubeClient, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, false, false, nil, omConnectionFactory.GetConnectionFunc), kubeClient, omConnectionFactory } // newDefaultPodSpec creates pod spec with default values,sets only the topology key and persistence sizes, diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 3e66f1786..40a69363e 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -2213,7 +2213,7 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) { replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion()) - existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment) + existingProcessIds := getReplicaSetProcessIdsFromDeployment(replicaSet.Name(), deployment) var rsWithProcesses om.ReplicaSetWithProcesses if mdb.Spec.IsMultiCluster() { // we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards diff --git a/docker/mongodb-kubernetes-tests/kubetester/mongodb.py b/docker/mongodb-kubernetes-tests/kubetester/mongodb.py index a523f9975..e808023d1 100644 --- a/docker/mongodb-kubernetes-tests/kubetester/mongodb.py +++ b/docker/mongodb-kubernetes-tests/kubetester/mongodb.py @@ -29,6 +29,7 @@ ShardedClusterTester, StandaloneTester, ) +from .multicluster_client import MultiClusterClient from .opsmanager import MongoDBOpsManager from .phase import Phase @@ -199,6 +200,14 @@ def tester( cluster_domain=self.get_cluster_domain(), ) + def read_statefulsets(self, clients: List[MultiClusterClient]) -> Dict[str, client.V1StatefulSet]: + statefulsets = {} + for mcc in clients: + statefulsets[mcc.cluster_name] = mcc.read_namespaced_stateful_set( + f"{self.name}-{mcc.cluster_index}", self.namespace + ) + return statefulsets + def assert_connectivity(self, ca_path: Optional[str] = None, cluster_domain: str = "cluster.local"): return self.tester(ca_path=ca_path).assert_connectivity() diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster/fixtures/mongodb-multi-new.yaml b/docker/mongodb-kubernetes-tests/tests/multicluster/fixtures/mongodb-multi-new.yaml new file mode 100644 index 000000000..84b865bcb --- /dev/null +++ b/docker/mongodb-kubernetes-tests/tests/multicluster/fixtures/mongodb-multi-new.yaml @@ -0,0 +1,21 @@ +--- +apiVersion: mongodb.com/v1 +kind: MongoDB +metadata: + name: multi-replica-set +spec: + version: 4.4.0-ent + type: ReplicaSet + topology: MultiCluster + duplicateServiceObjects: false + credentials: my-credentials + opsManager: + configMapRef: + name: my-project + clusterSpecList: + - clusterName: kind-e2e-cluster-1 + members: 2 + - clusterName: kind-e2e-cluster-2 + members: 1 + - clusterName: kind-e2e-cluster-3 + members: 2 diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_new_replica_set_scale_up.py b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_new_replica_set_scale_up.py new file mode 100644 index 000000000..7992199f9 --- /dev/null +++ b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_new_replica_set_scale_up.py @@ -0,0 +1,132 @@ +from typing import List + +import kubernetes +import kubetester +import pytest +from kubetester.automation_config_tester import AutomationConfigTester +from kubetester.certs_mongodb_multi import create_multi_cluster_mongodb_tls_certs +from kubetester.kubetester import fixture as yaml_fixture +from kubetester.kubetester import skip_if_local +from kubetester.mongodb import MongoDB +from kubetester.mongodb_multi import MongoDBMulti +from kubetester.mongotester import with_tls +from kubetester.multicluster_client import MultiClusterClient +from kubetester.operator import Operator +from kubetester.phase import Phase +from tests.multicluster.conftest import cluster_spec_list + +RESOURCE_NAME = "multi-replica-set" + + +@pytest.fixture(scope="module") +def mongodb_multi_unmarshalled( + namespace: str, + multi_cluster_issuer_ca_configmap: str, + central_cluster_client: kubernetes.client.ApiClient, + member_cluster_names: List[str], + custom_mdb_version: str, +) -> MongoDB: + resource = MongoDB.from_yaml(yaml_fixture("mongodb-multi-new.yaml"), RESOURCE_NAME, namespace) + resource.set_version(custom_mdb_version) + resource["spec"]["clusterSpecList"] = cluster_spec_list(member_cluster_names, [2, 1, 2]) + + resource.api = kubernetes.client.CustomObjectsApi(central_cluster_client) + return resource + + +@pytest.fixture(scope="module") +def mongodb_multi(mongodb_multi_unmarshalled: MongoDB) -> MongoDB: + mongodb_multi_unmarshalled["spec"]["clusterSpecList"][0]["members"] = 1 + mongodb_multi_unmarshalled["spec"]["clusterSpecList"][1]["members"] = 1 + mongodb_multi_unmarshalled["spec"]["clusterSpecList"][2]["members"] = 1 + return mongodb_multi_unmarshalled.create() + + +@pytest.mark.e2e_multi_cluster_new_replica_set_scale_up +def test_deploy_operator(multi_cluster_operator: Operator): + multi_cluster_operator.assert_is_running() + + +@pytest.mark.e2e_multi_cluster_new_replica_set_scale_up +def test_create_mongodb_multi(mongodb_multi: MongoDB): + mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) + + +@pytest.mark.e2e_multi_cluster_new_replica_set_scale_up +def test_statefulsets_have_been_created_correctly( + mongodb_multi: MongoDB, + member_cluster_clients: List[MultiClusterClient], +): + # Even though we already verified, in previous test, that the MongoDBMultiCluster resource's phase is running (that would mean all STSs are ready); + # checking the expected number of replicas for STS makes the test flaky because of an issue mentioned in detail in this ticket https://jira.mongodb.org/browse/CLOUDP-329231. + # That's why we are waiting for STS to have expected number of replicas. This change can be reverted when we make the proper fix as + # mentioned in the above ticket. + def fn(): + cluster_one_client = member_cluster_clients[0] + cluster_one_statefulsets = mongodb_multi.read_statefulsets([cluster_one_client]) + return cluster_one_statefulsets[cluster_one_client.cluster_name].status.ready_replicas == 1 + + kubetester.wait_until(fn, timeout=60, message="Verifying sts has correct number of replicas in cluster one") + + def fn(): + cluster_two_client = member_cluster_clients[1] + cluster_two_statefulsets = mongodb_multi.read_statefulsets([cluster_two_client]) + return cluster_two_statefulsets[cluster_two_client.cluster_name].status.ready_replicas == 1 + + kubetester.wait_until(fn, timeout=60, message="Verifying sts has correct number of replicas in cluster two") + + def fn(): + cluster_three_client = member_cluster_clients[2] + cluster_three_statefulsets = mongodb_multi.read_statefulsets([cluster_three_client]) + return cluster_three_statefulsets[cluster_three_client.cluster_name].status.ready_replicas == 1 + + kubetester.wait_until(fn, timeout=60, message="Verifying sts has correct number of replicas in cluster three") + + +# TODO: uncomment when scaling is fixed +# @pytest.mark.e2e_multi_cluster_new_replica_set_scale_up +# def test_scale_mongodb_multi(mongodb_multi: MongoDB): +# mongodb_multi.load() +# mongodb_multi["spec"]["clusterSpecList"][0]["members"] = 2 +# mongodb_multi["spec"]["clusterSpecList"][1]["members"] = 1 +# mongodb_multi["spec"]["clusterSpecList"][2]["members"] = 2 +# mongodb_multi.update() +# +# mongodb_multi.assert_reaches_phase(Phase.Running, timeout=1800) +# +# +# @pytest.mark.e2e_multi_cluster_new_replica_set_scale_u +# def test_statefulsets_have_been_scaled_up_correctly( +# mongodb_multi: MongoDB, +# member_cluster_clients: List[MultiClusterClient], +# ): +# # Even though we already verified, in previous test, that the MongoDBMultiCluster resource's phase is running (that would mean all STSs are ready); +# # checking the expected number of replicas for STS makes the test flaky because of an issue mentioned in detail in this ticket https://jira.mongodb.org/browse/CLOUDP-329231. +# # That's why we are waiting for STS to have expected number of replicas. This change can be reverted when we make the proper fix as +# # mentioned in the above ticket. +# def fn(): +# cluster_one_client = member_cluster_clients[0] +# cluster_one_statefulsets = mongodb_multi.read_statefulsets([cluster_one_client]) +# return cluster_one_statefulsets[cluster_one_client.cluster_name].status.ready_replicas == 2 +# +# kubetester.wait_until( +# fn, timeout=60, message="Verifying sts has correct number of replicas after scale up in cluster one" +# ) +# +# def fn(): +# cluster_two_client = member_cluster_clients[1] +# cluster_two_statefulsets = mongodb_multi.read_statefulsets([cluster_two_client]) +# return cluster_two_statefulsets[cluster_two_client.cluster_name].status.ready_replicas == 1 +# +# kubetester.wait_until( +# fn, timeout=60, message="Verifying sts has correct number of replicas after scale up in cluster two" +# ) +# +# def fn(): +# cluster_three_client = member_cluster_clients[2] +# cluster_three_statefulsets = mongodb_multi.read_statefulsets([cluster_three_client]) +# return cluster_three_statefulsets[cluster_three_client.cluster_name].status.ready_replicas == 2 +# +# kubetester.wait_until( +# fn, timeout=60, message="Verifying sts has correct number of replicas after scale up in cluster three" +# ) diff --git a/helm_chart/crds/mongodb.com_mongodb.yaml b/helm_chart/crds/mongodb.com_mongodb.yaml index d421d8837..7c275aef0 100644 --- a/helm_chart/crds/mongodb.com_mongodb.yaml +++ b/helm_chart/crds/mongodb.com_mongodb.yaml @@ -396,6 +396,146 @@ spec: clusterDomain: format: hostname type: string + clusterSpecList: + items: + description: |- + ClusterSpecItem is the mongodb multi-cluster spec that is specific to a + particular Kubernetes cluster, this maps to the statefulset created in each cluster + properties: + clusterName: + description: |- + ClusterName is name of the cluster where the MongoDB Statefulset will be scheduled, the + name should have a one on one mapping with the service-account created in the central cluster + to talk to the workload clusters. + type: string + externalAccess: + description: ExternalAccessConfiguration provides external access + configuration for Multi-Cluster. + properties: + externalDomain: + description: An external domain that is used for exposing + MongoDB to the outside world. + type: string + externalService: + description: Provides a way to override the default (NodePort) + Service + properties: + annotations: + additionalProperties: + type: string + description: A map of annotations that shall be added + to the externally available Service. + type: object + spec: + description: A wrapper for the Service spec object. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + type: object + memberConfig: + description: MemberConfig allows to specify votes, priorities + and tags for each of the mongodb process. + items: + properties: + priority: + type: string + tags: + additionalProperties: + type: string + type: object + votes: + type: integer + type: object + type: array + members: + description: Amount of members for this MongoDB Replica Set + type: integer + podSpec: + properties: + persistence: + description: Note, that this field is used by MongoDB resources + only, let's keep it here for simplicity + properties: + multiple: + properties: + data: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + journal: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + logs: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + single: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + podTemplate: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + service: + description: this is an optional service, it will get the name + "-service" in case not provided + type: string + statefulSet: + description: |- + StatefulSetConfiguration holds the optional custom StatefulSet + that should be merged into the operator created one. + properties: + metadata: + description: StatefulSetMetadataWrapper is a wrapper around + Labels and Annotations + properties: + annotations: + additionalProperties: + type: string + type: object + labels: + additionalProperties: + type: string + type: object + type: object + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - spec + type: object + required: + - members + type: object + type: array configServerCount: type: integer configSrv: diff --git a/main.go b/main.go index 5e5185825..b5c2d9b2f 100644 --- a/main.go +++ b/main.go @@ -362,7 +362,7 @@ func setupMongoDBCRD(ctx context.Context, mgr manager.Manager, imageUrls images. if err := operator.AddStandaloneController(ctx, mgr, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles); err != nil { return err } - if err := operator.AddReplicaSetController(ctx, mgr, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles); err != nil { + if err := operator.AddReplicaSetController(ctx, mgr, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles, memberClusterObjectsMap); err != nil { return err } if err := operator.AddShardedClusterController(ctx, mgr, imageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion, forceEnterprise, enableClusterMongoDBRoles, memberClusterObjectsMap); err != nil { diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 189a5a983..8872eff11 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -37,7 +37,10 @@ func GetMultiServiceFQDN(stsName string, namespace string, clusterNum int, podNu domain = strings.TrimPrefix(clusterDomain, ".") } - return fmt.Sprintf("%s.%s.svc.%s", GetMultiServiceName(stsName, clusterNum, podNum), namespace, domain) + // For StatefulSet pods, DNS format is: ...svc. + podName := GetMultiPodName(stsName, clusterNum, podNum) + headlessService := GetMultiHeadlessServiceName(stsName, clusterNum) + return fmt.Sprintf("%s.%s.%s.svc.%s", podName, headlessService, namespace, domain) } func GetMultiServiceExternalDomain(stsName, externalDomain string, clusterNum, podNum int) string { diff --git a/pkg/multicluster/multicluster.go b/pkg/multicluster/multicluster.go index 2d214130d..26917c68f 100644 --- a/pkg/multicluster/multicluster.go +++ b/pkg/multicluster/multicluster.go @@ -279,3 +279,14 @@ func InitializeGlobalMemberClusterMapForSingleCluster(globalMemberClustersMap ma return globalMemberClustersMap } + +// GetHealthyMemberClusters filters and returns only healthy member clusters. +func GetHealthyMemberClusters(memberClusters []MemberCluster) []MemberCluster { + var result []MemberCluster + for i := range memberClusters { + if memberClusters[i].Healthy { + result = append(result, memberClusters[i]) + } + } + return result +} diff --git a/pkg/util/constants.go b/pkg/util/constants.go index ee51115ce..fbde214d5 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -285,6 +285,10 @@ const ( LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration" LastAchievedRsMemberIds = "mongodb.com/v1.lastAchievedRsMemberIds" + // Used by MongoDB replica set until we transition to config map + ClusterMappingAnnotation = "mongodb.com/v1.clusterMapping" + LastAppliedMemberSpecAnnotation = "mongodb.com/v1.lastAppliedMemberSpec" + // SecretVolumeName is the name of the volume resource. SecretVolumeName = "secret-certs" diff --git a/public/crds.yaml b/public/crds.yaml index beeaf741d..84bc6f8bf 100644 --- a/public/crds.yaml +++ b/public/crds.yaml @@ -504,6 +504,146 @@ spec: clusterDomain: format: hostname type: string + clusterSpecList: + items: + description: |- + ClusterSpecItem is the mongodb multi-cluster spec that is specific to a + particular Kubernetes cluster, this maps to the statefulset created in each cluster + properties: + clusterName: + description: |- + ClusterName is name of the cluster where the MongoDB Statefulset will be scheduled, the + name should have a one on one mapping with the service-account created in the central cluster + to talk to the workload clusters. + type: string + externalAccess: + description: ExternalAccessConfiguration provides external access + configuration for Multi-Cluster. + properties: + externalDomain: + description: An external domain that is used for exposing + MongoDB to the outside world. + type: string + externalService: + description: Provides a way to override the default (NodePort) + Service + properties: + annotations: + additionalProperties: + type: string + description: A map of annotations that shall be added + to the externally available Service. + type: object + spec: + description: A wrapper for the Service spec object. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + type: object + memberConfig: + description: MemberConfig allows to specify votes, priorities + and tags for each of the mongodb process. + items: + properties: + priority: + type: string + tags: + additionalProperties: + type: string + type: object + votes: + type: integer + type: object + type: array + members: + description: Amount of members for this MongoDB Replica Set + type: integer + podSpec: + properties: + persistence: + description: Note, that this field is used by MongoDB resources + only, let's keep it here for simplicity + properties: + multiple: + properties: + data: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + journal: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + logs: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + single: + properties: + labelSelector: + type: object + x-kubernetes-preserve-unknown-fields: true + storage: + type: string + storageClass: + type: string + type: object + type: object + podTemplate: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + service: + description: this is an optional service, it will get the name + "-service" in case not provided + type: string + statefulSet: + description: |- + StatefulSetConfiguration holds the optional custom StatefulSet + that should be merged into the operator created one. + properties: + metadata: + description: StatefulSetMetadataWrapper is a wrapper around + Labels and Annotations + properties: + annotations: + additionalProperties: + type: string + type: object + labels: + additionalProperties: + type: string + type: object + type: object + spec: + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - spec + type: object + required: + - members + type: object + type: array configServerCount: type: integer configSrv: