@@ -44,6 +44,15 @@ impl Buffer {
4444 self . mutations . lock ( ) . await . get_primary_key_or ( key) . clone ( )
4545 }
4646
47+ /// Get a value from the buffer.
48+ /// If the returned value is None, it means the key doesn't exist in buffer yet.
49+ pub async fn get ( & self , key : & Key ) -> Option < Value > {
50+ match self . get_from_mutations ( key) . await {
51+ MutationValue :: Determined ( value) => value,
52+ MutationValue :: Undetermined => None ,
53+ }
54+ }
55+
4756 /// Get a value from the buffer. If the value is not present, run `f` to get
4857 /// the value.
4958 pub async fn get_or_else < F , Fut > ( & self , key : Key , f : F ) -> Result < Option < Value > >
@@ -192,9 +201,30 @@ impl Buffer {
192201 . insert ( key, BufferEntry :: Put ( value) ) ;
193202 }
194203
204+ /// Mark a value as Insert mutation into the buffer (does not write through).
205+ pub async fn insert ( & self , key : Key , value : Value ) {
206+ self . mutations
207+ . lock ( )
208+ . await
209+ . insert ( key, BufferEntry :: Insert ( value) ) ;
210+ }
211+
195212 /// Mark a value as deleted.
196213 pub async fn delete ( & self , key : Key ) {
197- self . mutations . lock ( ) . await . insert ( key, BufferEntry :: Del ) ;
214+ let mut mutations = self . mutations . lock ( ) . await ;
215+ let value = mutations
216+ . entry_map
217+ . entry ( key. clone ( ) )
218+ . or_insert ( BufferEntry :: Del ) ;
219+
220+ let new_value: BufferEntry ;
221+ if let BufferEntry :: Insert ( _) = value {
222+ new_value = BufferEntry :: CheckNotExist
223+ } else {
224+ new_value = BufferEntry :: Del
225+ }
226+
227+ mutations. insert ( key, new_value) ;
198228 }
199229
200230 /// Converts the buffered mutations to the proto buffer version
@@ -237,6 +267,12 @@ impl Buffer {
237267 Some ( BufferEntry :: Del ) => {
238268 assert ! ( value. is_none( ) ) ;
239269 }
270+ Some ( BufferEntry :: Insert ( v) ) => {
271+ assert ! ( value. as_ref( ) == Some ( v) )
272+ }
273+ Some ( BufferEntry :: CheckNotExist ) => {
274+ assert ! ( value. is_none( ) ) ;
275+ }
240276 }
241277 }
242278}
@@ -247,6 +283,8 @@ impl Buffer {
247283// Mutations:
248284// - `Put`
249285// - `Del`
286+ // - `Insert`
287+ // - `CheckNotExist`, a constraint to ensure the key doesn't exist. See https://github.com/pingcap/tidb/pull/14968.
250288// Cache of read requests:
251289// - `Cached`, generated by normal read requests
252290// - `ReadLockCached`, generated by lock commands (`lock_keys`, `get_for_update`) and optionally read requests
@@ -274,6 +312,10 @@ enum BufferEntry {
274312 Put ( Value ) ,
275313 // Value has been deleted.
276314 Del ,
315+ // Key should be check not exists before.
316+ Insert ( Value ) ,
317+ // Key should be check not exists before.
318+ CheckNotExist ,
277319}
278320
279321impl BufferEntry {
@@ -287,6 +329,11 @@ impl BufferEntry {
287329 }
288330 BufferEntry :: Del => pb. set_op ( kvrpcpb:: Op :: Del ) ,
289331 BufferEntry :: Locked ( _) => pb. set_op ( kvrpcpb:: Op :: Lock ) ,
332+ BufferEntry :: Insert ( v) => {
333+ pb. set_op ( kvrpcpb:: Op :: Insert ) ;
334+ pb. set_value ( v. clone ( ) ) ;
335+ }
336+ BufferEntry :: CheckNotExist => pb. set_op ( kvrpcpb:: Op :: CheckNotExists ) ,
290337 } ;
291338 pb. set_key ( key. clone ( ) . into ( ) ) ;
292339 Some ( pb)
@@ -299,6 +346,8 @@ impl BufferEntry {
299346 BufferEntry :: Del => MutationValue :: Determined ( None ) ,
300347 BufferEntry :: Locked ( None ) => MutationValue :: Undetermined ,
301348 BufferEntry :: Locked ( Some ( value) ) => MutationValue :: Determined ( value. clone ( ) ) ,
349+ BufferEntry :: Insert ( value) => MutationValue :: Determined ( Some ( value. clone ( ) ) ) ,
350+ BufferEntry :: CheckNotExist => MutationValue :: Determined ( None ) ,
302351 }
303352 }
304353}
@@ -359,6 +408,38 @@ mod tests {
359408 ) ;
360409 }
361410
411+ #[ tokio:: test]
412+ #[ allow( unreachable_code) ]
413+ async fn insert_and_get_from_buffer ( ) {
414+ let buffer = Buffer :: default ( ) ;
415+ buffer
416+ . insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
417+ . await ;
418+ buffer
419+ . insert ( b"key2" . to_vec ( ) . into ( ) , b"value2" . to_vec ( ) )
420+ . await ;
421+ assert_eq ! (
422+ block_on( buffer. get_or_else( b"key1" . to_vec( ) . into( ) , move |_| ready( panic!( ) ) ) )
423+ . unwrap( )
424+ . unwrap( ) ,
425+ b"value1" . to_vec( )
426+ ) ;
427+
428+ buffer. delete ( b"key2" . to_vec ( ) . into ( ) ) . await ;
429+ buffer
430+ . insert ( b"key1" . to_vec ( ) . into ( ) , b"value" . to_vec ( ) )
431+ . await ;
432+ assert_eq ! (
433+ block_on( buffer. batch_get_or_else(
434+ vec![ b"key2" . to_vec( ) . into( ) , b"key1" . to_vec( ) . into( ) ] . into_iter( ) ,
435+ move |_| ready( Ok ( vec![ ] ) ) ,
436+ ) )
437+ . unwrap( )
438+ . collect:: <Vec <_>>( ) ,
439+ vec![ KvPair ( Key :: from( b"key1" . to_vec( ) ) , b"value" . to_vec( ) ) , ]
440+ ) ;
441+ }
442+
362443 #[ test]
363444 #[ allow( unreachable_code) ]
364445 fn repeat_reads_are_cached ( ) {
0 commit comments