Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/20250902_fix_fixing_om_project_migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Fixing om project migration
kind: fix
date: 2025-09-02
---

* Fixed an issue where moving a **MongoDB** sharded cluster resource to a new project (or a new OM instance) would leave the deployment in a failed state.

10 changes: 8 additions & 2 deletions controllers/om/omclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,13 +978,19 @@ func (oc *HTTPOmConnection) AddPreferredHostname(agentApiKey string, value strin
return nil
}

func GetReplicaSetMemberIds(conn Connection) (map[string]map[string]int, error) {
// ProcessNameToId maps process names to their process IDs
type ProcessNameToId map[string]int

// ReplicaSetToProcessIds maps replica set names to their process mappings
type ReplicaSetToProcessIds map[string]ProcessNameToId

func GetReplicaSetMemberIds(conn Connection) (ReplicaSetToProcessIds, error) {
dep, err := conn.ReadDeployment()
if err != nil {
return nil, err
}

finalProcessIds := make(map[string]map[string]int)
finalProcessIds := make(ReplicaSetToProcessIds)

for _, replicaSet := range dep.GetReplicaSets() {
finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti
}

// saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved.
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error {
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds om.ReplicaSetToProcessIds) error {
clusterSpecs, err := mrs.GetClusterSpecItems()
if err != nil {
return err
Expand Down
55 changes: 52 additions & 3 deletions controllers/operator/mongodbshardedcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type ShardedClusterDeploymentState struct {
CommonDeploymentState `json:",inline"`
LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"`
Status *mdbv1.MongoDbStatus `json:"status"`
// ProcessIds stores process IDs for replica sets to handle migration scenarios
// where process IDs have length 0.
ProcessIds om.ReplicaSetToProcessIds `json:"processIds,omitempty"`
}

// updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost.
Expand All @@ -126,6 +129,7 @@ func NewShardedClusterDeploymentState() *ShardedClusterDeploymentState {
CommonDeploymentState: CommonDeploymentState{ClusterMapping: map[string]int{}},
LastAchievedSpec: &mdbv1.MongoDbSpec{},
Status: &mdbv1.MongoDbStatus{},
ProcessIds: om.ReplicaSetToProcessIds{},
}
}

Expand Down Expand Up @@ -1923,7 +1927,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
}

configSrvProcesses, configSrvMemberOptions := r.createDesiredConfigSrvProcessesAndMemberOptions(configSrvMemberCertPath)
configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment)
configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment, r.deploymentState)

// Shards
shards := make([]om.ReplicaSetWithProcesses, sc.Spec.ShardCount)
Expand All @@ -1934,7 +1938,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
shardInternalClusterPaths = append(shardInternalClusterPaths, fmt.Sprintf("%s/%s", util.InternalClusterAuthMountPath, shardOptions.InternalClusterHash))
shardMemberCertPath := fmt.Sprintf("%s/%s", util.TLSCertMountPath, shardOptions.CertificateHash)
desiredShardProcesses, desiredShardMemberOptions := r.createDesiredShardProcessesAndMemberOptions(shardIdx, shardMemberCertPath)
shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment)
shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment, r.deploymentState)
}

// updateOmAuthentication normally takes care of the certfile rotation code, but since sharded-cluster is special pertaining multiple clusterfiles, we code this part here for now.
Expand Down Expand Up @@ -2039,9 +2043,25 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
return nil, shardsRemoving, workflow.Pending("Performing multi stage reconciliation")
}

// Save final process IDs to deployment state for persistence across reconciliation cycles
if err := r.saveFinalProcessIdsToDeploymentState(conn); err != nil {
log.Warnf("Failed to save process IDs to deployment state: %v", err)
}

return finalProcesses, shardsRemoving, workflow.OK()
}

func (r *ShardedClusterReconcileHelper) saveFinalProcessIdsToDeploymentState(conn om.Connection) error {
finalProcessIds, err := om.GetReplicaSetMemberIds(conn)
if err != nil {
return err
}

saveReplicaSetProcessIdsToDeploymentState(finalProcessIds, r.deploymentState)

return nil
}

func logWarnIgnoredDueToRecovery(log *zap.SugaredLogger, err any) {
log.Warnf("ignoring error due to automatic recovery process: %v", err)
}
Expand Down Expand Up @@ -2239,12 +2259,41 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b
return processes
}

// getReplicaSetProcessIdsFromDeploymentState retrieves process IDs from the deployment state
// when they are empty (length 0), indicating a cluster migration scenario.
func getReplicaSetProcessIdsFromDeploymentState(replicaSetName string, deploymentState *ShardedClusterDeploymentState) om.ProcessNameToId {
if processIds, ok := deploymentState.ProcessIds[replicaSetName]; ok {
return processIds
}

return om.ProcessNameToId{}
}

// saveReplicaSetProcessIdsToDeploymentState saves process IDs to the deployment state
// for persistence across reconciliation cycles.
func saveReplicaSetProcessIdsToDeploymentState(processIds om.ReplicaSetToProcessIds, deploymentState *ShardedClusterDeploymentState) {
if deploymentState.ProcessIds == nil {
deploymentState.ProcessIds = om.ReplicaSetToProcessIds{}
}

if len(processIds) > 0 {
deploymentState.ProcessIds = processIds
}
}

// buildReplicaSetFromProcesses creates the 'ReplicaSetWithProcesses' with specified processes. This is of use only
// for sharded cluster (config server, shards)
func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) {
func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment, deploymentState *ShardedClusterDeploymentState) (om.ReplicaSetWithProcesses, error) {
replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion())

existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment)

// If there is no configuration saved in OM, it might be a new project, so we check the ids saved in the state map
// A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified.
if len(existingProcessIds) == 0 {
existingProcessIds = getReplicaSetProcessIdsFromDeploymentState(replicaSet.Name(), deploymentState)
}

var rsWithProcesses om.ReplicaSetWithProcesses
if mdb.Spec.IsMultiCluster() {
// we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3589,12 +3589,32 @@ func getMultiClusterFQDN(stsName string, namespace string, clusterIdx int, podId

func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB) string {
lastSpec, _ := sc.GetLastSpec()

expectedProcessIds := om.ReplicaSetToProcessIds{
"slaney-0": om.ProcessNameToId{
"slaney-0-0": 0,
"slaney-0-1": 1,
"slaney-0-2": 2,
},
"slaney-1": om.ProcessNameToId{
"slaney-1-0": 0,
"slaney-1-1": 1,
"slaney-1-2": 2,
},
"slaney-config": om.ProcessNameToId{
"slaney-config-0": 0,
"slaney-config-1": 1,
"slaney-config-2": 2,
},
}

expectedState := ShardedClusterDeploymentState{
CommonDeploymentState: CommonDeploymentState{
ClusterMapping: map[string]int{},
},
LastAchievedSpec: lastSpec,
Status: &sc.Status,
ProcessIds: expectedProcessIds,
}
lastSpecBytes, err := json.Marshal(expectedState)
require.NoError(t, err)
Expand Down
121 changes: 119 additions & 2 deletions controllers/operator/mongodbshardedcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operator

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -1643,7 +1644,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc
construct.GetPodEnvOptions(),
)
shardSts := construct.DatabaseStatefulSet(*sh, shardOptions, zap.S())
shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment())
shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState())
}

desiredMongosConfig := createMongosSpec(sh)
Expand All @@ -1664,7 +1665,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc
construct.GetPodEnvOptions(),
)
configSvrSts := construct.DatabaseStatefulSet(*sh, configServerOptions, zap.S())
configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment())
configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), NewShardedClusterDeploymentState())

d := om.NewDeployment()
_, err := d.MergeShardedCluster(om.DeploymentShardedClusterMergeOptions{
Expand Down Expand Up @@ -1914,3 +1915,119 @@ func generateAllHostsSingleCluster(sc *mdbv1.MongoDB, mongosCount int, configSrv
}
return allHosts, allPodNames
}

// TestProcessIdPersistence tests the process ID persistence functionality
// for sharded cluster deployments, ensuring process IDs are correctly
// saved to and retrieved from the ShardedClusterDeploymentState.
func TestProcessIdPersistence(t *testing.T) {
t.Run("TestShardedClusterDeploymentState_ProcessIds", func(t *testing.T) {
// Test 1: Create new deployment state
deploymentState := NewShardedClusterDeploymentState()
assert.NotNil(t, deploymentState)
assert.NotNil(t, deploymentState.ProcessIds)
assert.Equal(t, 0, len(deploymentState.ProcessIds))

// Test 2: Save process IDs for config server and shards (simulating GetReplicaSetMemberIds result)
allProcessIds := om.ReplicaSetToProcessIds{
"test-configsrv": {
"test-configsrv-0": 0,
"test-configsrv-1": 1,
"test-configsrv-2": 2,
},
"test-shard-0": {
"test-shard-0-0": 3,
"test-shard-0-1": 4,
"test-shard-0-2": 5,
},
"test-shard-1": {
"test-shard-1-0": 6,
"test-shard-1-1": 7,
},
}
saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState)

assert.Equal(t, 3, len(deploymentState.ProcessIds))
assert.Equal(t, allProcessIds["test-configsrv"], deploymentState.ProcessIds["test-configsrv"])
assert.Equal(t, allProcessIds["test-shard-0"], deploymentState.ProcessIds["test-shard-0"])
assert.Equal(t, allProcessIds["test-shard-1"], deploymentState.ProcessIds["test-shard-1"])

// Test 4: Retrieve process IDs (simulating migration scenario)
retrievedConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-configsrv", deploymentState)
assert.Equal(t, allProcessIds["test-configsrv"], retrievedConfigIds)

retrievedShard0Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", deploymentState)
assert.Equal(t, allProcessIds["test-shard-0"], retrievedShard0Ids)

retrievedShard1Ids := getReplicaSetProcessIdsFromDeploymentState("test-shard-1", deploymentState)
assert.Equal(t, allProcessIds["test-shard-1"], retrievedShard1Ids)

// Test 5: Retrieve non-existent replica set (should return empty map)
emptyIds := getReplicaSetProcessIdsFromDeploymentState("non-existent", deploymentState)
assert.NotNil(t, emptyIds)
assert.Equal(t, 0, len(emptyIds))
})

t.Run("TestProcessIdPersistence_EdgeCases", func(t *testing.T) {
deploymentState := &ShardedClusterDeploymentState{}

// Should return empty map for nil ReplicaSetToProcessIds
emptyIds := getReplicaSetProcessIdsFromDeploymentState("test-rs", deploymentState)
assert.NotNil(t, emptyIds)
assert.Equal(t, 0, len(emptyIds))

// Should initialize ReplicaSetToProcessIds when saving
allProcessIds := om.ReplicaSetToProcessIds{
"test-rs": {"test-process": 1},
}
saveReplicaSetProcessIdsToDeploymentState(allProcessIds, deploymentState)
assert.NotNil(t, deploymentState.ProcessIds)
assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"])

// Test edge case: empty process IDs (should not save)
emptyProcessIds := om.ReplicaSetToProcessIds{}
saveReplicaSetProcessIdsToDeploymentState(emptyProcessIds, deploymentState)
// Should still have the original data
assert.Equal(t, allProcessIds["test-rs"], deploymentState.ProcessIds["test-rs"])
})

t.Run("TestProcessIdPersistence_JSONSerialization", func(t *testing.T) {
// Test JSON serialization/deserialization (StateStore compatibility)
originalState := NewShardedClusterDeploymentState()

// Add test data
allProcessIds := om.ReplicaSetToProcessIds{
"test-config": {
"config-0": 0,
"config-1": 1,
"config-2": 2,
},
"test-shard-0": {
"shard-0-0": 3,
"shard-0-1": 4,
},
}

saveReplicaSetProcessIdsToDeploymentState(allProcessIds, originalState)

// Serialize to JSON
jsonData, err := json.Marshal(originalState)
require.NoError(t, err)
assert.Contains(t, string(jsonData), "processIds")
assert.Contains(t, string(jsonData), "test-config")
assert.Contains(t, string(jsonData), "test-shard-0")

// Deserialize from JSON
var restoredState ShardedClusterDeploymentState
err = json.Unmarshal(jsonData, &restoredState)
require.NoError(t, err)

// Verify restored state
assert.Equal(t, originalState.ProcessIds, restoredState.ProcessIds)

restoredConfigIds := getReplicaSetProcessIdsFromDeploymentState("test-config", &restoredState)
assert.Equal(t, allProcessIds["test-config"], restoredConfigIds)

restoredShardIds := getReplicaSetProcessIdsFromDeploymentState("test-shard-0", &restoredState)
assert.Equal(t, allProcessIds["test-shard-0"], restoredShardIds)
})
}
Loading