Skip to content

Commit 70a04c4

Browse files
committed
Merge branch 'master' into bugfix/default-storage-class
2 parents 538ab10 + 5618606 commit 70a04c4

File tree

9 files changed

+286
-16
lines changed

9 files changed

+286
-16
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
//
24+
// Note: This code is added to the standard glog package.
25+
// It has to be here because it needs package level
26+
// access to some members.
27+
// Do not remove this when updating the vendored glog package!
28+
//
29+
30+
package glog
31+
32+
import "strings"
33+
34+
type LogLevel int
35+
36+
const (
37+
// Make sure these constants end up having the same indexes as the severity constants
38+
LogLevelInfo LogLevel = iota
39+
LogLevelWarning
40+
LogLevelError
41+
LogLevelFatal
42+
)
43+
44+
// redirectWriter wraps a callback that is called when data is written to it.
45+
type redirectWriter struct {
46+
cb func(level LogLevel, message string)
47+
level LogLevel
48+
}
49+
50+
func (w *redirectWriter) Flush() error {
51+
return nil
52+
}
53+
54+
func (w *redirectWriter) Sync() error {
55+
return nil
56+
}
57+
58+
func (w *redirectWriter) Write(p []byte) (n int, err error) {
59+
msg := string(p)
60+
if msg[len(msg)-1] == '\n' {
61+
msg = msg[:len(msg)-1]
62+
}
63+
if idx := strings.IndexByte(msg, ']'); idx > 0 {
64+
msg = strings.TrimSpace(msg[idx+1:])
65+
}
66+
w.cb(w.level, msg)
67+
return len(p), nil
68+
}
69+
70+
// RedirectOutput redirects output of the given logging to the given callback.
71+
func (l *loggingT) RedirectOutput(cb func(level LogLevel, message string)) {
72+
l.mu.Lock()
73+
defer l.mu.Unlock()
74+
75+
l.toStderr = false
76+
l.alsoToStderr = false
77+
for i := range logging.file {
78+
logging.file[i] = &redirectWriter{
79+
cb: cb,
80+
level: LogLevel(i),
81+
}
82+
}
83+
return
84+
}
85+
86+
// RedirectOutput redirects output of thestandard logging to the given callback.
87+
func RedirectOutput(cb func(level LogLevel, message string)) {
88+
logging.RedirectOutput(cb)
89+
}

main.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ import (
2929
"net/http"
3030
"os"
3131
"strconv"
32+
"strings"
3233
"time"
3334

3435
"github.com/pkg/errors"
3536
"github.com/prometheus/client_golang/prometheus"
3637
"github.com/rs/zerolog"
3738
"github.com/spf13/cobra"
3839
flag "github.com/spf13/pflag"
40+
appsv1beta2 "k8s.io/api/apps/v1beta2"
3941
"k8s.io/api/core/v1"
4042
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43+
"k8s.io/apimachinery/pkg/runtime"
4144
"k8s.io/client-go/kubernetes"
4245
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
4346
"k8s.io/client-go/tools/record"
@@ -116,31 +119,44 @@ func cmdUsage(cmd *cobra.Command, args []string) {
116119

117120
// Run the operator
118121
func cmdMainRun(cmd *cobra.Command, args []string) {
122+
// Get environment
123+
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
124+
name := os.Getenv(constants.EnvOperatorPodName)
125+
ip := os.Getenv(constants.EnvOperatorPodIP)
126+
127+
// Prepare log service
119128
goflag.CommandLine.Parse([]string{"-logtostderr"})
120129
var err error
121130
logService, err = logging.NewService(logLevel)
122131
if err != nil {
123132
cliLog.Fatal().Err(err).Msg("Failed to initialize log service")
124133
}
134+
logService.ConfigureRootLogger(func(log zerolog.Logger) zerolog.Logger {
135+
podNameParts := strings.Split(name, "-")
136+
operatorID := podNameParts[len(podNameParts)-1]
137+
cliLog = cliLog.With().Str("operator-id", operatorID).Logger()
138+
return log.With().Str("operator-id", operatorID).Logger()
139+
})
140+
logService.CaptureGLog(logService.MustGetLogger("glog"))
125141

126142
// Check operating mode
127143
if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage {
128144
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage or any combination of these")
129145
}
130146

131147
// Log version
132-
cliLog.Info().Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild)
148+
cliLog.Info().
149+
Str("pod-name", name).
150+
Str("pod-namespace", namespace).
151+
Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild)
133152

134-
// Get environment
135-
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
153+
// Check environment
136154
if len(namespace) == 0 {
137155
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace)
138156
}
139-
name := os.Getenv(constants.EnvOperatorPodName)
140157
if len(name) == 0 {
141158
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName)
142159
}
143-
ip := os.Getenv(constants.EnvOperatorPodIP)
144160
if len(ip) == 0 {
145161
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodIP)
146162
}
@@ -271,5 +287,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
271287
log.Info().Msgf(format, args...)
272288
})
273289
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
274-
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
290+
combinedScheme := runtime.NewScheme()
291+
scheme.AddToScheme(combinedScheme)
292+
v1.AddToScheme(combinedScheme)
293+
appsv1beta2.AddToScheme(combinedScheme)
294+
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
275295
}

manifests/templates/deployment-replication/rbac.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ rules:
3333
resources: ["nodes"]
3434
verbs: ["get"]
3535
- apiGroups: ["apps"]
36-
resources: ["deployments"]
37-
verbs: ["*"]
36+
resources: ["deployments", "replicasets"]
37+
verbs: ["get"]
3838

3939
---
4040

manifests/templates/deployment/rbac.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ rules:
3030
resources: ["nodes"]
3131
verbs: ["get"]
3232
- apiGroups: ["apps"]
33-
resources: ["deployments"]
34-
verbs: ["*"]
33+
resources: ["deployments", "replicasets"]
34+
verbs: ["get"]
3535
- apiGroups: ["storage.k8s.io"]
3636
resources: ["storageclasses"]
3737
verbs: ["get", "list"]

manifests/templates/storage/rbac.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ rules:
3333
- apiGroups: ["apps"]
3434
resources: ["daemonsets"]
3535
verbs: ["*"]
36+
- apiGroups: ["apps"]
37+
resources: ["deployments", "replicasets"]
38+
verbs: ["get"]
3639
- apiGroups: ["storage.k8s.io"]
3740
resources: ["storageclasses"]
3841
verbs: ["*"]

pkg/logging/logger.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"strings"
2929
"sync"
3030

31+
"github.com/golang/glog"
3132
"github.com/rs/zerolog"
3233
)
3334

@@ -47,6 +48,10 @@ type Service interface {
4748
MustGetLogger(name string) zerolog.Logger
4849
// MustSetLevel sets the log level for the component with given name to given level.
4950
MustSetLevel(name, level string)
51+
// ConfigureRootLogger calls the given callback to modify the root logger.
52+
ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger)
53+
// CaptureGLog configures glog to write to the given logger
54+
CaptureGLog(log zerolog.Logger)
5055
}
5156

5257
// loggingService implements Service
@@ -83,6 +88,32 @@ func NewService(defaultLevel string) (Service, error) {
8388
return s, nil
8489
}
8590

91+
// ConfigureRootLogger calls the given callback to modify the root logger.
92+
func (s *loggingService) ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger) {
93+
s.mutex.Lock()
94+
defer s.mutex.Unlock()
95+
96+
s.rootLog = cb(s.rootLog)
97+
}
98+
99+
// CaptureGLog configures glog to write to the given logger
100+
func (s *loggingService) CaptureGLog(log zerolog.Logger) {
101+
glog.RedirectOutput(func(level glog.LogLevel, msg string) {
102+
var e *zerolog.Event
103+
switch level {
104+
case glog.LogLevelWarning:
105+
e = log.WithLevel(zerolog.WarnLevel)
106+
case glog.LogLevelError:
107+
e = log.WithLevel(zerolog.ErrorLevel)
108+
case glog.LogLevelFatal:
109+
e = log.WithLevel(zerolog.FatalLevel)
110+
default:
111+
e = log.WithLevel(zerolog.InfoLevel)
112+
}
113+
e.Msg(msg)
114+
})
115+
}
116+
86117
// MustGetLogger creates a logger with given name
87118
func (s *loggingService) MustGetLogger(name string) zerolog.Logger {
88119
s.mutex.Lock()

pkg/operator/operator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
105105
// Run the operator
106106
func (o *Operator) Run() {
107107
if o.Config.EnableDeployment {
108-
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment)
108+
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe)
109109
}
110110
if o.Config.EnableDeploymentReplication {
111-
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication)
111+
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe)
112112
}
113113
if o.Config.EnableStorage {
114-
go o.runLeaderElection("arango-storage-operator", o.onStartStorage)
114+
go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe)
115115
}
116116
// Wait until process terminates
117117
<-context.TODO().Done()

pkg/operator/operator_leader.go

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,37 @@
2323
package operator
2424

2525
import (
26+
"fmt"
27+
"os"
2628
"time"
2729

30+
"github.com/rs/zerolog"
31+
"k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/runtime"
2834
"k8s.io/client-go/tools/leaderelection"
2935
"k8s.io/client-go/tools/leaderelection/resourcelock"
36+
37+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
38+
"github.com/arangodb/kube-arangodb/pkg/util/probe"
3039
)
3140

32-
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
41+
// runLeaderElection performs a leader election on a lock with given name in
42+
// the namespace that the operator is deployed in.
43+
// When the leader election is won, the given callback is called.
44+
// When the leader election is was won once, but then the leadership is lost, the process is killed.
45+
// The given ready probe is set, as soon as this process became the leader, or a new leader
46+
// is detected.
47+
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {
3348
namespace := o.Config.Namespace
3449
kubecli := o.Dependencies.KubeCli
3550
log := o.log.With().Str("lock-name", lockName).Logger()
51+
eventTarget := o.getLeaderElectionEventTarget(log)
52+
recordEvent := func(reason, message string) {
53+
if eventTarget != nil {
54+
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
55+
}
56+
}
3657
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
3758
namespace,
3859
lockName,
@@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
5172
RenewDeadline: 10 * time.Second,
5273
RetryPeriod: 2 * time.Second,
5374
Callbacks: leaderelection.LeaderCallbacks{
54-
OnStartedLeading: onStart,
75+
OnStartedLeading: func(stop <-chan struct{}) {
76+
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
77+
readyProbe.SetReady()
78+
onStart(stop)
79+
},
5580
OnStoppedLeading: func() {
56-
log.Info().Msg("Leader election lost")
81+
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
82+
log.Info().Msg("Stop leading. Terminating process")
83+
os.Exit(1)
84+
},
85+
OnNewLeader: func(identity string) {
86+
log.Info().Str("identity", identity).Msg("New leader detected")
87+
readyProbe.SetReady()
5788
},
5889
},
5990
})
6091
}
92+
93+
// getLeaderElectionEventTarget returns the object that leader election related
94+
// events will be added to.
95+
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
96+
ns := o.Config.Namespace
97+
kubecli := o.Dependencies.KubeCli
98+
pods := kubecli.CoreV1().Pods(ns)
99+
log = log.With().Str("pod-name", o.Config.PodName).Logger()
100+
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
101+
if err != nil {
102+
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
103+
return nil
104+
}
105+
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
106+
if err != nil {
107+
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
108+
return pod
109+
}
110+
if rSet == nil {
111+
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
112+
return pod
113+
}
114+
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
115+
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
116+
if err != nil {
117+
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
118+
return rSet
119+
}
120+
if rSet == nil {
121+
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
122+
return rSet
123+
}
124+
return depl
125+
}

0 commit comments

Comments
 (0)