Skip to content

Commit 7c76d2e

Browse files
committed
etcd: add basic support
* implemented etcd driver (backend) for basic storage * used etcd `testing.LazyCluster` for integration testing * modified .golangci.yaml to add new library exclusion Part of #TNTP-4189
1 parent 4e1445d commit 7c76d2e

File tree

17 files changed

+3475
-45
lines changed

17 files changed

+3475
-45
lines changed

.golangci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,6 @@ linters:
7676
- "github.com/tarantool/go-iproto"
7777
- "github.com/vmihailenco/msgpack/v5"
7878
- "github.com/gojuno/minimock/v3"
79+
- "go.etcd.io/etcd/client/v3"
80+
- "go.etcd.io/etcd/tests/v3"
81+
- "go.etcd.io/etcd/api/v3"

driver/etcd/etcd.go

Lines changed: 225 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,72 +3,262 @@
33
package etcd
44

55
import (
6+
"bytes"
67
"context"
8+
"errors"
79
"fmt"
810

911
etcd "go.etcd.io/etcd/client/v3"
1012

1113
"github.com/tarantool/go-storage/driver"
14+
"github.com/tarantool/go-storage/kv"
1215
"github.com/tarantool/go-storage/operation"
1316
"github.com/tarantool/go-storage/predicate"
1417
"github.com/tarantool/go-storage/tx"
1518
"github.com/tarantool/go-storage/watch"
1619
)
1720

21+
// Client defines the minimal interface needed for etcd operations.
22+
// This allows for easier testing and mock implementations.
23+
type Client interface {
24+
// Txn creates a new transaction.
25+
Txn(ctx context.Context) etcd.Txn
26+
}
27+
28+
// Watcher defines the interface for watching etcd changes.
29+
// This extends the etcd.Watcher interface to match our usage pattern.
30+
type Watcher interface {
31+
// Watch watches for changes on a key (using etcd's signature).
32+
Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan
33+
// Close closes the watcher.
34+
Close() error
35+
}
36+
37+
// WatcherFactory creates new watchers from a client.
38+
type WatcherFactory interface {
39+
// NewWatcher creates a new watcher.
40+
NewWatcher(client Client) Watcher
41+
}
42+
1843
// Driver is an etcd implementation of the storage driver interface.
1944
// It uses etcd as the underlying key-value storage backend.
2045
type Driver struct {
21-
client *etcd.Client // etcd client instance..
46+
client Client // etcd client interface.
47+
watcherFactory WatcherFactory // factory for creating watchers.
2248
}
2349

2450
var (
2551
_ driver.Driver = &Driver{} //nolint:exhaustruct
52+
53+
// Static error definitions to avoid dynamic errors.
54+
errUnsupportedPredicateTarget = errors.New("unsupported predicate target")
55+
errValuePredicateRequiresBytes = errors.New("value predicate requires []byte value")
56+
errUnsupportedValueOperation = errors.New("unsupported operation for value predicate")
57+
errVersionPredicateRequiresInt = errors.New("version predicate requires int64 value")
58+
errUnsupportedVersionOperation = errors.New("unsupported operation for version predicate")
59+
errUnsupportedOperationType = errors.New("unsupported operation type")
2660
)
2761

28-
// New creates a new etcd driver instance.
29-
// It establishes a connection to the etcd cluster using the provided endpoints.
30-
func New(ctx context.Context, endpoints []string) (*Driver, error) {
31-
client, err := etcd.New(etcd.Config{
32-
Context: ctx,
33-
Endpoints: endpoints,
34-
AutoSyncInterval: 0,
35-
DialTimeout: 0,
36-
DialKeepAliveTime: 0,
37-
DialKeepAliveTimeout: 0,
38-
MaxCallSendMsgSize: 0,
39-
MaxCallRecvMsgSize: 0,
40-
TLS: nil,
41-
Username: "",
42-
Password: "",
43-
RejectOldCluster: false,
44-
DialOptions: nil,
45-
Logger: nil,
46-
LogConfig: nil,
47-
PermitWithoutStream: false,
48-
MaxUnaryRetries: 0,
49-
BackoffWaitBetween: 0,
50-
BackoffJitterFraction: 0,
51-
})
52-
if err != nil {
53-
return nil, fmt.Errorf("failed to create etcd client: %w", err)
62+
// etcdClientAdapter wraps etcd.Client to implement our Client interface.
63+
type etcdClientAdapter struct {
64+
client *etcd.Client
65+
}
66+
67+
func (a *etcdClientAdapter) Txn(ctx context.Context) etcd.Txn {
68+
return a.client.Txn(ctx)
69+
}
70+
71+
// etcdWatcherAdapter wraps etcd.Watcher to implement our Watcher interface.
72+
type etcdWatcherAdapter struct {
73+
watcher etcd.Watcher
74+
}
75+
76+
func (a *etcdWatcherAdapter) Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan {
77+
return a.watcher.Watch(ctx, key, opts...)
78+
}
79+
80+
func (a *etcdWatcherAdapter) Close() error {
81+
return fmt.Errorf("failed to close: %w", a.watcher.Close())
82+
}
83+
84+
// etcdWatcherFactory implements WatcherFactory for etcd clients.
85+
type etcdWatcherFactory struct{}
86+
87+
func (f *etcdWatcherFactory) NewWatcher(client Client) Watcher {
88+
// For etcd clients, we need access to the underlying client.
89+
if adapter, ok := client.(*etcdClientAdapter); ok {
90+
return &etcdWatcherAdapter{
91+
watcher: etcd.NewWatcher(adapter.client),
92+
}
93+
}
94+
// For other implementations, return a no-op watcher.
95+
return &noopWatcher{}
96+
}
97+
98+
// noopWatcher is a no-op implementation of Watcher for non-etcd clients.
99+
type noopWatcher struct{}
100+
101+
func (w *noopWatcher) Watch(_ context.Context, _ string, _ ...etcd.OpOption) etcd.WatchChan {
102+
ch := make(chan etcd.WatchResponse)
103+
close(ch)
104+
105+
return ch
106+
}
107+
108+
func (w *noopWatcher) Close() error {
109+
return nil
110+
}
111+
112+
// New creates a new etcd driver instance using an existing etcd client.
113+
// The client should be properly configured and connected to an etcd cluster.
114+
func New(client *etcd.Client) *Driver {
115+
return &Driver{
116+
client: &etcdClientAdapter{client: client},
117+
watcherFactory: &etcdWatcherFactory{},
118+
}
119+
}
120+
121+
// NewWithInterfaces creates a new etcd driver instance using interface abstractions.
122+
// This is useful for testing with mock clients.
123+
func NewWithInterfaces(client Client, watcherFactory WatcherFactory) *Driver {
124+
if watcherFactory == nil {
125+
watcherFactory = &etcdWatcherFactory{}
54126
}
55127

56-
return &Driver{client: client}, nil
128+
return &Driver{
129+
client: client,
130+
watcherFactory: watcherFactory,
131+
}
57132
}
58133

59134
// Execute executes a transactional operation with conditional logic.
60135
// It processes predicates to determine whether to execute thenOps or elseOps.
61136
func (d Driver) Execute(
62-
_ context.Context,
63-
_ []predicate.Predicate,
64-
_ []operation.Operation,
65-
_ []operation.Operation,
137+
ctx context.Context,
138+
predicates []predicate.Predicate,
139+
thenOps []operation.Operation,
140+
elseOps []operation.Operation,
66141
) (tx.Response, error) {
67-
panic("implement me")
142+
txn := d.client.Txn(ctx)
143+
144+
convertedPredicates, err := predicatesToCmps(predicates)
145+
if err != nil {
146+
return tx.Response{}, fmt.Errorf("failed to convert predicates: %w", err)
147+
}
148+
149+
txn.If(convertedPredicates...)
150+
151+
thenEtcdOps, err := operationsToEtcdOps(thenOps)
152+
if err != nil {
153+
return tx.Response{}, fmt.Errorf("failed to convert then operations: %w", err)
154+
}
155+
156+
txn = txn.Then(thenEtcdOps...)
157+
158+
elseEtcdOps, err := operationsToEtcdOps(elseOps)
159+
if err != nil {
160+
return tx.Response{}, fmt.Errorf("failed to convert else operations: %w", err)
161+
}
162+
163+
txn = txn.Else(elseEtcdOps...)
164+
165+
resp, err := txn.Commit()
166+
if err != nil {
167+
return tx.Response{}, fmt.Errorf("transaction failed: %w", err)
168+
}
169+
170+
return etcdResponseToTxResponse(resp), nil
68171
}
69172

173+
const (
174+
eventChannelSize = 100
175+
)
176+
70177
// Watch monitors changes to a specific key and returns a stream of events.
71178
// It supports optional watch configuration through the opts parameter.
72-
func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) {
73-
panic("implement me")
179+
func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) {
180+
eventCh := make(chan watch.Event, eventChannelSize)
181+
182+
parentWatcher := d.watcherFactory.NewWatcher(d.client)
183+
184+
go func() {
185+
defer close(eventCh)
186+
187+
var opts []etcd.OpOption
188+
if bytes.HasSuffix(key, []byte("/")) {
189+
opts = append(opts, etcd.WithPrefix())
190+
}
191+
192+
watchChan := parentWatcher.Watch(ctx, string(key), opts...)
193+
194+
for {
195+
select {
196+
case <-ctx.Done():
197+
return
198+
case watchResp, ok := <-watchChan:
199+
if !ok {
200+
return
201+
}
202+
203+
if watchResp.Err() != nil {
204+
continue
205+
}
206+
207+
for range watchResp.Events {
208+
select {
209+
case eventCh <- watch.Event{
210+
Prefix: key,
211+
}:
212+
case <-ctx.Done():
213+
return
214+
}
215+
}
216+
}
217+
}
218+
}()
219+
220+
return eventCh, func() {
221+
_ = parentWatcher.Close()
222+
}, nil
223+
}
224+
225+
// etcdResponseToTxResponse converts an etcd transaction response to tx.Response.
226+
func etcdResponseToTxResponse(resp *etcd.TxnResponse) tx.Response {
227+
results := make([]tx.RequestResponse, 0, len(resp.Responses))
228+
229+
for _, etcdResp := range resp.Responses {
230+
var values []kv.KeyValue
231+
232+
switch {
233+
case etcdResp.GetResponseRange() != nil:
234+
getResp := etcdResp.GetResponseRange()
235+
for _, etcdKv := range getResp.Kvs {
236+
values = append(values, kv.KeyValue{
237+
Key: etcdKv.Key,
238+
Value: etcdKv.Value,
239+
ModRevision: etcdKv.ModRevision,
240+
})
241+
}
242+
case etcdResp.GetResponsePut() != nil:
243+
// Put operations don't return data.
244+
case etcdResp.GetResponseDeleteRange() != nil:
245+
deleteResp := etcdResp.GetResponseDeleteRange()
246+
for _, etcdKv := range deleteResp.PrevKvs {
247+
values = append(values, kv.KeyValue{
248+
Key: etcdKv.Key,
249+
Value: etcdKv.Value,
250+
ModRevision: etcdKv.ModRevision,
251+
})
252+
}
253+
}
254+
255+
results = append(results, tx.RequestResponse{
256+
Values: values,
257+
})
258+
}
259+
260+
return tx.Response{
261+
Succeeded: resp.Succeeded,
262+
Results: results,
263+
}
74264
}

0 commit comments

Comments
 (0)