@@ -13,6 +13,47 @@ use futures::stream::BoxStream;
1313use std:: { collections:: HashMap , iter, sync:: Arc } ;
1414use tikv_client_proto:: { kvrpcpb, pdpb:: Timestamp } ;
1515
16+ // implement HasLocks for a response type that has a `pairs` field,
17+ // where locks can be extracted from both the `pairs` and `error` fields
18+ macro_rules! pair_locks {
19+ ( $response_type: ty) => {
20+ impl HasLocks for $response_type {
21+ fn take_locks( & mut self ) -> Vec <kvrpcpb:: LockInfo > {
22+ if self . pairs. is_empty( ) {
23+ self . error
24+ . as_mut( )
25+ . and_then( |error| error. locked. take( ) )
26+ . into_iter( )
27+ . collect( )
28+ } else {
29+ self . pairs
30+ . iter_mut( )
31+ . filter_map( |pair| {
32+ pair. error. as_mut( ) . and_then( |error| error. locked. take( ) )
33+ } )
34+ . collect( )
35+ }
36+ }
37+ }
38+ } ;
39+ }
40+
41+ // implement HasLocks for a response type that does not have a `pairs` field,
42+ // where locks are only extracted from the `error` field
43+ macro_rules! error_locks {
44+ ( $response_type: ty) => {
45+ impl HasLocks for $response_type {
46+ fn take_locks( & mut self ) -> Vec <kvrpcpb:: LockInfo > {
47+ self . error
48+ . as_mut( )
49+ . and_then( |error| error. locked. take( ) )
50+ . into_iter( )
51+ . collect( )
52+ }
53+ }
54+ } ;
55+ }
56+
1657pub fn new_get_request ( key : Vec < u8 > , timestamp : u64 ) -> kvrpcpb:: GetRequest {
1758 let mut req = kvrpcpb:: GetRequest :: default ( ) ;
1859 req. set_key ( key) ;
@@ -43,16 +84,6 @@ impl Process<kvrpcpb::GetResponse> for DefaultProcessor {
4384 }
4485}
4586
46- impl HasLocks for kvrpcpb:: GetResponse {
47- fn take_locks ( & mut self ) -> Vec < kvrpcpb:: LockInfo > {
48- self . error
49- . as_mut ( )
50- . and_then ( |error| error. locked . take ( ) )
51- . into_iter ( )
52- . collect ( )
53- }
54- }
55-
5687pub fn new_batch_get_request ( keys : Vec < Vec < u8 > > , timestamp : u64 ) -> kvrpcpb:: BatchGetRequest {
5788 let mut req = kvrpcpb:: BatchGetRequest :: default ( ) ;
5889 req. set_keys ( keys) ;
@@ -77,15 +108,6 @@ impl Merge<kvrpcpb::BatchGetResponse> for Collect {
77108 }
78109}
79110
80- impl HasLocks for kvrpcpb:: BatchGetResponse {
81- fn take_locks ( & mut self ) -> Vec < kvrpcpb:: LockInfo > {
82- self . pairs
83- . iter_mut ( )
84- . filter_map ( |pair| pair. error . as_mut ( ) . and_then ( |error| error. locked . take ( ) ) )
85- . collect ( )
86- }
87- }
88-
89111pub fn new_scan_request (
90112 start_key : Vec < u8 > ,
91113 end_key : Vec < u8 > ,
@@ -119,15 +141,6 @@ impl Merge<kvrpcpb::ScanResponse> for Collect {
119141 }
120142}
121143
122- impl HasLocks for kvrpcpb:: ScanResponse {
123- fn take_locks ( & mut self ) -> Vec < kvrpcpb:: LockInfo > {
124- self . pairs
125- . iter_mut ( )
126- . filter_map ( |pair| pair. error . as_mut ( ) . and_then ( |error| error. locked . take ( ) ) )
127- . collect ( )
128- }
129- }
130-
131144pub fn new_resolve_lock_request (
132145 start_version : u64 ,
133146 commit_version : u64 ,
@@ -547,13 +560,32 @@ pub struct SecondaryLocksStatus {
547560 pub commit_ts : Option < Timestamp > ,
548561}
549562
550- impl HasLocks for kvrpcpb:: CommitResponse { }
563+ impl HasLocks for kvrpcpb:: PessimisticRollbackResponse {
564+ fn take_locks ( & mut self ) -> Vec < kvrpcpb:: LockInfo > {
565+ self . errors
566+ . iter_mut ( )
567+ . filter_map ( |error| error. locked . take ( ) )
568+ . collect ( )
569+ }
570+ }
571+
572+ impl HasLocks for kvrpcpb:: PessimisticLockResponse {
573+ fn take_locks ( & mut self ) -> Vec < kvrpcpb:: LockInfo > {
574+ self . errors
575+ . iter_mut ( )
576+ . filter_map ( |error| error. locked . take ( ) )
577+ . collect ( )
578+ }
579+ }
580+
581+ pair_locks ! ( kvrpcpb:: BatchGetResponse ) ;
582+ pair_locks ! ( kvrpcpb:: ScanResponse ) ;
583+ error_locks ! ( kvrpcpb:: GetResponse ) ;
584+ error_locks ! ( kvrpcpb:: ResolveLockResponse ) ;
585+ error_locks ! ( kvrpcpb:: CommitResponse ) ;
586+ error_locks ! ( kvrpcpb:: BatchRollbackResponse ) ;
587+ error_locks ! ( kvrpcpb:: TxnHeartBeatResponse ) ;
588+ error_locks ! ( kvrpcpb:: CheckTxnStatusResponse ) ;
589+ error_locks ! ( kvrpcpb:: CheckSecondaryLocksResponse ) ;
551590impl HasLocks for kvrpcpb:: CleanupResponse { }
552- impl HasLocks for kvrpcpb:: BatchRollbackResponse { }
553- impl HasLocks for kvrpcpb:: PessimisticRollbackResponse { }
554- impl HasLocks for kvrpcpb:: ResolveLockResponse { }
555591impl HasLocks for kvrpcpb:: ScanLockResponse { }
556- impl HasLocks for kvrpcpb:: PessimisticLockResponse { }
557- impl HasLocks for kvrpcpb:: TxnHeartBeatResponse { }
558- impl HasLocks for kvrpcpb:: CheckTxnStatusResponse { }
559- impl HasLocks for kvrpcpb:: CheckSecondaryLocksResponse { }
0 commit comments