@@ -15,9 +15,13 @@ use crate::pd::PdClient;
1515use crate :: pd:: PdRpcClient ;
1616use crate :: proto:: metapb;
1717use crate :: raw:: lowering:: * ;
18+ use crate :: request:: APIVersion ;
1819use crate :: request:: Collect ;
1920use crate :: request:: CollectSingle ;
21+ use crate :: request:: EncodeVersion ;
22+ use crate :: request:: KeyMode ;
2023use crate :: request:: Plan ;
24+ use crate :: request:: TruncateVersion ;
2125use crate :: Backoff ;
2226use crate :: BoundRange ;
2327use crate :: ColumnFamily ;
@@ -41,6 +45,7 @@ pub struct Client<PdC: PdClient = PdRpcClient> {
4145 backoff : Backoff ,
4246 /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
4347 atomic : bool ,
48+ api_version : APIVersion ,
4449}
4550
4651impl Clone for Client {
@@ -50,6 +55,7 @@ impl Clone for Client {
5055 cf : self . cf . clone ( ) ,
5156 backoff : self . backoff . clone ( ) ,
5257 atomic : self . atomic ,
58+ api_version : self . api_version ,
5359 }
5460 }
5561}
@@ -100,12 +106,13 @@ impl Client<PdRpcClient> {
100106 config : Config ,
101107 ) -> Result < Self > {
102108 let pd_endpoints: Vec < String > = pd_endpoints. into_iter ( ) . map ( Into :: into) . collect ( ) ;
103- let rpc = Arc :: new ( PdRpcClient :: connect ( & pd_endpoints, config, false ) . await ?) ;
109+ let rpc = Arc :: new ( PdRpcClient :: connect ( & pd_endpoints, config. clone ( ) , false ) . await ?) ;
104110 Ok ( Client {
105111 rpc,
106112 cf : None ,
107113 backoff : DEFAULT_REGION_BACKOFF ,
108114 atomic : false ,
115+ api_version : APIVersion :: V1 ,
109116 } )
110117 }
111118
@@ -140,6 +147,7 @@ impl Client<PdRpcClient> {
140147 cf : Some ( cf) ,
141148 backoff : self . backoff . clone ( ) ,
142149 atomic : self . atomic ,
150+ api_version : self . api_version ,
143151 }
144152 }
145153
@@ -168,6 +176,7 @@ impl Client<PdRpcClient> {
168176 cf : self . cf . clone ( ) ,
169177 backoff,
170178 atomic : self . atomic ,
179+ api_version : self . api_version ,
171180 }
172181 }
173182
@@ -185,6 +194,20 @@ impl Client<PdRpcClient> {
185194 cf : self . cf . clone ( ) ,
186195 backoff : self . backoff . clone ( ) ,
187196 atomic : true ,
197+ api_version : self . api_version ,
198+ }
199+ }
200+
201+ /// Set the API version to use.
202+ #[ must_use]
203+ pub fn with_keyspace ( & self , keyspace : String ) -> Self {
204+ // FIXME
205+ Client {
206+ rpc : self . rpc . clone ( ) ,
207+ cf : self . cf . clone ( ) ,
208+ backoff : self . backoff . clone ( ) ,
209+ atomic : self . atomic ,
210+ api_version : APIVersion :: V2 { keyspace_id : 0 } ,
188211 }
189212 }
190213}
@@ -210,8 +233,9 @@ impl<PdC: PdClient> Client<PdC> {
210233 /// ```
211234 pub async fn get ( & self , key : impl Into < Key > ) -> Result < Option < Value > > {
212235 debug ! ( "invoking raw get request" ) ;
213- let request = new_raw_get_request ( key. into ( ) , self . cf . clone ( ) ) ;
214- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
236+ let key = key. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
237+ let request = new_raw_get_request ( key, self . cf . clone ( ) ) ;
238+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
215239 . retry_multi_region ( self . backoff . clone ( ) )
216240 . merge ( CollectSingle )
217241 . post_process_default ( )
@@ -242,14 +266,19 @@ impl<PdC: PdClient> Client<PdC> {
242266 keys : impl IntoIterator < Item = impl Into < Key > > ,
243267 ) -> Result < Vec < KvPair > > {
244268 debug ! ( "invoking raw batch_get request" ) ;
245- let request = new_raw_batch_get_request ( keys. into_iter ( ) . map ( Into :: into) , self . cf . clone ( ) ) ;
246- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
269+ let keys = keys
270+ . into_iter ( )
271+ . map ( |k| k. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ) ;
272+ let request = new_raw_batch_get_request ( keys, self . cf . clone ( ) ) ;
273+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
247274 . retry_multi_region ( self . backoff . clone ( ) )
248275 . merge ( Collect )
249276 . plan ( ) ;
250- plan. execute ( )
251- . await
252- . map ( |r| r. into_iter ( ) . map ( Into :: into) . collect ( ) )
277+ plan. execute ( ) . await . map ( |r| {
278+ r. into_iter ( )
279+ . map ( |pair| pair. truncate_version ( self . api_version ) )
280+ . collect ( )
281+ } )
253282 }
254283
255284 /// Create a new 'put' request.
@@ -270,8 +299,9 @@ impl<PdC: PdClient> Client<PdC> {
270299 /// ```
271300 pub async fn put ( & self , key : impl Into < Key > , value : impl Into < Value > ) -> Result < ( ) > {
272301 debug ! ( "invoking raw put request" ) ;
273- let request = new_raw_put_request ( key. into ( ) , value. into ( ) , self . cf . clone ( ) , self . atomic ) ;
274- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
302+ let key = key. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
303+ let request = new_raw_put_request ( key, value. into ( ) , self . cf . clone ( ) , self . atomic ) ;
304+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
275305 . retry_multi_region ( self . backoff . clone ( ) )
276306 . merge ( CollectSingle )
277307 . extract_error ( )
@@ -302,12 +332,11 @@ impl<PdC: PdClient> Client<PdC> {
302332 pairs : impl IntoIterator < Item = impl Into < KvPair > > ,
303333 ) -> Result < ( ) > {
304334 debug ! ( "invoking raw batch_put request" ) ;
305- let request = new_raw_batch_put_request (
306- pairs. into_iter ( ) . map ( Into :: into) ,
307- self . cf . clone ( ) ,
308- self . atomic ,
309- ) ;
310- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
335+ let pairs = pairs
336+ . into_iter ( )
337+ . map ( |pair| pair. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ) ;
338+ let request = new_raw_batch_put_request ( pairs, self . cf . clone ( ) , self . atomic ) ;
339+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
311340 . retry_multi_region ( self . backoff . clone ( ) )
312341 . extract_error ( )
313342 . plan ( ) ;
@@ -334,8 +363,9 @@ impl<PdC: PdClient> Client<PdC> {
334363 /// ```
335364 pub async fn delete ( & self , key : impl Into < Key > ) -> Result < ( ) > {
336365 debug ! ( "invoking raw delete request" ) ;
337- let request = new_raw_delete_request ( key. into ( ) , self . cf . clone ( ) , self . atomic ) ;
338- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
366+ let key = key. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
367+ let request = new_raw_delete_request ( key, self . cf . clone ( ) , self . atomic ) ;
368+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
339369 . retry_multi_region ( self . backoff . clone ( ) )
340370 . merge ( CollectSingle )
341371 . extract_error ( )
@@ -364,9 +394,11 @@ impl<PdC: PdClient> Client<PdC> {
364394 pub async fn batch_delete ( & self , keys : impl IntoIterator < Item = impl Into < Key > > ) -> Result < ( ) > {
365395 debug ! ( "invoking raw batch_delete request" ) ;
366396 self . assert_non_atomic ( ) ?;
367- let request =
368- new_raw_batch_delete_request ( keys. into_iter ( ) . map ( Into :: into) , self . cf . clone ( ) ) ;
369- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
397+ let keys = keys
398+ . into_iter ( )
399+ . map ( |k| k. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ) ;
400+ let request = new_raw_batch_delete_request ( keys, self . cf . clone ( ) ) ;
401+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
370402 . retry_multi_region ( self . backoff . clone ( ) )
371403 . extract_error ( )
372404 . plan ( ) ;
@@ -392,8 +424,9 @@ impl<PdC: PdClient> Client<PdC> {
392424 pub async fn delete_range ( & self , range : impl Into < BoundRange > ) -> Result < ( ) > {
393425 debug ! ( "invoking raw delete_range request" ) ;
394426 self . assert_non_atomic ( ) ?;
395- let request = new_raw_delete_range_request ( range. into ( ) , self . cf . clone ( ) ) ;
396- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
427+ let range = range. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
428+ let request = new_raw_delete_range_request ( range, self . cf . clone ( ) ) ;
429+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
397430 . retry_multi_region ( self . backoff . clone ( ) )
398431 . extract_error ( )
399432 . plan ( ) ;
@@ -543,13 +576,14 @@ impl<PdC: PdClient> Client<PdC> {
543576 ) -> Result < ( Option < Value > , bool ) > {
544577 debug ! ( "invoking raw compare_and_swap request" ) ;
545578 self . assert_atomic ( ) ?;
579+ let key = key. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
546580 let req = new_cas_request (
547- key. into ( ) ,
581+ key,
548582 new_value. into ( ) ,
549583 previous_value. into ( ) ,
550584 self . cf . clone ( ) ,
551585 ) ;
552- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , req)
586+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , req)
553587 . retry_multi_region ( self . backoff . clone ( ) )
554588 . merge ( CollectSingle )
555589 . post_process_default ( )
@@ -563,16 +597,29 @@ impl<PdC: PdClient> Client<PdC> {
563597 copr_version_req : impl Into < String > ,
564598 ranges : impl IntoIterator < Item = impl Into < BoundRange > > ,
565599 request_builder : impl Fn ( metapb:: Region , Vec < Range < Key > > ) -> Vec < u8 > + Send + Sync + ' static ,
566- ) -> Result < Vec < ( Vec < u8 > , Vec < Range < Key > > ) > > {
600+ ) -> Result < Vec < ( Vec < Range < Key > > , Vec < u8 > ) > > {
567601 let copr_version_req = copr_version_req. into ( ) ;
568602 semver:: VersionReq :: from_str ( & copr_version_req) ?;
603+ let ranges = ranges
604+ . into_iter ( )
605+ . map ( |range| range. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ) ;
606+ let api_version = self . api_version ;
607+ let request_builder = move |region, ranges : Vec < Range < Key > > | {
608+ request_builder (
609+ region,
610+ ranges
611+ . into_iter ( )
612+ . map ( |range| range. truncate_version ( api_version) )
613+ . collect ( ) ,
614+ )
615+ } ;
569616 let req = new_raw_coprocessor_request (
570617 copr_name. into ( ) ,
571618 copr_version_req,
572- ranges. into_iter ( ) . map ( Into :: into ) ,
619+ ranges,
573620 request_builder,
574621 ) ;
575- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , req)
622+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , req)
576623 . preserve_shard ( )
577624 . retry_multi_region ( self . backoff . clone ( ) )
578625 . post_process_default ( )
@@ -592,8 +639,9 @@ impl<PdC: PdClient> Client<PdC> {
592639 max_limit : MAX_RAW_KV_SCAN_LIMIT ,
593640 } ) ;
594641 }
642+
643+ let mut cur_range = range. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ;
595644 let mut result = Vec :: new ( ) ;
596- let mut cur_range = range. into ( ) ;
597645 let mut scan_regions = self . rpc . clone ( ) . stores_for_range ( cur_range. clone ( ) ) . boxed ( ) ;
598646 let mut region_store =
599647 scan_regions
@@ -603,22 +651,25 @@ impl<PdC: PdClient> Client<PdC> {
603651 range : ( cur_range. clone ( ) ) ,
604652 } ) ??;
605653 let mut cur_limit = limit;
654+
606655 while cur_limit > 0 {
607656 let request =
608657 new_raw_scan_request ( cur_range. clone ( ) , cur_limit, key_only, self . cf . clone ( ) ) ;
609- let resp = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
610- . single_region_with_store ( region_store. clone ( ) )
611- . await ?
612- . plan ( )
613- . execute ( )
614- . await ?;
658+ let resp =
659+ crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
660+ . single_region_with_store ( region_store. clone ( ) )
661+ . await ?
662+ . plan ( )
663+ . execute ( )
664+ . await ?;
615665 let mut region_scan_res = resp
616666 . kvs
617667 . into_iter ( )
618668 . map ( Into :: into)
619669 . collect :: < Vec < KvPair > > ( ) ;
620670 let res_len = region_scan_res. len ( ) ;
621671 result. append ( & mut region_scan_res) ;
672+
622673 // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
623674 if res_len < cur_limit as usize {
624675 region_store = match scan_regions. next ( ) . await {
@@ -637,8 +688,13 @@ impl<PdC: PdClient> Client<PdC> {
637688 break ;
638689 }
639690 }
691+
640692 // limit is a soft limit, so we need check the number of results
641693 result. truncate ( limit as usize ) ;
694+
695+ // truncate the version of keys
696+ let result = result. truncate_version ( self . api_version ) ;
697+
642698 Ok ( result)
643699 }
644700
@@ -655,17 +711,20 @@ impl<PdC: PdClient> Client<PdC> {
655711 } ) ;
656712 }
657713
658- let request = new_raw_batch_scan_request (
659- ranges. into_iter ( ) . map ( Into :: into) ,
660- each_limit,
661- key_only,
662- self . cf . clone ( ) ,
663- ) ;
664- let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , request)
714+ let ranges = ranges
715+ . into_iter ( )
716+ . map ( |range| range. into ( ) . encode_version ( self . api_version , KeyMode :: Raw ) ) ;
717+
718+ let request = new_raw_batch_scan_request ( ranges, each_limit, key_only, self . cf . clone ( ) ) ;
719+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . api_version , request)
665720 . retry_multi_region ( self . backoff . clone ( ) )
666721 . merge ( Collect )
667722 . plan ( ) ;
668- plan. execute ( ) . await
723+ plan. execute ( ) . await . map ( |r| {
724+ r. into_iter ( )
725+ . map ( |pair| pair. truncate_version ( self . api_version ) )
726+ . collect ( )
727+ } )
669728 }
670729
671730 fn assert_non_atomic ( & self ) -> Result < ( ) > {
@@ -718,6 +777,7 @@ mod tests {
718777 cf : Some ( ColumnFamily :: Default ) ,
719778 backoff : DEFAULT_REGION_BACKOFF ,
720779 atomic : false ,
780+ api_version : APIVersion :: V1 ,
721781 } ;
722782 let resps = client
723783 . coprocessor (
@@ -729,25 +789,25 @@ mod tests {
729789 . await ?;
730790 let resps: Vec < _ > = resps
731791 . into_iter ( )
732- . map ( |( data , ranges ) | ( String :: from_utf8 ( data) . unwrap ( ) , ranges ) )
792+ . map ( |( ranges , data ) | ( ranges , String :: from_utf8 ( data) . unwrap ( ) ) )
733793 . collect ( ) ;
734794 assert_eq ! (
735795 resps,
736796 vec![
737797 (
798+ vec![ Key :: from( vec![ 5 ] ) ..Key :: from( vec![ 10 ] ) ] ,
738799 "1:[Key(05)..Key(0A)]" . to_string( ) ,
739- vec![ Key :: from( vec![ 5 ] ) ..Key :: from( vec![ 10 ] ) ]
740800 ) ,
741801 (
742- "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]" . to_string( ) ,
743802 vec![
744803 Key :: from( vec![ 10 ] ) ..Key :: from( vec![ 15 ] ) ,
745804 Key :: from( vec![ 20 ] ) ..Key :: from( vec![ 250 , 250 ] )
746- ]
805+ ] ,
806+ "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]" . to_string( ) ,
747807 ) ,
748808 (
809+ vec![ Key :: from( vec![ 250 , 250 ] ) ..Key :: from( vec![ ] ) ] ,
749810 "3:[Key(FAFA)..Key()]" . to_string( ) ,
750- vec![ Key :: from( vec![ 250 , 250 ] ) ..Key :: from( vec![ ] ) ]
751811 )
752812 ]
753813 ) ;
0 commit comments