Skip to content

Commit a3f814a

Browse files
committed
feat: fix batch_get_for_update
Signed-off-by: ekexium <ekexium@gmail.com>
1 parent 405535c commit a3f814a

File tree

9 files changed

+216
-30
lines changed

9 files changed

+216
-30
lines changed

src/kv/key.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ impl Key {
128128
}
129129
}
130130

131+
pub trait HasKeys {
132+
fn get_keys(&self) -> Vec<Key>;
133+
}
134+
131135
impl From<Vec<u8>> for Key {
132136
fn from(v: Vec<u8>) -> Self {
133137
Key(v)

src/kv/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod kvpair;
88
mod value;
99

1010
pub use bound_range::{BoundRange, IntoOwnedRange};
11-
pub use key::Key;
11+
pub use key::{HasKeys, Key};
1212
pub use kvpair::KvPair;
1313
pub use value::Value;
1414

src/request/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use tikv_client_store::{HasError, Request};
1010

1111
pub use self::{
1212
plan::{
13-
Collect, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, MergeResponse,
14-
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion,
13+
Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge,
14+
MergeResponse, MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion,
1515
},
1616
plan_builder::{PlanBuilder, SingleKey},
1717
shard::Shardable,

src/request/plan.rs

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
use crate::{
44
backoff::Backoff,
5+
kv::HasKeys,
56
pd::PdClient,
67
request::{KvRequest, Shardable},
78
stats::tikv_stats,
89
transaction::{resolve_locks, HasLocks},
9-
Error, Result,
10+
util::iter::FlatMapOkIterExt,
11+
Error, Key, KvPair, Result, Value,
1012
};
1113
use async_trait::async_trait;
1214
use futures::{prelude::*, stream::StreamExt};
1315
use std::{marker::PhantomData, sync::Arc};
16+
use tikv_client_proto::kvrpcpb;
1417
use tikv_client_store::{HasError, HasRegionError, KvClient};
1518

1619
/// A plan for how to execute a request. A user builds up a plan with various
@@ -55,6 +58,12 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
5558
}
5659
}
5760

61+
impl<Req: KvRequest + HasKeys> HasKeys for Dispatch<Req> {
62+
fn get_keys(&self) -> Vec<Key> {
63+
self.request.get_keys()
64+
}
65+
}
66+
5867
pub struct MultiRegion<P: Plan, PdC: PdClient> {
5968
pub(super) inner: P,
6069
pub pd_client: Arc<PdC>,
@@ -123,6 +132,9 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
123132
#[derive(Clone, Copy)]
124133
pub struct Collect;
125134

135+
#[derive(Clone, Debug)]
136+
pub struct CollectAndMatchKey;
137+
126138
/// A merge strategy which returns an error if any response is an error and
127139
/// otherwise returns a Vec of the results.
128140
#[derive(Clone, Copy)]
@@ -256,6 +268,12 @@ where
256268
}
257269
}
258270

271+
impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
272+
fn get_keys(&self) -> Vec<Key> {
273+
self.inner.get_keys()
274+
}
275+
}
276+
259277
pub struct ExtractError<P: Plan> {
260278
pub inner: P,
261279
}
@@ -292,6 +310,34 @@ where
292310
}
293311
}
294312

313+
// Requires: len(inner.keys) == len(inner.result)
314+
pub struct PreserveKey<P: Plan + HasKeys> {
315+
pub inner: P,
316+
}
317+
318+
impl<P: Plan + HasKeys> Clone for PreserveKey<P> {
319+
fn clone(&self) -> Self {
320+
PreserveKey {
321+
inner: self.inner.clone(),
322+
}
323+
}
324+
}
325+
326+
#[async_trait]
327+
impl<P> Plan for PreserveKey<P>
328+
where
329+
P: Plan + HasKeys,
330+
{
331+
type Result = ResponseAndKeys<P::Result>;
332+
333+
async fn execute(&self) -> Result<Self::Result> {
334+
let keys = self.inner.get_keys();
335+
let res = self.inner.execute().await?;
336+
// TODO: should we check they have the same length?
337+
Ok(ResponseAndKeys(res, keys))
338+
}
339+
}
340+
295341
#[cfg(test)]
296342
mod test {
297343
use super::*;
@@ -349,3 +395,60 @@ mod test {
349395
.for_each(|r| assert!(r.is_err()));
350396
}
351397
}
398+
399+
// contains a response and the corresponding keys
400+
// currently only used for matching keys and values in pessimistic lock requests
401+
#[derive(Debug, Clone)]
402+
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);
403+
404+
impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
405+
fn error(&mut self) -> Option<Error> {
406+
self.0.error()
407+
}
408+
}
409+
410+
impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
411+
fn take_locks(&mut self) -> Vec<tikv_client_proto::kvrpcpb::LockInfo> {
412+
self.0.take_locks()
413+
}
414+
}
415+
416+
impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
417+
fn region_error(&mut self) -> Option<Error> {
418+
self.0.region_error()
419+
}
420+
}
421+
422+
impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatchKey {
423+
type Out = Vec<KvPair>;
424+
425+
fn merge(
426+
&self,
427+
input: Vec<Result<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>>>,
428+
) -> Result<Self::Out> {
429+
input
430+
.into_iter()
431+
.flat_map_ok(|ResponseAndKeys(mut resp, keys)| {
432+
let values = resp.take_values();
433+
let not_founds = resp.take_not_founds();
434+
let v: Vec<_> = if not_founds.is_empty() {
435+
// Legacy TiKV does not distiguish not existing key and existing key
436+
// that with empty value. We assume that key does not exist if value
437+
// is empty.
438+
let values: Vec<Value> = values.into_iter().filter(|v| v.is_empty()).collect();
439+
keys.into_iter().zip(values).map(From::from).collect()
440+
} else {
441+
assert_eq!(values.len(), not_founds.len());
442+
let values: Vec<Value> = values
443+
.into_iter()
444+
.zip(not_founds.into_iter())
445+
.filter_map(|(v, not_found)| if not_found { None } else { Some(v) })
446+
.collect();
447+
keys.into_iter().zip(values).map(From::from).collect()
448+
};
449+
// FIXME sucks to collect and re-iterate, but the iterators have different types
450+
v.into_iter()
451+
})
452+
.collect()
453+
}
454+
}

src/request/plan_builder.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use crate::{
44
backoff::Backoff,
5+
kv::HasKeys,
56
pd::PdClient,
67
request::{
78
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion,
@@ -14,6 +15,8 @@ use crate::{
1415
use std::{marker::PhantomData, sync::Arc};
1516
use tikv_client_store::HasError;
1617

18+
use super::plan::PreserveKey;
19+
1720
/// Builder type for plans (see that module for more).
1821
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
1922
pd_client: Arc<PdC>,
@@ -161,6 +164,19 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
161164
}
162165
}
163166

167+
impl<PdC: PdClient, P: Plan + HasKeys> PlanBuilder<PdC, P, NoTarget>
168+
where
169+
P::Result: HasError,
170+
{
171+
pub fn preserve_keys(self) -> PlanBuilder<PdC, PreserveKey<P>, NoTarget> {
172+
PlanBuilder {
173+
pd_client: self.pd_client.clone(),
174+
plan: PreserveKey { inner: self.plan },
175+
phantom: PhantomData,
176+
}
177+
}
178+
}
179+
164180
impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
165181
where
166182
P::Result: HasError,

src/request/shard.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{
4+
kv::HasKeys,
45
pd::PdClient,
56
request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion},
67
store::Store,
@@ -9,6 +10,8 @@ use crate::{
910
use futures::stream::BoxStream;
1011
use std::sync::Arc;
1112

13+
use super::plan::PreserveKey;
14+
1215
pub trait Shardable {
1316
type Shard: Send;
1417

@@ -51,6 +54,21 @@ impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
5154
}
5255
}
5356

57+
impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> {
58+
type Shard = P::Shard;
59+
60+
fn shards(
61+
&self,
62+
pd_client: &Arc<impl PdClient>,
63+
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
64+
self.inner.shards(pd_client)
65+
}
66+
67+
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
68+
self.inner.apply_shard(shard, store)
69+
}
70+
}
71+
5472
impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
5573
type Shard = P::Shard;
5674

src/transaction/requests.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{
4+
kv::HasKeys,
45
pd::PdClient,
56
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
67
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
@@ -359,6 +360,15 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
359360
}
360361
}
361362

363+
impl HasKeys for kvrpcpb::PessimisticLockRequest {
364+
fn get_keys(&self) -> Vec<Key> {
365+
self.mutations
366+
.iter()
367+
.map(|m| m.key.clone().into())
368+
.collect()
369+
}
370+
}
371+
362372
impl Merge<kvrpcpb::PessimisticLockResponse> for Collect {
363373
// FIXME: PessimisticLockResponse only contains values.
364374
// We need to pair keys and values returned somewhere.

src/transaction/transaction.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::{
44
backoff::Backoff,
55
pd::{PdClient, PdRpcClient},
6-
request::{Collect, CollectError, Plan, PlanBuilder, RetryOptions},
6+
request::{Collect, CollectAndMatchKey, CollectError, Plan, PlanBuilder, RetryOptions},
77
timestamp::TimestampExt,
88
transaction::{buffer::Buffer, lowering::*},
99
BoundRange, Error, Key, KvPair, Result, Value,
@@ -150,10 +150,12 @@ impl Transaction {
150150
if !self.is_pessimistic() {
151151
Err(Error::InvalidTransactionType)
152152
} else {
153-
let key = key.into();
154-
let mut values = self.pessimistic_lock(iter::once(key.clone()), true).await?;
155-
assert!(values.len() == 1);
156-
Ok(values.pop().unwrap())
153+
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
154+
debug_assert!(pairs.len() <= 1);
155+
match pairs.pop() {
156+
Some(pair) => Ok(Some(pair.1)),
157+
None => Ok(None),
158+
}
157159
}
158160
}
159161

@@ -242,7 +244,7 @@ impl Transaction {
242244
/// It can only be used in pessimistic mode.
243245
///
244246
/// # Examples
245-
/// ```rust,no_run,compile_fail
247+
/// ```rust,no_run
246248
/// # use tikv_client::{Key, Value, Config, TransactionClient};
247249
/// # use futures::prelude::*;
248250
/// # use std::collections::HashMap;
@@ -263,19 +265,16 @@ impl Transaction {
263265
/// ```
264266
// This is temporarily disabled because we cannot correctly match the keys and values.
265267
// See `impl KvRequest for kvrpcpb::PessimisticLockRequest` for details.
266-
#[allow(dead_code)]
267-
async fn batch_get_for_update(
268+
pub async fn batch_get_for_update(
268269
&mut self,
269270
keys: impl IntoIterator<Item = impl Into<Key>>,
270271
) -> Result<impl Iterator<Item = KvPair>> {
271272
self.check_allow_operation()?;
272273
if !self.is_pessimistic() {
273-
Err(Error::InvalidTransactionType)
274-
} else {
275-
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
276-
self.pessimistic_lock(keys.clone(), false).await?;
277-
self.batch_get(keys).await
274+
return Err(Error::InvalidTransactionType);
278275
}
276+
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
277+
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
279278
}
280279

281280
/// Create a new 'scan' request.
@@ -618,42 +617,43 @@ impl Transaction {
618617
&mut self,
619618
keys: impl IntoIterator<Item = Key>,
620619
need_value: bool,
621-
) -> Result<Vec<Option<Value>>> {
620+
) -> Result<Vec<KvPair>> {
622621
assert!(
623622
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
624623
"`pessimistic_lock` is only valid to use with pessimistic transactions"
625624
);
626625

627626
let keys: Vec<Key> = keys.into_iter().collect();
627+
if keys.is_empty() {
628+
return Ok(vec![]);
629+
}
630+
628631
let first_key = keys[0].clone();
629632
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
630-
let lock_ttl = DEFAULT_LOCK_TTL;
631633
let for_update_ts = self.rpc.clone().get_timestamp().await?;
632634
self.options.push_for_update_ts(for_update_ts.clone());
633635
let request = new_pessimistic_lock_request(
634636
keys.clone().into_iter(),
635637
primary_lock,
636638
self.timestamp.clone(),
637-
lock_ttl,
639+
DEFAULT_LOCK_TTL,
638640
for_update_ts,
639641
need_value,
640642
);
641643
let plan = PlanBuilder::new(self.rpc.clone(), request)
642644
.resolve_lock(self.options.retry_options.lock_backoff.clone())
645+
.preserve_keys()
643646
.multi_region()
644647
.retry_region(self.options.retry_options.region_backoff.clone())
645-
.merge(Collect)
648+
.merge(CollectAndMatchKey)
646649
.plan();
647-
let values = plan
648-
.execute()
649-
.await
650-
.map(|r| r.into_iter().map(Into::into).collect());
650+
let pairs = plan.execute().await;
651651

652652
for key in keys {
653653
self.buffer.lock(key).await;
654654
}
655655

656-
values
656+
pairs
657657
}
658658

659659
/// Checks if the transaction can perform arbitrary operations.

0 commit comments

Comments
 (0)