Skip to content

Commit d3f63fe

Browse files
committed
txn: add insert() to Transaction
Signed-off-by: Ziyi Yan <ziyi.yan@foxmail.com>
1 parent c59911a commit d3f63fe

File tree

3 files changed

+94
-0
lines changed

3 files changed

+94
-0
lines changed

src/transaction/buffer.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ impl Buffer {
4444
self.mutations.lock().await.get_primary_key_or(key).clone()
4545
}
4646

47+
/// Get a value from the buffer.
48+
/// If the returned value is None, it means the key doesn't exist in buffer yet.
49+
pub async fn get(&self, key: &Key) -> Option<Value> {
50+
match self.get_from_mutations(key).await {
51+
MutationValue::Determined(value) => value,
52+
MutationValue::Undetermined => None,
53+
}
54+
}
55+
4756
/// Get a value from the buffer. If the value is not present, run `f` to get
4857
/// the value.
4958
pub async fn get_or_else<F, Fut>(&self, key: Key, f: F) -> Result<Option<Value>>
@@ -192,6 +201,14 @@ impl Buffer {
192201
.insert(key, BufferEntry::Put(value));
193202
}
194203

204+
/// Mark a value as Insert mutation into the buffer (does not write through).
205+
pub async fn insert(&self, key: Key, value: Value) {
206+
self.mutations
207+
.lock()
208+
.await
209+
.insert(key, BufferEntry::Insert(value));
210+
}
211+
195212
/// Mark a value as deleted.
196213
pub async fn delete(&self, key: Key) {
197214
self.mutations.lock().await.insert(key, BufferEntry::Del);
@@ -237,6 +254,9 @@ impl Buffer {
237254
Some(BufferEntry::Del) => {
238255
assert!(value.is_none());
239256
}
257+
Some(BufferEntry::Insert(v)) => {
258+
assert!(value.as_ref() == Some(v))
259+
}
240260
}
241261
}
242262
}
@@ -247,6 +267,7 @@ impl Buffer {
247267
// Mutations:
248268
// - `Put`
249269
// - `Del`
270+
// - `Insert`
250271
// Cache of read requests:
251272
// - `Cached`, generated by normal read requests
252273
// - `ReadLockCached`, generated by lock commands (`lock_keys`, `get_for_update`) and optionally read requests
@@ -274,6 +295,8 @@ enum BufferEntry {
274295
Put(Value),
275296
// Value has been deleted.
276297
Del,
298+
// Key should be check not exists before.
299+
Insert(Value),
277300
}
278301

279302
impl BufferEntry {
@@ -287,6 +310,10 @@ impl BufferEntry {
287310
}
288311
BufferEntry::Del => pb.set_op(kvrpcpb::Op::Del),
289312
BufferEntry::Locked(_) => pb.set_op(kvrpcpb::Op::Lock),
313+
BufferEntry::Insert(v) => {
314+
pb.set_op(kvrpcpb::Op::Insert);
315+
pb.set_value(v.clone());
316+
}
290317
};
291318
pb.set_key(key.clone().into());
292319
Some(pb)
@@ -299,6 +326,7 @@ impl BufferEntry {
299326
BufferEntry::Del => MutationValue::Determined(None),
300327
BufferEntry::Locked(None) => MutationValue::Undetermined,
301328
BufferEntry::Locked(Some(value)) => MutationValue::Determined(value.clone()),
329+
BufferEntry::Insert(value) => MutationValue::Determined(Some(value.clone())),
302330
}
303331
}
304332
}
@@ -359,6 +387,38 @@ mod tests {
359387
);
360388
}
361389

390+
#[tokio::test]
391+
#[allow(unreachable_code)]
392+
async fn insert_and_get_from_buffer() {
393+
let buffer = Buffer::default();
394+
buffer
395+
.insert(b"key1".to_vec().into(), b"value1".to_vec())
396+
.await;
397+
buffer
398+
.insert(b"key2".to_vec().into(), b"value2".to_vec())
399+
.await;
400+
assert_eq!(
401+
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
402+
.unwrap()
403+
.unwrap(),
404+
b"value1".to_vec()
405+
);
406+
407+
buffer.delete(b"key2".to_vec().into()).await;
408+
buffer
409+
.insert(b"key1".to_vec().into(), b"value".to_vec())
410+
.await;
411+
assert_eq!(
412+
block_on(buffer.batch_get_or_else(
413+
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
414+
move |_| ready(Ok(vec![])),
415+
))
416+
.unwrap()
417+
.collect::<Vec<_>>(),
418+
vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec()),]
419+
);
420+
}
421+
362422
#[test]
363423
#[allow(unreachable_code)]
364424
fn repeat_reads_are_cached() {

src/transaction/transaction.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,37 @@ impl Transaction {
362362
Ok(())
363363
}
364364

365+
/// Inserts the value associated with the given key.
366+
/// It has a constraint that key should not exist before.
367+
///
368+
/// # Examples
369+
/// ```rust,no_run
370+
/// # use tikv_client::{Key, Value, Config, TransactionClient};
371+
/// # use futures::prelude::*;
372+
/// # futures::executor::block_on(async {
373+
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
374+
/// let mut txn = client.begin_optimistic().await.unwrap();
375+
/// let key = "TiKV".to_owned();
376+
/// let val = "TiKV".to_owned();
377+
/// txn.insert(key, val);
378+
/// // Finish the transaction...
379+
/// txn.commit().await.unwrap();
380+
/// # });
381+
/// ```
382+
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
383+
self.check_allow_operation()?;
384+
let key = key.into();
385+
if self.buffer.get(&key).await.is_some() {
386+
return Err(Error::DuplicateKeyInsertion);
387+
}
388+
if self.is_pessimistic() {
389+
self.pessimistic_lock(iter::once(key.clone()), false)
390+
.await?;
391+
}
392+
self.buffer.insert(key, value.into()).await;
393+
Ok(())
394+
}
395+
365396
/// Deletes the given key.
366397
///
367398
/// Deleting a non-existent key will not result in an error.

tikv-client-common/src/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub enum Error {
1010
/// Feature is not implemented.
1111
#[error("Unimplemented feature")]
1212
Unimplemented,
13+
/// Duplicate key insertion happens.
14+
#[error("Duplicate key insertion")]
15+
DuplicateKeyInsertion,
1316
/// Failed to resolve a lock
1417
#[error("Failed to resolve lock")]
1518
ResolveLockError,

0 commit comments

Comments
 (0)