1- use std:: env;
2- use tikv_client:: { ColumnFamily , RawClient } ;
1+ mod ctl;
2+
3+ use futures_timer:: Delay ;
4+ use log:: { info, warn} ;
5+ use std:: { env, time:: Duration } ;
6+ use tikv_client:: { ColumnFamily , Key , RawClient , Result , TransactionClient } ;
7+
8+ const ENV_PD_ADDRS : & str = "PD_ADDRS" ;
9+ const ENV_ENABLE_MULIT_REGION : & str = "MULTI_REGION" ;
10+ const REGION_SPLIT_TIME_LIMIT : Duration = Duration :: from_secs ( 15 ) ;
311
412// Delete all entries in TiKV to leave a clean space for following tests.
513pub async fn clear_tikv ( ) {
@@ -14,7 +22,69 @@ pub async fn clear_tikv() {
1422 }
1523}
1624
17- const ENV_PD_ADDRS : & str = "PD_ADDRS" ;
25+ // To test with multiple regions, prewrite some data. Tests that hope to test
26+ // with multiple regions should use keys in the corresponding ranges.
27+ pub async fn init ( ) -> Result < ( ) > {
28+ // ignore SetLoggerError
29+ let _ = simple_logger:: SimpleLogger :: new ( )
30+ . with_level ( log:: LevelFilter :: Warn )
31+ . init ( ) ;
32+
33+ if env:: var ( ENV_ENABLE_MULIT_REGION ) . is_ok ( ) {
34+ // 1000 keys: 0..1000
35+ let keys_1 = std:: iter:: successors ( Some ( 0u32 ) , |x| Some ( x + 1 ) )
36+ . take ( 1000 )
37+ . map ( |x| x. to_be_bytes ( ) . to_vec ( ) ) ;
38+ // 1024 keys: 0..u32::MAX
39+ let count = 1024 ;
40+ let step = u32:: MAX / count;
41+ let keys_2 = std:: iter:: successors ( Some ( 0u32 ) , |x| Some ( x + step) )
42+ . take ( count as usize - 1 )
43+ . map ( |x| x. to_be_bytes ( ) . to_vec ( ) ) ;
44+
45+ ensure_region_split ( keys_1. chain ( keys_2) , 100 ) . await ?;
46+ }
47+
48+ clear_tikv ( ) . await ;
49+ Ok ( ( ) )
50+ }
51+
52+ async fn ensure_region_split (
53+ keys : impl IntoIterator < Item = impl Into < Key > > ,
54+ region_count : u32 ,
55+ ) -> Result < ( ) > {
56+ if ctl:: get_region_count ( ) . await ? as u32 >= region_count {
57+ return Ok ( ( ) ) ;
58+ }
59+
60+ // 1. write plenty transactional keys
61+ // 2. wait until regions split
62+
63+ let client = TransactionClient :: new ( pd_addrs ( ) ) . await ?;
64+ let mut txn = client. begin_optimistic ( ) . await ?;
65+ for key in keys. into_iter ( ) {
66+ txn. put ( key. into ( ) , vec ! [ 0 , 0 , 0 , 0 ] ) . await ?;
67+ }
68+ txn. commit ( ) . await ?;
69+ let mut txn = client. begin_optimistic ( ) . await ?;
70+ let _ = txn. scan ( vec ! [ ] .., 2048 ) . await ?;
71+ txn. commit ( ) . await ?;
72+
73+ info ! ( "splitting regions..." ) ;
74+ let start_time = std:: time:: Instant :: now ( ) ;
75+ loop {
76+ if ctl:: get_region_count ( ) . await ? as u32 >= region_count {
77+ break ;
78+ }
79+ if start_time. elapsed ( ) > REGION_SPLIT_TIME_LIMIT {
80+ warn ! ( "Stop splitting regions: time limit exceeded" ) ;
81+ break ;
82+ }
83+ Delay :: new ( Duration :: from_millis ( 200 ) ) . await ;
84+ }
85+
86+ Ok ( ( ) )
87+ }
1888
1989pub fn pd_addrs ( ) -> Vec < String > {
2090 env:: var ( ENV_PD_ADDRS )
0 commit comments