11// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2-
32use core:: ops:: Range ;
3+
44use std:: str:: FromStr ;
55use std:: sync:: Arc ;
66
7- use futures:: StreamExt ;
87use log:: debug;
8+ use tokio:: time:: sleep;
99
10- use crate :: backoff:: DEFAULT_REGION_BACKOFF ;
10+ use crate :: backoff:: { DEFAULT_REGION_BACKOFF , DEFAULT_STORE_BACKOFF } ;
1111use crate :: common:: Error ;
1212use crate :: config:: Config ;
1313use crate :: pd:: PdClient ;
1414use crate :: pd:: PdRpcClient ;
15+ use crate :: proto:: kvrpcpb:: { RawScanRequest , RawScanResponse } ;
1516use crate :: proto:: metapb;
1617use crate :: raw:: lowering:: * ;
17- use crate :: request:: Collect ;
1818use crate :: request:: CollectSingle ;
1919use crate :: request:: EncodeKeyspace ;
2020use crate :: request:: KeyMode ;
2121use crate :: request:: Keyspace ;
2222use crate :: request:: Plan ;
2323use crate :: request:: TruncateKeyspace ;
24+ use crate :: request:: { plan, Collect } ;
25+ use crate :: store:: { HasRegionError , RegionStore } ;
2426use crate :: Backoff ;
2527use crate :: BoundRange ;
2628use crate :: ColumnFamily ;
29+ use crate :: Error :: RegionError ;
2730use crate :: Key ;
2831use crate :: KvPair ;
2932use crate :: Result ;
@@ -755,57 +758,37 @@ impl<PdC: PdClient> Client<PdC> {
755758 max_limit : MAX_RAW_KV_SCAN_LIMIT ,
756759 } ) ;
757760 }
758-
759- let mut cur_range = range. into ( ) . encode_keyspace ( self . keyspace , KeyMode :: Raw ) ;
761+ let backoff = DEFAULT_STORE_BACKOFF ;
762+ let mut range = range. into ( ) . encode_keyspace ( self . keyspace , KeyMode :: Raw ) ;
760763 let mut result = Vec :: new ( ) ;
761- let mut scan_regions = self . rpc . clone ( ) . stores_for_range ( cur_range. clone ( ) ) . boxed ( ) ;
762- let mut region_store =
763- scan_regions
764- . next ( )
765- . await
766- . ok_or ( Error :: RegionForRangeNotFound {
767- range : ( cur_range. clone ( ) ) ,
768- } ) ??;
769- let mut cur_limit = limit;
770-
771- while cur_limit > 0 {
772- let request = new_raw_scan_request (
773- cur_range. clone ( ) ,
774- cur_limit,
764+ let mut current_limit = limit;
765+ let ( start_key, end_key) = range. clone ( ) . into_keys ( ) ;
766+ let mut current_key: Key = start_key;
767+
768+ while current_limit > 0 {
769+ let scan_args = ScanInnerArgs {
770+ start_key : current_key. clone ( ) ,
771+ end_key : end_key. clone ( ) ,
772+ limit : current_limit,
775773 key_only,
776774 reverse,
777- self . cf . clone ( ) ,
778- ) ;
779- let resp = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . keyspace , request)
780- . single_region_with_store ( region_store. clone ( ) )
781- . await ?
782- . plan ( )
783- . execute ( )
784- . await ?;
785- let mut region_scan_res = resp
786- . kvs
787- . into_iter ( )
788- . map ( Into :: into)
789- . collect :: < Vec < KvPair > > ( ) ;
790- let res_len = region_scan_res. len ( ) ;
791- result. append ( & mut region_scan_res) ;
792-
793- // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
794- if res_len < cur_limit as usize {
795- region_store = match scan_regions. next ( ) . await {
796- Some ( Ok ( rs) ) => {
797- cur_range = BoundRange :: new (
798- std:: ops:: Bound :: Included ( region_store. region_with_leader . range ( ) . 1 ) ,
799- cur_range. to ,
800- ) ;
801- rs
802- }
803- Some ( Err ( e) ) => return Err ( e) ,
804- None => break ,
805- } ;
806- cur_limit -= res_len as u32 ;
807- } else {
775+ backoff : backoff. clone ( ) ,
776+ } ;
777+ let ( res, next_key) = self . retryable_scan ( scan_args) . await ?;
778+
779+ let mut kvs = res
780+ . map ( |r| r. kvs . into_iter ( ) . map ( Into :: into) . collect :: < Vec < KvPair > > ( ) )
781+ . unwrap_or ( Vec :: new ( ) ) ;
782+
783+ if !kvs. is_empty ( ) {
784+ current_limit -= kvs. len ( ) as u32 ;
785+ result. append ( & mut kvs) ;
786+ }
787+ if end_key. clone ( ) . is_some_and ( |ek| ek <= next_key) {
808788 break ;
789+ } else {
790+ current_key = next_key;
791+ range = BoundRange :: new ( std:: ops:: Bound :: Included ( current_key. clone ( ) ) , range. to ) ;
809792 }
810793 }
811794
@@ -818,6 +801,58 @@ impl<PdC: PdClient> Client<PdC> {
818801 Ok ( result)
819802 }
820803
804+ async fn retryable_scan (
805+ & self ,
806+ mut scan_args : ScanInnerArgs ,
807+ ) -> Result < ( Option < RawScanResponse > , Key ) > {
808+ let start_key = scan_args. start_key ;
809+ let end_key = scan_args. end_key ;
810+ loop {
811+ let region = self . rpc . clone ( ) . region_for_key ( & start_key) . await ?;
812+ let store = self . rpc . clone ( ) . store_for_id ( region. id ( ) ) . await ?;
813+ let request = new_raw_scan_request (
814+ ( start_key. clone ( ) , end_key. clone ( ) ) . into ( ) ,
815+ scan_args. limit ,
816+ scan_args. key_only ,
817+ scan_args. reverse ,
818+ self . cf . clone ( ) ,
819+ ) ;
820+ let resp = self . do_store_scan ( store. clone ( ) , request. clone ( ) ) . await ;
821+ return match resp {
822+ Ok ( mut r) => {
823+ if let Some ( err) = r. region_error ( ) {
824+ let status =
825+ plan:: handle_region_error ( self . rpc . clone ( ) , err. clone ( ) , store. clone ( ) )
826+ . await ?;
827+ if status {
828+ continue ;
829+ } else if let Some ( duration) = scan_args. backoff . next_delay_duration ( ) {
830+ sleep ( duration) . await ;
831+ continue ;
832+ } else {
833+ return Err ( RegionError ( Box :: new ( err) ) ) ;
834+ }
835+ }
836+ Ok ( ( Some ( r) , region. end_key ( ) ) )
837+ }
838+ Err ( err) => Err ( err) ,
839+ } ;
840+ }
841+ }
842+
843+ async fn do_store_scan (
844+ & self ,
845+ store : RegionStore ,
846+ scan_request : RawScanRequest ,
847+ ) -> Result < RawScanResponse > {
848+ crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . keyspace , scan_request)
849+ . single_region_with_store ( store. clone ( ) )
850+ . await ?
851+ . plan ( )
852+ . execute ( )
853+ . await
854+ }
855+
821856 async fn batch_scan_inner (
822857 & self ,
823858 ranges : impl IntoIterator < Item = impl Into < BoundRange > > ,
@@ -864,6 +899,16 @@ impl<PdC: PdClient> Client<PdC> {
864899 }
865900}
866901
902+ #[ derive( Clone ) ]
903+ struct ScanInnerArgs {
904+ start_key : Key ,
905+ end_key : Option < Key > ,
906+ limit : u32 ,
907+ key_only : bool ,
908+ reverse : bool ,
909+ backoff : Backoff ,
910+ }
911+
867912#[ cfg( test) ]
868913mod tests {
869914 use std:: any:: Any ;
0 commit comments