Skip to content

Commit 4236e3b

Browse files
committed
address review comments and fix tests
Signed-off-by: haojinming <jinming.hao@pingcap.com>
1 parent 3ec9bb8 commit 4236e3b

File tree

5 files changed

+61
-40
lines changed

5 files changed

+61
-40
lines changed

src/request/plan.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,13 +574,15 @@ where
574574
"CleanupLocks::execute, meet locks:{}",
575575
locks.len()
576576
);
577-
result.meet_locks += locks.len();
578577

578+
let lock_size = locks.len();
579579
match lock_resolver
580580
.cleanup_locks(self.store.clone().unwrap(), locks, self.pd_client.clone())
581581
.await
582582
{
583-
Ok(()) => {}
583+
Ok(()) => {
584+
result.meet_locks += lock_size;
585+
}
584586
Err(Error::ExtractedErrors(mut errors)) => {
585587
// Propagate errors to `retry_multi_region` for retry.
586588
if let Error::RegionError(e) = errors.pop().unwrap() {

src/transaction/client.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use super::requests::new_scan_lock_request;
43
use crate::{
54
backoff::DEFAULT_REGION_BACKOFF,
65
config::Config,
@@ -10,7 +9,8 @@ use crate::{
109
transaction::{
1110
lock::ResolveLocksOptions, ResolveLocksContext, Snapshot, Transaction, TransactionOptions,
1211
},
13-
Backoff, Result,
12+
transaction_lowering::new_scan_lock_request,
13+
Backoff, BoundRange, Result,
1414
};
1515
use slog::{Drain, Logger};
1616
use std::sync::Arc;
@@ -242,7 +242,7 @@ impl Client {
242242
batch_size: SCAN_LOCK_BATCH_SIZE,
243243
..Default::default()
244244
};
245-
self.cleanup_locks(&safepoint, options).await?;
245+
self.cleanup_locks(&safepoint, vec![].., options).await?;
246246

247247
// update safepoint to PD
248248
let res: bool = self
@@ -259,18 +259,14 @@ impl Client {
259259
pub async fn cleanup_locks(
260260
&self,
261261
safepoint: &Timestamp,
262+
range: impl Into<BoundRange>,
262263
options: ResolveLocksOptions,
263264
) -> Result<CleanupLocksResult> {
264265
debug!(self.logger, "invoking cleanup async commit locks");
265266
// scan all locks with ts <= safepoint
266267
let ctx = ResolveLocksContext::default();
267268
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
268-
let req = new_scan_lock_request(
269-
options.start_key.clone(),
270-
options.end_key.clone(),
271-
safepoint.version(),
272-
options.batch_size,
273-
);
269+
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
274270
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
275271
.cleanup_locks(self.logger.clone(), ctx.clone(), options, backoff)
276272
.retry_multi_region(DEFAULT_REGION_BACKOFF)
@@ -286,16 +282,10 @@ impl Client {
286282
pub async fn scan_locks(
287283
&self,
288284
safepoint: &Timestamp,
289-
mut start_key: Vec<u8>,
290-
mut end_key: Vec<u8>,
285+
range: impl Into<BoundRange>,
291286
batch_size: u32,
292287
) -> Result<Vec<tikv_client_proto::kvrpcpb::LockInfo>> {
293-
let req = new_scan_lock_request(
294-
std::mem::take(&mut start_key),
295-
std::mem::take(&mut end_key),
296-
safepoint.version(),
297-
batch_size,
298-
);
288+
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
299289
let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
300290
.retry_multi_region(DEFAULT_REGION_BACKOFF)
301291
.merge(crate::request::Collect)

src/transaction/lock.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,13 @@ pub struct ResolveLocksContext {
152152
#[derive(Clone, Debug)]
153153
pub struct ResolveLocksOptions {
154154
pub async_commit_only: bool,
155-
pub start_key: Vec<u8>,
156-
pub end_key: Vec<u8>,
157155
pub batch_size: u32,
158156
}
159157

160158
impl Default for ResolveLocksOptions {
161159
fn default() -> Self {
162160
Self {
163161
async_commit_only: false,
164-
start_key: vec![],
165-
end_key: vec![],
166162
batch_size: 1024,
167163
}
168164
}

src/transaction/lowering.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,17 @@ pub fn new_pessimistic_lock_request(
161161
}
162162

163163
pub fn new_scan_lock_request(
164-
start_key: Key,
165-
end_key: Key,
166-
safepoint: Timestamp,
164+
range: BoundRange,
165+
safepoint: &Timestamp,
167166
limit: u32,
168167
) -> kvrpcpb::ScanLockRequest {
169-
requests::new_scan_lock_request(start_key.into(), end_key.into(), safepoint.version(), limit)
168+
let (start_key, end_key) = range.into_keys();
169+
requests::new_scan_lock_request(
170+
start_key.into(),
171+
end_key.unwrap_or_default().into(),
172+
safepoint.version(),
173+
limit,
174+
)
170175
}
171176

172177
pub fn new_heart_beat_request(

tests/failpoint_tests.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,19 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
8383

8484
init().await?;
8585
let scenario = FailScenario::setup();
86+
let full_range = vec![]..;
87+
88+
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
89+
// Clean all locks at the beginning to avoid other cases' side effect.
90+
let safepoint = client.current_timestamp().await?;
91+
let options = ResolveLocksOptions {
92+
async_commit_only: false,
93+
..Default::default()
94+
};
95+
client
96+
.cleanup_locks(&safepoint, full_range.clone(), options)
97+
.await
98+
.unwrap();
8699

87100
fail::cfg("after-prewrite", "return").unwrap();
88101
fail::cfg("before-cleanup-locks", "return").unwrap();
@@ -91,7 +104,6 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
91104
fail::cfg("before-cleanup-locks", "off").unwrap();
92105
}}
93106

94-
let client = TransactionClient::new(pd_addrs(), Some(logger.clone())).await?;
95107
let keys = write_data(&client, true, true).await?;
96108
assert_eq!(count_locks(&client).await?, keys.len());
97109

@@ -101,7 +113,9 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> {
101113
batch_size: 4,
102114
..Default::default()
103115
};
104-
let res = client.cleanup_locks(&safepoint, options).await?;
116+
let res = client
117+
.cleanup_locks(&safepoint, full_range, options)
118+
.await?;
105119

106120
assert_eq!(res.meet_locks, keys.len());
107121
assert_eq!(count_locks(&client).await?, keys.len());
@@ -117,6 +131,7 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
117131

118132
init().await?;
119133
let scenario = FailScenario::setup();
134+
let full_range = vec![]..;
120135

121136
// no commit
122137
{
@@ -135,7 +150,9 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
135150
async_commit_only: true,
136151
..Default::default()
137152
};
138-
client.cleanup_locks(&safepoint, options).await?;
153+
client
154+
.cleanup_locks(&safepoint, full_range.clone(), options)
155+
.await?;
139156

140157
must_committed(&client, keys).await;
141158
assert_eq!(count_locks(&client).await?, 0);
@@ -160,7 +177,9 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
160177
async_commit_only: true,
161178
..Default::default()
162179
};
163-
client.cleanup_locks(&safepoint, options).await?;
180+
client
181+
.cleanup_locks(&safepoint, full_range.clone(), options)
182+
.await?;
164183

165184
must_committed(&client, keys).await;
166185
assert_eq!(count_locks(&client).await?, 0);
@@ -177,7 +196,9 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> {
177196
async_commit_only: true,
178197
..Default::default()
179198
};
180-
client.cleanup_locks(&safepoint, options).await?;
199+
client
200+
.cleanup_locks(&safepoint, full_range, options)
201+
.await?;
181202

182203
must_committed(&client, keys).await;
183204
assert_eq!(count_locks(&client).await?, 0);
@@ -217,11 +238,11 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
217238
let safepoint = client.current_timestamp().await?;
218239
let options = ResolveLocksOptions {
219240
async_commit_only: true,
220-
start_key,
221-
end_key,
222241
..Default::default()
223242
};
224-
let res = client.cleanup_locks(&safepoint, options).await?;
243+
let res = client
244+
.cleanup_locks(&safepoint, start_key..end_key, options)
245+
.await?;
225246

226247
assert_eq!(res.meet_locks, keys.len() - 3);
227248

@@ -230,7 +251,7 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> {
230251
async_commit_only: false,
231252
..Default::default()
232253
};
233-
client.cleanup_locks(&safepoint, options).await?;
254+
client.cleanup_locks(&safepoint, vec![].., options).await?;
234255
must_committed(&client, keys).await;
235256
assert_eq!(count_locks(&client).await?, 0);
236257

@@ -245,6 +266,7 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
245266

246267
init().await?;
247268
let scenario = FailScenario::setup();
269+
let full_range = vec![]..;
248270

249271
// no commit
250272
{
@@ -264,14 +286,18 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
264286
async_commit_only: true, // Skip 2pc locks.
265287
..Default::default()
266288
};
267-
client.cleanup_locks(&safepoint, options).await?;
289+
client
290+
.cleanup_locks(&safepoint, full_range.clone(), options)
291+
.await?;
268292
assert_eq!(count_locks(&client).await?, keys.len());
269293
}
270294
let options = ResolveLocksOptions {
271295
async_commit_only: false,
272296
..Default::default()
273297
};
274-
client.cleanup_locks(&safepoint, options).await?;
298+
client
299+
.cleanup_locks(&safepoint, full_range.clone(), options)
300+
.await?;
275301

276302
must_rollbacked(&client, keys).await;
277303
assert_eq!(count_locks(&client).await?, 0);
@@ -289,7 +315,9 @@ async fn txn_cleanup_2pc_locks() -> Result<()> {
289315
async_commit_only: false,
290316
..Default::default()
291317
};
292-
client.cleanup_locks(&safepoint, options).await?;
318+
client
319+
.cleanup_locks(&safepoint, full_range, options)
320+
.await?;
293321

294322
must_committed(&client, keys).await;
295323
assert_eq!(count_locks(&client).await?, 0);
@@ -319,7 +347,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet<Vec<u8>>) {
319347

320348
async fn count_locks(client: &TransactionClient) -> Result<usize> {
321349
let ts = client.current_timestamp().await.unwrap();
322-
let locks = client.scan_locks(&ts, vec![], vec![], 1024).await?;
350+
let locks = client.scan_locks(&ts, vec![].., 1024).await?;
323351
// De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes.
324352
let locks_set: HashSet<Vec<u8>> =
325353
HashSet::from_iter(locks.into_iter().map(|mut l| l.take_key()));

0 commit comments

Comments
 (0)