@@ -4,11 +4,13 @@ package etcd
44
55import (
66 "context"
7+ "errors"
78 "fmt"
89
910 etcd "go.etcd.io/etcd/client/v3"
1011
1112 "github.com/tarantool/go-storage/driver"
13+ "github.com/tarantool/go-storage/kv"
1214 "github.com/tarantool/go-storage/operation"
1315 "github.com/tarantool/go-storage/predicate"
1416 "github.com/tarantool/go-storage/tx"
@@ -23,6 +25,14 @@ type Driver struct {
2325
2426var (
2527 _ driver.Driver = & Driver {} //nolint:exhaustruct
28+
29+ // Static error definitions to avoid dynamic errors.
30+ errUnsupportedPredicateTarget = errors .New ("unsupported predicate target" )
31+ errValuePredicateRequiresBytes = errors .New ("value predicate requires []byte value" )
32+ errUnsupportedValueOperation = errors .New ("unsupported operation for value predicate" )
33+ errVersionPredicateRequiresInt = errors .New ("version predicate requires int64 value" )
34+ errUnsupportedVersionOperation = errors .New ("unsupported operation for version predicate" )
35+ errUnsupportedOperationType = errors .New ("unsupported operation type" )
2636)
2737
2838// New creates a new etcd driver instance.
@@ -59,16 +69,229 @@ func New(ctx context.Context, endpoints []string) (*Driver, error) {
5969// Execute executes a transactional operation with conditional logic.
6070// It processes predicates to determine whether to execute thenOps or elseOps.
6171func (d Driver ) Execute (
62- _ context.Context ,
63- _ []predicate.Predicate ,
64- _ []operation.Operation ,
65- _ []operation.Operation ,
72+ ctx context.Context ,
73+ predicates []predicate.Predicate ,
74+ thenOps []operation.Operation ,
75+ elseOps []operation.Operation ,
6676) (tx.Response , error ) {
67- panic ("implement me" )
77+ txn := d .client .Txn (ctx )
78+
79+ for _ , p := range predicates {
80+ cmp , err := predicateToCmp (p )
81+ if err != nil {
82+ return tx.Response {}, fmt .Errorf ("failed to convert predicate: %w" , err )
83+ }
84+
85+ txn = txn .If (cmp )
86+ }
87+
88+ thenEtcdOps , err := operationsToEtcdOps (thenOps )
89+ if err != nil {
90+ return tx.Response {}, fmt .Errorf ("failed to convert then operations: %w" , err )
91+ }
92+
93+ txn = txn .Then (thenEtcdOps ... )
94+
95+ elseEtcdOps , err := operationsToEtcdOps (elseOps )
96+ if err != nil {
97+ return tx.Response {}, fmt .Errorf ("failed to convert else operations: %w" , err )
98+ }
99+
100+ txn = txn .Else (elseEtcdOps ... )
101+
102+ resp , err := txn .Commit ()
103+ if err != nil {
104+ return tx.Response {}, fmt .Errorf ("transaction failed: %w" , err )
105+ }
106+
107+ return etcdResponseToTxResponse (resp ), nil
68108}
69109
70110// Watch monitors changes to a specific key and returns a stream of events.
71111// It supports optional watch configuration through the opts parameter.
72- func (d Driver ) Watch (_ context.Context , _ []byte , _ ... watch.Option ) <- chan watch.Event {
73- panic ("implement me" )
112+ func (d Driver ) Watch (ctx context.Context , key []byte , _ ... watch.Option ) <- chan watch.Event {
113+ // TODO: Apply watch options when implemented.
114+ eventCh := make (chan watch.Event , 100 ) //nolint:mnd
115+
116+ go func () {
117+ defer close (eventCh )
118+
119+ watchChan := d .client .Watch (ctx , string (key ))
120+
121+ for {
122+ select {
123+ case <- ctx .Done ():
124+ return
125+ case watchResp , ok := <- watchChan :
126+ if ! ok {
127+ return
128+ }
129+
130+ if watchResp .Err () != nil {
131+ continue
132+ }
133+
134+ for _ , etcdEvent := range watchResp .Events {
135+ event := etcdEventToWatchEvent (etcdEvent )
136+ select {
137+ case eventCh <- event :
138+ case <- ctx .Done ():
139+ return
140+ }
141+ }
142+ }
143+ }
144+ }()
145+
146+ return eventCh
147+ }
148+
149+ // predicateToCmp converts a predicate to an etcd comparison.
150+ func predicateToCmp (pred predicate.Predicate ) (etcd.Cmp , error ) {
151+ switch pred .Target () {
152+ case predicate .TargetValue :
153+ return valuePredicateToCmp (pred )
154+ case predicate .TargetVersion :
155+ return versionPredicateToCmp (pred )
156+ default :
157+ return etcd.Cmp {}, fmt .Errorf ("%w: %v" , errUnsupportedPredicateTarget , pred .Target ())
158+ }
159+ }
160+
161+ // valuePredicateToCmp converts a value predicate to an etcd comparison.
162+ func valuePredicateToCmp (pred predicate.Predicate ) (etcd.Cmp , error ) {
163+ key := string (pred .Key ())
164+ value , ok := pred .Value ().([]byte )
165+
166+ if ! ok {
167+ return etcd.Cmp {}, errValuePredicateRequiresBytes
168+ }
169+
170+ switch pred .Operation () {
171+ case predicate .OpEqual :
172+ return etcd .Compare (etcd .Value (key ), "=" , value ), nil
173+ case predicate .OpNotEqual :
174+ return etcd .Compare (etcd .Value (key ), "!=" , value ), nil
175+ case predicate .OpGreater :
176+ return etcd.Cmp {}, fmt .Errorf ("%w: %v" , errUnsupportedValueOperation , pred .Operation ())
177+ case predicate .OpLess :
178+ return etcd.Cmp {}, fmt .Errorf ("%w: %v" , errUnsupportedValueOperation , pred .Operation ())
179+ default :
180+ return etcd.Cmp {}, fmt .Errorf ("%w: %v" , errUnsupportedValueOperation , pred .Operation ())
181+ }
182+ }
183+
184+ // versionPredicateToCmp converts a version predicate to an etcd comparison.
185+ func versionPredicateToCmp (pred predicate.Predicate ) (etcd.Cmp , error ) {
186+ key := string (pred .Key ())
187+ version , ok := pred .Value ().(int64 )
188+
189+ if ! ok {
190+ return etcd.Cmp {}, errVersionPredicateRequiresInt
191+ }
192+
193+ switch pred .Operation () {
194+ case predicate .OpEqual :
195+ return etcd .Compare (etcd .Version (key ), "=" , version ), nil
196+ case predicate .OpNotEqual :
197+ return etcd .Compare (etcd .Version (key ), "!=" , version ), nil
198+ case predicate .OpGreater :
199+ return etcd .Compare (etcd .Version (key ), ">" , version ), nil
200+ case predicate .OpLess :
201+ return etcd .Compare (etcd .Version (key ), "<" , version ), nil
202+ default :
203+ return etcd.Cmp {}, fmt .Errorf ("%w: %v" , errUnsupportedVersionOperation , pred .Operation ())
204+ }
205+ }
206+
207+ // operationsToEtcdOps converts operations to etcd operations.
208+ func operationsToEtcdOps (ops []operation.Operation ) ([]etcd.Op , error ) {
209+ etcdOps := make ([]etcd.Op , 0 , len (ops ))
210+ for _ , op := range ops {
211+ etcdOp , err := operationToEtcdOp (op )
212+ if err != nil {
213+ return nil , err
214+ }
215+
216+ etcdOps = append (etcdOps , etcdOp )
217+ }
218+
219+ return etcdOps , nil
220+ }
221+
222+ // operationToEtcdOp converts an operation to an etcd operation.
223+ func operationToEtcdOp (storageOperation operation.Operation ) (etcd.Op , error ) {
224+ key := string (storageOperation .Key ())
225+
226+ switch storageOperation .Type () {
227+ case operation .TypeGet :
228+ return etcd .OpGet (key ), nil
229+ case operation .TypePut :
230+ return etcd .OpPut (key , string (storageOperation .Value ())), nil
231+ case operation .TypeDelete :
232+ return etcd .OpDelete (key ), nil
233+ default :
234+ return etcd.Op {}, fmt .Errorf ("%w: %v" , errUnsupportedOperationType , storageOperation .Type ())
235+ }
236+ }
237+
238+ // etcdResponseToTxResponse converts an etcd transaction response to tx.Response.
239+ func etcdResponseToTxResponse (resp * etcd.TxnResponse ) tx.Response {
240+ results := make ([]tx.RequestResponse , 0 , len (resp .Responses ))
241+
242+ for _ , etcdResp := range resp .Responses {
243+ reqResp := tx.RequestResponse {
244+ Success : true ,
245+ KeyValue : nil ,
246+ Error : nil ,
247+ }
248+
249+ switch {
250+ case etcdResp .GetResponseRange () != nil :
251+ getResp := etcdResp .GetResponseRange ()
252+ if len (getResp .Kvs ) > 0 {
253+ etcdKv := getResp .Kvs [0 ]
254+
255+ reqResp .KeyValue = & kv.KeyValue {
256+ Key : etcdKv .Key ,
257+ Value : etcdKv .Value ,
258+ CreateRevision : etcdKv .CreateRevision ,
259+ ModRevision : etcdKv .ModRevision ,
260+ Version : etcdKv .Version ,
261+ }
262+ }
263+ case etcdResp .GetResponsePut () != nil :
264+ // Put operations don't return data.
265+ case etcdResp .GetResponseDeleteRange () != nil :
266+ // Delete operations don't return data.
267+ }
268+
269+ results = append (results , reqResp )
270+ }
271+
272+ return tx.Response {
273+ Succeeded : resp .Succeeded ,
274+ Results : results ,
275+ }
276+ }
277+
278+ // etcdEventToWatchEvent converts an etcd event to a watch event.
279+ func etcdEventToWatchEvent (etcdEvent * etcd.Event ) watch.Event {
280+ event := watch.Event {
281+ Type : watch .EventType (0 ), // Will be set below.
282+ Key : etcdEvent .Kv .Key ,
283+ Value : nil , // Will be set below.
284+ Rev : etcdEvent .Kv .ModRevision ,
285+ }
286+
287+ switch etcdEvent .Type {
288+ case etcd .EventTypePut :
289+ event .Type = watch .EventPut
290+ event .Value = etcdEvent .Kv .Value
291+ case etcd .EventTypeDelete :
292+ event .Type = watch .EventDelete
293+ event .Value = nil
294+ }
295+
296+ return event
74297}
0 commit comments