1414package cluster
1515
1616import (
17+ "context"
1718 "errors"
19+ "fmt"
20+
21+ ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
22+ ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
23+ "strconv"
24+
25+ svcapitypes "github.com/aws-controllers-k8s/memorydb-controller/apis/v1alpha1"
26+ svcsdk "github.com/aws/aws-sdk-go/service/memorydb"
1827
1928 ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
2029)
@@ -59,3 +68,197 @@ func isUpdating(r *resource) bool {
5968 status := * r .ko .Status .Status
6069 return status == StatusUpdating
6170}
71+
72+ func (rm * resourceManager ) setShardDetails (
73+ ctx context.Context ,
74+ r * resource ,
75+ ko * svcapitypes.Cluster ,
76+ ) (* svcapitypes.Cluster , error ) {
77+
78+ resp , err := rm .sdkFind (ctx , r )
79+
80+ if err != nil {
81+ return nil , err
82+ }
83+
84+ ko .Status = resp .ko .Status
85+ ko .Spec .NumReplicasPerShard = resp .ko .Spec .NumReplicasPerShard
86+ ko .Spec .NumShards = resp .ko .Spec .NumShards
87+
88+ return ko , nil
89+ }
90+
91+ func (rm * resourceManager ) setAllowedNodeTypeUpdates (
92+ ctx context.Context ,
93+ ko * svcapitypes.Cluster ,
94+ ) error {
95+ if * ko .Status .Status != StatusAvailable {
96+ return nil
97+ }
98+
99+ response , respErr := rm .sdkapi .ListAllowedNodeTypeUpdatesWithContext (ctx , & svcsdk.ListAllowedNodeTypeUpdatesInput {
100+ ClusterName : ko .Spec .Name ,
101+ })
102+ rm .metrics .RecordAPICall ("GET" , "ListAllowedNodeTypeUpdates" , respErr )
103+ if respErr == nil {
104+ ko .Status .AllowedScaleDownNodeTypes = response .ScaleDownNodeTypes
105+ ko .Status .AllowedScaleUpNodeTypes = response .ScaleUpNodeTypes
106+ }
107+
108+ return respErr
109+ }
110+
111+ // validateClusterNeedsUpdate this function's purpose is to requeue if the resource is currently unavailable and
112+ // to validate if resource update is required.
113+ func (rm * resourceManager ) validateClusterNeedsUpdate (
114+ desired * resource ,
115+ latest * resource ,
116+ delta * ackcompare.Delta ,
117+ ) (* resource , error ) {
118+ // requeue if necessary
119+ latestStatus := latest .ko .Status .Status
120+ if latestStatus == nil || * latestStatus != StatusAvailable {
121+ return nil , ackrequeue .NeededAfter (
122+ fmt .Errorf ("cluster cannot be updated as its status is not '%s'" , StatusAvailable ),
123+ ackrequeue .DefaultRequeueAfterDuration )
124+ }
125+
126+ // Set terminal condition when cluster is in create-failed state
127+ if * latestStatus == StatusCreateFailed {
128+ return nil , ackerr .NewTerminalError (fmt .Errorf ("cluster is in '%s' state, cannot be updated" , StatusCreateFailed ))
129+ }
130+
131+ annotations := desired .ko .ObjectMeta .GetAnnotations ()
132+
133+ // Handle asynchronous rollback of NodeType. This can happen due to ICE, InsufficientMemoryException or other
134+ // errors TODO update the error message once we add describe events support
135+ if val , ok := annotations [AnnotationLastRequestedNodeType ]; ok && desired .ko .Spec .NodeType != nil {
136+ if val == * desired .ko .Spec .NodeType && delta .DifferentAt ("Spec.NodeType" ) {
137+ return nil , ackerr .NewTerminalError (errors .New ("cannot update NodeType" ))
138+ }
139+ }
140+
141+ // Handle asynchronous rollback of NumShards. This can happen due to ICE, InsufficientMemoryException or other
142+ // errors TODO update the error message once we add describe events support
143+ if val , ok := annotations [AnnotationLastRequestedNumShards ]; ok && desired .ko .Spec .NumShards != nil {
144+ numShards , err := strconv .ParseInt (val , 10 , 64 )
145+ if err == nil && numShards == * desired .ko .Spec .NumShards && delta .DifferentAt ("Spec.NumShards" ) {
146+ return nil , ackerr .NewTerminalError (errors .New ("cannot update NumShards" ))
147+ }
148+ }
149+
150+ // Handle asynchronous rollback of NumShards. This can happen due to ICE, InsufficientMemoryException or other
151+ // errors TODO update the error message once we add describe events support
152+ if val , ok := annotations [AnnotationLastRequestedNumReplicasPerShard ]; ok && desired .ko .Spec .NumReplicasPerShard != nil {
153+ numReplicasPerShard , err := strconv .ParseInt (val , 10 , 64 )
154+ if err == nil && numReplicasPerShard == * desired .ko .Spec .NumReplicasPerShard && delta .DifferentAt ("Spec.NumReplicasPerShard" ) {
155+ return nil , ackerr .NewTerminalError (errors .New ("cannot update NumReplicasPerShard" ))
156+ }
157+ }
158+
159+ return nil , nil
160+ }
161+
162+ func (rm * resourceManager ) newMemoryDBClusterUploadPayload (
163+ desired * resource ,
164+ latest * resource ,
165+ delta * ackcompare.Delta ,
166+ ) * svcsdk.UpdateClusterInput {
167+ res := & svcsdk.UpdateClusterInput {}
168+
169+ if delta .DifferentAt ("Spec.ACLName" ) && desired .ko .Spec .ACLName != nil {
170+ res .SetACLName (* desired .ko .Spec .ACLName )
171+ }
172+ if desired .ko .Spec .Name != nil {
173+ res .SetClusterName (* desired .ko .Spec .Name )
174+ }
175+ if delta .DifferentAt ("Spec.Description" ) && desired .ko .Spec .Description != nil {
176+ res .SetDescription (* desired .ko .Spec .Description )
177+ }
178+ if delta .DifferentAt ("Spec.MaintenanceWindow" ) && desired .ko .Spec .MaintenanceWindow != nil {
179+ res .SetMaintenanceWindow (* desired .ko .Spec .MaintenanceWindow )
180+ }
181+ if delta .DifferentAt ("Spec.ParameterGroupName" ) && desired .ko .Spec .ParameterGroupName != nil {
182+ res .SetParameterGroupName (* desired .ko .Spec .ParameterGroupName )
183+ }
184+ if delta .DifferentAt ("Spec.SecurityGroupIDs" ) && desired .ko .Spec .SecurityGroupIDs != nil {
185+ f8 := []* string {}
186+ for _ , f8iter := range desired .ko .Spec .SecurityGroupIDs {
187+ var f8elem string
188+ f8elem = * f8iter
189+ f8 = append (f8 , & f8elem )
190+ }
191+ res .SetSecurityGroupIds (f8 )
192+ }
193+ if delta .DifferentAt ("Spec.SnapshotRetentionLimit" ) && desired .ko .Spec .SnapshotRetentionLimit != nil {
194+ res .SetSnapshotRetentionLimit (* desired .ko .Spec .SnapshotRetentionLimit )
195+ }
196+ if delta .DifferentAt ("Spec.SnapshotWindow" ) && desired .ko .Spec .SnapshotWindow != nil {
197+ res .SetSnapshotWindow (* desired .ko .Spec .SnapshotWindow )
198+ }
199+ if delta .DifferentAt ("Spec.SNSTopicARN" ) && desired .ko .Spec .SNSTopicARN != nil {
200+ res .SetSnsTopicArn (* desired .ko .Spec .SNSTopicARN )
201+ }
202+ if delta .DifferentAt ("Spec.SNSTopicStatus" ) && desired .ko .Status .SNSTopicStatus != nil {
203+ res .SetSnsTopicStatus (* desired .ko .Status .SNSTopicStatus )
204+ }
205+
206+ if delta .DifferentAt ("Spec.EngineVersion" ) && desired .ko .Spec .EngineVersion != nil {
207+ res .SetEngineVersion (* desired .ko .Spec .EngineVersion )
208+ }
209+
210+ // Determine if we are trying to scale up an instance
211+ scaleUpUpdate := false
212+ if delta .DifferentAt ("Spec.NodeType" ) && desired .ko .Spec .NodeType != nil {
213+ for _ , instance := range desired .ko .Status .AllowedScaleUpNodeTypes {
214+ if * instance == * desired .ko .Spec .NodeType {
215+ scaleUpUpdate = true
216+ break
217+ }
218+ }
219+ }
220+
221+ // Determine if we are doing scaling up/down along with resharding
222+ if delta .DifferentAt ("Spec.NodeType" ) && desired .ko .Spec .NodeType != nil &&
223+ delta .DifferentAt ("Spec.NumShards" ) {
224+ if latest .ko .Spec .NumShards != nil && desired .ko .Spec .NumShards != nil {
225+ // If we are scaling in, then perform scale up or down it does not matter.
226+ if * latest .ko .Spec .NumShards > * desired .ko .Spec .NumShards {
227+ scaleUpUpdate = true
228+ }
229+ }
230+ }
231+
232+ // This means we are not scaling out, so we can perform scale up/down. Reason we give preference to scale down
233+ // instead of scale in is we perform scale down and update engine version together.
234+ if scaleUpUpdate {
235+ res .SetNodeType (* desired .ko .Spec .NodeType )
236+ }
237+
238+ engineUpgradeOrScaling := delta .DifferentAt ("Spec.EngineVersion" ) || scaleUpUpdate
239+
240+ if ! engineUpgradeOrScaling && delta .DifferentAt ("Spec.NumShards" ) && desired .ko .Spec .NumShards != nil {
241+ shardConfig := & svcsdk.ShardConfigurationRequest {}
242+ shardConfig .SetShardCount (* desired .ko .Spec .NumShards )
243+ res .SetShardConfiguration (shardConfig )
244+ }
245+
246+ reSharding := delta .DifferentAt ("Spec.NumShards" )
247+
248+ // Ensure no resharding would be done in API call
249+ if ! reSharding && delta .DifferentAt ("Spec.NodeType" ) && desired .ko .Spec .NodeType != nil {
250+ res .SetNodeType (* desired .ko .Spec .NodeType )
251+ }
252+
253+ // If no scaling or engine upgrade then perform replica scaling.
254+ engineUpgradeOrScaling = delta .DifferentAt ("Spec.EngineVersion" ) || delta .DifferentAt ("Spec.NodeType" )
255+
256+ if ! engineUpgradeOrScaling && ! reSharding &&
257+ delta .DifferentAt ("Spec.NumReplicasPerShard" ) && desired .ko .Spec .NumReplicasPerShard != nil {
258+ replicaConfig := & svcsdk.ReplicaConfigurationRequest {}
259+ replicaConfig .SetReplicaCount (* desired .ko .Spec .NumReplicasPerShard )
260+ res .SetReplicaConfiguration (replicaConfig )
261+ }
262+
263+ return res
264+ }
0 commit comments