@@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
116116 }
117117
118118 /// Create a `get for udpate` request.
119- /// Once resolved this request will pessimistically lock and fetch the latest
120- /// value associated with the given key at **current timestamp**.
121119 ///
122- /// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
120+ /// The request reads and "locks" a key. It is similar to `SELECT ... FOR
121+ /// UPDATE` in TiDB, and has different behavior in optimistic and
122+ /// pessimistic transactions.
123123 ///
124- /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
125- /// which is similar to that in MySQL. It reads the latest value (using current timestamp),
126- /// and the value is not cached in the local buffer.
127- /// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
124+ /// # Optimistic transaction
128125 ///
126+ /// It reads at the "start timestamp" and caches the value, just like normal
127+ /// get requests. The lock is written in prewrite and commit, so it cannot
128+ /// prevent concurrent transactions from writing the same key, but can only
129+ /// prevent itself from committing.
129130 ///
130- /// It can only be used in pessimistic mode.
131+ /// # Pessimistic transaction
132+ ///
133+ /// It reads at the "current timestamp" and thus does not cache the value.
134+ /// So following read requests won't be affected by the `get_for_udpate`.
135+ /// A lock will be acquired immediately with this request, which prevents
136+ /// concurrent transactions from mutating the keys.
137+ ///
138+ /// The "current timestamp" (also called `for_update_ts` of the request) is
139+ /// fetched immediately from the timestamp oracle.
140+ ///
141+ /// Note: The behavior of the request under pessimistic transaction does not
142+ /// follow snapshot isolation.
131143 ///
132144 /// # Examples
133145 /// ```rust,no_run
@@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
146158 pub async fn get_for_update ( & mut self , key : impl Into < Key > ) -> Result < Option < Value > > {
147159 self . check_allow_operation ( ) . await ?;
148160 if !self . is_pessimistic ( ) {
149- Err ( Error :: InvalidTransactionType )
161+ let key = key. into ( ) ;
162+ self . lock_keys ( iter:: once ( key. clone ( ) ) ) . await ?;
163+ self . get ( key) . await
150164 } else {
151165 let mut pairs = self . pessimistic_lock ( iter:: once ( key. into ( ) ) , true ) . await ?;
152166 debug_assert ! ( pairs. len( ) <= 1 ) ;
@@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {
228242
229243 /// Create a new 'batch get for update' request.
230244 ///
231- /// Once resolved this request will pessimistically lock the keys and
232- /// fetch the values associated with the given keys.
233- ///
234- /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
235- /// which is similar to that in MySQL. It reads the latest value (using current timestamp),
236- /// and the value is not cached in the local buffer.
237- /// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
238- ///
239- /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
245+ /// Similar [`get_for_update`](Transaction::get_for_update), but it works
246+ /// for a batch of keys.
240247 ///
241- /// It can only be used in pessimistic mode.
248+ /// Non-existent entries will not appear in the result. The order of the
249+ /// keys is not retained in the result.
242250 ///
243251 /// # Examples
244252 /// ```rust,no_run
245- /// # use tikv_client::{Key, Value, Config, TransactionClient};
253+ /// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair };
246254 /// # use futures::prelude::*;
247255 /// # use std::collections::HashMap;
248256 /// # futures::executor::block_on(async {
249257 /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
250258 /// let mut txn = client.begin_pessimistic().await.unwrap();
251259 /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
252- /// let result: HashMap<Key, Value > = txn
260+ /// let result: Vec<KvPair > = txn
253261 /// .batch_get_for_update(keys)
254262 /// .await
255- /// .unwrap()
256- /// .map(|pair| (pair.0, pair.1))
257- /// .collect();
263+ /// .unwrap();
258264 /// // now "TiKV" and "TiDB" are both locked
259265 /// // Finish the transaction...
260266 /// txn.commit().await.unwrap();
@@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
263269 pub async fn batch_get_for_update (
264270 & mut self ,
265271 keys : impl IntoIterator < Item = impl Into < Key > > ,
266- ) -> Result < impl Iterator < Item = KvPair > > {
272+ ) -> Result < Vec < KvPair > > {
267273 self . check_allow_operation ( ) . await ?;
274+ let keys: Vec < Key > = keys. into_iter ( ) . map ( |k| k. into ( ) ) . collect ( ) ;
268275 if !self . is_pessimistic ( ) {
269- return Err ( Error :: InvalidTransactionType ) ;
276+ self . lock_keys ( keys. clone ( ) ) . await ?;
277+ Ok ( self . batch_get ( keys) . await ?. collect ( ) )
278+ } else {
279+ self . pessimistic_lock ( keys, true ) . await
270280 }
271- let keys: Vec < Key > = keys. into_iter ( ) . map ( |it| it. into ( ) ) . collect ( ) ;
272- Ok ( self . pessimistic_lock ( keys, true ) . await ?. into_iter ( ) )
273281 }
274282
275283 /// Create a new 'scan' request.
@@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
473481 }
474482 }
475483 TransactionKind :: Pessimistic ( _) => {
476- self . pessimistic_lock ( keys. into_iter ( ) . map ( |k| k. into ( ) ) , false )
477- . await ?;
484+ let keys: Vec < Key > = keys . into_iter ( ) . map ( |k| k. into ( ) ) . collect ( ) ;
485+ self . pessimistic_lock ( keys . into_iter ( ) , false ) . await ?;
478486 }
479487 }
480488 Ok ( ( ) )
@@ -649,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
649657 }
650658
651659 let first_key = keys[ 0 ] . clone ( ) . key ( ) ;
652- let primary_lock = self . buffer . get_primary_key_or ( & first_key) . await ;
660+ // we do not set the primary key here, because pessimistic lock request
661+ // can fail, in which case the keys may not be part of the transaction.
662+ let primary_lock = self
663+ . buffer
664+ . get_primary_key ( )
665+ . await
666+ . unwrap_or_else ( || first_key. clone ( ) ) ;
653667 let for_update_ts = self . rpc . clone ( ) . get_timestamp ( ) . await ?;
654668 self . options . push_for_update_ts ( for_update_ts. clone ( ) ) ;
655669 let request = new_pessimistic_lock_request (
@@ -669,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
669683 . plan ( ) ;
670684 let pairs = plan. execute ( ) . await ;
671685
686+ // primary key will be set here if needed
687+ self . buffer . primary_key_or ( & first_key) . await ;
688+
672689 self . start_auto_heartbeat ( ) . await ;
673690
674691 for key in keys {
0 commit comments