11// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22
33use crate :: { BoundRange , Key , KvPair , Result , Value } ;
4+ use derive_new:: new;
45use std:: {
5- collections:: { BTreeMap , HashMap } ,
6+ collections:: { btree_map :: Entry , BTreeMap , HashMap } ,
67 future:: Future ,
78} ;
89use tikv_client_proto:: kvrpcpb;
910use tokio:: sync:: { Mutex , MutexGuard } ;
1011
11- #[ derive( Default ) ]
12+ #[ derive( new ) ]
1213struct InnerBuffer {
14+ #[ new( default ) ]
1315 primary_key : Option < Key > ,
16+ #[ new( default ) ]
1417 entry_map : BTreeMap < Key , BufferEntry > ,
18+ is_pessimistic : bool ,
1519}
1620
1721impl InnerBuffer {
@@ -29,16 +33,22 @@ impl InnerBuffer {
2933}
3034
3135/// A caching layer which buffers reads and writes in a transaction.
32- #[ derive( Default ) ]
3336pub struct Buffer {
3437 mutations : Mutex < InnerBuffer > ,
3538}
3639
3740impl Buffer {
41+ pub fn new ( is_pessimistic : bool ) -> Buffer {
42+ Buffer {
43+ mutations : Mutex :: new ( InnerBuffer :: new ( is_pessimistic) ) ,
44+ }
45+ }
46+
3847 /// Get the primary key of the buffer.
3948 pub async fn get_primary_key ( & self ) -> Option < Key > {
4049 self . mutations . lock ( ) . await . primary_key . clone ( )
4150 }
51+
4252 /// Get the primary key of the buffer, if not exists, use `key` as the primary key.
4353 pub async fn get_primary_key_or ( & self , key : & Key ) -> Key {
4454 self . mutations . lock ( ) . await . get_primary_key_or ( key) . clone ( )
@@ -203,28 +213,30 @@ impl Buffer {
203213
204214 /// Mark a value as Insert mutation into the buffer (does not write through).
205215 pub async fn insert ( & self , key : Key , value : Value ) {
206- self . mutations
207- . lock ( )
208- . await
209- . insert ( key, BufferEntry :: Insert ( value) ) ;
216+ let mut mutations = self . mutations . lock ( ) . await ;
217+ let mut entry = mutations. entry_map . entry ( key. clone ( ) ) ;
218+ match entry {
219+ Entry :: Occupied ( ref mut o) if matches ! ( o. get( ) , BufferEntry :: Del ) => {
220+ o. insert ( BufferEntry :: Put ( value) ) ;
221+ }
222+ _ => mutations. insert ( key, BufferEntry :: Insert ( value) ) ,
223+ }
210224 }
211225
212226 /// Mark a value as deleted.
213227 pub async fn delete ( & self , key : Key ) {
214228 let mut mutations = self . mutations . lock ( ) . await ;
215- let value = mutations
216- . entry_map
217- . entry ( key . clone ( ) )
218- . or_insert ( BufferEntry :: Del ) ;
219-
220- let new_value : BufferEntry ;
221- if let BufferEntry :: Insert ( _ ) = value {
222- new_value = BufferEntry :: CheckNotExist
223- } else {
224- new_value = BufferEntry :: Del
229+ let is_pessimistic = mutations. is_pessimistic ;
230+ let mut entry = mutations . entry_map . entry ( key . clone ( ) ) ;
231+
232+ match entry {
233+ Entry :: Occupied ( ref mut o )
234+ if matches ! ( o . get ( ) , BufferEntry :: Insert ( _ ) ) && !is_pessimistic =>
235+ {
236+ o . insert ( BufferEntry :: CheckNotExist ) ;
237+ }
238+ _ => mutations . insert ( key , BufferEntry :: Del ) ,
225239 }
226-
227- mutations. insert ( key, new_value) ;
228240 }
229241
230242 /// Converts the buffered mutations to the proto buffer version
@@ -378,7 +390,7 @@ mod tests {
378390 #[ tokio:: test]
379391 #[ allow( unreachable_code) ]
380392 async fn set_and_get_from_buffer ( ) {
381- let buffer = Buffer :: default ( ) ;
393+ let buffer = Buffer :: new ( false ) ;
382394 buffer
383395 . put ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
384396 . await ;
@@ -411,7 +423,7 @@ mod tests {
411423 #[ tokio:: test]
412424 #[ allow( unreachable_code) ]
413425 async fn insert_and_get_from_buffer ( ) {
414- let buffer = Buffer :: default ( ) ;
426+ let buffer = Buffer :: new ( false ) ;
415427 buffer
416428 . insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
417429 . await ;
@@ -453,13 +465,13 @@ mod tests {
453465 let v2: Value = b"value2" . to_vec ( ) ;
454466 let v2_ = v2. clone ( ) ;
455467
456- let buffer = Buffer :: default ( ) ;
468+ let buffer = Buffer :: new ( false ) ;
457469 let r1 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Ok ( Some ( v1_) ) ) ) ) ;
458470 let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
459471 assert_eq ! ( r1. unwrap( ) . unwrap( ) , v1) ;
460472 assert_eq ! ( r2. unwrap( ) . unwrap( ) , v1) ;
461473
462- let buffer = Buffer :: default ( ) ;
474+ let buffer = Buffer :: new ( false ) ;
463475 let r1 = block_on (
464476 buffer. batch_get_or_else ( vec ! [ k1. clone( ) , k2. clone( ) ] . into_iter ( ) , move |_| {
465477 ready ( Ok ( vec ! [ ( k1_, v1__) . into( ) , ( k2_, v2_) . into( ) ] ) )
0 commit comments