Skip to content

Commit b1d7de8

Browse files
committed
Extract reading datafeed state
1 parent 1444c71 commit b1d7de8

File tree

3 files changed

+56
-32
lines changed

3 files changed

+56
-32
lines changed

internal/elasticsearch/ml/datafeed/delete.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package datafeed
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
87
"github.com/hashicorp/terraform-plugin-framework/diag"
@@ -44,22 +43,18 @@ func (r *datafeedResource) delete(ctx context.Context, req resource.DeleteReques
4443
}
4544

4645
func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId string) (bool, diag.Diagnostics) {
47-
var diags diag.Diagnostics
48-
4946
// Check current state
50-
currentState, err := r.getDatafeedState(ctx, datafeedId)
51-
if err != nil {
52-
// If we can't get the state, try to extract the error details
53-
if err.Error() == fmt.Sprintf("datafeed %s not found", datafeedId) {
54-
// Datafeed does not exist, nothing to stop
55-
return false, diags
56-
}
57-
diags.AddError("Failed to get datafeed state", err.Error())
47+
currentState, diags := GetDatafeedState(ctx, r.client, datafeedId)
48+
if diags.HasError() {
5849
return false, diags
5950
}
6051

52+
if currentState == nil {
53+
return false, nil
54+
}
55+
6156
// If the datafeed is not running, nothing to stop
62-
if currentState != "started" && currentState != "starting" {
57+
if *currentState != "started" && *currentState != "starting" {
6358
return false, diags
6459
}
6560

@@ -71,9 +66,9 @@ func (r *datafeedResource) maybeStopDatafeed(ctx context.Context, datafeedId str
7166
}
7267

7368
// Wait for the datafeed to reach stopped state
74-
err = r.waitForDatafeedState(ctx, datafeedId, "stopped")
75-
if err != nil {
76-
diags.AddError("Failed to wait for datafeed to stop", fmt.Sprintf("Datafeed %s did not stop within timeout: %s", datafeedId, err.Error()))
69+
_, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "stopped")
70+
diags.Append(waitDiags...)
71+
if diags.HasError() {
7772
return true, diags
7873
}
7974

internal/elasticsearch/ml/datafeed/state_utils.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,65 @@ package datafeed
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/elastic/terraform-provider-elasticstack/internal/asyncutils"
9+
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
810
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
11+
"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
12+
"github.com/hashicorp/terraform-plugin-framework/diag"
913
)
1014

11-
// getDatafeedState returns the current state of a datafeed
12-
func (r *datafeedResource) getDatafeedState(ctx context.Context, datafeedId string) (string, error) {
13-
statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, r.client, datafeedId)
15+
// GetDatafeedState returns the current state of a datafeed
16+
func GetDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId string) (*string, diag.Diagnostics) {
17+
statsResponse, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId)
1418
if diags.HasError() {
15-
return "", fmt.Errorf("failed to get datafeed stats: %v", diags)
19+
return nil, diags
1620
}
1721

1822
if statsResponse == nil {
19-
return "", fmt.Errorf("datafeed %s not found", datafeedId)
23+
return nil, nil
2024
}
2125

22-
return statsResponse.State, nil
26+
return &statsResponse.State, nil
2327
}
2428

25-
// waitForDatafeedState waits for a datafeed to reach the desired state
26-
func (r *datafeedResource) waitForDatafeedState(ctx context.Context, datafeedId, desiredState string) error {
29+
var terminalDatafeedStates = map[string]struct{}{
30+
"stopped": {},
31+
"started": {},
32+
}
33+
34+
var errDatafeedInUndesiredState = errors.New("datafeed stuck in undesired state")
35+
36+
// WaitForDatafeedState waits for a datafeed to reach the desired state
37+
func WaitForDatafeedState(ctx context.Context, client *clients.ApiClient, datafeedId, desiredState string) (bool, diag.Diagnostics) {
2738
stateChecker := func(ctx context.Context) (bool, error) {
28-
currentState, err := r.getDatafeedState(ctx, datafeedId)
29-
if err != nil {
30-
return false, err
39+
currentState, diags := GetDatafeedState(ctx, client, datafeedId)
40+
if diags.HasError() {
41+
return false, diagutil.FwDiagsAsError(diags)
42+
}
43+
44+
if currentState == nil {
45+
return false, fmt.Errorf("datafeed %s not found", datafeedId)
46+
}
47+
48+
if *currentState == desiredState {
49+
return true, nil
50+
}
51+
52+
_, isInTerminalState := terminalDatafeedStates[*currentState]
53+
if isInTerminalState {
54+
return false, fmt.Errorf("%w: datafeed is in state [%s] but desired state is [%s]", errDatafeedInUndesiredState, *currentState, desiredState)
3155
}
32-
return currentState == desiredState, nil
56+
57+
return false, nil
58+
}
59+
60+
err := asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker)
61+
if errors.Is(err, errDatafeedInUndesiredState) {
62+
return false, nil
3363
}
3464

35-
return asyncutils.WaitForStateTransition(ctx, "datafeed", datafeedId, stateChecker)
65+
return err == nil, diagutil.FrameworkDiagFromError(err)
3666
}

internal/elasticsearch/ml/datafeed/update.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package datafeed
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
87
"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
@@ -62,9 +61,9 @@ func (r *datafeedResource) update(ctx context.Context, req resource.UpdateReques
6261
}
6362

6463
// Wait for the datafeed to reach started state
65-
err := r.waitForDatafeedState(ctx, datafeedId, "started")
66-
if err != nil {
67-
resp.Diagnostics.AddError("Failed to wait for datafeed to start", fmt.Sprintf("Datafeed %s did not start within timeout: %s", datafeedId, err.Error()))
64+
_, waitDiags := WaitForDatafeedState(ctx, r.client, datafeedId, "started")
65+
resp.Diagnostics.Append(waitDiags...)
66+
if resp.Diagnostics.HasError() {
6867
return
6968
}
7069
}

0 commit comments

Comments
 (0)