Skip to content

Commit ff2bf71

Browse files
authored
cluster reconcile: set incoming replicasets weight to 1 (#8)
1 parent 30efc51 commit ff2bf71

File tree

3 files changed

+73
-3
lines changed

3 files changed

+73
-3
lines changed

pkg/controller/cluster/cluster_controller.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,50 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
321321
}
322322
}
323323

324+
for _, sts := range stsList.Items {
325+
stsAnnotations := sts.GetAnnotations()
326+
weight, ok := stsAnnotations["tarantool.io/replicaset-weight"]
327+
if ok {
328+
continue
329+
}
330+
331+
if weight == "1" {
332+
continue
333+
}
334+
335+
for i := 0; i < int(*sts.Spec.Replicas); i++ {
336+
pod := &corev1.Pod{}
337+
name := types.NamespacedName{
338+
Namespace: request.Namespace,
339+
Name: fmt.Sprintf("%s-%d", sts.GetName(), i),
340+
}
341+
if err := r.client.Get(context.TODO(), name, pod); err != nil {
342+
if errors.IsNotFound(err) {
343+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
344+
}
345+
346+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
347+
}
348+
if tarantool.IsJoined(pod) == false {
349+
reqLogger.Info("Not all instances joined, skip weight change", "StatefulSet.Name", sts.GetName())
350+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
351+
}
352+
}
353+
354+
if err := topologyClient.SetWeight(sts.GetLabels()["tarantool.io/replicaset-uuid"]); err != nil {
355+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
356+
}
357+
if stsAnnotations == nil {
358+
stsAnnotations = make(map[string]string)
359+
}
360+
361+
stsAnnotations["tarantool.io/replicaset-weight"] = "1"
362+
sts.SetAnnotations(stsAnnotations)
363+
if err := r.client.Update(context.TODO(), &sts); err != nil {
364+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
365+
}
366+
}
367+
324368
if err := topologyClient.BootstrapVshard(); err != nil {
325369
if topology.IsAlreadyBootstrapped(err) {
326370
cluster.Status.State = "Ready"

pkg/controller/role/role_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (r *ReconcileRole) Reconcile(request reconcile.Request) (reconcile.Result,
155155
return reconcile.Result{}, err
156156
}
157157

158+
// ensure num of statefulsets matches user expectations
158159
if len(stsList.Items) > int(*role.Spec.Replicas) {
159160
reqLogger.Info("Role", "more instances", *role.Spec.Replicas)
160161
for i := len(stsList.Items); i > int(*role.Spec.Replicas); i-- {

pkg/topology/builtin.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type ResponseError struct {
1717
Message string `json:"message"`
1818
}
1919
type JoinResponseData struct {
20-
JoinInstance bool `json:"join_instance"`
20+
JoinInstance bool `json:"joinInstanceResponse"`
2121
}
2222
type JoinResponse struct {
2323
Errors []*ResponseError `json:"errors,omitempty"`
@@ -44,14 +44,21 @@ type BuiltInTopologyService struct {
4444
serviceHost string
4545
}
4646

47+
type EditReplicasetResponse struct {
48+
Response bool `json:"editReplicasetResponse"`
49+
}
50+
4751
var (
4852
topologyIsDown = errors.New("topology service is down")
4953
alreadyJoined = errors.New("already joined")
5054
alreadyBootstrapped = errors.New("already bootstrapped")
5155
)
5256

5357
var join_mutation = `mutation do_join_server($uri: String!, $instance_uuid: String!, $replicaset_uuid: String!, $roles: [String!]) {
54-
join_instance: join_server(uri: $uri, instance_uuid: $instance_uuid, replicaset_uuid: $replicaset_uuid, roles: $roles, timeout: 10)
58+
joinInstanceResponse: join_server(uri: $uri, instance_uuid: $instance_uuid, replicaset_uuid: $replicaset_uuid, roles: $roles, timeout: 10)
59+
}`
60+
var edit_rs_mutation = `mutation editReplicaset($uuid: String!, $weight: Float) {
61+
editReplicasetResponse: edit_replicaset(uuid: $uuid, weight: $weight)
5562
}`
5663

5764
func (s *BuiltInTopologyService) Join(pod *corev1.Pod) error {
@@ -113,12 +120,30 @@ func (s *BuiltInTopologyService) Expel(pod *corev1.Pod) error {
113120
}
114121

115122
if resp.Data.ExpelInstance == false && (resp.Errors == nil || len(resp.Errors) == 0) {
116-
return errors.New("Shit happened")
123+
return errors.New("something really bad happened")
117124
}
118125

119126
return nil
120127
}
121128

129+
func (s *BuiltInTopologyService) SetWeight(replicasetUUID string) error {
130+
client := graphql.NewClient(s.serviceHost, graphql.WithHTTPClient(&http.Client{Timeout: time.Duration(time.Second * 5)}))
131+
req := graphql.NewRequest(edit_rs_mutation)
132+
133+
req.Var("uuid", replicasetUUID)
134+
req.Var("weight", 1)
135+
136+
resp := &EditReplicasetResponse{}
137+
if err := client.Run(context.TODO(), req, resp); err != nil {
138+
return err
139+
}
140+
if resp.Response == true {
141+
return nil
142+
}
143+
144+
return errors.New("something really bad happened")
145+
}
146+
122147
func (s *BuiltInTopologyService) BootstrapVshard() error {
123148
req := fmt.Sprint("mutation bootstrap {bootstrapVshardResponse: bootstrap_vshard}")
124149
j := fmt.Sprintf("{\"query\": \"%s\"}", req)

0 commit comments

Comments
 (0)