@@ -24,6 +24,8 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2424pub struct Client {
2525 rpc : Arc < PdRpcClient > ,
2626 cf : Option < ColumnFamily > ,
27+ /// Whether to use the [`atomic mode`](Client::with_atomic).
28+ atomic : bool ,
2729}
2830
2931impl Client {
@@ -63,7 +65,11 @@ impl Client {
6365 ) -> Result < Client > {
6466 let pd_endpoints: Vec < String > = pd_endpoints. into_iter ( ) . map ( Into :: into) . collect ( ) ;
6567 let rpc = Arc :: new ( PdRpcClient :: connect ( & pd_endpoints, & config, false ) . await ?) ;
66- Ok ( Client { rpc, cf : None } )
68+ Ok ( Client {
69+ rpc,
70+ cf : None ,
71+ atomic : false ,
72+ } )
6773 }
6874
6975 /// Set the column family of requests.
@@ -89,6 +95,22 @@ impl Client {
8995 Client {
9096 rpc : self . rpc . clone ( ) ,
9197 cf : Some ( cf) ,
98+ atomic : self . atomic ,
99+ }
100+ }
101+
102+ /// Set to use the atomic mode.
103+ ///
104+ /// The only reason of using atomic mode is the
105+ /// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
106+ /// the atomicity of CAS, write operations like [`put`](Client::put) or
107+ /// [`delete`](Client::delete) in atomic mode are more expensive. Some
108+ /// operations are not supported in the mode.
109+ pub fn with_atomic ( & self ) -> Client {
110+ Client {
111+ rpc : self . rpc . clone ( ) ,
112+ cf : self . cf . clone ( ) ,
113+ atomic : true ,
92114 }
93115 }
94116
@@ -171,15 +193,15 @@ impl Client {
171193 /// # });
172194 /// ```
173195 pub async fn put ( & self , key : impl Into < Key > , value : impl Into < Value > ) -> Result < ( ) > {
174- self . put_inner ( key, value, false ) . await
175- }
176-
177- /// Create a new *atomic* 'put' request.
178- /// Atomic operations can block each other on the same key.
179- ///
180- /// Once resolved this request will result in the setting of the value associated with the given key.
181- pub async fn atomic_put ( & self , key : impl Into < Key > , value : impl Into < Value > ) -> Result < ( ) > {
182- self . put_inner ( key , value , true ) . await
196+ let request = new_raw_put_request ( key. into ( ) , value. into ( ) , self . cf . clone ( ) , self . atomic ) ;
197+ let plan = crate :: request :: PlanBuilder :: new ( self . rpc . clone ( ) , request )
198+ . single_region ( )
199+ . await ?
200+ . retry_region ( DEFAULT_REGION_BACKOFF )
201+ . extract_error ( )
202+ . plan ( ) ;
203+ plan . execute ( ) . await ? ;
204+ Ok ( ( ) )
183205 }
184206
185207 /// Create a new 'batch put' request.
@@ -203,19 +225,18 @@ impl Client {
203225 & self ,
204226 pairs : impl IntoIterator < Item = impl Into < KvPair > > ,
205227 ) -> Result < ( ) > {
206- self . batch_put_inner ( pairs, false ) . await
207- }
208-
209- /// Create a new *atomic* 'batch put' request.
210- /// Atomic operations can block each other on the same key.
211- ///
212- /// Once resolved this request will result in the setting of the values
213- /// associated with the given keys.
214- pub async fn atomic_batch_put (
215- & self ,
216- pairs : impl IntoIterator < Item = impl Into < KvPair > > ,
217- ) -> Result < ( ) > {
218- self . batch_put_inner ( pairs, true ) . await
228+ let request = new_raw_batch_put_request (
229+ pairs. into_iter ( ) . map ( Into :: into) ,
230+ self . cf . clone ( ) ,
231+ self . atomic ,
232+ ) ;
233+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
234+ . multi_region ( )
235+ . retry_region ( DEFAULT_REGION_BACKOFF )
236+ . extract_error ( )
237+ . plan ( ) ;
238+ plan. execute ( ) . await ?;
239+ Ok ( ( ) )
219240 }
220241
221242 /// Create a new 'delete' request.
@@ -236,29 +257,15 @@ impl Client {
236257 /// # });
237258 /// ```
238259 pub async fn delete ( & self , key : impl Into < Key > ) -> Result < ( ) > {
239- self . delete_inner ( key, false ) . await
240- }
241-
242- /// Create a new *atomic* 'delete' request.
243- /// Atomic operations can block each other on the same key.
244- ///
245- /// Once resolved this request will result in the deletion of the given key.
246- ///
247- /// It does not return an error if the key does not exist in TiKV.
248- ///
249- /// # Examples
250- /// ```rust,no_run
251- /// # use tikv_client::{Key, Config, RawClient};
252- /// # use futures::prelude::*;
253- /// # futures::executor::block_on(async {
254- /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
255- /// let key = "TiKV".to_owned();
256- /// let req = client.delete(key);
257- /// let result: () = req.await.unwrap();
258- /// # });
259- /// ```
260- pub async fn atomic_delete ( & self , key : impl Into < Key > ) -> Result < ( ) > {
261- self . delete_inner ( key, true ) . await
260+ let request = new_raw_delete_request ( key. into ( ) , self . cf . clone ( ) , self . atomic ) ;
261+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
262+ . single_region ( )
263+ . await ?
264+ . retry_region ( DEFAULT_REGION_BACKOFF )
265+ . extract_error ( )
266+ . plan ( ) ;
267+ plan. execute ( ) . await ?;
268+ Ok ( ( ) )
262269 }
263270
264271 /// Create a new 'batch delete' request.
@@ -279,6 +286,7 @@ impl Client {
279286 /// # });
280287 /// ```
281288 pub async fn batch_delete ( & self , keys : impl IntoIterator < Item = impl Into < Key > > ) -> Result < ( ) > {
289+ self . assert_non_atomic ( ) ?;
282290 let request =
283291 new_raw_batch_delete_request ( keys. into_iter ( ) . map ( Into :: into) , self . cf . clone ( ) ) ;
284292 let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
@@ -306,6 +314,7 @@ impl Client {
306314 /// # });
307315 /// ```
308316 pub async fn delete_range ( & self , range : impl Into < BoundRange > ) -> Result < ( ) > {
317+ self . assert_non_atomic ( ) ?;
309318 let request = new_raw_delete_range_request ( range. into ( ) , self . cf . clone ( ) ) ;
310319 let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
311320 . multi_region ( )
@@ -436,18 +445,23 @@ impl Client {
436445
437446 /// Create a new *atomic* 'compare and set' request.
438447 ///
439- /// Once resolved this request will result in an atomic `compare and set' operation for the given key.
448+ /// Once resolved this request will result in an atomic `compare and set'
449+ /// operation for the given key.
440450 ///
441- /// If the value retrived is equal to `current_value`, `new_value` is written.
451+ /// If the value retrived is equal to `current_value`, `new_value` is
452+ /// written.
442453 ///
443454 /// # Return Value
444- /// A tuple is returned if successful: the previous value and whether the value is swapped
445- pub async fn atomic_compare_and_swap (
455+ ///
456+ /// A tuple is returned if successful: the previous value and whether the
457+ /// value is swapped
458+ pub async fn compare_and_swap (
446459 & self ,
447460 key : impl Into < Key > ,
448461 previous_value : impl Into < Option < Value > > ,
449462 new_value : impl Into < Value > ,
450463 ) -> Result < ( Option < Value > , bool ) > {
464+ self . assert_atomic ( ) ?;
451465 let req = new_cas_request (
452466 key. into ( ) ,
453467 new_value. into ( ) ,
@@ -516,48 +530,15 @@ impl Client {
516530 plan. execute ( ) . await
517531 }
518532
519- async fn put_inner (
520- & self ,
521- key : impl Into < Key > ,
522- value : impl Into < Value > ,
523- atomic : bool ,
524- ) -> Result < ( ) > {
525- let request = new_raw_put_request ( key. into ( ) , value. into ( ) , self . cf . clone ( ) , atomic) ;
526- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
527- . single_region ( )
528- . await ?
529- . retry_region ( DEFAULT_REGION_BACKOFF )
530- . extract_error ( )
531- . plan ( ) ;
532- plan. execute ( ) . await ?;
533- Ok ( ( ) )
534- }
535-
536- async fn batch_put_inner (
537- & self ,
538- pairs : impl IntoIterator < Item = impl Into < KvPair > > ,
539- atomic : bool ,
540- ) -> Result < ( ) > {
541- let request =
542- new_raw_batch_put_request ( pairs. into_iter ( ) . map ( Into :: into) , self . cf . clone ( ) , atomic) ;
543- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
544- . multi_region ( )
545- . retry_region ( DEFAULT_REGION_BACKOFF )
546- . extract_error ( )
547- . plan ( ) ;
548- plan. execute ( ) . await ?;
549- Ok ( ( ) )
533+ fn assert_non_atomic ( & self ) -> Result < ( ) > {
534+ ( !self . atomic )
535+ . then ( || ( ) )
536+ . ok_or ( Error :: UnsupportedInAtomicMode )
550537 }
551538
552- async fn delete_inner ( & self , key : impl Into < Key > , atomic : bool ) -> Result < ( ) > {
553- let request = new_raw_delete_request ( key. into ( ) , self . cf . clone ( ) , atomic) ;
554- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
555- . single_region ( )
556- . await ?
557- . retry_region ( DEFAULT_REGION_BACKOFF )
558- . extract_error ( )
559- . plan ( ) ;
560- plan. execute ( ) . await ?;
561- Ok ( ( ) )
539+ fn assert_atomic ( & self ) -> Result < ( ) > {
540+ self . atomic
541+ . then ( || ( ) )
542+ . ok_or ( Error :: UnsupportedInNonAtomicMode )
562543 }
563544}
0 commit comments