2323package operator
2424
2525import (
26+ "fmt"
27+ "os"
2628 "time"
2729
30+ "github.com/rs/zerolog"
31+ "k8s.io/api/core/v1"
32+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+ "k8s.io/apimachinery/pkg/runtime"
2834 "k8s.io/client-go/tools/leaderelection"
2935 "k8s.io/client-go/tools/leaderelection/resourcelock"
36+
37+ "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
38+ "github.com/arangodb/kube-arangodb/pkg/util/probe"
3039)
3140
32- func (o * Operator ) runLeaderElection (lockName string , onStart func (stop <- chan struct {})) {
41+ // runLeaderElection performs a leader election on a lock with given name in
42+ // the namespace that the operator is deployed in.
43+ // When the leader election is won, the given callback is called.
44+ // When the leader election is was won once, but then the leadership is lost, the process is killed.
45+ // The given ready probe is set, as soon as this process became the leader, or a new leader
46+ // is detected.
47+ func (o * Operator ) runLeaderElection (lockName string , onStart func (stop <- chan struct {}), readyProbe * probe.ReadyProbe ) {
3348 namespace := o .Config .Namespace
3449 kubecli := o .Dependencies .KubeCli
3550 log := o .log .With ().Str ("lock-name" , lockName ).Logger ()
51+ eventTarget := o .getLeaderElectionEventTarget (log )
52+ recordEvent := func (reason , message string ) {
53+ if eventTarget != nil {
54+ o .Dependencies .EventRecorder .Event (eventTarget , v1 .EventTypeNormal , reason , message )
55+ }
56+ }
3657 rl , err := resourcelock .New (resourcelock .EndpointsResourceLock ,
3758 namespace ,
3859 lockName ,
@@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
5172 RenewDeadline : 10 * time .Second ,
5273 RetryPeriod : 2 * time .Second ,
5374 Callbacks : leaderelection.LeaderCallbacks {
54- OnStartedLeading : onStart ,
75+ OnStartedLeading : func (stop <- chan struct {}) {
76+ recordEvent ("Leader Election Won" , fmt .Sprintf ("Pod %s is running as leader" , o .Config .PodName ))
77+ readyProbe .SetReady ()
78+ onStart (stop )
79+ },
5580 OnStoppedLeading : func () {
56- log .Info ().Msg ("Leader election lost" )
81+ recordEvent ("Stop Leading" , fmt .Sprintf ("Pod %s is stopping to run as leader" , o .Config .PodName ))
82+ log .Info ().Msg ("Stop leading. Terminating process" )
83+ os .Exit (1 )
84+ },
85+ OnNewLeader : func (identity string ) {
86+ log .Info ().Str ("identity" , identity ).Msg ("New leader detected" )
87+ readyProbe .SetReady ()
5788 },
5889 },
5990 })
6091}
92+
93+ // getLeaderElectionEventTarget returns the object that leader election related
94+ // events will be added to.
95+ func (o * Operator ) getLeaderElectionEventTarget (log zerolog.Logger ) runtime.Object {
96+ ns := o .Config .Namespace
97+ kubecli := o .Dependencies .KubeCli
98+ pods := kubecli .CoreV1 ().Pods (ns )
99+ log = log .With ().Str ("pod-name" , o .Config .PodName ).Logger ()
100+ pod , err := pods .Get (o .Config .PodName , metav1.GetOptions {})
101+ if err != nil {
102+ log .Error ().Err (err ).Msg ("Cannot find Pod containing this operator" )
103+ return nil
104+ }
105+ rSet , err := k8sutil .GetPodOwner (kubecli , pod , ns )
106+ if err != nil {
107+ log .Error ().Err (err ).Msg ("Cannot find ReplicaSet owning the Pod containing this operator" )
108+ return pod
109+ }
110+ if rSet == nil {
111+ log .Error ().Msg ("Pod containing this operator has no ReplicaSet owner" )
112+ return pod
113+ }
114+ log = log .With ().Str ("replicaSet-name" , rSet .Name ).Logger ()
115+ depl , err := k8sutil .GetReplicaSetOwner (kubecli , rSet , ns )
116+ if err != nil {
117+ log .Error ().Err (err ).Msg ("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator" )
118+ return rSet
119+ }
120+ if rSet == nil {
121+ log .Error ().Msg ("ReplicaSet that owns the Pod containing this operator has no Deployment owner" )
122+ return rSet
123+ }
124+ return depl
125+ }
0 commit comments