|
3 | 3 | package etcd |
4 | 4 |
|
5 | 5 | import ( |
| 6 | + "bytes" |
6 | 7 | "context" |
| 8 | + "errors" |
7 | 9 | "fmt" |
8 | 10 |
|
9 | 11 | etcd "go.etcd.io/etcd/client/v3" |
10 | 12 |
|
11 | 13 | "github.com/tarantool/go-storage/driver" |
| 14 | + "github.com/tarantool/go-storage/kv" |
12 | 15 | "github.com/tarantool/go-storage/operation" |
13 | 16 | "github.com/tarantool/go-storage/predicate" |
14 | 17 | "github.com/tarantool/go-storage/tx" |
15 | 18 | "github.com/tarantool/go-storage/watch" |
16 | 19 | ) |
17 | 20 |
|
| 21 | +// Client defines the minimal interface needed for etcd operations. |
| 22 | +// This allows for easier testing and mock implementations. |
| 23 | +type Client interface { |
| 24 | + // Txn creates a new transaction. |
| 25 | + Txn(ctx context.Context) etcd.Txn |
| 26 | +} |
| 27 | + |
| 28 | +// Watcher defines the interface for watching etcd changes. |
| 29 | +// This extends the etcd.Watcher interface to match our usage pattern. |
| 30 | +type Watcher interface { |
| 31 | + // Watch watches for changes on a key (using etcd's signature). |
| 32 | + Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan |
| 33 | + // Close closes the watcher. |
| 34 | + Close() error |
| 35 | +} |
| 36 | + |
| 37 | +// WatcherFactory creates new watchers from a client. |
| 38 | +type WatcherFactory interface { |
| 39 | + // NewWatcher creates a new watcher. |
| 40 | + NewWatcher(client Client) Watcher |
| 41 | +} |
| 42 | + |
18 | 43 | // Driver is an etcd implementation of the storage driver interface. |
19 | 44 | // It uses etcd as the underlying key-value storage backend. |
20 | 45 | type Driver struct { |
21 | | - client *etcd.Client // etcd client instance.. |
| 46 | + client Client // etcd client interface. |
| 47 | + watcherFactory WatcherFactory // factory for creating watchers. |
22 | 48 | } |
23 | 49 |
|
24 | 50 | var ( |
25 | 51 | _ driver.Driver = &Driver{} //nolint:exhaustruct |
| 52 | + |
| 53 | + // Static error definitions to avoid dynamic errors. |
| 54 | + errUnsupportedPredicateTarget = errors.New("unsupported predicate target") |
| 55 | + errValuePredicateRequiresBytes = errors.New("value predicate requires []byte value") |
| 56 | + errUnsupportedValueOperation = errors.New("unsupported operation for value predicate") |
| 57 | + errVersionPredicateRequiresInt = errors.New("version predicate requires int64 value") |
| 58 | + errUnsupportedVersionOperation = errors.New("unsupported operation for version predicate") |
| 59 | + errUnsupportedOperationType = errors.New("unsupported operation type") |
26 | 60 | ) |
27 | 61 |
|
28 | | -// New creates a new etcd driver instance. |
29 | | -// It establishes a connection to the etcd cluster using the provided endpoints. |
30 | | -func New(ctx context.Context, endpoints []string) (*Driver, error) { |
31 | | - client, err := etcd.New(etcd.Config{ |
32 | | - Context: ctx, |
33 | | - Endpoints: endpoints, |
34 | | - AutoSyncInterval: 0, |
35 | | - DialTimeout: 0, |
36 | | - DialKeepAliveTime: 0, |
37 | | - DialKeepAliveTimeout: 0, |
38 | | - MaxCallSendMsgSize: 0, |
39 | | - MaxCallRecvMsgSize: 0, |
40 | | - TLS: nil, |
41 | | - Username: "", |
42 | | - Password: "", |
43 | | - RejectOldCluster: false, |
44 | | - DialOptions: nil, |
45 | | - Logger: nil, |
46 | | - LogConfig: nil, |
47 | | - PermitWithoutStream: false, |
48 | | - MaxUnaryRetries: 0, |
49 | | - BackoffWaitBetween: 0, |
50 | | - BackoffJitterFraction: 0, |
51 | | - }) |
52 | | - if err != nil { |
53 | | - return nil, fmt.Errorf("failed to create etcd client: %w", err) |
| 62 | +// etcdClientAdapter wraps etcd.Client to implement our Client interface. |
| 63 | +type etcdClientAdapter struct { |
| 64 | + client *etcd.Client |
| 65 | +} |
| 66 | + |
| 67 | +func (a *etcdClientAdapter) Txn(ctx context.Context) etcd.Txn { |
| 68 | + return a.client.Txn(ctx) |
| 69 | +} |
| 70 | + |
| 71 | +// etcdWatcherAdapter wraps etcd.Watcher to implement our Watcher interface. |
| 72 | +type etcdWatcherAdapter struct { |
| 73 | + watcher etcd.Watcher |
| 74 | +} |
| 75 | + |
| 76 | +func (a *etcdWatcherAdapter) Watch(ctx context.Context, key string, opts ...etcd.OpOption) etcd.WatchChan { |
| 77 | + return a.watcher.Watch(ctx, key, opts...) |
| 78 | +} |
| 79 | + |
| 80 | +func (a *etcdWatcherAdapter) Close() error { |
| 81 | + return fmt.Errorf("failed to close: %w", a.watcher.Close()) |
| 82 | +} |
| 83 | + |
| 84 | +// etcdWatcherFactory implements WatcherFactory for etcd clients. |
| 85 | +type etcdWatcherFactory struct{} |
| 86 | + |
| 87 | +func (f *etcdWatcherFactory) NewWatcher(client Client) Watcher { |
| 88 | + // For etcd clients, we need access to the underlying client. |
| 89 | + if adapter, ok := client.(*etcdClientAdapter); ok { |
| 90 | + return &etcdWatcherAdapter{ |
| 91 | + watcher: etcd.NewWatcher(adapter.client), |
| 92 | + } |
54 | 93 | } |
| 94 | + // For other implementations, return a no-op watcher. |
| 95 | + return &noopWatcher{} |
| 96 | +} |
| 97 | + |
| 98 | +// noopWatcher is a no-op implementation of Watcher for non-etcd clients. |
| 99 | +type noopWatcher struct{} |
| 100 | + |
| 101 | +func (w *noopWatcher) Watch(_ context.Context, _ string, _ ...etcd.OpOption) etcd.WatchChan { |
| 102 | + ch := make(chan etcd.WatchResponse) |
| 103 | + close(ch) |
55 | 104 |
|
56 | | - return &Driver{client: client}, nil |
| 105 | + return ch |
| 106 | +} |
| 107 | + |
| 108 | +func (w *noopWatcher) Close() error { |
| 109 | + return nil |
| 110 | +} |
| 111 | + |
| 112 | +// New creates a new etcd driver instance using an existing etcd client. |
| 113 | +// The client should be properly configured and connected to an etcd cluster. |
| 114 | +func New(client *etcd.Client) *Driver { |
| 115 | + return &Driver{ |
| 116 | + client: &etcdClientAdapter{client: client}, |
| 117 | + watcherFactory: &etcdWatcherFactory{}, |
| 118 | + } |
| 119 | +} |
| 120 | + |
| 121 | +// NewWithInterfaces creates a new etcd driver instance using interface abstractions. |
| 122 | +// This is useful for testing with mock clients. |
| 123 | +func NewWithInterfaces(client Client, watcherFactory WatcherFactory) *Driver { |
| 124 | + if watcherFactory == nil { |
| 125 | + watcherFactory = &etcdWatcherFactory{} |
| 126 | + } |
| 127 | + |
| 128 | + return &Driver{ |
| 129 | + client: client, |
| 130 | + watcherFactory: watcherFactory, |
| 131 | + } |
57 | 132 | } |
58 | 133 |
|
59 | 134 | // Execute executes a transactional operation with conditional logic. |
60 | 135 | // It processes predicates to determine whether to execute thenOps or elseOps. |
61 | 136 | func (d Driver) Execute( |
62 | | - _ context.Context, |
63 | | - _ []predicate.Predicate, |
64 | | - _ []operation.Operation, |
65 | | - _ []operation.Operation, |
| 137 | + ctx context.Context, |
| 138 | + predicates []predicate.Predicate, |
| 139 | + thenOps []operation.Operation, |
| 140 | + elseOps []operation.Operation, |
66 | 141 | ) (tx.Response, error) { |
67 | | - panic("implement me") |
| 142 | + txn := d.client.Txn(ctx) |
| 143 | + |
| 144 | + convertedPredicates, err := predicatesToCmps(predicates) |
| 145 | + if err != nil { |
| 146 | + return tx.Response{}, fmt.Errorf("failed to convert predicates: %w", err) |
| 147 | + } |
| 148 | + txn.If(convertedPredicates...) |
| 149 | + |
| 150 | + thenEtcdOps, err := operationsToEtcdOps(thenOps) |
| 151 | + if err != nil { |
| 152 | + return tx.Response{}, fmt.Errorf("failed to convert then operations: %w", err) |
| 153 | + } |
| 154 | + |
| 155 | + txn = txn.Then(thenEtcdOps...) |
| 156 | + |
| 157 | + elseEtcdOps, err := operationsToEtcdOps(elseOps) |
| 158 | + if err != nil { |
| 159 | + return tx.Response{}, fmt.Errorf("failed to convert else operations: %w", err) |
| 160 | + } |
| 161 | + |
| 162 | + txn = txn.Else(elseEtcdOps...) |
| 163 | + |
| 164 | + resp, err := txn.Commit() |
| 165 | + if err != nil { |
| 166 | + return tx.Response{}, fmt.Errorf("transaction failed: %w", err) |
| 167 | + } |
| 168 | + |
| 169 | + return etcdResponseToTxResponse(resp), nil |
68 | 170 | } |
69 | 171 |
|
| 172 | +const ( |
| 173 | + eventChannelSize = 100 |
| 174 | +) |
| 175 | + |
70 | 176 | // Watch monitors changes to a specific key and returns a stream of events. |
71 | 177 | // It supports optional watch configuration through the opts parameter. |
72 | | -func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { |
73 | | - panic("implement me") |
| 178 | +func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { |
| 179 | + eventCh := make(chan watch.Event, eventChannelSize) |
| 180 | + |
| 181 | + parentWatcher := d.watcherFactory.NewWatcher(d.client) |
| 182 | + |
| 183 | + go func() { |
| 184 | + defer close(eventCh) |
| 185 | + |
| 186 | + var opts []etcd.OpOption |
| 187 | + if bytes.HasSuffix(key, []byte("/")) { |
| 188 | + opts = append(opts, etcd.WithPrefix()) |
| 189 | + } |
| 190 | + |
| 191 | + watchChan := parentWatcher.Watch(ctx, string(key), opts...) |
| 192 | + |
| 193 | + for { |
| 194 | + select { |
| 195 | + case <-ctx.Done(): |
| 196 | + return |
| 197 | + case watchResp, ok := <-watchChan: |
| 198 | + if !ok { |
| 199 | + return |
| 200 | + } |
| 201 | + |
| 202 | + if watchResp.Err() != nil { |
| 203 | + continue |
| 204 | + } |
| 205 | + |
| 206 | + for range watchResp.Events { |
| 207 | + select { |
| 208 | + case eventCh <- watch.Event{ |
| 209 | + Prefix: key, |
| 210 | + }: |
| 211 | + case <-ctx.Done(): |
| 212 | + return |
| 213 | + } |
| 214 | + } |
| 215 | + } |
| 216 | + } |
| 217 | + }() |
| 218 | + |
| 219 | + return eventCh, func() { |
| 220 | + _ = parentWatcher.Close() |
| 221 | + }, nil |
| 222 | +} |
| 223 | + |
| 224 | +// etcdResponseToTxResponse converts an etcd transaction response to tx.Response. |
| 225 | +func etcdResponseToTxResponse(resp *etcd.TxnResponse) tx.Response { |
| 226 | + results := make([]tx.RequestResponse, 0, len(resp.Responses)) |
| 227 | + |
| 228 | + for _, etcdResp := range resp.Responses { |
| 229 | + var values []kv.KeyValue |
| 230 | + |
| 231 | + switch { |
| 232 | + case etcdResp.GetResponseRange() != nil: |
| 233 | + getResp := etcdResp.GetResponseRange() |
| 234 | + for _, etcdKv := range getResp.Kvs { |
| 235 | + values = append(values, kv.KeyValue{ |
| 236 | + Key: etcdKv.Key, |
| 237 | + Value: etcdKv.Value, |
| 238 | + ModRevision: etcdKv.ModRevision, |
| 239 | + }) |
| 240 | + } |
| 241 | + case etcdResp.GetResponsePut() != nil: |
| 242 | + // Put operations don't return data. |
| 243 | + case etcdResp.GetResponseDeleteRange() != nil: |
| 244 | + deleteResp := etcdResp.GetResponseDeleteRange() |
| 245 | + for _, etcdKv := range deleteResp.PrevKvs { |
| 246 | + values = append(values, kv.KeyValue{ |
| 247 | + Key: etcdKv.Key, |
| 248 | + Value: etcdKv.Value, |
| 249 | + ModRevision: etcdKv.ModRevision, |
| 250 | + }) |
| 251 | + } |
| 252 | + } |
| 253 | + |
| 254 | + results = append(results, tx.RequestResponse{ |
| 255 | + Values: values, |
| 256 | + }) |
| 257 | + } |
| 258 | + |
| 259 | + return tx.Response{ |
| 260 | + Succeeded: resp.Succeeded, |
| 261 | + Results: results, |
| 262 | + } |
| 263 | +} |
| 264 | + |
| 265 | +// etcdEventToWatchEvent converts an etcd event to a watch event. |
| 266 | +func etcdEventToWatchEvent(etcdEvent *etcd.Event) watch.Event { |
| 267 | + return watch.Event{ |
| 268 | + Prefix: etcdEvent.Kv.Key, |
| 269 | + } |
74 | 270 | } |
0 commit comments