From 4a078fa0d875c31b9735b14a517cf3fbce54102f Mon Sep 17 00:00:00 2001 From: Nam Nguyen Date: Tue, 2 Sep 2025 14:12:10 +0200 Subject: [PATCH 1/5] add memberid migration case --- .../mongodbshardedcluster_controller.go | 76 +++++- .../mongodbshardedcluster_controller_test.go | 219 +++++++++++++++++- .../multi_cluster_sharded_scaling.py | 170 +++++++++++++- 3 files changed, 458 insertions(+), 7 deletions(-) diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 4b670d6fa..06a3bc8db 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -104,6 +104,10 @@ type ShardedClusterDeploymentState struct { CommonDeploymentState `json:",inline"` LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` Status *mdbv1.MongoDbStatus `json:"status"` + // ProcessIds stores process IDs for replica sets to handle migration scenarios + // where process IDs have length 0. The key is the replica set name, and the value + // is a map of process name to process ID. + ProcessIds map[string]map[string]int `json:"processIds,omitempty"` } // updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost. @@ -126,6 +130,7 @@ func NewShardedClusterDeploymentState() *ShardedClusterDeploymentState { CommonDeploymentState: CommonDeploymentState{ClusterMapping: map[string]int{}}, LastAchievedSpec: &mdbv1.MongoDbSpec{}, Status: &mdbv1.MongoDbStatus{}, + ProcessIds: map[string]map[string]int{}, } } @@ -1923,7 +1928,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c } configSrvProcesses, configSrvMemberOptions := r.createDesiredConfigSrvProcessesAndMemberOptions(configSrvMemberCertPath) - configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment) + configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment, r.deploymentState) // Shards shards := make([]om.ReplicaSetWithProcesses, sc.Spec.ShardCount) @@ -1934,7 +1939,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c shardInternalClusterPaths = append(shardInternalClusterPaths, fmt.Sprintf("%s/%s", util.InternalClusterAuthMountPath, shardOptions.InternalClusterHash)) shardMemberCertPath := fmt.Sprintf("%s/%s", util.TLSCertMountPath, shardOptions.CertificateHash) desiredShardProcesses, desiredShardMemberOptions := r.createDesiredShardProcessesAndMemberOptions(shardIdx, shardMemberCertPath) - shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment) + shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment, r.deploymentState) } // updateOmAuthentication normally takes care of the certfile rotation code, but since sharded-cluster is special pertaining multiple clusterfiles, we code this part here for now. @@ -2039,9 +2044,41 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c return nil, shardsRemoving, workflow.Pending("Performing multi stage reconciliation") } + // Save final process IDs to deployment state for persistence across reconciliation cycles + if err := r.saveFinalProcessIdsToDeploymentState(conn, sc); err != nil { + log.Warnf("Failed to save process IDs to deployment state: %v", err) + // Don't fail the reconciliation for this, just log the warning + } + return finalProcesses, shardsRemoving, workflow.OK() } +// saveFinalProcessIdsToDeploymentState saves the final process IDs from the OM deployment +// to the deployment state for persistence across reconciliation cycles. +func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection, sc *mdbv1.MongoDB) error { + finalDeployment, err := conn.ReadDeployment() + if err != nil { + return err + } + + // Save config server process IDs + configRsProcessIds := getReplicaSetProcessIdsFromReplicaSets(sc.ConfigRsName(), finalDeployment) + if len(configRsProcessIds) > 0 { + saveReplicaSetProcessIdsToDeploymentState(sc.ConfigRsName(), configRsProcessIds, r.deploymentState) + } + + // Save shard process IDs + for shardIdx := 0; shardIdx < sc.Spec.ShardCount; shardIdx++ { + shardRsName := sc.ShardRsName(shardIdx) + shardProcessIds := getReplicaSetProcessIdsFromReplicaSets(shardRsName, finalDeployment) + if len(shardProcessIds) > 0 { + saveReplicaSetProcessIdsToDeploymentState(shardRsName, shardProcessIds, r.deploymentState) + } + } + + return nil +} + func logWarnIgnoredDueToRecovery(log *zap.SugaredLogger, err any) { log.Warnf("ignoring error due to automatic recovery process: %v", err) } @@ -2239,12 +2276,45 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b return processes } +// getReplicaSetProcessIdsFromDeploymentState retrieves process IDs from the deployment state +// when they are empty (length 0), indicating a cluster migration scenario. +func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymentState *ShardedClusterDeploymentState) map[string]int { + if deploymentState.ProcessIds == nil { + return make(map[string]int) + } + + if processIds, ok := deploymentState.ProcessIds[replicaSetName]; ok { + return processIds + } + + return make(map[string]int) +} + +// saveReplicaSetProcessIdsToDeploymentState saves process IDs to the deployment state +// for persistence across reconciliation cycles. +func saveReplicaSetProcessIdsToDeploymentState(replicaSetName string, processIds map[string]int, deploymentState *ShardedClusterDeploymentState) { + if deploymentState.ProcessIds == nil { + deploymentState.ProcessIds = make(map[string]map[string]int) + } + + if len(processIds) > 0 { + deploymentState.ProcessIds[replicaSetName] = processIds + } +} + // buildReplicaSetFromProcesses creates the 'ReplicaSetWithProcesses' with specified processes. This is of use only // for sharded cluster (config server, shards) -func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) { +func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment, deploymentState *ShardedClusterDeploymentState) (om.ReplicaSetWithProcesses, error) { replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion()) existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment) + + // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in deployment state + // A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified. + if len(existingProcessIds) == 0 { + existingProcessIds = getReplicaSetProcessIdsFromDeploymentState(replicaSet.Name(), deploymentState) + } + var rsWithProcesses om.ReplicaSetWithProcesses if mdb.Spec.IsMultiCluster() { // we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards diff --git a/controllers/operator/mongodbshardedcluster_controller_test.go b/controllers/operator/mongodbshardedcluster_controller_test.go index 12ca849b8..9fa15d019 100644 --- a/controllers/operator/mongodbshardedcluster_controller_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_test.go @@ -2,6 +2,7 @@ package operator import ( "context" + "encoding/json" "fmt" "reflect" "strings" @@ -28,6 +29,7 @@ import ( "github.com/mongodb/mongodb-kubernetes/api/v1/status/pvc" "github.com/mongodb/mongodb-kubernetes/controllers/om" "github.com/mongodb/mongodb-kubernetes/controllers/om/backup" + "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/automationconfig" "github.com/mongodb/mongodb-kubernetes/controllers/operator/connection" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct" "github.com/mongodb/mongodb-kubernetes/controllers/operator/controlledfeature" @@ -1643,7 +1645,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) shardSts := construct.DatabaseStatefulSet(*sh, shardOptions, zap.S()) - shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState()) } desiredMongosConfig := createMongosSpec(sh) @@ -1664,7 +1666,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) configSvrSts := construct.DatabaseStatefulSet(*sh, configServerOptions, zap.S()) - configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState()) d := om.NewDeployment() _, err := d.MergeShardedCluster(om.DeploymentShardedClusterMergeOptions{ @@ -1914,3 +1916,216 @@ func generateAllHostsSingleCluster(sc *mdbv1.MongoDB, mongosCount int, configSrv } return allHosts, allPodNames } + +// TestProcessIdPersistence tests the process ID persistence functionality +// for sharded cluster deployments, ensuring process IDs are correctly +// saved to and retrieved from the ShardedClusterDeploymentState. +func TestProcessIdPersistence(t *testing.T) { + t.Run("TestShardedClusterDeploymentState_ProcessIds", func(t *testing.T) { + // Test 1: Create new deployment state + deploymentState := NewShardedClusterDeploymentState() + assert.NotNil(t, deploymentState) + assert.NotNil(t, deploymentState.ProcessIds) + assert.Equal(t, 0, len(deploymentState.ProcessIds)) + + // Test 2: Save process IDs for config server + configServerProcessIds := map[string]int{ + "test-configsrv-0": 0, + "test-configsrv-1": 1, + "test-configsrv-2": 2, + } + saveReplicaSetProcessIdsToDeploymentState("test-configsrv", configServerProcessIds, deploymentState) + assert.Equal(t, 1, len(deploymentState.ProcessIds)) + assert.Equal(t, configServerProcessIds, deploymentState.ProcessIds["test-configsrv"]) + + // Test 3: Save process IDs for multiple shards + shard0ProcessIds := map[string]int{ + "test-shard-0-0": 3, + "test-shard-0-1": 4, + "test-shard-0-2": 5, + } + shard1ProcessIds := map[string]int{ + "test-shard-1-0": 6, + "test-shard-1-1": 7, + } + saveReplicaSetProcessIdsToDeploymentState("test-shard-0", shard0ProcessIds, deploymentState) + saveReplicaSetProcessIdsToDeploymentState("test-shard-1", shard1ProcessIds, deploymentState) + + assert.Equal(t, 3, len(deploymentState.ProcessIds)) + assert.Equal(t, shard0ProcessIds, deploymentState.ProcessIds["test-shard-0"]) + assert.Equal(t, shard1ProcessIds, deploymentState.ProcessIds["test-shard-1"]) + + // Test 4: Retrieve process IDs (simulating migration scenario) + retrievedConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-configsrv", deploymentState) + assert.Equal(t, configServerProcessIds, retrievedConfigIds) + + retrievedShard0Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", deploymentState) + assert.Equal(t, shard0ProcessIds, retrievedShard0Ids) + + retrievedShard1Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-1", deploymentState) + assert.Equal(t, shard1ProcessIds, retrievedShard1Ids) + + // Test 5: Retrieve non-existent replica set (should return empty map) + emptyIds := getReplicaSetProcessIdsFromDeploymentState("non-existent", deploymentState) + assert.NotNil(t, emptyIds) + assert.Equal(t, 0, len(emptyIds)) + }) + + t.Run("TestProcessIdPersistence_EdgeCases", func(t *testing.T) { + // Test edge case: nil deployment state ProcessIds + deploymentState := &ShardedClusterDeploymentState{} + + // Should return empty map for nil ProcessIds + emptyIds := getReplicaSetProcessIdsFromDeploymentState("test-rs", deploymentState) + assert.NotNil(t, emptyIds) + assert.Equal(t, 0, len(emptyIds)) + + // Should initialize ProcessIds when saving + processIds := map[string]int{"test-process": 1} + saveReplicaSetProcessIdsToDeploymentState("test-rs", processIds, deploymentState) + assert.NotNil(t, deploymentState.ProcessIds) + assert.Equal(t, processIds, deploymentState.ProcessIds["test-rs"]) + + // Test edge case: empty process IDs (should not save) + emptyProcessIds := map[string]int{} + saveReplicaSetProcessIdsToDeploymentState("empty-rs", emptyProcessIds, deploymentState) + _, exists := deploymentState.ProcessIds["empty-rs"] + assert.False(t, exists, "Empty process IDs should not be saved") + }) + + t.Run("TestProcessIdPersistence_JSONSerialization", func(t *testing.T) { + // Test JSON serialization/deserialization (StateStore compatibility) + originalState := NewShardedClusterDeploymentState() + + // Add test data + configProcessIds := map[string]int{ + "config-0": 0, + "config-1": 1, + "config-2": 2, + } + shardProcessIds := map[string]int{ + "shard-0-0": 3, + "shard-0-1": 4, + } + + saveReplicaSetProcessIdsToDeploymentState("test-config", configProcessIds, originalState) + saveReplicaSetProcessIdsToDeploymentState("test-shard-0", shardProcessIds, originalState) + + // Serialize to JSON + jsonData, err := json.Marshal(originalState) + require.NoError(t, err) + assert.Contains(t, string(jsonData), "processIds") + assert.Contains(t, string(jsonData), "test-config") + assert.Contains(t, string(jsonData), "test-shard-0") + + // Deserialize from JSON + var restoredState ShardedClusterDeploymentState + err = json.Unmarshal(jsonData, &restoredState) + require.NoError(t, err) + + // Verify restored state + assert.Equal(t, originalState.ProcessIds, restoredState.ProcessIds) + + restoredConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-config", &restoredState) + assert.Equal(t, configProcessIds, restoredConfigIds) + + restoredShardIds := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", &restoredState) + assert.Equal(t, shardProcessIds, restoredShardIds) + }) + + t.Run("TestBuildReplicaSetFromProcesses_WithProcessIdPersistence", func(t *testing.T) { + // Create test MongoDB resource + mdb := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sharded-cluster", + Namespace: "test-namespace", + }, + Spec: mdbv1.MongoDbSpec{ + DbCommonSpec: mdbv1.DbCommonSpec{ + Version: "6.0.0", + Connectivity: &mdbv1.MongoDBConnectivity{}, + }, + MongodbShardedClusterSizeConfig: status.MongodbShardedClusterSizeConfig{ + ShardCount: 2, + }, + }, + } + + // Create test processes + members := []om.Process{ + { + "name": "test-shard-0-0", + "hostname": "test-shard-0-0.test-namespace.svc.cluster.local", + }, + { + "name": "test-shard-0-1", + "hostname": "test-shard-0-1.test-namespace.svc.cluster.local", + }, + } + + memberOptions := []automationconfig.MemberOptions{ + {}, + {}, + } + + // Test case 1: Empty deployment (simulating migration scenario) + emptyDeployment := om.NewDeployment() + deploymentState := NewShardedClusterDeploymentState() + + // Pre-populate deployment state with saved process IDs + savedProcessIds := map[string]int{ + "test-shard-0-0": 10, + "test-shard-0-1": 11, + } + saveReplicaSetProcessIdsToDeploymentState("test-shard-0", savedProcessIds, deploymentState) + + // Build replica set - should use saved process IDs + rsWithProcesses, err := buildReplicaSetFromProcesses( + "test-shard-0", + members, + mdb, + memberOptions, + emptyDeployment, + deploymentState, + ) + + require.NoError(t, err) + assert.Equal(t, "test-shard-0", rsWithProcesses.Rs.Name()) + + // Verify that saved process IDs were used + // Note: This would require access to the internal structure of rsWithProcesses + // In a real test, you might need to verify this through the automation config + // or by checking the deployment after the replica set is added to it. + }) + + t.Run("TestSaveFinalProcessIdsToDeploymentState_MockConnection", func(t *testing.T) { + // Create a simple mock connection that returns an empty deployment + mockConn := &om.MockedOmConnection{} + + // Create test MongoDB resource + sc := &mdbv1.MongoDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sharded-cluster", + Namespace: "test-namespace", + }, + Spec: mdbv1.MongoDbSpec{ + MongodbShardedClusterSizeConfig: status.MongodbShardedClusterSizeConfig{ + ShardCount: 2, + }, + }, + } + + // Create helper with deployment state + helper := &ShardedClusterReconcileHelper{ + sc: sc, + deploymentState: NewShardedClusterDeploymentState(), + } + + // Test saving final process IDs with empty deployment (no error expected) + err := helper.saveFinalProcessIdsToDeploymentState(mockConn, sc) + assert.NoError(t, err) + + // Verify that no process IDs were saved (since deployment is empty) + assert.Equal(t, 0, len(helper.deploymentState.ProcessIds)) + }) +} diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py index f662c516e..4d2bfe507 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py @@ -1,8 +1,9 @@ -from kubetester import find_fixture, try_load +from pytest import fixture, mark + +from kubetester import find_fixture, try_load, create_or_update_configmap, read_configmap, random_k8s_name from kubetester.mongodb import MongoDB from kubetester.operator import Operator from kubetester.phase import Phase -from pytest import fixture, mark from tests import test_logger from tests.conftest import get_member_cluster_names from tests.multicluster.conftest import cluster_spec_list @@ -127,3 +128,168 @@ def test_assert_stateful_sets_after_scaling(self, sc: MongoDB): assert_shard_sts_members_count(sc, [[0, 2, 1]]) assert_config_srv_sts_members_count(sc, [1, 1, 1]) assert_mongos_sts_members_count(sc, [1, 0, 0]) + + +@fixture(scope="module") +def migration_project_configmap(namespace: str) -> str: + """Create a new project configmap to simulate cluster migration scenario.""" + cm = read_configmap(namespace=namespace, name="my-project") + project_name = f"{random_k8s_name('migration-project-')}" + return create_or_update_configmap( + namespace=namespace, + name=project_name, + data={ + "baseUrl": cm["baseUrl"], + "projectName": project_name, + "orgId": cm["orgId"], + }, + ) + + +@fixture(scope="module") +def sc_process_id_persistence(namespace: str, custom_mdb_version: str) -> MongoDB: + """Create a sharded cluster for process ID persistence testing.""" + resource = MongoDB.from_yaml( + find_fixture("sharded-cluster-multi-cluster.yaml"), + namespace=namespace, + name="sh-process-id-test", + ) + + if try_load(resource): + return resource + + resource.set_architecture_annotation() + return resource + + +@mark.e2e_multi_cluster_sharded_process_id_persistence +class TestShardedClusterProcessIdPersistence: + """ + Test process ID persistence during cluster migration scenarios. + + This test class validates that the sharded cluster controller correctly: + 1. Saves process IDs to deployment state after reconciliation + 2. Retrieves process IDs from deployment state when they become empty (migration scenario) + 3. Maintains cluster functionality after process ID restoration + 4. Handles scaling operations correctly after migration + """ + + def test_deploy_operator(self, multi_cluster_operator: Operator): + """Ensure the operator is running before starting tests.""" + multi_cluster_operator.assert_is_running() + + def test_create_initial_cluster(self, sc_process_id_persistence: MongoDB, custom_mdb_version: str, issuer_ca_configmap: str): + """Create initial sharded cluster with basic configuration.""" + logger.info("Creating initial sharded cluster for process ID persistence testing") + + sc = sc_process_id_persistence + sc["spec"]["shard"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [2, 2, 1]) + sc["spec"]["configSrv"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [1, 1, 1]) + sc["spec"]["mongos"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [1, 1, 1]) + + sc.update() + + def test_initial_cluster_running(self, sc_process_id_persistence: MongoDB): + """Wait for initial cluster to reach running state.""" + logger.info("Waiting for initial cluster to reach running state") + sc_process_id_persistence.assert_reaches_phase(Phase.Running, timeout=1200) + + def test_capture_initial_process_ids(self, sc_process_id_persistence: MongoDB): + """Capture initial process IDs and verify they are saved to deployment state.""" + logger.info("Capturing initial process IDs from automation config") + + # Get the automation config to verify process IDs exist + automation_config = sc_process_id_persistence.get_automation_config() + + # Verify that processes exist and have IDs + processes = automation_config.get("processes", []) + assert len(processes) > 0, "No processes found in automation config" + + # Verify config server processes + config_processes = [p for p in processes if "configsrv" in p.get("name", "")] + assert len(config_processes) >= 3, f"Expected at least 3 config server processes, found {len(config_processes)}" + + # Verify shard processes + shard_processes = [p for p in processes if "shard" in p.get("name", "")] + assert len(shard_processes) >= 5, f"Expected at least 5 shard processes, found {len(shard_processes)}" + + # Store process IDs for later verification + self.initial_process_ids = {p["name"]: p.get("processId") for p in processes if p.get("processId") is not None} + logger.info(f"Captured {len(self.initial_process_ids)} initial process IDs") + + def test_simulate_cluster_migration(self, sc_process_id_persistence: MongoDB, migration_project_configmap: str): + """Simulate cluster migration by changing the project configuration.""" + logger.info("Simulating cluster migration by changing project configuration") + + sc = sc_process_id_persistence + + # Change the opsManager configMapRef to simulate migration + original_configmap = sc["spec"]["opsManager"]["configMapRef"]["name"] + sc["spec"]["opsManager"]["configMapRef"]["name"] = migration_project_configmap + + logger.info(f"Changed configMapRef from {original_configmap} to {migration_project_configmap}") + sc.update() + + # Store original configmap for potential restoration + self.original_configmap = original_configmap + + def test_cluster_recovers_after_migration(self, sc_process_id_persistence: MongoDB): + """Verify cluster recovers and reaches running state after migration.""" + logger.info("Waiting for cluster to recover after migration") + + # The cluster should eventually reach running state again + # This may take longer as the controller needs to handle the migration + sc_process_id_persistence.assert_reaches_phase(Phase.Running, timeout=1800) + + def test_verify_process_id_persistence(self, sc_process_id_persistence: MongoDB): + """Verify that process IDs are correctly restored from deployment state.""" + logger.info("Verifying process ID persistence after migration") + + # Get the automation config after migration + automation_config = sc_process_id_persistence.get_automation_config() + processes = automation_config.get("processes", []) + + # Get current process IDs + current_process_ids = {p["name"]: p.get("processId") for p in processes if p.get("processId") is not None} + + logger.info(f"Found {len(current_process_ids)} process IDs after migration") + + # Verify that process IDs are preserved or properly restored + # In a real migration scenario, the process IDs should be restored from deployment state + for process_name in self.initial_process_ids: + if process_name in current_process_ids: + logger.info(f"Process {process_name}: initial ID {self.initial_process_ids[process_name]}, current ID {current_process_ids[process_name]}") + + # Verify cluster is functional by checking that all expected processes exist + config_processes = [p for p in processes if "configsrv" in p.get("name", "")] + shard_processes = [p for p in processes if "shard" in p.get("name", "")] + + assert len(config_processes) >= 3, f"Config server processes missing after migration: {len(config_processes)}" + assert len(shard_processes) >= 5, f"Shard processes missing after migration: {len(shard_processes)}" + + def test_scaling_after_migration(self, sc_process_id_persistence: MongoDB): + """Test that scaling operations work correctly after migration.""" + logger.info("Testing scaling operations after migration") + + sc = sc_process_id_persistence + + # Scale up one of the shards + sc["spec"]["shard"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [3, 2, 1]) + sc.update() + + # Wait for scaling to complete + sc.assert_reaches_phase(Phase.Running, timeout=1200) + + def test_verify_scaling_success(self, sc_process_id_persistence: MongoDB): + """Verify that scaling was successful and cluster is functional.""" + logger.info("Verifying scaling success after migration") + + # Verify automation config correctness + assert_correct_automation_config_after_scaling(sc_process_id_persistence) + + # Verify statefulset member counts + assert_shard_sts_members_count(sc_process_id_persistence, [[3, 2, 1]]) + assert_config_srv_sts_members_count(sc_process_id_persistence, [1, 1, 1]) + assert_mongos_sts_members_count(sc_process_id_persistence, [1, 1, 1]) + + logger.info("Process ID persistence test completed successfully") From b49f584e95f6cc62bd8ac3d129f77889c8deb92f Mon Sep 17 00:00:00 2001 From: Nam Nguyen Date: Tue, 2 Sep 2025 15:46:35 +0200 Subject: [PATCH 2/5] add memberid migration case --- .../mongodbshardedcluster_controller.go | 27 +-- .../mongodbshardedcluster_controller_test.go | 102 +++++------ .../multi_cluster_sharded_scaling.py | 158 +++--------------- 3 files changed, 83 insertions(+), 204 deletions(-) diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 06a3bc8db..5df2a1f65 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -2045,7 +2045,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c } // Save final process IDs to deployment state for persistence across reconciliation cycles - if err := r.saveFinalProcessIdsToDeploymentState(conn, sc); err != nil { + if err := r.saveFinalProcessIdsToDeploymentState(conn); err != nil { log.Warnf("Failed to save process IDs to deployment state: %v", err) // Don't fail the reconciliation for this, just log the warning } @@ -2055,26 +2055,13 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c // saveFinalProcessIdsToDeploymentState saves the final process IDs from the OM deployment // to the deployment state for persistence across reconciliation cycles. -func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection, sc *mdbv1.MongoDB) error { - finalDeployment, err := conn.ReadDeployment() +func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection) error { + finalProcessIds, err := om.GetReplicaSetMemberIds(conn) if err != nil { return err } - // Save config server process IDs - configRsProcessIds := getReplicaSetProcessIdsFromReplicaSets(sc.ConfigRsName(), finalDeployment) - if len(configRsProcessIds) > 0 { - saveReplicaSetProcessIdsToDeploymentState(sc.ConfigRsName(), configRsProcessIds, r.deploymentState) - } - - // Save shard process IDs - for shardIdx := 0; shardIdx < sc.Spec.ShardCount; shardIdx++ { - shardRsName := sc.ShardRsName(shardIdx) - shardProcessIds := getReplicaSetProcessIdsFromReplicaSets(shardRsName, finalDeployment) - if len(shardProcessIds) > 0 { - saveReplicaSetProcessIdsToDeploymentState(shardRsName, shardProcessIds, r.deploymentState) - } - } + saveReplicaSetProcessIdsToDeploymentState(finalProcessIds, r.deploymentState) return nil } @@ -2292,13 +2279,13 @@ func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymen // saveReplicaSetProcessIdsToDeploymentState saves process IDs to the deployment state // for persistence across reconciliation cycles. -func saveReplicaSetProcessIdsToDeploymentState(replicaSetName string, processIds map[string]int, deploymentState *ShardedClusterDeploymentState) { +func saveReplicaSetProcessIdsToDeploymentState(processIds map[string]map[string]int, deploymentState *ShardedClusterDeploymentState) { if deploymentState.ProcessIds == nil { deploymentState.ProcessIds = make(map[string]map[string]int) } if len(processIds) > 0 { - deploymentState.ProcessIds[replicaSetName] = processIds + deploymentState.ProcessIds = processIds } } @@ -2309,7 +2296,7 @@ func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1. existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment) - // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in deployment state + // If there is no configuration saved in OM, it might be a new project, so we check the ids saved in the state map // A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified. if len(existingProcessIds) == 0 { existingProcessIds = getReplicaSetProcessIdsFromDeploymentState(replicaSet.Name(), deploymentState) diff --git a/controllers/operator/mongodbshardedcluster_controller_test.go b/controllers/operator/mongodbshardedcluster_controller_test.go index 9fa15d019..eb407772e 100644 --- a/controllers/operator/mongodbshardedcluster_controller_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_test.go @@ -1928,42 +1928,39 @@ func TestProcessIdPersistence(t *testing.T) { assert.NotNil(t, deploymentState.ProcessIds) assert.Equal(t, 0, len(deploymentState.ProcessIds)) - // Test 2: Save process IDs for config server - configServerProcessIds := map[string]int{ - "test-configsrv-0": 0, - "test-configsrv-1": 1, - "test-configsrv-2": 2, - } - saveReplicaSetProcessIdsToDeploymentState("test-configsrv", configServerProcessIds, deploymentState) - assert.Equal(t, 1, len(deploymentState.ProcessIds)) - assert.Equal(t, configServerProcessIds, deploymentState.ProcessIds["test-configsrv"]) - - // Test 3: Save process IDs for multiple shards - shard0ProcessIds := map[string]int{ - "test-shard-0-0": 3, - "test-shard-0-1": 4, - "test-shard-0-2": 5, - } - shard1ProcessIds := map[string]int{ - "test-shard-1-0": 6, - "test-shard-1-1": 7, + // Test 2: Save process IDs for config server and shards (simulating GetReplicaSetMemberIds result) + allProcessIds := map[string]map[string]int{ + "test-configsrv": { + "test-configsrv-0": 0, + "test-configsrv-1": 1, + "test-configsrv-2": 2, + }, + "test-shard-0": { + "test-shard-0-0": 3, + "test-shard-0-1": 4, + "test-shard-0-2": 5, + }, + "test-shard-1": { + "test-shard-1-0": 6, + "test-shard-1-1": 7, + }, } - saveReplicaSetProcessIdsToDeploymentState("test-shard-0", shard0ProcessIds, deploymentState) - saveReplicaSetProcessIdsToDeploymentState("test-shard-1", shard1ProcessIds, deploymentState) + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState) assert.Equal(t, 3, len(deploymentState.ProcessIds)) - assert.Equal(t, shard0ProcessIds, deploymentState.ProcessIds["test-shard-0"]) - assert.Equal(t, shard1ProcessIds, deploymentState.ProcessIds["test-shard-1"]) + assert.Equal(t, allProcessIds["test-configsrv"], deploymentState.ProcessIds["test-configsrv"]) + assert.Equal(t, allProcessIds["test-shard-0"], deploymentState.ProcessIds["test-shard-0"]) + assert.Equal(t, allProcessIds["test-shard-1"], deploymentState.ProcessIds["test-shard-1"]) // Test 4: Retrieve process IDs (simulating migration scenario) retrievedConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-configsrv", deploymentState) - assert.Equal(t, configServerProcessIds, retrievedConfigIds) + assert.Equal(t, allProcessIds["test-configsrv"], retrievedConfigIds) retrievedShard0Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", deploymentState) - assert.Equal(t, shard0ProcessIds, retrievedShard0Ids) + assert.Equal(t, allProcessIds["test-shard-0"], retrievedShard0Ids) retrievedShard1Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-1", deploymentState) - assert.Equal(t, shard1ProcessIds, retrievedShard1Ids) + assert.Equal(t, allProcessIds["test-shard-1"], retrievedShard1Ids) // Test 5: Retrieve non-existent replica set (should return empty map) emptyIds := getReplicaSetProcessIdsFromDeploymentState("non-existent", deploymentState) @@ -1981,16 +1978,18 @@ func TestProcessIdPersistence(t *testing.T) { assert.Equal(t, 0, len(emptyIds)) // Should initialize ProcessIds when saving - processIds := map[string]int{"test-process": 1} - saveReplicaSetProcessIdsToDeploymentState("test-rs", processIds, deploymentState) + allProcessIds := map[string]map[string]int{ + "test-rs": {"test-process": 1}, + } + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState) assert.NotNil(t, deploymentState.ProcessIds) - assert.Equal(t, processIds, deploymentState.ProcessIds["test-rs"]) + assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) // Test edge case: empty process IDs (should not save) - emptyProcessIds := map[string]int{} - saveReplicaSetProcessIdsToDeploymentState("empty-rs", emptyProcessIds, deploymentState) - _, exists := deploymentState.ProcessIds["empty-rs"] - assert.False(t, exists, "Empty process IDs should not be saved") + emptyProcessIds := map[string]map[string]int{} + saveReplicaSetProcessIdsToDeploymentState(emptyProcessIds, deploymentState) + // Should still have the original data + assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) }) t.Run("TestProcessIdPersistence_JSONSerialization", func(t *testing.T) { @@ -1998,18 +1997,19 @@ func TestProcessIdPersistence(t *testing.T) { originalState := NewShardedClusterDeploymentState() // Add test data - configProcessIds := map[string]int{ - "config-0": 0, - "config-1": 1, - "config-2": 2, - } - shardProcessIds := map[string]int{ - "shard-0-0": 3, - "shard-0-1": 4, + allProcessIds := map[string]map[string]int{ + "test-config": { + "config-0": 0, + "config-1": 1, + "config-2": 2, + }, + "test-shard-0": { + "shard-0-0": 3, + "shard-0-1": 4, + }, } - saveReplicaSetProcessIdsToDeploymentState("test-config", configProcessIds, originalState) - saveReplicaSetProcessIdsToDeploymentState("test-shard-0", shardProcessIds, originalState) + saveReplicaSetProcessIdsToDeploymentState(allProcessIds, originalState) // Serialize to JSON jsonData, err := json.Marshal(originalState) @@ -2027,10 +2027,10 @@ func TestProcessIdPersistence(t *testing.T) { assert.Equal(t, originalState.ProcessIds, restoredState.ProcessIds) restoredConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-config", &restoredState) - assert.Equal(t, configProcessIds, restoredConfigIds) + assert.Equal(t, allProcessIds["test-config"], restoredConfigIds) restoredShardIds := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", &restoredState) - assert.Equal(t, shardProcessIds, restoredShardIds) + assert.Equal(t, allProcessIds["test-shard-0"], restoredShardIds) }) t.Run("TestBuildReplicaSetFromProcesses_WithProcessIdPersistence", func(t *testing.T) { @@ -2073,11 +2073,13 @@ func TestProcessIdPersistence(t *testing.T) { deploymentState := NewShardedClusterDeploymentState() // Pre-populate deployment state with saved process IDs - savedProcessIds := map[string]int{ - "test-shard-0-0": 10, - "test-shard-0-1": 11, + savedProcessIds := map[string]map[string]int{ + "test-shard-0": { + "test-shard-0-0": 10, + "test-shard-0-1": 11, + }, } - saveReplicaSetProcessIdsToDeploymentState("test-shard-0", savedProcessIds, deploymentState) + saveReplicaSetProcessIdsToDeploymentState(savedProcessIds, deploymentState) // Build replica set - should use saved process IDs rsWithProcesses, err := buildReplicaSetFromProcesses( @@ -2122,7 +2124,7 @@ func TestProcessIdPersistence(t *testing.T) { } // Test saving final process IDs with empty deployment (no error expected) - err := helper.saveFinalProcessIdsToDeploymentState(mockConn, sc) + err := helper.saveFinalProcessIdsToDeploymentState(mockConn) assert.NoError(t, err) // Verify that no process IDs were saved (since deployment is empty) diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py index 4d2bfe507..be55de8ee 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py @@ -5,7 +5,7 @@ from kubetester.operator import Operator from kubetester.phase import Phase from tests import test_logger -from tests.conftest import get_member_cluster_names +from tests.conftest import get_member_cluster_names, read_deployment_state, get_central_cluster_client from tests.multicluster.conftest import cluster_spec_list from tests.multicluster_shardedcluster import ( assert_config_srv_sts_members_count, @@ -129,12 +129,15 @@ def test_assert_stateful_sets_after_scaling(self, sc: MongoDB): assert_config_srv_sts_members_count(sc, [1, 1, 1]) assert_mongos_sts_members_count(sc, [1, 0, 0]) +# From here on, the tests are for verifying that we can change the project of the MongoDB sharded cluster resource +# and that process IDs are correctly persisted during migration scenarios. + @fixture(scope="module") -def migration_project_configmap(namespace: str) -> str: +def new_project_configmap(namespace: str) -> str: """Create a new project configmap to simulate cluster migration scenario.""" cm = read_configmap(namespace=namespace, name="my-project") - project_name = f"{random_k8s_name('migration-project-')}" + project_name = f"{random_k8s_name('new-project-')}" return create_or_update_configmap( namespace=namespace, name=project_name, @@ -146,150 +149,37 @@ def migration_project_configmap(namespace: str) -> str: ) -@fixture(scope="module") -def sc_process_id_persistence(namespace: str, custom_mdb_version: str) -> MongoDB: - """Create a sharded cluster for process ID persistence testing.""" - resource = MongoDB.from_yaml( - find_fixture("sharded-cluster-multi-cluster.yaml"), - namespace=namespace, - name="sh-process-id-test", - ) - - if try_load(resource): - return resource - - resource.set_architecture_annotation() - return resource - - @mark.e2e_multi_cluster_sharded_process_id_persistence class TestShardedClusterProcessIdPersistence: """ Test process ID persistence during cluster migration scenarios. - This test class validates that the sharded cluster controller correctly: - 1. Saves process IDs to deployment state after reconciliation - 2. Retrieves process IDs from deployment state when they become empty (migration scenario) - 3. Maintains cluster functionality after process ID restoration - 4. Handles scaling operations correctly after migration + This test validates that the sharded cluster controller correctly preserves + process IDs when changing projects (migration scenario) even with non-sequential + member IDs in the replica sets. """ - def test_deploy_operator(self, multi_cluster_operator: Operator): - """Ensure the operator is running before starting tests.""" - multi_cluster_operator.assert_is_running() - - def test_create_initial_cluster(self, sc_process_id_persistence: MongoDB, custom_mdb_version: str, issuer_ca_configmap: str): - """Create initial sharded cluster with basic configuration.""" - logger.info("Creating initial sharded cluster for process ID persistence testing") - - sc = sc_process_id_persistence - sc["spec"]["shard"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [2, 2, 1]) - sc["spec"]["configSrv"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [1, 1, 1]) - sc["spec"]["mongos"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [1, 1, 1]) - - sc.update() - - def test_initial_cluster_running(self, sc_process_id_persistence: MongoDB): - """Wait for initial cluster to reach running state.""" - logger.info("Waiting for initial cluster to reach running state") - sc_process_id_persistence.assert_reaches_phase(Phase.Running, timeout=1200) + def test_scale_up_first_shard(self, sc: MongoDB): + """Scale up the first shard to create non-sequential member IDs.""" + logger.info("Scaling up first shard to create non-sequential member IDs") - def test_capture_initial_process_ids(self, sc_process_id_persistence: MongoDB): - """Capture initial process IDs and verify they are saved to deployment state.""" - logger.info("Capturing initial process IDs from automation config") - - # Get the automation config to verify process IDs exist - automation_config = sc_process_id_persistence.get_automation_config() - - # Verify that processes exist and have IDs - processes = automation_config.get("processes", []) - assert len(processes) > 0, "No processes found in automation config" - - # Verify config server processes - config_processes = [p for p in processes if "configsrv" in p.get("name", "")] - assert len(config_processes) >= 3, f"Expected at least 3 config server processes, found {len(config_processes)}" - - # Verify shard processes - shard_processes = [p for p in processes if "shard" in p.get("name", "")] - assert len(shard_processes) >= 5, f"Expected at least 5 shard processes, found {len(shard_processes)}" - - # Store process IDs for later verification - self.initial_process_ids = {p["name"]: p.get("processId") for p in processes if p.get("processId") is not None} - logger.info(f"Captured {len(self.initial_process_ids)} initial process IDs") - - def test_simulate_cluster_migration(self, sc_process_id_persistence: MongoDB, migration_project_configmap: str): - """Simulate cluster migration by changing the project configuration.""" - logger.info("Simulating cluster migration by changing project configuration") - - sc = sc_process_id_persistence - - # Change the opsManager configMapRef to simulate migration - original_configmap = sc["spec"]["opsManager"]["configMapRef"]["name"] - sc["spec"]["opsManager"]["configMapRef"]["name"] = migration_project_configmap - - logger.info(f"Changed configMapRef from {original_configmap} to {migration_project_configmap}") + # Scale up the first shard to 3 members. This will lead to non-sequential member ids in the replicaset. + # Similar to the multi replica set test, this creates a scenario where process IDs are not sequential + sc["spec"]["shard"]["clusterSpecList"][0]["members"] = 3 sc.update() - # Store original configmap for potential restoration - self.original_configmap = original_configmap - - def test_cluster_recovers_after_migration(self, sc_process_id_persistence: MongoDB): - """Verify cluster recovers and reaches running state after migration.""" - logger.info("Waiting for cluster to recover after migration") - - # The cluster should eventually reach running state again - # This may take longer as the controller needs to handle the migration - sc_process_id_persistence.assert_reaches_phase(Phase.Running, timeout=1800) - - def test_verify_process_id_persistence(self, sc_process_id_persistence: MongoDB): - """Verify that process IDs are correctly restored from deployment state.""" - logger.info("Verifying process ID persistence after migration") - - # Get the automation config after migration - automation_config = sc_process_id_persistence.get_automation_config() - processes = automation_config.get("processes", []) - - # Get current process IDs - current_process_ids = {p["name"]: p.get("processId") for p in processes if p.get("processId") is not None} - - logger.info(f"Found {len(current_process_ids)} process IDs after migration") - - # Verify that process IDs are preserved or properly restored - # In a real migration scenario, the process IDs should be restored from deployment state - for process_name in self.initial_process_ids: - if process_name in current_process_ids: - logger.info(f"Process {process_name}: initial ID {self.initial_process_ids[process_name]}, current ID {current_process_ids[process_name]}") - - # Verify cluster is functional by checking that all expected processes exist - config_processes = [p for p in processes if "configsrv" in p.get("name", "")] - shard_processes = [p for p in processes if "shard" in p.get("name", "")] - - assert len(config_processes) >= 3, f"Config server processes missing after migration: {len(config_processes)}" - assert len(shard_processes) >= 5, f"Shard processes missing after migration: {len(shard_processes)}" - - def test_scaling_after_migration(self, sc_process_id_persistence: MongoDB): - """Test that scaling operations work correctly after migration.""" - logger.info("Testing scaling operations after migration") + sc.assert_reaches_phase(Phase.Running, timeout=1200) - sc = sc_process_id_persistence + def test_change_project(self, sc: MongoDB, new_project_configmap: str): + oldRsMembers = sc.get_automation_config_tester().get_replica_set_members(sc.name) - # Scale up one of the shards - sc["spec"]["shard"]["clusterSpecList"] = cluster_spec_list(get_member_cluster_names(), [3, 2, 1]) + sc["spec"]["opsManager"]["configMapRef"]["name"] = new_project_configmap sc.update() - # Wait for scaling to complete - sc.assert_reaches_phase(Phase.Running, timeout=1200) - - def test_verify_scaling_success(self, sc_process_id_persistence: MongoDB): - """Verify that scaling was successful and cluster is functional.""" - logger.info("Verifying scaling success after migration") - - # Verify automation config correctness - assert_correct_automation_config_after_scaling(sc_process_id_persistence) + sc.assert_abandons_phase(phase=Phase.Running, timeout=1200) + sc.assert_reaches_phase(phase=Phase.Running, timeout=1800) - # Verify statefulset member counts - assert_shard_sts_members_count(sc_process_id_persistence, [[3, 2, 1]]) - assert_config_srv_sts_members_count(sc_process_id_persistence, [1, 1, 1]) - assert_mongos_sts_members_count(sc_process_id_persistence, [1, 1, 1]) + newRsMembers = sc.get_automation_config_tester().get_replica_set_members(sc.name) - logger.info("Process ID persistence test completed successfully") + # Assert that the replica set member ids have not changed after changing the project. + assert oldRsMembers == newRsMembers From 9139ea088f348c4320606d73158c9f5c27953187 Mon Sep 17 00:00:00 2001 From: Nam Nguyen Date: Tue, 2 Sep 2025 16:35:34 +0200 Subject: [PATCH 3/5] add typing --- controllers/om/omclient.go | 10 +- .../mongodbmultireplicaset_controller.go | 2 +- .../mongodbshardedcluster_controller.go | 16 +-- .../mongodbshardedcluster_controller_test.go | 112 +----------------- .../multi_cluster_sharded_scaling.py | 18 ++- 5 files changed, 35 insertions(+), 123 deletions(-) diff --git a/controllers/om/omclient.go b/controllers/om/omclient.go index cbc0c2875..c9853a0ba 100644 --- a/controllers/om/omclient.go +++ b/controllers/om/omclient.go @@ -978,13 +978,19 @@ func (oc *HTTPOmConnection) AddPreferredHostname(agentApiKey string, value strin return nil } -func GetReplicaSetMemberIds(conn Connection) (map[string]map[string]int, error) { +// ProcessNameToId maps process names to their process IDs +type ProcessNameToId map[string]int + +// ReplicaSetToProcessIds maps replica set names to their process mappings +type ReplicaSetToProcessIds map[string]ProcessNameToId + +func GetReplicaSetMemberIds(conn Connection) (ReplicaSetToProcessIds, error) { dep, err := conn.ReadDeployment() if err != nil { return nil, err } - finalProcessIds := make(map[string]map[string]int) + finalProcessIds := make(ReplicaSetToProcessIds) for _, replicaSet := range dep.GetReplicaSets() { finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 013844268..12cd45f47 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -652,7 +652,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti } // saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved. -func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error { +func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds om.ReplicaSetToProcessIds) error { clusterSpecs, err := mrs.GetClusterSpecItems() if err != nil { return err diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 5df2a1f65..8b4680d2b 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -107,7 +107,7 @@ type ShardedClusterDeploymentState struct { // ProcessIds stores process IDs for replica sets to handle migration scenarios // where process IDs have length 0. The key is the replica set name, and the value // is a map of process name to process ID. - ProcessIds map[string]map[string]int `json:"processIds,omitempty"` + ProcessIds om.ReplicaSetToProcessIds `json:"processIds,omitempty"` } // updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost. @@ -130,7 +130,7 @@ func NewShardedClusterDeploymentState() *ShardedClusterDeploymentState { CommonDeploymentState: CommonDeploymentState{ClusterMapping: map[string]int{}}, LastAchievedSpec: &mdbv1.MongoDbSpec{}, Status: &mdbv1.MongoDbStatus{}, - ProcessIds: map[string]map[string]int{}, + ProcessIds: om.ReplicaSetToProcessIds{}, } } @@ -2265,23 +2265,19 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b // getReplicaSetProcessIdsFromDeploymentState retrieves process IDs from the deployment state // when they are empty (length 0), indicating a cluster migration scenario. -func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymentState *ShardedClusterDeploymentState) map[string]int { - if deploymentState.ProcessIds == nil { - return make(map[string]int) - } - +func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymentState *ShardedClusterDeploymentState) om.ProcessNameToId { if processIds, ok := deploymentState.ProcessIds[replicaSetName]; ok { return processIds } - return make(map[string]int) + return om.ProcessNameToId{} } // saveReplicaSetProcessIdsToDeploymentState saves process IDs to the deployment state // for persistence across reconciliation cycles. -func saveReplicaSetProcessIdsToDeploymentState(processIds map[string]map[string]int, deploymentState *ShardedClusterDeploymentState) { +func saveReplicaSetProcessIdsToDeploymentState(processIds om.ReplicaSetToProcessIds, deploymentState *ShardedClusterDeploymentState) { if deploymentState.ProcessIds == nil { - deploymentState.ProcessIds = make(map[string]map[string]int) + deploymentState.ProcessIds = om.ReplicaSetToProcessIds{} } if len(processIds) > 0 { diff --git a/controllers/operator/mongodbshardedcluster_controller_test.go b/controllers/operator/mongodbshardedcluster_controller_test.go index eb407772e..a72329e5f 100644 --- a/controllers/operator/mongodbshardedcluster_controller_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_test.go @@ -29,7 +29,6 @@ import ( "github.com/mongodb/mongodb-kubernetes/api/v1/status/pvc" "github.com/mongodb/mongodb-kubernetes/controllers/om" "github.com/mongodb/mongodb-kubernetes/controllers/om/backup" - "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/automationconfig" "github.com/mongodb/mongodb-kubernetes/controllers/operator/connection" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct" "github.com/mongodb/mongodb-kubernetes/controllers/operator/controlledfeature" @@ -1929,7 +1928,7 @@ func TestProcessIdPersistence(t *testing.T) { assert.Equal(t, 0, len(deploymentState.ProcessIds)) // Test 2: Save process IDs for config server and shards (simulating GetReplicaSetMemberIds result) - allProcessIds := map[string]map[string]int{ + allProcessIds := om.ReplicaSetToProcessIds{ "test-configsrv": { "test-configsrv-0": 0, "test-configsrv-1": 1, @@ -1969,16 +1968,15 @@ func TestProcessIdPersistence(t *testing.T) { }) t.Run("TestProcessIdPersistence_EdgeCases", func(t *testing.T) { - // Test edge case: nil deployment state ProcessIds deploymentState := &ShardedClusterDeploymentState{} - // Should return empty map for nil ProcessIds + // Should return empty map for nil ReplicaSetToProcessIds emptyIds := getReplicaSetProcessIdsFromDeploymentState("test-rs", deploymentState) assert.NotNil(t, emptyIds) assert.Equal(t, 0, len(emptyIds)) - // Should initialize ProcessIds when saving - allProcessIds := map[string]map[string]int{ + // Should initialize ReplicaSetToProcessIds when saving + allProcessIds := om.ReplicaSetToProcessIds{ "test-rs": {"test-process": 1}, } saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState) @@ -1986,7 +1984,7 @@ func TestProcessIdPersistence(t *testing.T) { assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) // Test edge case: empty process IDs (should not save) - emptyProcessIds := map[string]map[string]int{} + emptyProcessIds := om.ReplicaSetToProcessIds{} saveReplicaSetProcessIdsToDeploymentState(emptyProcessIds, deploymentState) // Should still have the original data assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"]) @@ -1997,7 +1995,7 @@ func TestProcessIdPersistence(t *testing.T) { originalState := NewShardedClusterDeploymentState() // Add test data - allProcessIds := map[string]map[string]int{ + allProcessIds := om.ReplicaSetToProcessIds{ "test-config": { "config-0": 0, "config-1": 1, @@ -2032,102 +2030,4 @@ func TestProcessIdPersistence(t *testing.T) { restoredShardIds := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", &restoredState) assert.Equal(t, allProcessIds["test-shard-0"], restoredShardIds) }) - - t.Run("TestBuildReplicaSetFromProcesses_WithProcessIdPersistence", func(t *testing.T) { - // Create test MongoDB resource - mdb := &mdbv1.MongoDB{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sharded-cluster", - Namespace: "test-namespace", - }, - Spec: mdbv1.MongoDbSpec{ - DbCommonSpec: mdbv1.DbCommonSpec{ - Version: "6.0.0", - Connectivity: &mdbv1.MongoDBConnectivity{}, - }, - MongodbShardedClusterSizeConfig: status.MongodbShardedClusterSizeConfig{ - ShardCount: 2, - }, - }, - } - - // Create test processes - members := []om.Process{ - { - "name": "test-shard-0-0", - "hostname": "test-shard-0-0.test-namespace.svc.cluster.local", - }, - { - "name": "test-shard-0-1", - "hostname": "test-shard-0-1.test-namespace.svc.cluster.local", - }, - } - - memberOptions := []automationconfig.MemberOptions{ - {}, - {}, - } - - // Test case 1: Empty deployment (simulating migration scenario) - emptyDeployment := om.NewDeployment() - deploymentState := NewShardedClusterDeploymentState() - - // Pre-populate deployment state with saved process IDs - savedProcessIds := map[string]map[string]int{ - "test-shard-0": { - "test-shard-0-0": 10, - "test-shard-0-1": 11, - }, - } - saveReplicaSetProcessIdsToDeploymentState(savedProcessIds, deploymentState) - - // Build replica set - should use saved process IDs - rsWithProcesses, err := buildReplicaSetFromProcesses( - "test-shard-0", - members, - mdb, - memberOptions, - emptyDeployment, - deploymentState, - ) - - require.NoError(t, err) - assert.Equal(t, "test-shard-0", rsWithProcesses.Rs.Name()) - - // Verify that saved process IDs were used - // Note: This would require access to the internal structure of rsWithProcesses - // In a real test, you might need to verify this through the automation config - // or by checking the deployment after the replica set is added to it. - }) - - t.Run("TestSaveFinalProcessIdsToDeploymentState_MockConnection", func(t *testing.T) { - // Create a simple mock connection that returns an empty deployment - mockConn := &om.MockedOmConnection{} - - // Create test MongoDB resource - sc := &mdbv1.MongoDB{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sharded-cluster", - Namespace: "test-namespace", - }, - Spec: mdbv1.MongoDbSpec{ - MongodbShardedClusterSizeConfig: status.MongodbShardedClusterSizeConfig{ - ShardCount: 2, - }, - }, - } - - // Create helper with deployment state - helper := &ShardedClusterReconcileHelper{ - sc: sc, - deploymentState: NewShardedClusterDeploymentState(), - } - - // Test saving final process IDs with empty deployment (no error expected) - err := helper.saveFinalProcessIdsToDeploymentState(mockConn) - assert.NoError(t, err) - - // Verify that no process IDs were saved (since deployment is empty) - assert.Equal(t, 0, len(helper.deploymentState.ProcessIds)) - }) } diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py index be55de8ee..c8f0b2503 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster_shardedcluster/multi_cluster_sharded_scaling.py @@ -1,11 +1,20 @@ -from pytest import fixture, mark - -from kubetester import find_fixture, try_load, create_or_update_configmap, read_configmap, random_k8s_name +from kubetester import ( + create_or_update_configmap, + find_fixture, + random_k8s_name, + read_configmap, + try_load, +) from kubetester.mongodb import MongoDB from kubetester.operator import Operator from kubetester.phase import Phase +from pytest import fixture, mark from tests import test_logger -from tests.conftest import get_member_cluster_names, read_deployment_state, get_central_cluster_client +from tests.conftest import ( + get_central_cluster_client, + get_member_cluster_names, + read_deployment_state, +) from tests.multicluster.conftest import cluster_spec_list from tests.multicluster_shardedcluster import ( assert_config_srv_sts_members_count, @@ -129,6 +138,7 @@ def test_assert_stateful_sets_after_scaling(self, sc: MongoDB): assert_config_srv_sts_members_count(sc, [1, 1, 1]) assert_mongos_sts_members_count(sc, [1, 0, 0]) + # From here on, the tests are for verifying that we can change the project of the MongoDB sharded cluster resource # and that process IDs are correctly persisted during migration scenarios. From 1e7bfaf625e1fd4af17f781d162a14d22f62029f Mon Sep 17 00:00:00 2001 From: Nam Nguyen Date: Tue, 2 Sep 2025 17:07:30 +0200 Subject: [PATCH 4/5] add changelog --- changelog/20250902_fix_fixing_om_project_migration.md | 8 ++++++++ controllers/operator/mongodbshardedcluster_controller.go | 6 +----- 2 files changed, 9 insertions(+), 5 deletions(-) create mode 100644 changelog/20250902_fix_fixing_om_project_migration.md diff --git a/changelog/20250902_fix_fixing_om_project_migration.md b/changelog/20250902_fix_fixing_om_project_migration.md new file mode 100644 index 000000000..0f5dbf5a2 --- /dev/null +++ b/changelog/20250902_fix_fixing_om_project_migration.md @@ -0,0 +1,8 @@ +--- +title: Fixing om project migration +kind: fix +date: 2025-09-02 +--- + +* Fixed an issue where moving a **MongoDB** sharded cluster resource to a new project (or a new OM instance) would leave the deployment in a failed state. + diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 8b4680d2b..3b5c6e23f 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -105,8 +105,7 @@ type ShardedClusterDeploymentState struct { LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` Status *mdbv1.MongoDbStatus `json:"status"` // ProcessIds stores process IDs for replica sets to handle migration scenarios - // where process IDs have length 0. The key is the replica set name, and the value - // is a map of process name to process ID. + // where process IDs have length 0. ProcessIds om.ReplicaSetToProcessIds `json:"processIds,omitempty"` } @@ -2047,14 +2046,11 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c // Save final process IDs to deployment state for persistence across reconciliation cycles if err := r.saveFinalProcessIdsToDeploymentState(conn); err != nil { log.Warnf("Failed to save process IDs to deployment state: %v", err) - // Don't fail the reconciliation for this, just log the warning } return finalProcesses, shardsRemoving, workflow.OK() } -// saveFinalProcessIdsToDeploymentState saves the final process IDs from the OM deployment -// to the deployment state for persistence across reconciliation cycles. func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection) error { finalProcessIds, err := om.GetReplicaSetMemberIds(conn) if err != nil { From 152d3d88bb7895f8a21ccb3e8607febc2fdccfb6 Mon Sep 17 00:00:00 2001 From: Nam Nguyen Date: Tue, 2 Sep 2025 17:37:30 +0200 Subject: [PATCH 5/5] fix test --- ...odbshardedcluster_controller_multi_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/controllers/operator/mongodbshardedcluster_controller_multi_test.go b/controllers/operator/mongodbshardedcluster_controller_multi_test.go index 158665f3b..dcf698846 100644 --- a/controllers/operator/mongodbshardedcluster_controller_multi_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_multi_test.go @@ -3589,12 +3589,32 @@ func getMultiClusterFQDN(stsName string, namespace string, clusterIdx int, podId func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB) string { lastSpec, _ := sc.GetLastSpec() + + expectedProcessIds := om.ReplicaSetToProcessIds{ + "slaney-0": om.ProcessNameToId{ + "slaney-0-0": 0, + "slaney-0-1": 1, + "slaney-0-2": 2, + }, + "slaney-1": om.ProcessNameToId{ + "slaney-1-0": 0, + "slaney-1-1": 1, + "slaney-1-2": 2, + }, + "slaney-config": om.ProcessNameToId{ + "slaney-config-0": 0, + "slaney-config-1": 1, + "slaney-config-2": 2, + }, + } + expectedState := ShardedClusterDeploymentState{ CommonDeploymentState: CommonDeploymentState{ ClusterMapping: map[string]int{}, }, LastAchievedSpec: lastSpec, Status: &sc.Status, + ProcessIds: expectedProcessIds, } lastSpecBytes, err := json.Marshal(expectedState) require.NoError(t, err)