11// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22
33use crate :: { BoundRange , Key , KvPair , Result , Value } ;
4- use derive_new:: new;
54use std:: {
65 collections:: { btree_map:: Entry , BTreeMap , HashMap } ,
76 future:: Future ,
87} ;
98use tikv_client_proto:: kvrpcpb;
109
11- #[ derive( new) ]
12- struct InnerBuffer {
13- #[ new( default ) ]
10+ /// A caching layer which buffers reads and writes in a transaction.
11+ pub struct Buffer {
1412 primary_key : Option < Key > ,
15- #[ new( default ) ]
1613 entry_map : BTreeMap < Key , BufferEntry > ,
1714 is_pessimistic : bool ,
1815}
1916
20- impl InnerBuffer {
21- fn insert ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
22- let key = key. into ( ) ;
23- if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
24- self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
25- }
26- self . entry_map . insert ( key, entry) ;
27- }
28-
29- /// Set the primary key if it is not set
30- pub fn primary_key_or ( & mut self , key : & Key ) {
31- self . primary_key . get_or_insert ( key. clone ( ) ) ;
32- }
33- }
34-
35- /// A caching layer which buffers reads and writes in a transaction.
36- pub struct Buffer {
37- inner : InnerBuffer ,
38- }
39-
4017impl Buffer {
4118 pub fn new ( is_pessimistic : bool ) -> Buffer {
4219 Buffer {
43- inner : InnerBuffer :: new ( is_pessimistic) ,
20+ primary_key : None ,
21+ entry_map : BTreeMap :: new ( ) ,
22+ is_pessimistic,
4423 }
4524 }
4625
4726 /// Get the primary key of the buffer.
4827 pub async fn get_primary_key ( & self ) -> Option < Key > {
49- self . inner . primary_key . clone ( )
28+ self . primary_key . clone ( )
5029 }
5130
5231 /// Set the primary key if it is not set
5332 pub async fn primary_key_or ( & mut self , key : & Key ) {
54- self . inner . primary_key_or ( key) ;
33+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
5534 }
5635
5736 /// Get a value from the buffer.
@@ -74,7 +53,7 @@ impl Buffer {
7453 MutationValue :: Determined ( value) => Ok ( value) ,
7554 MutationValue :: Undetermined => {
7655 let value = f ( key. clone ( ) ) . await ?;
77- Self :: update_cache ( & mut self . inner , key, value. clone ( ) ) ;
56+ self . update_cache ( key, value. clone ( ) ) ;
7857 Ok ( value)
7958 }
8059 }
@@ -102,7 +81,6 @@ impl Buffer {
10281 ) = keys
10382 . map ( |key| {
10483 let value = self
105- . inner
10684 . entry_map
10785 . get ( & key)
10886 . map ( BufferEntry :: get_value)
@@ -123,7 +101,7 @@ impl Buffer {
123101 for kvpair in & fetched_results {
124102 let key = kvpair. 0 . clone ( ) ;
125103 let value = Some ( kvpair. 1 . clone ( ) ) ;
126- Self :: update_cache ( & mut self . inner , key, value) ;
104+ self . update_cache ( key, value) ;
127105 }
128106
129107 let results = cached_results. chain ( fetched_results. into_iter ( ) ) ;
@@ -142,7 +120,7 @@ impl Buffer {
142120 Fut : Future < Output = Result < Vec < KvPair > > > ,
143121 {
144122 // read from local buffer
145- let mutation_range = self . inner . entry_map . range ( range. clone ( ) ) ;
123+ let mutation_range = self . entry_map . range ( range. clone ( ) ) ;
146124
147125 // fetch from TiKV
148126 // fetch more entries because some of them may be deleted.
@@ -173,7 +151,7 @@ impl Buffer {
173151
174152 // update local buffer
175153 for ( k, v) in & results {
176- Self :: update_cache ( & mut self . inner , k. clone ( ) , Some ( v. clone ( ) ) ) ;
154+ self . update_cache ( k. clone ( ) , Some ( v. clone ( ) ) ) ;
177155 }
178156
179157 let mut res = results
@@ -187,9 +165,8 @@ impl Buffer {
187165
188166 /// Lock the given key if necessary.
189167 pub async fn lock ( & mut self , key : Key ) {
190- self . inner . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
168+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
191169 let value = self
192- . inner
193170 . entry_map
194171 . entry ( key)
195172 // Mutated keys don't need a lock.
@@ -202,61 +179,57 @@ impl Buffer {
202179
203180 /// Insert a value into the buffer (does not write through).
204181 pub async fn put ( & mut self , key : Key , value : Value ) {
205- self . inner . insert ( key, BufferEntry :: Put ( value) ) ;
182+ self . insert_entry ( key, BufferEntry :: Put ( value) ) ;
206183 }
207184
208185 /// Mark a value as Insert mutation into the buffer (does not write through).
209186 pub async fn insert ( & mut self , key : Key , value : Value ) {
210- let mut entry = self . inner . entry_map . entry ( key. clone ( ) ) ;
187+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
211188 match entry {
212189 Entry :: Occupied ( ref mut o) if matches ! ( o. get( ) , BufferEntry :: Del ) => {
213190 o. insert ( BufferEntry :: Put ( value) ) ;
214191 }
215- _ => self . inner . insert ( key, BufferEntry :: Insert ( value) ) ,
192+ _ => self . insert_entry ( key, BufferEntry :: Insert ( value) ) ,
216193 }
217194 }
218195
219196 /// Mark a value as deleted.
220197 pub async fn delete ( & mut self , key : Key ) {
221- let is_pessimistic = self . inner . is_pessimistic ;
222- let mut entry = self . inner . entry_map . entry ( key. clone ( ) ) ;
198+ let is_pessimistic = self . is_pessimistic ;
199+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
223200
224201 match entry {
225202 Entry :: Occupied ( ref mut o)
226203 if matches ! ( o. get( ) , BufferEntry :: Insert ( _) ) && !is_pessimistic =>
227204 {
228205 o. insert ( BufferEntry :: CheckNotExist ) ;
229206 }
230- _ => self . inner . insert ( key, BufferEntry :: Del ) ,
207+ _ => self . insert_entry ( key, BufferEntry :: Del ) ,
231208 }
232209 }
233210
234211 /// Converts the buffered mutations to the proto buffer version
235212 pub async fn to_proto_mutations ( & self ) -> Vec < kvrpcpb:: Mutation > {
236- self . inner
237- . entry_map
213+ self . entry_map
238214 . iter ( )
239215 . filter_map ( |( key, mutation) | mutation. to_proto_with_key ( key) )
240216 . collect ( )
241217 }
242218
243219 async fn get_from_mutations ( & self , key : & Key ) -> MutationValue {
244- self . inner
245- . entry_map
220+ self . entry_map
246221 . get ( & key)
247222 . map ( BufferEntry :: get_value)
248223 . unwrap_or ( MutationValue :: Undetermined )
249224 }
250225
251- fn update_cache ( buffer : & mut InnerBuffer , key : Key , value : Option < Value > ) {
252- match buffer . entry_map . get ( & key) {
226+ fn update_cache ( & mut self , key : Key , value : Option < Value > ) {
227+ match self . entry_map . get ( & key) {
253228 Some ( BufferEntry :: Locked ( None ) ) => {
254- buffer
255- . entry_map
256- . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
229+ self . entry_map . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
257230 }
258231 None => {
259- buffer . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
232+ self . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
260233 }
261234 Some ( BufferEntry :: Cached ( v) ) | Some ( BufferEntry :: Locked ( Some ( v) ) ) => {
262235 assert ! ( & value == v) ;
@@ -275,6 +248,14 @@ impl Buffer {
275248 }
276249 }
277250 }
251+
252+ fn insert_entry ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
253+ let key = key. into ( ) ;
254+ if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
255+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
256+ }
257+ self . entry_map . insert ( key, entry) ;
258+ }
278259}
279260
280261// The state of a key-value pair in the buffer.
0 commit comments