diff --git a/changelog/20250902_fix_fixing_om_project_migration.md b/changelog/20250902_fix_fixing_om_project_migration.md new file mode 100644 index 000000000..0f5dbf5a2 --- /dev/null +++ b/changelog/20250902_fix_fixing_om_project_migration.md @@ -0,0 +1,8 @@ +--- +title: Fixing om project migration +kind: fix +date: 2025-09-02 +--- + +* Fixed an issue where moving a **MongoDB** sharded cluster resource to a new project (or a new OM instance) would leave the deployment in a failed state. + diff --git a/controllers/om/omclient.go b/controllers/om/omclient.go index cbc0c2875..c9853a0ba 100644 --- a/controllers/om/omclient.go +++ b/controllers/om/omclient.go @@ -978,13 +978,19 @@ func (oc *HTTPOmConnection) AddPreferredHostname(agentApiKey string, value strin return nil } -func GetReplicaSetMemberIds(conn Connection) (map[string]map[string]int, error) { +// ProcessNameToId maps process names to their process IDs +type ProcessNameToId map[string]int + +// ReplicaSetToProcessIds maps replica set names to their process mappings +type ReplicaSetToProcessIds map[string]ProcessNameToId + +func GetReplicaSetMemberIds(conn Connection) (ReplicaSetToProcessIds, error) { dep, err := conn.ReadDeployment() if err != nil { return nil, err } - finalProcessIds := make(map[string]map[string]int) + finalProcessIds := make(ReplicaSetToProcessIds) for _, replicaSet := range dep.GetReplicaSets() { finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 013844268..12cd45f47 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -652,7 +652,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti } // saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved. -func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error { +func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds om.ReplicaSetToProcessIds) error { clusterSpecs, err := mrs.GetClusterSpecItems() if err != nil { return err diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 4b670d6fa..3b5c6e23f 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -104,6 +104,9 @@ type ShardedClusterDeploymentState struct { CommonDeploymentState `json:",inline"` LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` Status *mdbv1.MongoDbStatus `json:"status"` + // ProcessIds stores process IDs for replica sets to handle migration scenarios + // where process IDs have length 0. + ProcessIds om.ReplicaSetToProcessIds `json:"processIds,omitempty"` } // updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost. @@ -126,6 +129,7 @@ func NewShardedClusterDeploymentState() *ShardedClusterDeploymentState { CommonDeploymentState: CommonDeploymentState{ClusterMapping: map[string]int{}}, LastAchievedSpec: &mdbv1.MongoDbSpec{}, Status: &mdbv1.MongoDbStatus{}, + ProcessIds: om.ReplicaSetToProcessIds{}, } } @@ -1923,7 +1927,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c } configSrvProcesses, configSrvMemberOptions := r.createDesiredConfigSrvProcessesAndMemberOptions(configSrvMemberCertPath) - configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment) + configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment, r.deploymentState) // Shards shards := make([]om.ReplicaSetWithProcesses, sc.Spec.ShardCount) @@ -1934,7 +1938,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c shardInternalClusterPaths = append(shardInternalClusterPaths, fmt.Sprintf("%s/%s", util.InternalClusterAuthMountPath, shardOptions.InternalClusterHash)) shardMemberCertPath := fmt.Sprintf("%s/%s", util.TLSCertMountPath, shardOptions.CertificateHash) desiredShardProcesses, desiredShardMemberOptions := r.createDesiredShardProcessesAndMemberOptions(shardIdx, shardMemberCertPath) - shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment) + shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment, r.deploymentState) } // updateOmAuthentication normally takes care of the certfile rotation code, but since sharded-cluster is special pertaining multiple clusterfiles, we code this part here for now. @@ -2039,9 +2043,25 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c return nil, shardsRemoving, workflow.Pending("Performing multi stage reconciliation") } + // Save final process IDs to deployment state for persistence across reconciliation cycles + if err := r.saveFinalProcessIdsToDeploymentState(conn); err != nil { + log.Warnf("Failed to save process IDs to deployment state: %v", err) + } + return finalProcesses, shardsRemoving, workflow.OK() } +func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection) error { + finalProcessIds, err := om.GetReplicaSetMemberIds(conn) + if err != nil { + return err + } + + saveReplicaSetProcessIdsToDeploymentState(finalProcessIds, r.deploymentState) + + return nil +} + func logWarnIgnoredDueToRecovery(log *zap.SugaredLogger, err any) { log.Warnf("ignoring error due to automatic recovery process: %v", err) } @@ -2239,12 +2259,41 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b return processes } +// getReplicaSetProcessIdsFromDeploymentState retrieves process IDs from the deployment state +// when they are empty (length 0), indicating a cluster migration scenario. +func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymentState *ShardedClusterDeploymentState) om.ProcessNameToId { + if processIds, ok := deploymentState.ProcessIds[replicaSetName]; ok { + return processIds + } + + return om.ProcessNameToId{} +} + +// saveReplicaSetProcessIdsToDeploymentState saves process IDs to the deployment state +// for persistence across reconciliation cycles. +func saveReplicaSetProcessIdsToDeploymentState(processIds om.ReplicaSetToProcessIds, deploymentState *ShardedClusterDeploymentState) { + if deploymentState.ProcessIds == nil { + deploymentState.ProcessIds = om.ReplicaSetToProcessIds{} + } + + if len(processIds) > 0 { + deploymentState.ProcessIds = processIds + } +} + // buildReplicaSetFromProcesses creates the 'ReplicaSetWithProcesses' with specified processes. This is of use only // for sharded cluster (config server, shards) -func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) { +func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment, deploymentState *ShardedClusterDeploymentState) (om.ReplicaSetWithProcesses, error) { replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion()) existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment) + + // If there is no configuration saved in OM, it might be a new project, so we check the ids saved in the state map + // A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified. + if len(existingProcessIds) == 0 { + existingProcessIds = getReplicaSetProcessIdsFromDeploymentState(replicaSet.Name(), deploymentState) + } + var rsWithProcesses om.ReplicaSetWithProcesses if mdb.Spec.IsMultiCluster() { // we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards diff --git a/controllers/operator/mongodbshardedcluster_controller_multi_test.go b/controllers/operator/mongodbshardedcluster_controller_multi_test.go index 158665f3b..dcf698846 100644 --- a/controllers/operator/mongodbshardedcluster_controller_multi_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_multi_test.go @@ -3589,12 +3589,32 @@ func getMultiClusterFQDN(stsName string, namespace string, clusterIdx int, podId func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB) string { lastSpec, _ := sc.GetLastSpec() + + expectedProcessIds := om.ReplicaSetToProcessIds{ + "slaney-0": om.ProcessNameToId{ + "slaney-0-0": 0, + "slaney-0-1": 1, + "slaney-0-2": 2, + }, + "slaney-1": om.ProcessNameToId{ + "slaney-1-0": 0, + "slaney-1-1": 1, + "slaney-1-2": 2, + }, + "slaney-config": om.ProcessNameToId{ + "slaney-config-0": 0, + "slaney-config-1": 1, + "slaney-config-2": 2, + }, + } + expectedState := ShardedClusterDeploymentState{ CommonDeploymentState: CommonDeploymentState{ ClusterMapping: map[string]int{}, }, LastAchievedSpec: lastSpec, Status: &sc.Status, + ProcessIds: expectedProcessIds, } lastSpecBytes, err := json.Marshal(expectedState) require.NoError(t, err) diff --git a/controllers/operator/mongodbshardedcluster_controller_test.go b/controllers/operator/mongodbshardedcluster_controller_test.go index 12ca849b8..a72329e5f 100644 --- a/controllers/operator/mongodbshardedcluster_controller_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_test.go @@ -2,6 +2,7 @@ package operator import ( "context" + "encoding/json" "fmt" "reflect" "strings" @@ -1643,7 +1644,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) shardSts := construct.DatabaseStatefulSet(*sh, shardOptions, zap.S()) - shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState()) } desiredMongosConfig := createMongosSpec(sh) @@ -1664,7 +1665,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) configSvrSts := construct.DatabaseStatefulSet(*sh, configServerOptions, zap.S()) - configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState()) d := om.NewDeployment() _, err := d.MergeShardedCluster(om.DeploymentShardedClusterMergeOptions{ @@ -1914,3 +1915,119 @@ func generateAllHostsSingleCluster(sc *mdbv1.MongoDB, mongosCount int, configSrv } return allHosts, allPodNames } + +// TestProcessIdPersistence tests the process ID persistence functionality +// for sharded cluster deployments, ensuring process IDs are correctly +// saved to and retrieved from the ShardedClusterDeploymentState. +func TestProcessIdPersistence(t *testing.T) { + t.Run("TestShardedClusterDeploymentState_ProcessIds", func(t *testing.T) { + // Test 1: Create new deployment state + deploymentState := NewShardedClusterDeploymentState() + assert.NotNil(t, deploymentState) + assert.NotNil(t, deploymentState.ProcessIds) + assert.Equal(t, 0, len(deploymentState.ProcessIds)) + + // Test 2: Save process IDs for config server and shards (simulating GetReplicaSetMemberIds result) + allProcessIds := om.ReplicaSetToProcessIds{ + "test-configsrv": { + "test-configsrv-0": 0, + "test-configsrv-1": 1, + "test-configsrv-2": 2, + }, + "test-shard-0": { + "test-shard-0-0": 3, + "test-shard-0-1": 4, + "test-shard-0-2": 5, + }, + "test-shard-1": { + "test-shard-1-0": 6, + "test-shard-1-1": 7, + }, + } + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState) + + assert.Equal(t, 3, len(deploymentState.ProcessIds)) + assert.Equal(t, allProcessIds["test-configsrv"], deploymentState.ProcessIds["test-configsrv"]) + assert.Equal(t, allProcessIds["test-shard-0"], deploymentState.ProcessIds["test-shard-0"]) + assert.Equal(t, allProcessIds["test-shard-1"], deploymentState.ProcessIds["test-shard-1"]) + + // Test 4: Retrieve process IDs (simulating migration scenario) + retrievedConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-configsrv", deploymentState) + assert.Equal(t, allProcessIds["test-configsrv"], retrievedConfigIds) + + retrievedShard0Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", deploymentState) + assert.Equal(t, allProcessIds["test-shard-0"], retrievedShard0Ids) + + retrievedShard1Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-1", deploymentState) + assert.Equal(t, allProcessIds["test-shard-1"], retrievedShard1Ids) + + // Test 5: Retrieve non-existent replica set (should return empty map) + emptyIds := getReplicaSetProcessIdsFromDeploymentState("non-existent", deploymentState) + assert.NotNil(t, emptyIds) + assert.Equal(t, 0, len(emptyIds)) + }) + + t.Run("TestProcessIdPersistence_EdgeCases", func(t *testing.T) { + deploymentState := &ShardedClusterDeploymentState{} + + // Should return empty map for nil ReplicaSetToProcessIds + emptyIds := getReplicaSetProcessIdsFromDeploymentState("test-rs", deploymentState) + assert.NotNil(t, emptyIds) + assert.Equal(t, 0, len(emptyIds)) + + // Should initialize ReplicaSetToProcessIds when saving + allProcessIds := om.ReplicaSetToProcessIds{ + "test-rs": {"test-process": 1}, + } + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState) + assert.NotNil(t, deploymentState.ProcessIds) + assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) + + // Test edge case: empty process IDs (should not save) + emptyProcessIds := om.ReplicaSetToProcessIds{} + saveReplicaSetProcessIdsToDeploymentState(emptyProcessIds, deploymentState) + // Should still have the original data + assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) + }) + + t.Run("TestProcessIdPersistence_JSONSerialization", func(t *testing.T) { + // Test JSON serialization/deserialization (StateStore compatibility) + originalState := NewShardedClusterDeploymentState() + + // Add test data + allProcessIds := om.ReplicaSetToProcessIds{ + "test-config": { + "config-0": 0, + "config-1": 1, + "config-2": 2, + }, + "test-shard-0": { + "shard-0-0": 3, + "shard-0-1": 4, + }, + } + + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, originalState) + + // Serialize to JSON + jsonData, err := json.Marshal(originalState) + require.NoError(t, err) + assert.Contains(t, string(jsonData), "processIds") + assert.Contains(t, string(jsonData), "test-config") + assert.Contains(t, string(jsonData), "test-shard-0") + + // Deserialize from JSON + var restoredState ShardedClusterDeploymentState + err = json.Unmarshal(jsonData, &restoredState) + require.NoError(t, err) + + // Verify restored state + assert.Equal(t, originalState.ProcessIds, restoredState.ProcessIds) + + restoredConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-config", &restoredState) + assert.Equal(t, allProcessIds["test-config"], restoredConfigIds) + + restoredShardIds := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", &restoredState) + assert.Equal(t, allProcessIds["test-shard-0"], restoredShardIds) + }) +} diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py index f662c516e..c8f0b2503 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py @@ -1,10 +1,20 @@ -from kubetester import find_fixture, try_load +from kubetester import ( + create_or_update_configmap, + find_fixture, + random_k8s_name, + read_configmap, + try_load, +) from kubetester.mongodb import MongoDB from kubetester.operator import Operator from kubetester.phase import Phase from pytest import fixture, mark from tests import test_logger -from tests.conftest import get_member_cluster_names +from tests.conftest import ( + get_central_cluster_client, + get_member_cluster_names, + read_deployment_state, +) from tests.multicluster.conftest import cluster_spec_list from tests.multicluster_shardedcluster import ( assert_config_srv_sts_members_count, @@ -127,3 +137,59 @@ def test_assert_stateful_sets_after_scaling(self, sc: MongoDB): assert_shard_sts_members_count(sc, [[0, 2, 1]]) assert_config_srv_sts_members_count(sc, [1, 1, 1]) assert_mongos_sts_members_count(sc, [1, 0, 0]) + + +# From here on, the tests are for verifying that we can change the project of the MongoDB sharded cluster resource +# and that process IDs are correctly persisted during migration scenarios. + + +@fixture(scope="module") +def new_project_configmap(namespace: str) -> str: + """Create a new project configmap to simulate cluster migration scenario.""" + cm = read_configmap(namespace=namespace, name="my-project") + project_name = f"{random_k8s_name('new-project-')}" + return create_or_update_configmap( + namespace=namespace, + name=project_name, + data={ + "baseUrl": cm["baseUrl"], + "projectName": project_name, + "orgId": cm["orgId"], + }, + ) + + +@mark.e2e_multi_cluster_sharded_process_id_persistence +class TestShardedClusterProcessIdPersistence: + """ + Test process ID persistence during cluster migration scenarios. + + This test validates that the sharded cluster controller correctly preserves + process IDs when changing projects (migration scenario) even with non-sequential + member IDs in the replica sets. + """ + + def test_scale_up_first_shard(self, sc: MongoDB): + """Scale up the first shard to create non-sequential member IDs.""" + logger.info("Scaling up first shard to create non-sequential member IDs") + + # Scale up the first shard to 3 members. This will lead to non-sequential member ids in the replicaset. + # Similar to the multi replica set test, this creates a scenario where process IDs are not sequential + sc["spec"]["shard"]["clusterSpecList"][0]["members"] = 3 + sc.update() + + sc.assert_reaches_phase(Phase.Running, timeout=1200) + + def test_change_project(self, sc: MongoDB, new_project_configmap: str): + oldRsMembers = sc.get_automation_config_tester().get_replica_set_members(sc.name) + + sc["spec"]["opsManager"]["configMapRef"]["name"] = new_project_configmap + sc.update() + + sc.assert_abandons_phase(phase=Phase.Running, timeout=1200) + sc.assert_reaches_phase(phase=Phase.Running, timeout=1800) + + newRsMembers = sc.get_automation_config_tester().get_replica_set_members(sc.name) + + # Assert that the replica set member ids have not changed after changing the project. + assert oldRsMembers == newRsMembers