Skip to content

Commit c914e4c

Browse files
committed
Report leader event in owner resource
1 parent a924758 commit c914e4c

File tree

6 files changed

+135
-7
lines changed

6 files changed

+135
-7
lines changed

main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ import (
3636
"github.com/rs/zerolog"
3737
"github.com/spf13/cobra"
3838
flag "github.com/spf13/pflag"
39+
appsv1beta2 "k8s.io/api/apps/v1beta2"
3940
"k8s.io/api/core/v1"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42+
"k8s.io/apimachinery/pkg/runtime"
4143
"k8s.io/client-go/kubernetes"
4244
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
4345
"k8s.io/client-go/tools/record"
@@ -271,5 +273,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
271273
log.Info().Msgf(format, args...)
272274
})
273275
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
274-
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
276+
combinedScheme := runtime.NewScheme()
277+
scheme.AddToScheme(combinedScheme)
278+
v1.AddToScheme(combinedScheme)
279+
appsv1beta2.AddToScheme(combinedScheme)
280+
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
275281
}

manifests/templates/deployment-replication/rbac.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ rules:
3333
resources: ["nodes"]
3434
verbs: ["get"]
3535
- apiGroups: ["apps"]
36-
resources: ["deployments"]
37-
verbs: ["*"]
36+
resources: ["deployments", "replicasets"]
37+
verbs: ["get"]
3838

3939
---
4040

manifests/templates/deployment/rbac.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ rules:
3030
resources: ["nodes"]
3131
verbs: ["get"]
3232
- apiGroups: ["apps"]
33-
resources: ["deployments"]
34-
verbs: ["*"]
33+
resources: ["deployments", "replicasets"]
34+
verbs: ["get"]
3535
- apiGroups: ["storage.k8s.io"]
3636
resources: ["storageclasses"]
3737
verbs: ["get", "list"]

manifests/templates/storage/rbac.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ rules:
3333
- apiGroups: ["apps"]
3434
resources: ["daemonsets"]
3535
verbs: ["*"]
36+
- apiGroups: ["apps"]
37+
resources: ["deployments", "replicasets"]
38+
verbs: ["get"]
3639
- apiGroups: ["storage.k8s.io"]
3740
resources: ["storageclasses"]
3841
verbs: ["*"]

pkg/operator/operator_leader.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,34 @@
2323
package operator
2424

2525
import (
26+
"fmt"
27+
"os"
2628
"time"
2729

30+
"github.com/rs/zerolog"
31+
"k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/runtime"
2834
"k8s.io/client-go/tools/leaderelection"
2935
"k8s.io/client-go/tools/leaderelection/resourcelock"
36+
37+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
3038
)
3139

40+
// runLeaderElection performs a leader election on a lock with given name in
41+
// the namespace that the operator is deployed in.
42+
// When the leader election is won, the given callback is called.
43+
// When the leader election is lost (even after it was won once), the process is killed.
3244
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
3345
namespace := o.Config.Namespace
3446
kubecli := o.Dependencies.KubeCli
3547
log := o.log.With().Str("lock-name", lockName).Logger()
48+
eventTarget := o.getLeaderElectionEventTarget(log)
49+
recordEvent := func(reason, message string) {
50+
if eventTarget != nil {
51+
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
52+
}
53+
}
3654
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
3755
namespace,
3856
lockName,
@@ -51,10 +69,49 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
5169
RenewDeadline: 10 * time.Second,
5270
RetryPeriod: 2 * time.Second,
5371
Callbacks: leaderelection.LeaderCallbacks{
54-
OnStartedLeading: onStart,
72+
OnStartedLeading: func(stop <-chan struct{}) {
73+
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
74+
onStart(stop)
75+
},
5576
OnStoppedLeading: func() {
56-
log.Info().Msg("Leader election lost")
77+
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
78+
log.Info().Msg("Stop leading. Terminating process")
79+
os.Exit(1)
5780
},
5881
},
5982
})
6083
}
84+
85+
// getLeaderElectionEventTarget returns the object that leader election related
86+
// events will be added to.
87+
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
88+
ns := o.Config.Namespace
89+
kubecli := o.Dependencies.KubeCli
90+
pods := kubecli.CoreV1().Pods(ns)
91+
log = log.With().Str("pod-name", o.Config.PodName).Logger()
92+
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
93+
if err != nil {
94+
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
95+
return nil
96+
}
97+
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
98+
if err != nil {
99+
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
100+
return pod
101+
}
102+
if rSet == nil {
103+
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
104+
return pod
105+
}
106+
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
107+
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
108+
if err != nil {
109+
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
110+
return rSet
111+
}
112+
if rSet == nil {
113+
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
114+
return rSet
115+
}
116+
return depl
117+
}

pkg/util/k8sutil/owner.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package k8sutil
24+
25+
import (
26+
"k8s.io/api/apps/v1beta2"
27+
"k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/client-go/kubernetes"
30+
)
31+
32+
// GetPodOwner returns the ReplicaSet that owns the given Pod.
33+
// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned.
34+
func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) {
35+
for _, ref := range pod.GetOwnerReferences() {
36+
if ref.Kind == "ReplicaSet" {
37+
rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace())
38+
rSet, err := rSets.Get(ref.Name, metav1.GetOptions{})
39+
if err != nil {
40+
return nil, maskAny(err)
41+
}
42+
return rSet, nil
43+
}
44+
}
45+
return nil, nil
46+
}
47+
48+
// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet.
49+
// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned.
50+
func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) {
51+
for _, ref := range rSet.GetOwnerReferences() {
52+
if ref.Kind == "Deployment" {
53+
depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace())
54+
depl, err := depls.Get(ref.Name, metav1.GetOptions{})
55+
if err != nil {
56+
return nil, maskAny(err)
57+
}
58+
return depl, nil
59+
}
60+
}
61+
return nil, nil
62+
}

0 commit comments

Comments
 (0)