@@ -10,7 +10,6 @@ use crate::proto::tikvpb::tikv_client::TikvClient;
1010use crate :: range_request;
1111use crate :: region:: RegionWithLeader ;
1212use crate :: request:: plan:: ResponseWithShard ;
13- use crate :: request:: Collect ;
1413use crate :: request:: CollectSingle ;
1514use crate :: request:: DefaultProcessor ;
1615use crate :: request:: KvRequest ;
@@ -19,6 +18,7 @@ use crate::request::Process;
1918use crate :: request:: RangeRequest ;
2019use crate :: request:: Shardable ;
2120use crate :: request:: SingleKey ;
21+ use crate :: request:: { Batchable , Collect } ;
2222use crate :: shardable_key;
2323use crate :: shardable_keys;
2424use crate :: shardable_range;
@@ -35,12 +35,15 @@ use crate::Result;
3535use crate :: Value ;
3636use async_trait:: async_trait;
3737use futures:: stream:: BoxStream ;
38+ use futures:: { stream, StreamExt } ;
3839use std:: any:: Any ;
3940use std:: ops:: Range ;
4041use std:: sync:: Arc ;
4142use std:: time:: Duration ;
4243use tonic:: transport:: Channel ;
4344
45+ const RAW_KV_REQUEST_BATCH_SIZE : u64 = 16 * 1024 ; // 16 KB
46+
4447pub fn new_raw_get_request ( key : Vec < u8 > , cf : Option < ColumnFamily > ) -> kvrpcpb:: RawGetRequest {
4548 let mut req = kvrpcpb:: RawGetRequest :: default ( ) ;
4649 req. key = key;
@@ -188,6 +191,14 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
188191 type Response = kvrpcpb:: RawBatchPutResponse ;
189192}
190193
194+ impl Batchable for kvrpcpb:: RawBatchPutRequest {
195+ type Item = ( kvrpcpb:: KvPair , u64 ) ;
196+
197+ fn item_size ( item : & Self :: Item ) -> u64 {
198+ ( item. 0 . key . len ( ) + item. 0 . value . len ( ) ) as u64
199+ }
200+ }
201+
191202impl Shardable for kvrpcpb:: RawBatchPutRequest {
192203 type Shard = Vec < ( kvrpcpb:: KvPair , u64 ) > ;
193204
@@ -204,6 +215,16 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
204215 . collect ( ) ;
205216 kv_ttl. sort_by ( |a, b| a. 0 . key . cmp ( & b. 0 . key ) ) ;
206217 region_stream_for_keys ( kv_ttl. into_iter ( ) , pd_client. clone ( ) )
218+ . flat_map ( |result| match result {
219+ Ok ( ( keys, region) ) => stream:: iter ( kvrpcpb:: RawBatchPutRequest :: batches (
220+ keys,
221+ RAW_KV_REQUEST_BATCH_SIZE ,
222+ ) )
223+ . map ( move |batch| Ok ( ( batch, region. clone ( ) ) ) )
224+ . boxed ( ) ,
225+ Err ( e) => stream:: iter ( Err ( e) ) . boxed ( ) ,
226+ } )
227+ . boxed ( )
207228 }
208229
209230 fn apply_shard ( & mut self , shard : Self :: Shard ) {
@@ -212,6 +233,18 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
212233 self . ttls = ttls;
213234 }
214235
236+ fn clone_then_apply_shard ( & self , shard : Self :: Shard ) -> Self
237+ where
238+ Self : Sized + Clone ,
239+ {
240+ let mut cloned = Self :: default ( ) ;
241+ cloned. context = self . context . clone ( ) ;
242+ cloned. cf = self . cf . clone ( ) ;
243+ cloned. for_cas = self . for_cas ;
244+ cloned. apply_shard ( shard) ;
245+ cloned
246+ }
247+
215248 fn apply_store ( & mut self , store : & RegionStore ) -> Result < ( ) > {
216249 self . set_leader ( & store. region_with_leader )
217250 }
@@ -257,7 +290,56 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
257290 type Response = kvrpcpb:: RawBatchDeleteResponse ;
258291}
259292
260- shardable_keys ! ( kvrpcpb:: RawBatchDeleteRequest ) ;
293+ impl Batchable for kvrpcpb:: RawBatchDeleteRequest {
294+ type Item = Vec < u8 > ;
295+
296+ fn item_size ( item : & Self :: Item ) -> u64 {
297+ item. len ( ) as u64
298+ }
299+ }
300+
301+ impl Shardable for kvrpcpb:: RawBatchDeleteRequest {
302+ type Shard = Vec < Vec < u8 > > ;
303+
304+ fn shards (
305+ & self ,
306+ pd_client : & Arc < impl PdClient > ,
307+ ) -> BoxStream < ' static , Result < ( Self :: Shard , RegionWithLeader ) > > {
308+ let mut keys = self . keys . clone ( ) ;
309+ keys. sort ( ) ;
310+ region_stream_for_keys ( keys. into_iter ( ) , pd_client. clone ( ) )
311+ . flat_map ( |result| match result {
312+ Ok ( ( keys, region) ) => stream:: iter ( kvrpcpb:: RawBatchDeleteRequest :: batches (
313+ keys,
314+ RAW_KV_REQUEST_BATCH_SIZE ,
315+ ) )
316+ . map ( move |batch| Ok ( ( batch, region. clone ( ) ) ) )
317+ . boxed ( ) ,
318+ Err ( e) => stream:: iter ( Err ( e) ) . boxed ( ) ,
319+ } )
320+ . boxed ( )
321+ }
322+
323+ fn apply_shard ( & mut self , shard : Self :: Shard ) {
324+ self . keys = shard;
325+ }
326+
327+ fn clone_then_apply_shard ( & self , shard : Self :: Shard ) -> Self
328+ where
329+ Self : Sized + Clone ,
330+ {
331+ let mut cloned = Self :: default ( ) ;
332+ cloned. context = self . context . clone ( ) ;
333+ cloned. cf = self . cf . clone ( ) ;
334+ cloned. for_cas = self . for_cas ;
335+ cloned. apply_shard ( shard) ;
336+ cloned
337+ }
338+
339+ fn apply_store ( & mut self , store : & RegionStore ) -> Result < ( ) > {
340+ self . set_leader ( & store. region_with_leader )
341+ }
342+ }
261343
262344pub fn new_raw_delete_range_request (
263345 start_key : Vec < u8 > ,
0 commit comments