11// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22
33use crate :: { BoundRange , Key , KvPair , Result , Value } ;
4- use derive_new:: new;
54use std:: {
65 collections:: { btree_map:: Entry , BTreeMap , HashMap } ,
76 future:: Future ,
87} ;
98use tikv_client_proto:: kvrpcpb;
10- use tokio:: sync:: { Mutex , MutexGuard } ;
119
12- #[ derive( new) ]
13- struct InnerBuffer {
14- #[ new( default ) ]
10+ /// A caching layer which buffers reads and writes in a transaction.
11+ pub struct Buffer {
1512 primary_key : Option < Key > ,
16- #[ new( default ) ]
1713 entry_map : BTreeMap < Key , BufferEntry > ,
1814 is_pessimistic : bool ,
1915}
2016
21- impl InnerBuffer {
22- fn insert ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
23- let key = key. into ( ) ;
24- if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
25- self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
26- }
27- self . entry_map . insert ( key, entry) ;
28- }
29-
30- /// Set the primary key if it is not set
31- pub fn primary_key_or ( & mut self , key : & Key ) {
32- self . primary_key . get_or_insert ( key. clone ( ) ) ;
33- }
34- }
35-
36- /// A caching layer which buffers reads and writes in a transaction.
37- pub struct Buffer {
38- inner : Mutex < InnerBuffer > ,
39- }
40-
4117impl Buffer {
4218 pub fn new ( is_pessimistic : bool ) -> Buffer {
4319 Buffer {
44- inner : Mutex :: new ( InnerBuffer :: new ( is_pessimistic) ) ,
20+ primary_key : None ,
21+ entry_map : BTreeMap :: new ( ) ,
22+ is_pessimistic,
4523 }
4624 }
4725
4826 /// Get the primary key of the buffer.
4927 pub async fn get_primary_key ( & self ) -> Option < Key > {
50- self . inner . lock ( ) . await . primary_key . clone ( )
28+ self . primary_key . clone ( )
5129 }
5230
5331 /// Set the primary key if it is not set
54- pub async fn primary_key_or ( & self , key : & Key ) {
55- self . inner . lock ( ) . await . primary_key_or ( key ) ;
32+ pub async fn primary_key_or ( & mut self , key : & Key ) {
33+ self . primary_key . get_or_insert_with ( || key . clone ( ) ) ;
5634 }
5735
5836 /// Get a value from the buffer.
@@ -66,7 +44,7 @@ impl Buffer {
6644
6745 /// Get a value from the buffer. If the value is not present, run `f` to get
6846 /// the value.
69- pub async fn get_or_else < F , Fut > ( & self , key : Key , f : F ) -> Result < Option < Value > >
47+ pub async fn get_or_else < F , Fut > ( & mut self , key : Key , f : F ) -> Result < Option < Value > >
7048 where
7149 F : FnOnce ( Key ) -> Fut ,
7250 Fut : Future < Output = Result < Option < Value > > > ,
@@ -75,8 +53,7 @@ impl Buffer {
7553 MutationValue :: Determined ( value) => Ok ( value) ,
7654 MutationValue :: Undetermined => {
7755 let value = f ( key. clone ( ) ) . await ?;
78- let mut mutations = self . inner . lock ( ) . await ;
79- Self :: update_cache ( & mut mutations, key, value. clone ( ) ) ;
56+ self . update_cache ( key, value. clone ( ) ) ;
8057 Ok ( value)
8158 }
8259 }
@@ -87,7 +64,7 @@ impl Buffer {
8764 ///
8865 /// only used for snapshot read (i.e. not for `batch_get_for_update`)
8966 pub async fn batch_get_or_else < F , Fut > (
90- & self ,
67+ & mut self ,
9168 keys : impl Iterator < Item = Key > ,
9269 f : F ,
9370 ) -> Result < impl Iterator < Item = KvPair > >
@@ -96,15 +73,14 @@ impl Buffer {
9673 Fut : Future < Output = Result < Vec < KvPair > > > ,
9774 {
9875 let ( cached_results, undetermined_keys) = {
99- let mutations = self . inner . lock ( ) . await ;
10076 // Partition the keys into those we have buffered and those we have to
10177 // get from the store.
10278 let ( undetermined_keys, cached_results) : (
10379 Vec < ( Key , MutationValue ) > ,
10480 Vec < ( Key , MutationValue ) > ,
10581 ) = keys
10682 . map ( |key| {
107- let value = mutations
83+ let value = self
10884 . entry_map
10985 . get ( & key)
11086 . map ( BufferEntry :: get_value)
@@ -122,11 +98,10 @@ impl Buffer {
12298 } ;
12399
124100 let fetched_results = f ( Box :: new ( undetermined_keys) ) . await ?;
125- let mut mutations = self . inner . lock ( ) . await ;
126101 for kvpair in & fetched_results {
127102 let key = kvpair. 0 . clone ( ) ;
128103 let value = Some ( kvpair. 1 . clone ( ) ) ;
129- Self :: update_cache ( & mut mutations , key, value) ;
104+ self . update_cache ( key, value) ;
130105 }
131106
132107 let results = cached_results. chain ( fetched_results. into_iter ( ) ) ;
@@ -135,7 +110,7 @@ impl Buffer {
135110
136111 /// Run `f` to fetch entries in `range` from TiKV. Combine them with mutations in local buffer. Returns the results.
137112 pub async fn scan_and_fetch < F , Fut > (
138- & self ,
113+ & mut self ,
139114 range : BoundRange ,
140115 limit : u32 ,
141116 f : F ,
@@ -145,8 +120,7 @@ impl Buffer {
145120 Fut : Future < Output = Result < Vec < KvPair > > > ,
146121 {
147122 // read from local buffer
148- let mut mutations = self . inner . lock ( ) . await ;
149- let mutation_range = mutations. entry_map . range ( range. clone ( ) ) ;
123+ let mutation_range = self . entry_map . range ( range. clone ( ) ) ;
150124
151125 // fetch from TiKV
152126 // fetch more entries because some of them may be deleted.
@@ -177,7 +151,7 @@ impl Buffer {
177151
178152 // update local buffer
179153 for ( k, v) in & results {
180- Self :: update_cache ( & mut mutations , k. clone ( ) , Some ( v. clone ( ) ) ) ;
154+ self . update_cache ( k. clone ( ) , Some ( v. clone ( ) ) ) ;
181155 }
182156
183157 let mut res = results
@@ -190,10 +164,9 @@ impl Buffer {
190164 }
191165
192166 /// Lock the given key if necessary.
193- pub async fn lock ( & self , key : Key ) {
194- let mutations = & mut self . inner . lock ( ) . await ;
195- mutations. primary_key . get_or_insert_with ( || key. clone ( ) ) ;
196- let value = mutations
167+ pub async fn lock ( & mut self , key : Key ) {
168+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
169+ let value = self
197170 . entry_map
198171 . entry ( key)
199172 // Mutated keys don't need a lock.
@@ -205,68 +178,58 @@ impl Buffer {
205178 }
206179
207180 /// Insert a value into the buffer (does not write through).
208- pub async fn put ( & self , key : Key , value : Value ) {
209- self . inner . lock ( ) . await . insert ( key, BufferEntry :: Put ( value) ) ;
181+ pub async fn put ( & mut self , key : Key , value : Value ) {
182+ self . insert_entry ( key, BufferEntry :: Put ( value) ) ;
210183 }
211184
212185 /// Mark a value as Insert mutation into the buffer (does not write through).
213- pub async fn insert ( & self , key : Key , value : Value ) {
214- let mut mutations = self . inner . lock ( ) . await ;
215- let mut entry = mutations. entry_map . entry ( key. clone ( ) ) ;
186+ pub async fn insert ( & mut self , key : Key , value : Value ) {
187+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
216188 match entry {
217189 Entry :: Occupied ( ref mut o) if matches ! ( o. get( ) , BufferEntry :: Del ) => {
218190 o. insert ( BufferEntry :: Put ( value) ) ;
219191 }
220- _ => mutations . insert ( key, BufferEntry :: Insert ( value) ) ,
192+ _ => self . insert_entry ( key, BufferEntry :: Insert ( value) ) ,
221193 }
222194 }
223195
224196 /// Mark a value as deleted.
225- pub async fn delete ( & self , key : Key ) {
226- let mut mutations = self . inner . lock ( ) . await ;
227- let is_pessimistic = mutations. is_pessimistic ;
228- let mut entry = mutations. entry_map . entry ( key. clone ( ) ) ;
197+ pub async fn delete ( & mut self , key : Key ) {
198+ let is_pessimistic = self . is_pessimistic ;
199+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
229200
230201 match entry {
231202 Entry :: Occupied ( ref mut o)
232203 if matches ! ( o. get( ) , BufferEntry :: Insert ( _) ) && !is_pessimistic =>
233204 {
234205 o. insert ( BufferEntry :: CheckNotExist ) ;
235206 }
236- _ => mutations . insert ( key, BufferEntry :: Del ) ,
207+ _ => self . insert_entry ( key, BufferEntry :: Del ) ,
237208 }
238209 }
239210
240211 /// Converts the buffered mutations to the proto buffer version
241212 pub async fn to_proto_mutations ( & self ) -> Vec < kvrpcpb:: Mutation > {
242- self . inner
243- . lock ( )
244- . await
245- . entry_map
213+ self . entry_map
246214 . iter ( )
247215 . filter_map ( |( key, mutation) | mutation. to_proto_with_key ( key) )
248216 . collect ( )
249217 }
250218
251219 async fn get_from_mutations ( & self , key : & Key ) -> MutationValue {
252- self . inner
253- . lock ( )
254- . await
255- . entry_map
220+ self . entry_map
256221 . get ( & key)
257222 . map ( BufferEntry :: get_value)
258223 . unwrap_or ( MutationValue :: Undetermined )
259224 }
260225
261- fn update_cache ( buffer : & mut MutexGuard < InnerBuffer > , key : Key , value : Option < Value > ) {
262- match buffer . entry_map . get ( & key) {
226+ fn update_cache ( & mut self , key : Key , value : Option < Value > ) {
227+ match self . entry_map . get ( & key) {
263228 Some ( BufferEntry :: Locked ( None ) ) => {
264- buffer
265- . entry_map
266- . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
229+ self . entry_map . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
267230 }
268231 None => {
269- buffer . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
232+ self . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
270233 }
271234 Some ( BufferEntry :: Cached ( v) ) | Some ( BufferEntry :: Locked ( Some ( v) ) ) => {
272235 assert ! ( & value == v) ;
@@ -285,6 +248,14 @@ impl Buffer {
285248 }
286249 }
287250 }
251+
252+ fn insert_entry ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
253+ let key = key. into ( ) ;
254+ if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
255+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
256+ }
257+ self . entry_map . insert ( key, entry) ;
258+ }
288259}
289260
290261// The state of a key-value pair in the buffer.
@@ -388,7 +359,7 @@ mod tests {
388359 #[ tokio:: test]
389360 #[ allow( unreachable_code) ]
390361 async fn set_and_get_from_buffer ( ) {
391- let buffer = Buffer :: new ( false ) ;
362+ let mut buffer = Buffer :: new ( false ) ;
392363 buffer
393364 . put ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
394365 . await ;
@@ -421,7 +392,7 @@ mod tests {
421392 #[ tokio:: test]
422393 #[ allow( unreachable_code) ]
423394 async fn insert_and_get_from_buffer ( ) {
424- let buffer = Buffer :: new ( false ) ;
395+ let mut buffer = Buffer :: new ( false ) ;
425396 buffer
426397 . insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
427398 . await ;
@@ -463,13 +434,13 @@ mod tests {
463434 let v2: Value = b"value2" . to_vec ( ) ;
464435 let v2_ = v2. clone ( ) ;
465436
466- let buffer = Buffer :: new ( false ) ;
437+ let mut buffer = Buffer :: new ( false ) ;
467438 let r1 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Ok ( Some ( v1_) ) ) ) ) ;
468439 let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
469440 assert_eq ! ( r1. unwrap( ) . unwrap( ) , v1) ;
470441 assert_eq ! ( r2. unwrap( ) . unwrap( ) , v1) ;
471442
472- let buffer = Buffer :: new ( false ) ;
443+ let mut buffer = Buffer :: new ( false ) ;
473444 let r1 = block_on (
474445 buffer. batch_get_or_else ( vec ! [ k1. clone( ) , k2. clone( ) ] . into_iter ( ) , move |_| {
475446 ready ( Ok ( vec ! [ ( k1_, v1__) . into( ) , ( k2_, v2_) . into( ) ] ) )
0 commit comments