Skip to content

Commit 0a703fd

Browse files
authored
cluster bridge: instance join ordering guarantees (#5)
1 parent eadbf8e commit 0a703fd

File tree

1 file changed

+65
-36
lines changed

1 file changed

+65
-36
lines changed

pkg/controller/cluster/cluster_controller.go

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
tarantoolv1alpha1 "github.com/tarantool/tarantool-operator/pkg/apis/tarantool/v1alpha1"
1010
"github.com/tarantool/tarantool-operator/pkg/tarantool"
1111
"github.com/tarantool/tarantool-operator/pkg/topology"
12+
appsv1 "k8s.io/api/apps/v1"
1213
corev1 "k8s.io/api/core/v1"
1314
"k8s.io/apimachinery/pkg/api/errors"
1415
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -234,62 +235,90 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
234235
}
235236
}
236237

237-
podList := &corev1.PodList{}
238-
if err := r.client.List(context.TODO(), &client.ListOptions{LabelSelector: clusterSelector}, podList); err != nil {
238+
stsList := &appsv1.StatefulSetList{}
239+
if err := r.client.List(context.TODO(), &client.ListOptions{LabelSelector: clusterSelector}, stsList); err != nil {
239240
if errors.IsNotFound(err) {
240241
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
241242
}
242243

243244
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
244245
}
245246

246-
for _, pod := range podList.Items {
247-
podLogger := reqLogger.WithValues("Pod.Name", pod.GetName())
248-
if HasInstanceUUID(&pod) {
249-
continue
250-
}
251-
podLogger.Info("starting: set instance uuid")
252-
pod = *SetInstanceUUID(&pod)
247+
topologyClient := topology.NewBuiltInTopologyService(topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", leader)))
248+
for _, sts := range stsList.Items {
249+
for i := 0; i < int(*sts.Spec.Replicas); i++ {
250+
pod := &corev1.Pod{}
251+
name := types.NamespacedName{
252+
Namespace: request.Namespace,
253+
Name: fmt.Sprintf("%s-%d", sts.GetName(), i),
254+
}
255+
if err := r.client.Get(context.TODO(), name, pod); err != nil {
256+
if errors.IsNotFound(err) {
257+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
258+
}
253259

254-
if err := r.client.Update(context.TODO(), &pod); err != nil {
255-
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
256-
}
260+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
261+
}
257262

258-
podLogger.Info("success: set instance uuid", "UUID", pod.GetLabels()["tarantool.io/instance-uuid"])
259-
return reconcile.Result{Requeue: true}, nil
260-
}
263+
podLogger := reqLogger.WithValues("Pod.Name", pod.GetName())
264+
if HasInstanceUUID(pod) {
265+
continue
266+
}
267+
podLogger.Info("starting: set instance uuid")
268+
pod = SetInstanceUUID(pod)
261269

262-
topologyClient := topology.NewBuiltInTopologyService(topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", leader)))
263-
for _, pod := range podList.Items {
264-
if tarantool.IsJoined(&pod) {
265-
continue
270+
if err := r.client.Update(context.TODO(), pod); err != nil {
271+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
272+
}
273+
274+
podLogger.Info("success: set instance uuid", "UUID", pod.GetLabels()["tarantool.io/instance-uuid"])
275+
return reconcile.Result{Requeue: true}, nil
266276
}
267277

268-
if err := topologyClient.Join(&pod); err != nil {
269-
if topology.IsAlreadyJoined(err) {
270-
tarantool.MarkJoined(&pod)
271-
if err := r.client.Update(context.TODO(), &pod); err != nil {
272-
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
278+
for i := 0; i < int(*sts.Spec.Replicas); i++ {
279+
pod := &corev1.Pod{}
280+
name := types.NamespacedName{
281+
Namespace: request.Namespace,
282+
Name: fmt.Sprintf("%s-%d", sts.GetName(), i),
283+
}
284+
if err := r.client.Get(context.TODO(), name, pod); err != nil {
285+
if errors.IsNotFound(err) {
286+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
273287
}
274-
reqLogger.Info("Already joined", "Pod.Name", pod.Name)
275-
continue
288+
289+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
276290
}
277291

278-
if topology.IsTopologyDown(err) {
279-
reqLogger.Info("Topology is down", "Pod.Name", pod.Name)
292+
if tarantool.IsJoined(pod) {
280293
continue
281294
}
282295

283-
reqLogger.Error(err, "Join error")
284-
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
285-
} else {
286-
tarantool.MarkJoined(&pod)
287-
if err := r.client.Update(context.TODO(), &pod); err != nil {
288-
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
296+
if err := topologyClient.Join(pod); err != nil {
297+
if topology.IsAlreadyJoined(err) {
298+
tarantool.MarkJoined(pod)
299+
if err := r.client.Update(context.TODO(), pod); err != nil {
300+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
301+
}
302+
reqLogger.Info("Already joined", "Pod.Name", pod.Name)
303+
continue
304+
}
305+
306+
if topology.IsTopologyDown(err) {
307+
reqLogger.Info("Topology is down", "Pod.Name", pod.Name)
308+
continue
309+
}
310+
311+
reqLogger.Error(err, "Join error")
312+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
313+
} else {
314+
tarantool.MarkJoined(pod)
315+
if err := r.client.Update(context.TODO(), pod); err != nil {
316+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
317+
}
289318
}
290-
}
291319

292-
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
320+
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
321+
}
293322
}
294323

295324
if err := topologyClient.BootstrapVshard(); err != nil {

0 commit comments

Comments
 (0)