Skip to content

Commit f071184

Browse files
authored
[Feature] Replace DEP management (#994)
1 parent 42a6816 commit f071184

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1101
-183
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
1313
- (Feature) Add agency leader service
1414
- (Feature) Add HostPath and PVC Volume types and allow templating
15+
- (Feature) Replace mod
1516

1617
## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
1718
- (Feature) Add CoreV1 Endpoints Inspector

pkg/deployment/access_package.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
6565
}
6666

6767
// Remove all access packages that we did build, but are no longer needed
68-
secretList := d.currentState.Secret().V1().ListSimple()
68+
secretList := d.acs.CurrentClusterCache().Secret().V1().ListSimple()
6969
for _, secret := range secretList {
7070
if d.isOwnerOf(secret) {
7171
if _, found := secret.Data[constants.SecretAccessPackageYaml]; found {
@@ -100,7 +100,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
100100
log := d.deps.Log
101101
spec := d.apiObject.Spec
102102

103-
_, err := d.currentState.Secret().V1().Read().Get(ctx, apSecretName, metav1.GetOptions{})
103+
_, err := d.acs.CurrentClusterCache().Secret().V1().Read().Get(ctx, apSecretName, metav1.GetOptions{})
104104
if err == nil {
105105
// Secret already exists
106106
return nil
@@ -111,15 +111,15 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
111111

112112
// Fetch client authentication CA
113113
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
114-
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctx, d.currentState.Secret().V1().Read(), clientAuthSecretName, nil)
114+
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctx, d.acs.CurrentClusterCache().Secret().V1().Read(), clientAuthSecretName, nil)
115115
if err != nil {
116116
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
117117
return errors.WithStack(err)
118118
}
119119

120120
// Fetch TLS CA public key
121121
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
122-
tlsCACert, err := k8sutil.GetCACertficateSecret(ctx, d.currentState.Secret().V1().Read(), tlsCASecretName)
122+
tlsCACert, err := k8sutil.GetCACertficateSecret(ctx, d.acs.CurrentClusterCache().Secret().V1().Read(), tlsCASecretName)
123123
if err != nil {
124124
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
125125
return errors.WithStack(err)

pkg/deployment/context_impl.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -624,37 +624,37 @@ func (d *Deployment) WithStatusUpdate(ctx context.Context, action reconciler.Dep
624624
}
625625

626626
func (d *Deployment) SecretsModInterface() secretv1.ModInterface {
627-
d.currentState.GetThrottles().Secret().Invalidate()
627+
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
628628
return kclient.NewModInterface(d.deps.Client, d.namespace).Secrets()
629629
}
630630

631631
func (d *Deployment) PodsModInterface() podv1.ModInterface {
632-
d.currentState.GetThrottles().Pod().Invalidate()
632+
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
633633
return kclient.NewModInterface(d.deps.Client, d.namespace).Pods()
634634
}
635635

636636
func (d *Deployment) ServiceAccountsModInterface() serviceaccountv1.ModInterface {
637-
d.currentState.GetThrottles().ServiceAccount().Invalidate()
637+
d.acs.CurrentClusterCache().GetThrottles().ServiceAccount().Invalidate()
638638
return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceAccounts()
639639
}
640640

641641
func (d *Deployment) ServicesModInterface() servicev1.ModInterface {
642-
d.currentState.GetThrottles().Service().Invalidate()
642+
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
643643
return kclient.NewModInterface(d.deps.Client, d.namespace).Services()
644644
}
645645

646646
func (d *Deployment) PersistentVolumeClaimsModInterface() persistentvolumeclaimv1.ModInterface {
647-
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
647+
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
648648
return kclient.NewModInterface(d.deps.Client, d.namespace).PersistentVolumeClaims()
649649
}
650650

651651
func (d *Deployment) PodDisruptionBudgetsModInterface() poddisruptionbudgetv1beta1.ModInterface {
652-
d.currentState.GetThrottles().PodDisruptionBudget().Invalidate()
652+
d.acs.CurrentClusterCache().GetThrottles().PodDisruptionBudget().Invalidate()
653653
return kclient.NewModInterface(d.deps.Client, d.namespace).PodDisruptionBudgets()
654654
}
655655

656656
func (d *Deployment) ServiceMonitorsModInterface() servicemonitorv1.ModInterface {
657-
d.currentState.GetThrottles().ServiceMonitor().Invalidate()
657+
d.acs.CurrentClusterCache().GetThrottles().ServiceMonitor().Invalidate()
658658
return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceMonitors()
659659
}
660660

@@ -679,7 +679,7 @@ func (d *Deployment) GetOwnedPods(ctx context.Context) ([]core.Pod, error) {
679679
}
680680

681681
func (d *Deployment) GetCachedStatus() inspectorInterface.Inspector {
682-
return d.currentState
682+
return d.acs.CurrentClusterCache()
683683
}
684684

685685
func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error {

pkg/deployment/deployment.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ type Deployment struct {
123123
inspectCRDTrigger trigger.Trigger
124124
updateDeploymentTrigger trigger.Trigger
125125
clientCache deploymentClient.Cache
126-
currentState inspectorInterface.Inspector
127126
agencyCache agency.Cache
128127
recentInspectionErrors int
129128
clusterScalingIntegration *clusterScalingIntegration
@@ -143,7 +142,7 @@ func (d *Deployment) WithArangoMember(cache inspectorInterface.Inspector, timeou
143142
}
144143

145144
func (d *Deployment) WithCurrentArangoMember(name string) reconciler.ArangoMemberModContext {
146-
return d.WithArangoMember(d.currentState, globals.GetGlobals().Timeouts().Kubernetes().Get(), name)
145+
return d.WithArangoMember(d.acs.CurrentClusterCache(), globals.GetGlobals().Timeouts().Kubernetes().Get(), name)
147146
}
148147

149148
func (d *Deployment) GetMembersState() memberState.StateInspector {
@@ -227,16 +226,15 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
227226
i := inspector.NewInspector(inspector.NewDefaultThrottle(), deps.Client, apiObject.GetNamespace(), apiObject.GetName())
228227

229228
d := &Deployment{
230-
apiObject: apiObject,
231-
name: apiObject.GetName(),
232-
namespace: apiObject.GetNamespace(),
233-
config: config,
234-
deps: deps,
235-
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
236-
stopCh: make(chan struct{}),
237-
agencyCache: agency.NewCache(apiObject.Spec.Mode),
238-
currentState: i,
239-
acs: acs.NewACS(apiObject.GetUID(), i),
229+
apiObject: apiObject,
230+
name: apiObject.GetName(),
231+
namespace: apiObject.GetNamespace(),
232+
config: config,
233+
deps: deps,
234+
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
235+
stopCh: make(chan struct{}),
236+
agencyCache: agency.NewCache(apiObject.Spec.Mode),
237+
acs: acs.NewACS(apiObject.GetUID(), i),
240238
}
241239

242240
d.memberState = memberState.NewStateInspector(d)
@@ -348,7 +346,7 @@ func (d *Deployment) run() {
348346
for {
349347
select {
350348
case <-d.stopCh:
351-
err := d.currentState.Refresh(context.Background())
349+
err := d.acs.CurrentClusterCache().Refresh(context.Background())
352350
if err != nil {
353351
log.Error().Err(err).Msg("Unable to get resources")
354352
}
@@ -596,7 +594,7 @@ func (d *Deployment) isOwnerOf(obj meta.Object) bool {
596594
func (d *Deployment) lookForServiceMonitorCRD() {
597595
var err error
598596
if d.GetScope().IsNamespaced() {
599-
_, err = d.currentState.ServiceMonitor().V1()
597+
_, err = d.acs.CurrentClusterCache().ServiceMonitor().V1()
600598
if k8sutil.IsForbiddenOrNotFound(err) {
601599
return
602600
}

pkg/deployment/deployment_inspector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
7272
deploymentName := d.GetName()
7373
defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start)
7474

75-
err := d.currentState.Refresh(ctxReconciliation)
75+
err := d.acs.CurrentClusterCache().Refresh(ctxReconciliation)
7676
if err != nil {
7777
log.Error().Err(err).Msg("Unable to get resources")
7878
return minInspectionInterval // Retry ASAP
7979
}
8080

8181
// Check deployment still exists
82-
updated, err := d.currentState.GetCurrentArangoDeployment()
82+
updated, err := d.acs.CurrentClusterCache().GetCurrentArangoDeployment()
8383
if k8sutil.IsNotFound(err) {
8484
// Deployment is gone
8585
log.Info().Msg("Deployment is gone")

pkg/deployment/deployment_run_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
6767

6868
errs := 0
6969
for {
70-
require.NoError(t, d.currentState.Refresh(context.Background()))
70+
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
7171
err := d.resources.EnsureSecrets(context.Background(), log.Logger, d.GetCachedStatus())
7272
if err == nil {
7373
break
@@ -172,7 +172,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
172172
return err
173173
}
174174

175-
require.NoError(t, d.currentState.Refresh(context.Background()))
175+
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
176176

177177
groupSpec := d.apiObject.Spec.GetServerGroupSpec(group)
178178

@@ -217,7 +217,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
217217
}
218218

219219
// Act
220-
require.NoError(t, d.currentState.Refresh(context.Background()))
220+
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
221221
err = d.resources.EnsurePods(context.Background(), d.GetCachedStatus())
222222

223223
// Assert

pkg/deployment/deployment_suite_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -480,20 +480,21 @@ func createTestDeployment(t *testing.T, config Config, arangoDeployment *api.Ara
480480
Client: kclient.NewStaticClient(kubernetesClientSet, kubernetesExtClientSet, arangoClientSet, monitoringClientSet),
481481
}
482482

483+
i := inspector.NewInspector(throttle.NewAlwaysThrottleComponents(), deps.Client, arangoDeployment.GetNamespace(), arangoDeployment.GetName())
484+
483485
d := &Deployment{
484-
apiObject: arangoDeployment,
485-
name: arangoDeployment.GetName(),
486-
namespace: arangoDeployment.GetNamespace(),
487-
config: config,
488-
deps: deps,
489-
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
490-
stopCh: make(chan struct{}),
491-
currentState: inspector.NewInspector(throttle.NewAlwaysThrottleComponents(), deps.Client, arangoDeployment.GetNamespace(), arangoDeployment.GetName()),
486+
apiObject: arangoDeployment,
487+
name: arangoDeployment.GetName(),
488+
namespace: arangoDeployment.GetNamespace(),
489+
config: config,
490+
deps: deps,
491+
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
492+
stopCh: make(chan struct{}),
492493
}
493494
d.clientCache = client.NewClientCache(d, conn.NewFactory(d.getAuth, d.getConnConfig))
494-
d.acs = acs.NewACS("", d.currentState)
495+
d.acs = acs.NewACS("", i)
495496

496-
require.NoError(t, d.currentState.Refresh(context.Background()))
497+
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
497498

498499
arangoDeployment.Spec.SetDefaults(arangoDeployment.GetName())
499500
d.resources = resources.NewResources(deps.Log, d)

pkg/deployment/images.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac
140140
// Check if pod exists
141141
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
142142
defer cancel()
143-
pod, err := ib.Context.GetCachedStatus().Pod().V1().Read().Get(ctxChild, podName, metav1.GetOptions{})
143+
pod, err := ib.Context.ACS().CurrentClusterCache().Pod().V1().Read().Get(ctxChild, podName, metav1.GetOptions{})
144144
if err == nil {
145145
// Pod found
146146
if k8sutil.IsPodFailed(pod, utils.StringList{shared.ServerContainerName}) {
147147
// Wait some time before deleting the pod
148148
if time.Now().After(pod.GetCreationTimestamp().Add(30 * time.Second)) {
149149
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
150-
return ib.Context.PodsModInterface().Delete(ctxChild, podName, metav1.DeleteOptions{})
150+
return ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1().Delete(ctxChild, podName, metav1.DeleteOptions{})
151151
})
152152
if err != nil && !k8sutil.IsNotFound(err) {
153153
log.Warn().Err(err).Msg("Failed to delete Image ID Pod")
@@ -189,7 +189,7 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac
189189

190190
// We have all the info we need now, kill the pod and store the image info.
191191
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
192-
return ib.Context.PodsModInterface().Delete(ctxChild, podName, metav1.DeleteOptions{})
192+
return ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1().Delete(ctxChild, podName, metav1.DeleteOptions{})
193193
})
194194
if err != nil && !k8sutil.IsNotFound(err) {
195195
log.Warn().Err(err).Msg("Failed to delete Image ID Pod")
@@ -236,7 +236,7 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac
236236
}
237237

238238
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
239-
_, _, err := resources.CreateArangoPod(ctxChild, ib.Context.PodsModInterface(), ib.APIObject, ib.Spec, api.ServerGroupImageDiscovery, pod)
239+
_, _, err := resources.CreateArangoPod(ctxChild, ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1(), ib.APIObject, ib.Spec, api.ServerGroupImageDiscovery, pod)
240240
return err
241241
})
242242
if err != nil {

pkg/deployment/images_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestEnsureImages(t *testing.T) {
325325
_, err := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{})
326326
require.NoError(t, err)
327327

328-
require.NoError(t, d.currentState.Refresh(context.Background()))
328+
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
329329

330330
// Act
331331
retrySoon, _, err := d.ensureImages(context.Background(), d.apiObject, d.GetCachedStatus())

pkg/deployment/informers.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
5151
&v1.Pod{},
5252
cache.ResourceEventHandlerFuncs{
5353
AddFunc: func(obj interface{}) {
54-
d.currentState.GetThrottles().Pod().Invalidate()
54+
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
5555
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
5656
d.triggerInspection()
5757
}
5858
},
5959
UpdateFunc: func(oldObj, newObj interface{}) {
60-
d.currentState.GetThrottles().Pod().Invalidate()
60+
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
6161
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
6262
d.triggerInspection()
6363
}
6464
},
6565
DeleteFunc: func(obj interface{}) {
66-
d.currentState.GetThrottles().Pod().Invalidate()
66+
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
6767
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
6868
d.triggerInspection()
6969
}
@@ -96,19 +96,19 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
9696
&v1.PersistentVolumeClaim{},
9797
cache.ResourceEventHandlerFuncs{
9898
AddFunc: func(obj interface{}) {
99-
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
99+
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
100100
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
101101
d.triggerInspection()
102102
}
103103
},
104104
UpdateFunc: func(oldObj, newObj interface{}) {
105-
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
105+
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
106106
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
107107
d.triggerInspection()
108108
}
109109
},
110110
DeleteFunc: func(obj interface{}) {
111-
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
111+
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
112112
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
113113
d.triggerInspection()
114114
}
@@ -142,19 +142,19 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
142142
cache.ResourceEventHandlerFuncs{
143143
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
144144
AddFunc: func(obj interface{}) {
145-
d.currentState.GetThrottles().Secret().Invalidate()
145+
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
146146
if getSecret(obj) {
147147
d.triggerInspection()
148148
}
149149
},
150150
UpdateFunc: func(oldObj, newObj interface{}) {
151-
d.currentState.GetThrottles().Secret().Invalidate()
151+
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
152152
if getSecret(newObj) {
153153
d.triggerInspection()
154154
}
155155
},
156156
DeleteFunc: func(obj interface{}) {
157-
d.currentState.GetThrottles().Secret().Invalidate()
157+
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
158158
if getSecret(obj) {
159159
d.triggerInspection()
160160
}
@@ -187,19 +187,19 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
187187
&v1.Service{},
188188
cache.ResourceEventHandlerFuncs{
189189
AddFunc: func(obj interface{}) {
190-
d.currentState.GetThrottles().Service().Invalidate()
190+
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
191191
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
192192
d.triggerInspection()
193193
}
194194
},
195195
UpdateFunc: func(oldObj, newObj interface{}) {
196-
d.currentState.GetThrottles().Service().Invalidate()
196+
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
197197
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
198198
d.triggerInspection()
199199
}
200200
},
201201
DeleteFunc: func(obj interface{}) {
202-
d.currentState.GetThrottles().Service().Invalidate()
202+
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
203203
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
204204
d.triggerInspection()
205205
}

0 commit comments

Comments
 (0)