Skip to content

Commit 2bab832

Browse files
committed
operator: fix problem with non-existent cluster leader (#82)
1 parent 1475bc9 commit 2bab832

File tree

4 files changed

+350
-9
lines changed

4 files changed

+350
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1717
### Fixed
1818
- Not working update of replicaset roles
1919
- Not working update of container env vars
20+
- Problem with a non-existent leader of cluster
2021

2122
## [0.0.8] - 2020-12-16
2223

pkg/controller/cluster/cluster_controller.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,21 @@ func SetInstanceUUID(o *corev1.Pod) *corev1.Pod {
8080
return o
8181
}
8282

83+
// Checking for a leader in the cluster Endpoint annotation
84+
func IsLeaderExists(ep *corev1.Endpoints) bool {
85+
leader, ok := ep.Annotations["tarantool.io/leader"]
86+
if !ok || leader == "" {
87+
return false
88+
}
89+
90+
for _, addr := range ep.Subsets[0].Addresses {
91+
if leader == fmt.Sprintf("%s:%s", addr.IP, "8081") {
92+
return true
93+
}
94+
}
95+
return false
96+
}
97+
8398
// Add creates a new Cluster Controller and adds it to the Manager. The Manager will set fields on the Controller
8499
// and Start it when the Manager is Started.
85100
func Add(mgr manager.Manager) error {
@@ -234,14 +249,8 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
234249
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
235250
}
236251

237-
leader, ok := ep.Annotations["tarantool.io/leader"]
238-
if !ok {
239-
if leader == "" {
240-
reqLogger.Info("leader is not elected")
241-
// return reconcile.Result{RequeueAfter: time.Duration(5000 * time.Millisecond)}, nil
242-
}
243-
244-
leader = fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081")
252+
if !IsLeaderExists(ep) {
253+
leader := fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081")
245254

246255
if ep.Annotations == nil {
247256
ep.Annotations = make(map[string]string)
@@ -262,7 +271,11 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
262271
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
263272
}
264273

265-
topologyClient := topology.NewBuiltInTopologyService(topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", leader)), topology.WithClusterID(cluster.GetName()))
274+
topologyClient := topology.NewBuiltInTopologyService(
275+
topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", ep.Annotations["tarantool.io/leader"])),
276+
topology.WithClusterID(cluster.GetName()),
277+
)
278+
266279
for _, sts := range stsList.Items {
267280
for i := 0; i < int(*sts.Spec.Replicas); i++ {
268281
pod := &corev1.Pod{}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package cluster
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
. "github.com/onsi/ginkgo"
9+
. "github.com/onsi/gomega"
10+
11+
helpers "github.com/tarantool/tarantool-operator/test/helpers"
12+
13+
corev1 "k8s.io/api/core/v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
16+
tarantoolv1alpha1 "github.com/tarantool/tarantool-operator/pkg/apis/tarantool/v1alpha1"
17+
18+
"sigs.k8s.io/controller-runtime/pkg/client"
19+
)
20+
21+
var _ = Describe("cluster_controller unit testing", func() {
22+
var (
23+
namespace = "default"
24+
ctx = context.TODO()
25+
26+
roleName = "" // setup for every spec in hook
27+
rsTemplateName = ""
28+
29+
clusterName = "test"
30+
clusterId = clusterName
31+
32+
defaultRolesToAssign = "[\"A\",\"B\"]"
33+
)
34+
35+
Describe("cluster_controller manage cluster resources", func() {
36+
BeforeEach(func() {
37+
// setup variables for each spec
38+
roleName = fmt.Sprintf("test-role-%s", RandStringRunes(4))
39+
rsTemplateName = fmt.Sprintf("test-rs-%s", RandStringRunes(4))
40+
41+
By("create new Role " + roleName)
42+
role := helpers.NewRole(helpers.RoleParams{
43+
Name: roleName,
44+
Namespace: namespace,
45+
RolesToAssign: defaultRolesToAssign,
46+
RsNum: int32(1),
47+
RsTemplateName: rsTemplateName,
48+
ClusterId: clusterId,
49+
})
50+
// mock owner reference
51+
role.SetOwnerReferences([]metav1.OwnerReference{
52+
{
53+
APIVersion: "v0",
54+
Kind: "mockRef",
55+
Name: "mockRef",
56+
UID: "-",
57+
},
58+
})
59+
Expect(k8sClient.Create(ctx, &role)).NotTo(HaveOccurred(), "failed to create Role")
60+
61+
By("create new Cluster " + clusterName)
62+
cluster := helpers.NewCluster(helpers.ClusterParams{
63+
Name: clusterName,
64+
Namespace: namespace,
65+
Id: clusterId,
66+
})
67+
Expect(k8sClient.Create(ctx, &cluster)).NotTo(HaveOccurred(), "failed to create Cluster")
68+
})
69+
70+
AfterEach(func() {
71+
By("remove role object " + roleName)
72+
role := &tarantoolv1alpha1.Role{}
73+
Expect(
74+
k8sClient.Get(ctx, client.ObjectKey{Name: roleName, Namespace: namespace}, role),
75+
).NotTo(HaveOccurred(), "failed to get Role")
76+
77+
Expect(k8sClient.Delete(ctx, role)).NotTo(HaveOccurred(), "failed to delete Role")
78+
79+
By("remove Cluster object " + clusterName)
80+
cluster := &tarantoolv1alpha1.Cluster{}
81+
Expect(
82+
k8sClient.Get(ctx, client.ObjectKey{Name: clusterName, Namespace: namespace}, cluster),
83+
).NotTo(HaveOccurred(), "failed to get Cluster")
84+
85+
Expect(k8sClient.Delete(ctx, cluster)).NotTo(HaveOccurred(), "failed to delete Cluster")
86+
})
87+
88+
Context("manage cluster leader: tarantool instance accepting admin requests", func() {
89+
BeforeEach(func() {
90+
By("create cluster endpoints")
91+
ep := corev1.Endpoints{
92+
ObjectMeta: metav1.ObjectMeta{
93+
Name: clusterId,
94+
Namespace: namespace,
95+
},
96+
Subsets: []corev1.EndpointSubset{
97+
{
98+
Addresses: []corev1.EndpointAddress{
99+
{IP: "1.1.1.1"},
100+
{IP: "2.2.2.2"},
101+
{IP: "3.3.3.3"},
102+
},
103+
},
104+
},
105+
}
106+
Expect(k8sClient.Create(ctx, &ep)).NotTo(HaveOccurred(), "failed to create cluster endpoints")
107+
})
108+
109+
AfterEach(func() {
110+
ep := corev1.Endpoints{}
111+
Expect(
112+
k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep),
113+
).NotTo(HaveOccurred(), "failed to get cluster endpoints")
114+
115+
Expect(k8sClient.Delete(ctx, &ep)).NotTo(HaveOccurred(), "failed to delete endpoints")
116+
})
117+
118+
It("change the leader if the previous one does not exist", func() {
119+
By("get the chosen leader")
120+
ep := corev1.Endpoints{}
121+
Eventually(
122+
func() bool {
123+
err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep)
124+
if err != nil {
125+
return false
126+
}
127+
128+
if ep.GetAnnotations()["tarantool.io/leader"] != "" {
129+
return true
130+
}
131+
132+
return false
133+
},
134+
time.Second*10, time.Millisecond*500,
135+
).Should(BeTrue())
136+
137+
By("save old leader")
138+
oldLeader := ep.GetAnnotations()["tarantool.io/leader"]
139+
140+
By("set all new IP addresses")
141+
ep.Subsets = []corev1.EndpointSubset{
142+
{
143+
Addresses: []corev1.EndpointAddress{
144+
{IP: "4.4.4.4"},
145+
{IP: "5.5.5.5"},
146+
{IP: "6.6.6.6"},
147+
},
148+
},
149+
}
150+
Expect(k8sClient.Update(ctx, &ep)).NotTo(HaveOccurred(), "failed to update cluster endpoints")
151+
152+
By("check that the leader has changed")
153+
Eventually(
154+
func() bool {
155+
err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep)
156+
if err != nil {
157+
return false
158+
}
159+
160+
if ep.GetAnnotations()["tarantool.io/leader"] != oldLeader {
161+
return true
162+
}
163+
return false
164+
},
165+
time.Second*10, time.Millisecond*500,
166+
).Should(BeTrue())
167+
})
168+
})
169+
})
170+
171+
Describe("cluster_contriller unit testing functions", func() {
172+
Describe("function IsLeaderExists must check for existence of leader in annotation of cluster Endpoints", func() {
173+
Context("positive cases (leader exist)", func() {
174+
It("should return True if leader assigned and exist", func() {
175+
leaderIP := "1.1.1.1"
176+
177+
ep := &corev1.Endpoints{
178+
ObjectMeta: metav1.ObjectMeta{
179+
Name: "name",
180+
Namespace: "namespace",
181+
Annotations: map[string]string{
182+
"tarantool.io/leader": fmt.Sprintf("%s:8081", leaderIP),
183+
},
184+
},
185+
Subsets: []corev1.EndpointSubset{
186+
{
187+
Addresses: []corev1.EndpointAddress{
188+
{IP: leaderIP},
189+
},
190+
},
191+
},
192+
}
193+
Expect(IsLeaderExists(ep)).To(BeTrue())
194+
})
195+
})
196+
197+
Context("negative cases (leader does not exist)", func() {
198+
It("should return False if leader not assigned", func() {
199+
ep := &corev1.Endpoints{
200+
ObjectMeta: metav1.ObjectMeta{
201+
Name: "name",
202+
Namespace: "namespace",
203+
},
204+
}
205+
Expect(IsLeaderExists(ep)).To(BeFalse())
206+
})
207+
208+
It("should return False if leader assigned, but IP not exists", func() {
209+
ep := &corev1.Endpoints{
210+
ObjectMeta: metav1.ObjectMeta{
211+
Name: "name",
212+
Namespace: "namespace",
213+
Annotations: map[string]string{
214+
"tarantool.io/leader": "6.6.6.6:8081",
215+
},
216+
},
217+
Subsets: []corev1.EndpointSubset{
218+
{
219+
Addresses: []corev1.EndpointAddress{
220+
{IP: "0.0.0.0"},
221+
},
222+
},
223+
},
224+
}
225+
Expect(IsLeaderExists(ep)).To(BeFalse())
226+
})
227+
})
228+
})
229+
})
230+
})
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package cluster
2+
3+
import (
4+
"math/rand"
5+
"path/filepath"
6+
"testing"
7+
"time"
8+
9+
. "github.com/onsi/ginkgo"
10+
. "github.com/onsi/gomega"
11+
"github.com/operator-framework/operator-sdk/pkg/log/zap"
12+
13+
"k8s.io/client-go/kubernetes/scheme"
14+
"k8s.io/client-go/rest"
15+
16+
ctrl "sigs.k8s.io/controller-runtime"
17+
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"sigs.k8s.io/controller-runtime/pkg/envtest"
19+
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
20+
// +kubebuilder:scaffold:imports
21+
"github.com/tarantool/tarantool-operator/pkg/apis"
22+
)
23+
24+
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
25+
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
26+
27+
var cfg *rest.Config
28+
var k8sClient client.Client
29+
var testEnv *envtest.Environment
30+
var stopCh chan struct{}
31+
32+
func TestRoleController(t *testing.T) {
33+
RegisterFailHandler(Fail)
34+
35+
RunSpecsWithDefaultAndCustomReporters(t,
36+
"Cluster Controller Suite",
37+
[]Reporter{envtest.NewlineReporter{}})
38+
}
39+
40+
var _ = BeforeSuite(func(done Done) {
41+
logf.SetLogger(zap.LoggerTo(GinkgoWriter))
42+
43+
By("Bootstrapping test environment")
44+
testEnv = &envtest.Environment{
45+
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "ci", "helm-chart", "crds")},
46+
UseExistingCluster: false,
47+
}
48+
49+
var err error
50+
cfg, err = testEnv.Start()
51+
Expect(err).ToNot(HaveOccurred())
52+
Expect(cfg).ToNot(BeNil())
53+
54+
err = apis.AddToScheme(scheme.Scheme)
55+
Expect(err).NotTo(HaveOccurred())
56+
57+
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
58+
Expect(err).ToNot(HaveOccurred())
59+
Expect(k8sClient).ToNot(BeNil())
60+
61+
// create channel for stopping manager
62+
stopCh = make(chan struct{})
63+
64+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{})
65+
Expect(err).NotTo(HaveOccurred(), "failed to create manager")
66+
67+
err = Add(mgr)
68+
Expect(err).NotTo(HaveOccurred(), "failed to setup controller")
69+
70+
go func() {
71+
err = mgr.Start(stopCh)
72+
Expect(err).NotTo(HaveOccurred(), "failed to start manager")
73+
}()
74+
75+
close(done)
76+
}, 60)
77+
78+
var _ = AfterSuite(func() {
79+
close(stopCh)
80+
By("Tearing down the test environment")
81+
err := testEnv.Stop()
82+
Expect(err).ToNot(HaveOccurred())
83+
})
84+
85+
func init() {
86+
rand.Seed(time.Now().UnixNano())
87+
}
88+
89+
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz")
90+
91+
func RandStringRunes(n int) string {
92+
b := make([]rune, n)
93+
for i := range b {
94+
b[i] = letterRunes[rand.Intn(len(letterRunes))]
95+
}
96+
return string(b)
97+
}

0 commit comments

Comments
 (0)