@@ -24,19 +24,19 @@ impl Buffer {
2424 }
2525
2626 /// Get the primary key of the buffer.
27- pub async fn get_primary_key ( & self ) -> Option < Key > {
27+ pub fn get_primary_key ( & self ) -> Option < Key > {
2828 self . primary_key . clone ( )
2929 }
3030
3131 /// Set the primary key if it is not set
32- pub async fn primary_key_or ( & mut self , key : & Key ) {
32+ pub fn primary_key_or ( & mut self , key : & Key ) {
3333 self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
3434 }
3535
3636 /// Get a value from the buffer.
3737 /// If the returned value is None, it means the key doesn't exist in buffer yet.
38- pub async fn get ( & self , key : & Key ) -> Option < Value > {
39- match self . get_from_mutations ( key) . await {
38+ pub fn get ( & self , key : & Key ) -> Option < Value > {
39+ match self . get_from_mutations ( key) {
4040 MutationValue :: Determined ( value) => value,
4141 MutationValue :: Undetermined => None ,
4242 }
@@ -49,7 +49,7 @@ impl Buffer {
4949 F : FnOnce ( Key ) -> Fut ,
5050 Fut : Future < Output = Result < Option < Value > > > ,
5151 {
52- match self . get_from_mutations ( & key) . await {
52+ match self . get_from_mutations ( & key) {
5353 MutationValue :: Determined ( value) => Ok ( value) ,
5454 MutationValue :: Undetermined => {
5555 let value = f ( key. clone ( ) ) . await ?;
@@ -161,7 +161,7 @@ impl Buffer {
161161 }
162162
163163 /// Lock the given key if necessary.
164- pub async fn lock ( & mut self , key : Key ) {
164+ pub fn lock ( & mut self , key : Key ) {
165165 self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
166166 let value = self
167167 . entry_map
@@ -174,13 +174,22 @@ impl Buffer {
174174 }
175175 }
176176
177- /// Insert a value into the buffer (does not write through).
178- pub async fn put ( & mut self , key : Key , value : Value ) {
179- self . insert_entry ( key, BufferEntry :: Put ( value) ) ;
177+ /// Put a value into the buffer (does not write through).
178+ pub fn put ( & mut self , key : Key , value : Value ) {
179+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
180+ match entry {
181+ Entry :: Occupied ( ref mut o)
182+ if matches ! ( o. get( ) , BufferEntry :: Insert ( _) )
183+ || matches ! ( o. get( ) , BufferEntry :: CheckNotExist ) =>
184+ {
185+ o. insert ( BufferEntry :: Insert ( value) ) ;
186+ }
187+ _ => self . insert_entry ( key, BufferEntry :: Put ( value) ) ,
188+ }
180189 }
181190
182191 /// Mark a value as Insert mutation into the buffer (does not write through).
183- pub async fn insert ( & mut self , key : Key , value : Value ) {
192+ pub fn insert ( & mut self , key : Key , value : Value ) {
184193 let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
185194 match entry {
186195 Entry :: Occupied ( ref mut o) if matches ! ( o. get( ) , BufferEntry :: Del ) => {
@@ -191,13 +200,15 @@ impl Buffer {
191200 }
192201
193202 /// Mark a value as deleted.
194- pub async fn delete ( & mut self , key : Key ) {
203+ pub fn delete ( & mut self , key : Key ) {
195204 let is_pessimistic = self . is_pessimistic ;
196205 let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
197206
198207 match entry {
199208 Entry :: Occupied ( ref mut o)
200- if matches ! ( o. get( ) , BufferEntry :: Insert ( _) ) && !is_pessimistic =>
209+ if !is_pessimistic
210+ && ( matches ! ( o. get( ) , BufferEntry :: Insert ( _) )
211+ || matches ! ( o. get( ) , BufferEntry :: CheckNotExist ) ) =>
201212 {
202213 o. insert ( BufferEntry :: CheckNotExist ) ;
203214 }
@@ -206,14 +217,14 @@ impl Buffer {
206217 }
207218
208219 /// Converts the buffered mutations to the proto buffer version
209- pub async fn to_proto_mutations ( & self ) -> Vec < kvrpcpb:: Mutation > {
220+ pub fn to_proto_mutations ( & self ) -> Vec < kvrpcpb:: Mutation > {
210221 self . entry_map
211222 . iter ( )
212223 . filter_map ( |( key, mutation) | mutation. to_proto_with_key ( key) )
213224 . collect ( )
214225 }
215226
216- async fn get_from_mutations ( & self , key : & Key ) -> MutationValue {
227+ fn get_from_mutations ( & self , key : & Key ) -> MutationValue {
217228 self . entry_map
218229 . get ( & key)
219230 . map ( BufferEntry :: get_value)
@@ -352,26 +363,26 @@ impl MutationValue {
352363mod tests {
353364 use super :: * ;
354365 use futures:: { executor:: block_on, future:: ready} ;
366+ use tikv_client_common:: internal_err;
355367
356- #[ tokio:: test]
357- #[ allow( unreachable_code) ]
358- async fn set_and_get_from_buffer ( ) {
368+ #[ test]
369+ fn set_and_get_from_buffer ( ) {
359370 let mut buffer = Buffer :: new ( false ) ;
360- buffer
361- . put ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
362- . await ;
363- buffer
364- . put ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) )
365- . await ;
371+ buffer. put ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) ) ;
372+ buffer. put ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) ) ;
366373 assert_eq ! (
367- block_on( buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( panic!( ) ) ) )
368- . unwrap( )
369- . unwrap( ) ,
374+ block_on(
375+ buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( Err ( internal_err!(
376+ ""
377+ ) ) ) )
378+ )
379+ . unwrap( )
380+ . unwrap( ) ,
370381 b"value1" . to_vec( )
371382 ) ;
372383
373- buffer. delete ( b"key2" . to_vec ( ) . into ( ) ) . await ;
374- buffer. put ( b"key1" . to_vec ( ) . into ( ) , b"value" . to_vec ( ) ) . await ;
384+ buffer. delete ( b"key2" . to_vec ( ) . into ( ) ) ;
385+ buffer. put ( b"key1" . to_vec ( ) . into ( ) , b"value" . to_vec ( ) ) ;
375386 assert_eq ! (
376387 block_on( buffer. batch_get_or_else(
377388 vec![ b"key2" . to_vec( ) . into( ) , b"key1" . to_vec( ) . into( ) ] . into_iter( ) ,
@@ -386,27 +397,24 @@ mod tests {
386397 ) ;
387398 }
388399
389- #[ tokio:: test]
390- #[ allow( unreachable_code) ]
391- async fn insert_and_get_from_buffer ( ) {
400+ #[ test]
401+ fn insert_and_get_from_buffer ( ) {
392402 let mut buffer = Buffer :: new ( false ) ;
393- buffer
394- . insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
395- . await ;
396- buffer
397- . insert ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) )
398- . await ;
403+ buffer. insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) ) ;
404+ buffer. insert ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) ) ;
399405 assert_eq ! (
400- block_on( buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( panic!( ) ) ) )
401- . unwrap( )
402- . unwrap( ) ,
406+ block_on(
407+ buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( Err ( internal_err!(
408+ ""
409+ ) ) ) )
410+ )
411+ . unwrap( )
412+ . unwrap( ) ,
403413 b"value1" . to_vec( )
404414 ) ;
405415
406- buffer. delete ( b"key2" . to_vec ( ) . into ( ) ) . await ;
407- buffer
408- . insert ( b"key1" . to_vec ( ) . into ( ) , b"value" . to_vec ( ) )
409- . await ;
416+ buffer. delete ( b"key2" . to_vec ( ) . into ( ) ) ;
417+ buffer. insert ( b"key1" . to_vec ( ) . into ( ) , b"value" . to_vec ( ) ) ;
410418 assert_eq ! (
411419 block_on( buffer. batch_get_or_else(
412420 vec![ b"key2" . to_vec( ) . into( ) , b"key1" . to_vec( ) . into( ) ] . into_iter( ) ,
@@ -419,7 +427,6 @@ mod tests {
419427 }
420428
421429 #[ test]
422- #[ allow( unreachable_code) ]
423430 fn repeat_reads_are_cached ( ) {
424431 let k1: Key = b"key1" . to_vec ( ) . into ( ) ;
425432 let k1_ = k1. clone ( ) ;
@@ -433,7 +440,7 @@ mod tests {
433440
434441 let mut buffer = Buffer :: new ( false ) ;
435442 let r1 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Ok ( Some ( v1_) ) ) ) ) ;
436- let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
443+ let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Err ( internal_err ! ( "" ) ) ) ) ) ;
437444 assert_eq ! ( r1. unwrap( ) . unwrap( ) , v1) ;
438445 assert_eq ! ( r2. unwrap( ) . unwrap( ) , v1) ;
439446
@@ -443,7 +450,7 @@ mod tests {
443450 ready ( Ok ( vec ! [ ( k1_, v1__) . into( ) , ( k2_, v2_) . into( ) ] ) )
444451 } ) ,
445452 ) ;
446- let r2 = block_on ( buffer. get_or_else ( k2. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
453+ let r2 = block_on ( buffer. get_or_else ( k2. clone ( ) , move |_| ready ( Err ( internal_err ! ( "" ) ) ) ) ) ;
447454 let r3 = block_on (
448455 buffer. batch_get_or_else ( vec ! [ k1. clone( ) , k2. clone( ) ] . into_iter ( ) , move |_| {
449456 ready ( Ok ( vec ! [ ] ) )
@@ -462,4 +469,42 @@ mod tests {
462469 vec![ KvPair ( k1, v1) , KvPair ( k2, v2) ]
463470 ) ;
464471 }
472+
473+ // Check that multiple writes to the same key combine in the correct way.
474+ #[ test]
475+ fn state_machine ( ) {
476+ let mut buffer = Buffer :: new ( false ) ;
477+
478+ macro_rules! assert_entry {
479+ ( $key: ident, $p: pat) => {
480+ assert!( matches!( buffer. entry_map. get( & $key) , Some ( & $p) , ) )
481+ } ;
482+ }
483+
484+ // Insert + Delete = CheckNotExists
485+ let key: Key = b"key1" . to_vec ( ) . into ( ) ;
486+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) ;
487+ buffer. delete ( key. clone ( ) ) ;
488+ assert_entry ! ( key, BufferEntry :: CheckNotExist ) ;
489+
490+ // CheckNotExists + Delete = CheckNotExists
491+ buffer. delete ( key. clone ( ) ) ;
492+ assert_entry ! ( key, BufferEntry :: CheckNotExist ) ;
493+
494+ // CheckNotExists + Put = Insert
495+ buffer. put ( key. clone ( ) , b"value2" . to_vec ( ) ) ;
496+ assert_entry ! ( key, BufferEntry :: Insert ( _) ) ;
497+
498+ // Insert + Put = Insert
499+ let key: Key = b"key2" . to_vec ( ) . into ( ) ;
500+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) ;
501+ buffer. put ( key. clone ( ) , b"value2" . to_vec ( ) ) ;
502+ assert_entry ! ( key, BufferEntry :: Insert ( _) ) ;
503+
504+ // Delete + Insert = Put
505+ let key: Key = b"key3" . to_vec ( ) . into ( ) ;
506+ buffer. delete ( key. clone ( ) ) ;
507+ buffer. insert ( key. clone ( ) , b"value1" . to_vec ( ) ) ;
508+ assert_entry ! ( key, BufferEntry :: Put ( _) ) ;
509+ }
465510}
0 commit comments