Skip to content

Commit b209095

Browse files
authored
[Feature] Async restore (#982)
1 parent 3191f5e commit b209095

File tree

8 files changed

+169
-15
lines changed

8 files changed

+169
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- (Maintenance) Add check make targets
99
- (Feature) Create support for local variables in actions.
1010
- (Feature) Support for asynchronous ArangoD resquests.
11+
- (Feature) Change Restore in Cluster mode to Async Request
1112

1213
## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30)
1314
- (Bugfix) Orphan PVC are not removed

pkg/deployment/client/client_cache.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@ import (
3636
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
3737
)
3838

39+
type ConnectionWrap func(c driver.Connection) driver.Connection
40+
3941
type Cache interface {
4042
GetAuth() conn.Auth
4143

4244
Connection(ctx context.Context, host string) (driver.Connection, error)
4345

4446
Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
4547
GetDatabase(ctx context.Context) (driver.Client, error)
48+
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
4649
GetAgency(ctx context.Context) (agency.Agency, error)
4750
}
4851

@@ -144,6 +147,25 @@ func (cc *cache) GetDatabase(ctx context.Context) (driver.Client, error) {
144147
}
145148
}
146149

150+
func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) {
151+
c, err := cc.getDatabaseClient()
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
conn := c.Connection()
157+
158+
for _, w := range wraps {
159+
if w != nil {
160+
conn = w(conn)
161+
}
162+
}
163+
164+
return driver.NewClient(driver.ClientConfig{
165+
Connection: conn,
166+
})
167+
}
168+
147169
// GetAgency returns a cached client for the agency
148170
func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) {
149171
cc.mutex.Lock()

pkg/deployment/context_impl.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ func (d *Deployment) GetDatabaseClient(ctx context.Context) (driver.Client, erro
187187
return c, nil
188188
}
189189

190+
func (d *Deployment) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
191+
c, err := d.clientCache.GetDatabaseWithWrap(ctx, conn.NewAsyncConnection)
192+
if err != nil {
193+
return nil, errors.WithStack(err)
194+
}
195+
return c, nil
196+
}
197+
190198
// GetServerClient returns a cached client for a specific server.
191199
func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
192200
c, err := d.clientCache.Get(ctx, group, id)

pkg/deployment/reconcile/action_backup_restore.go

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,21 @@ import (
2828
"github.com/arangodb/go-driver"
2929
"github.com/rs/zerolog"
3030

31+
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
3132
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
33+
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
34+
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3235
)
3336

3437
func init() {
3538
registerAction(api.ActionTypeBackupRestore, newBackupRestoreAction, backupRestoreTimeout)
3639
}
3740

41+
const (
42+
actionBackupRestoreLocalJobID api.PlanLocalKey = "jobID"
43+
actionBackupRestoreLocalBackupName api.PlanLocalKey = "backupName"
44+
)
45+
3846
func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
3947
a := &actionBackupRestore{}
4048

@@ -47,8 +55,6 @@ func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx Act
4755
type actionBackupRestore struct {
4856
// actionImpl implement timeout and member id functions
4957
actionImpl
50-
51-
actionEmptyCheckProgress
5258
}
5359

5460
func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
@@ -64,13 +70,6 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
6470
return true, nil
6571
}
6672

67-
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
68-
defer cancel()
69-
dbc, err := a.actionCtx.GetDatabaseClient(ctxChild)
70-
if err != nil {
71-
return false, err
72-
}
73-
7473
backupResource, err := a.actionCtx.GetBackup(ctx, *spec.RestoreFrom)
7574
if err != nil {
7675
a.log.Error().Err(err).Msg("Unable to find backup")
@@ -96,15 +95,61 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
9695
return false, err
9796
}
9897

98+
switch mode := a.actionCtx.GetSpec().Mode.Get(); mode {
99+
case api.DeploymentModeActiveFailover, api.DeploymentModeSingle:
100+
return a.restoreSync(ctx, backupResource)
101+
case api.DeploymentModeCluster:
102+
return a.restoreAsync(ctx, backupResource)
103+
default:
104+
return false, errors.Newf("Unknown mode %s", mode)
105+
}
106+
}
107+
108+
func (a actionBackupRestore) restoreAsync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) {
109+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
110+
defer cancel()
111+
112+
dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild)
113+
if err != nil {
114+
return false, errors.Wrapf(err, "Unable to create client")
115+
}
116+
117+
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
118+
defer cancel()
119+
120+
if err := dbc.Backup().Restore(ctxChild, driver.BackupID(backup.Status.Backup.ID), nil); err != nil {
121+
if id, ok := conn.IsAsyncJobInProgress(err); ok {
122+
a.actionCtx.Add(actionBackupRestoreLocalJobID, id, true)
123+
a.actionCtx.Add(actionBackupRestoreLocalBackupName, backup.GetName(), true)
124+
125+
// Async request has been send
126+
return false, nil
127+
} else {
128+
return false, errors.Wrapf(err, "Unknown restore error")
129+
}
130+
}
131+
132+
return false, errors.Newf("Async response not received")
133+
}
134+
135+
func (a actionBackupRestore) restoreSync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) {
136+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
137+
defer cancel()
138+
dbc, err := a.actionCtx.GetDatabaseClient(ctxChild)
139+
if err != nil {
140+
a.log.Debug().Err(err).Msg("Failed to create database client")
141+
return false, nil
142+
}
143+
99144
// The below action can take a while so the full parent timeout context is used.
100-
restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backupResource.Status.Backup.ID), nil)
145+
restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backup.Status.Backup.ID), nil)
101146
if restoreError != nil {
102147
a.log.Error().Err(restoreError).Msg("Restore failed")
103148
}
104149

105150
if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
106151
result := &api.DeploymentRestoreResult{
107-
RequestedFrom: spec.GetRestoreFrom(),
152+
RequestedFrom: backup.GetName(),
108153
}
109154

110155
if restoreError != nil {
@@ -118,9 +163,70 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
118163

119164
return true
120165
}); err != nil {
121-
a.log.Error().Err(err).Msg("Unable to ser restored state")
166+
a.log.Error().Err(err).Msg("Unable to set restored state")
122167
return false, err
123168
}
124169

125170
return true, nil
126171
}
172+
173+
func (a actionBackupRestore) CheckProgress(ctx context.Context) (bool, bool, error) {
174+
backup, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalBackupName)
175+
if !ok {
176+
return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalBackupName)
177+
}
178+
179+
job, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalJobID)
180+
if !ok {
181+
return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalJobID)
182+
}
183+
184+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
185+
defer cancel()
186+
187+
dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild)
188+
if err != nil {
189+
a.log.Debug().Err(err).Msg("Failed to create database client")
190+
return false, false, nil
191+
}
192+
193+
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
194+
defer cancel()
195+
196+
// Params does not matter in async fetch
197+
restoreError := dbc.Backup().Restore(conn.WithAsyncID(ctxChild, job), "", nil)
198+
if restoreError != nil {
199+
if _, ok := conn.IsAsyncJobInProgress(restoreError); ok {
200+
// Job still in progress
201+
return false, false, nil
202+
}
203+
204+
if errors.IsTemporary(restoreError) {
205+
// Retry
206+
return false, false, nil
207+
}
208+
}
209+
210+
// Restore is done
211+
212+
if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
213+
result := &api.DeploymentRestoreResult{
214+
RequestedFrom: backup,
215+
State: api.DeploymentRestoreStateRestored,
216+
}
217+
218+
if restoreError != nil {
219+
result.State = api.DeploymentRestoreStateRestoreFailed
220+
result.Message = restoreError.Error()
221+
}
222+
223+
s.Restore = result
224+
225+
return true
226+
}); err != nil {
227+
a.log.Error().Err(err).Msg("Unable to set restored state")
228+
return false, false, err
229+
}
230+
231+
return true, false, nil
232+
}

pkg/deployment/reconcile/action_context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ type actionContext struct {
155155
locals api.PlanLocals
156156
}
157157

158+
func (ac *actionContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
159+
return ac.context.GetDatabaseAsyncClient(ctx)
160+
}
161+
158162
func (ac *actionContext) CurrentLocals() api.PlanLocals {
159163
return ac.locals
160164
}

pkg/deployment/reconcile/plan_builder_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ type testContext struct {
8585
Inspector inspectorInterface.Inspector
8686
}
8787

88+
func (c *testContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
89+
//TODO implement me
90+
panic("implement me")
91+
}
92+
8893
func (c *testContext) WithArangoMember(cache inspectorInterface.Inspector, timeout time.Duration, name string) reconciler.ArangoMemberModContext {
8994
return reconciler.NewArangoMemberModContext(cache, timeout, name)
9095
}

pkg/deployment/reconciler/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ type DeploymentDatabaseClient interface {
165165
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
166166
// creating one if needed.
167167
GetDatabaseClient(ctx context.Context) (driver.Client, error)
168+
169+
// GetDatabaseAsyncClient returns a cached client for the entire database (cluster coordinators or single server),
170+
// creating one if needed. Only in AsyncMode
171+
GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error)
168172
}
169173

170174
type DeploymentMemberClient interface {

pkg/util/arangod/conn/async_errors.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
package conn
2222

23-
import "fmt"
23+
import (
24+
"fmt"
25+
26+
"github.com/arangodb/kube-arangodb/pkg/util/errors"
27+
)
2428

2529
func IsAsyncErrorNotFound(err error) bool {
2630
if err == nil {
@@ -31,7 +35,7 @@ func IsAsyncErrorNotFound(err error) bool {
3135
return true
3236
}
3337

34-
return false
38+
return IsAsyncErrorNotFound(errors.Cause(err))
3539
}
3640

3741
func newAsyncErrorNotFound(id string) error {
@@ -57,7 +61,7 @@ func IsAsyncJobInProgress(err error) (string, bool) {
5761
return v.jobID, true
5862
}
5963

60-
return "", false
64+
return IsAsyncJobInProgress(errors.Cause(err))
6165
}
6266

6367
func newAsyncJobInProgress(id string) error {

0 commit comments

Comments
 (0)