Skip to content

Commit fcb9b35

Browse files
authored
[Feature] Remove forgotten jobs (#1060)
1 parent 1f93c9c commit fcb9b35

File tree

10 files changed

+156
-10
lines changed

10 files changed

+156
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
- (Bugfix) Ensure pod names not too long
55
- (Refactor) Use cached member's clients
66
- (Feature) Move PVC resize action to high-priority plan
7+
- (Feature) Remove forgotten ArangoDB jobs during restart
78

89
## [1.2.14](https://github.com/arangodb/kube-arangodb/tree/1.2.14) (2022-07-14)
910
- (Feature) Add ArangoSync TLS based rotation

pkg/apis/deployment/v1/deployment_mode.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool {
9696
return m == DeploymentModeCluster
9797
}
9898

99+
// ServingGroup returns mode serving group
100+
func (m DeploymentMode) ServingGroup() ServerGroup {
101+
switch m {
102+
case DeploymentModeCluster:
103+
return ServerGroupCoordinators
104+
case DeploymentModeSingle, DeploymentModeActiveFailover:
105+
return ServerGroupSingle
106+
default:
107+
return ServerGroupUnknown
108+
}
109+
}
110+
99111
// NewMode returns a reference to a string with given value.
100112
func NewMode(input DeploymentMode) *DeploymentMode {
101113
return &input

pkg/apis/deployment/v1/plan.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool {
364364
return len(p) == 0
365365
}
366366

367-
// Add add action at the end of plan
367+
// After add action at the end of plan
368368
func (p Plan) After(action ...Action) Plan {
369369
n := Plan{}
370370

@@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan {
375375
return n
376376
}
377377

378-
// Prefix add action at the beginning of plan
378+
// Before add action at the beginning of plan
379379
func (p Plan) Before(action ...Action) Plan {
380380
n := Plan{}
381381

@@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan {
386386
return n
387387
}
388388

389-
// Prefix add action at the beginning of plan
389+
// Wrap wraps plan with actions
390390
func (p Plan) Wrap(before, after Action) Plan {
391391
n := Plan{}
392392

pkg/apis/deployment/v2alpha1/deployment_mode.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool {
9696
return m == DeploymentModeCluster
9797
}
9898

99+
// ServingGroup returns mode serving group
100+
func (m DeploymentMode) ServingGroup() ServerGroup {
101+
switch m {
102+
case DeploymentModeCluster:
103+
return ServerGroupCoordinators
104+
case DeploymentModeSingle, DeploymentModeActiveFailover:
105+
return ServerGroupSingle
106+
default:
107+
return ServerGroupUnknown
108+
}
109+
}
110+
99111
// NewMode returns a reference to a string with given value.
100112
func NewMode(input DeploymentMode) *DeploymentMode {
101113
return &input

pkg/apis/deployment/v2alpha1/plan.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool {
364364
return len(p) == 0
365365
}
366366

367-
// Add add action at the end of plan
367+
// After add action at the end of plan
368368
func (p Plan) After(action ...Action) Plan {
369369
n := Plan{}
370370

@@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan {
375375
return n
376376
}
377377

378-
// Prefix add action at the beginning of plan
378+
// Before add action at the beginning of plan
379379
func (p Plan) Before(action ...Action) Plan {
380380
n := Plan{}
381381

@@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan {
386386
return n
387387
}
388388

389-
// Prefix add action at the beginning of plan
389+
// Wrap wraps plan with actions
390390
func (p Plan) Wrap(before, after Action) Plan {
391391
n := Plan{}
392392

pkg/deployment/client/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package client
2323
import (
2424
"context"
2525
"net/http"
26+
"time"
2627

2728
"github.com/arangodb/go-driver"
2829
)
@@ -44,6 +45,8 @@ type Client interface {
4445

4546
GetJWT(ctx context.Context) (JWTDetails, error)
4647
RefreshJWT(ctx context.Context) (JWTDetails, error)
48+
49+
DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error
4750
}
4851

4952
type client struct {

pkg/deployment/client/jobs.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package client
22+
23+
import (
24+
"context"
25+
"fmt"
26+
"net/http"
27+
"time"
28+
)
29+
30+
const DeleteExpiredJobsURL = "/_api/job/expired"
31+
32+
func (c *client) DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error {
33+
req, err := c.c.NewRequest(http.MethodDelete, DeleteExpiredJobsURL)
34+
if err != nil {
35+
return err
36+
}
37+
38+
req.SetQuery("stamp", fmt.Sprintf("%d", time.Now().UTC().Add(-1*timeout).Unix()))
39+
40+
resp, err := c.c.Do(ctx, req)
41+
if err != nil {
42+
return err
43+
}
44+
45+
if err := resp.CheckStatus(http.StatusOK); err != nil {
46+
return err
47+
}
48+
49+
return nil
50+
}

pkg/deployment/reconcile/action_context.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
3636
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
3737
"github.com/arangodb/kube-arangodb/pkg/logging"
38+
"github.com/arangodb/kube-arangodb/pkg/util"
3839
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3940
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
4041
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
@@ -104,6 +105,11 @@ type ActionLocalsContext interface {
104105

105106
Get(action api.Action, key api.PlanLocalKey) (string, bool)
106107
Add(key api.PlanLocalKey, value string, override bool) bool
108+
109+
SetTime(key api.PlanLocalKey, t time.Time) bool
110+
GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool)
111+
112+
BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool
107113
}
108114

109115
// newActionContext creates a new ActionContext implementation.
@@ -144,6 +150,39 @@ func (ac *actionContext) Get(action api.Action, key api.PlanLocalKey) (string, b
144150
return ac.locals.GetWithParent(action.Locals, key)
145151
}
146152

153+
func (ac *actionContext) BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool {
154+
t, ok := ac.GetTime(action, key)
155+
if !ok {
156+
// Reset as zero time
157+
t = time.Time{}
158+
}
159+
160+
if t.IsZero() || time.Since(t) > duration {
161+
// Execution is needed
162+
ac.SetTime(key, time.Now())
163+
return true
164+
}
165+
166+
return false
167+
}
168+
169+
func (ac *actionContext) SetTime(key api.PlanLocalKey, t time.Time) bool {
170+
return ac.Add(key, t.Format(util.TimeLayout), true)
171+
}
172+
173+
func (ac *actionContext) GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool) {
174+
s, ok := ac.locals.GetWithParent(action.Locals, key)
175+
if !ok {
176+
return time.Time{}, false
177+
}
178+
179+
if t, err := time.Parse(util.TimeLayout, s); err != nil {
180+
return time.Time{}, false
181+
} else {
182+
return t, true
183+
}
184+
}
185+
147186
func (ac *actionContext) Add(key api.PlanLocalKey, value string, override bool) bool {
148187
return ac.locals.Add(key, value, override)
149188
}

pkg/deployment/reconcile/helper_shutdown.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,26 @@ package reconcile
2222

2323
import (
2424
"context"
25+
"time"
2526

2627
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
2728

2829
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
2930
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
31+
"github.com/arangodb/kube-arangodb/pkg/deployment/client"
3032
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
3133
"github.com/arangodb/kube-arangodb/pkg/util"
3234
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3335
"github.com/arangodb/kube-arangodb/pkg/util/globals"
3436
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
3537
)
3638

39+
const (
40+
actionShutdownJobExpiredTermination api.PlanLocalKey = "expiredJobTerminationCheck"
41+
actionShutdownJobExpiredTerminationDelay = 10 * time.Second
42+
ActionShutdownJobExpiredTerminationTimeout = time.Minute
43+
)
44+
3745
// getShutdownHelper returns an action to shut down a pod according to the settings.
3846
// Returns true when member status exists.
3947
// There are 3 possibilities to shut down the pod: immediately, gracefully, standard kubernetes delete API.
@@ -150,9 +158,28 @@ func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) {
150158
}
151159

152160
// CheckProgress returns true when pod is terminated.
153-
func (s shutdownHelperAPI) CheckProgress(_ context.Context) (bool, bool, error) {
154-
terminated := s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated)
155-
return terminated, false, nil
161+
func (s shutdownHelperAPI) CheckProgress(ctx context.Context) (bool, bool, error) {
162+
if s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) {
163+
return true, false, nil
164+
}
165+
166+
if s.action.Group == s.actionCtx.GetMode().ServingGroup() {
167+
if s.actionCtx.BackoffExecution(s.action, actionShutdownJobExpiredTermination, actionShutdownJobExpiredTerminationDelay) {
168+
// Lets try to run termination
169+
c, err := s.actionCtx.GetMembersState().GetMemberClient(s.action.MemberID)
170+
if err != nil {
171+
s.log.Err(err).Warn("Failed to create member client")
172+
} else {
173+
internal := client.NewClient(c.Connection())
174+
175+
if err := internal.DeleteExpiredJobs(ctx, ActionShutdownJobExpiredTerminationTimeout); err != nil {
176+
s.log.Err(err).Warn("Unable to kill async jobs on member")
177+
}
178+
}
179+
}
180+
}
181+
182+
return false, false, nil
156183
}
157184

158185
type shutdownHelperDelete struct {

pkg/util/times.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
)
2929

30+
const TimeLayout = time.RFC3339
31+
3032
// TimeCompareEqual compares two times, allowing an error of 1s
3133
func TimeCompareEqual(a, b meta.Time) bool {
3234
return math.Abs(a.Time.Sub(b.Time).Seconds()) <= 1
@@ -45,7 +47,7 @@ func TimeCompareEqualPointer(a, b *meta.Time) bool {
4547

4648
func TimeAgencyLayouts() []string {
4749
return []string{
48-
time.RFC3339,
50+
TimeLayout,
4951
}
5052
}
5153

0 commit comments

Comments
 (0)