11// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22
3+ #![ feature( async_await, await_macro) ]
4+
35mod common;
46
57use crate :: common:: parse_args;
6- use futures:: { future, Future , Stream } ;
8+ use futures:: {
9+ future,
10+ prelude:: { StreamExt , TryStreamExt } ,
11+ stream, TryFutureExt ,
12+ } ;
713use std:: ops:: RangeBounds ;
814use tikv_client:: {
915 transaction:: { Client , IsolationLevel } ,
1016 Config , Key , KvPair , Value ,
1117} ;
1218
13- fn puts ( client : & Client , pairs : impl IntoIterator < Item = impl Into < KvPair > > ) {
19+ async fn puts ( client : & Client , pairs : impl IntoIterator < Item = impl Into < KvPair > > ) {
1420 let mut txn = client. begin ( ) ;
15- let _ : Vec < ( ) > = future:: join_all (
21+ await ! ( future:: join_all(
1622 pairs
1723 . into_iter( )
1824 . map( Into :: into)
19- . map ( |p| txn. set ( p. key ( ) . clone ( ) , p. value ( ) . clone ( ) ) ) ,
20- )
21- . wait ( )
25+ . map( |p| txn. set( p. key( ) . clone( ) , p. value( ) . clone( ) ) )
26+ ) )
27+ . into_iter ( )
28+ . collect :: < Result < Vec < ( ) > , _ > > ( )
2229 . expect ( "Could not set key value pairs" ) ;
23- txn. commit ( ) . wait ( ) . expect ( "Could not commit transaction" ) ;
30+ await ! ( txn. commit( ) ) . expect ( "Could not commit transaction" ) ;
2431}
2532
26- fn get ( client : & Client , key : Key ) -> Value {
33+ async fn get ( client : & Client , key : Key ) -> Value {
2734 let txn = client. begin ( ) ;
28- txn. get ( key) . wait ( ) . expect ( "Could not get value" )
35+ await ! ( txn. get( key) ) . expect ( "Could not get value" )
2936}
3037
31- fn scan ( client : & Client , range : impl RangeBounds < Key > , mut limit : usize ) {
32- client
38+ // Ignore a spurious warning from rustc (https://github.com/rust-lang/rust/issues/60566).
39+ #[ allow( unused_mut) ]
40+ async fn scan ( client : & Client , range : impl RangeBounds < Key > , mut limit : usize ) {
41+ await ! ( client
3342 . begin( )
3443 . scan( range)
35- . take_while ( move |_| {
36- Ok ( if limit == 0 {
44+ . into_stream( )
45+ . take_while( move |r| {
46+ assert!( r. is_ok( ) , "Could not scan keys" ) ;
47+ future:: ready( if limit == 0 {
3748 false
3849 } else {
3950 limit -= 1 ;
4051 true
4152 } )
4253 } )
43- . for_each ( |pair| {
44- println ! ( "{:?}" , pair) ;
45- Ok ( ( ) )
46- } )
47- . wait ( )
48- . expect ( "Could not scan keys" ) ;
54+ . for_each( |pair| { future:: ready( println!( "{:?}" , pair) ) } ) ) ;
4955}
5056
51- fn dels ( client : & Client , keys : impl IntoIterator < Item = Key > ) {
57+ async fn dels ( client : & Client , keys : impl IntoIterator < Item = Key > ) {
5258 let mut txn = client. begin ( ) ;
5359 txn. set_isolation_level ( IsolationLevel :: ReadCommitted ) ;
54- let _: Vec < ( ) > = keys
55- . into_iter ( )
56- . map ( |p| {
57- txn. delete ( p) . wait ( ) . expect ( "Could not delete key" ) ;
58- } )
59- . collect ( ) ;
60- txn. commit ( ) . wait ( ) . expect ( "Could not commit transaction" ) ;
60+ let _: Vec < ( ) > = await ! ( stream:: iter( keys. into_iter( ) )
61+ . then( |p| txn
62+ . delete( p)
63+ . unwrap_or_else( |e| panic!( "error in delete: {:?}" , e) ) )
64+ . collect( ) ) ;
65+ await ! ( txn. commit( ) ) . expect ( "Could not commit transaction" ) ;
6166}
6267
63- fn main ( ) {
68+ #[ runtime:: main( runtime_tokio:: Tokio ) ]
69+ async fn main ( ) {
6470 // You can try running this example by passing your pd endpoints
6571 // (and SSL options if necessary) through command line arguments.
6672 let args = parse_args ( "txn" ) ;
@@ -73,28 +79,26 @@ fn main() {
7379 Config :: new ( args. pd )
7480 } ;
7581
76- let txn = Client :: new ( config)
77- . wait ( )
78- . expect ( "Could not connect to tikv" ) ;
82+ let txn = await ! ( Client :: new( config) ) . expect ( "Could not connect to tikv" ) ;
7983
8084 // set
8185 let key1: Key = b"key1" . to_vec ( ) . into ( ) ;
8286 let value1: Value = b"value1" . to_vec ( ) . into ( ) ;
8387 let key2: Key = b"key2" . to_vec ( ) . into ( ) ;
8488 let value2: Value = b"value2" . to_vec ( ) . into ( ) ;
85- puts ( & txn, vec ! [ ( key1, value1) , ( key2, value2) ] ) ;
89+ await ! ( puts( & txn, vec![ ( key1, value1) , ( key2, value2) ] ) ) ;
8690
8791 // get
8892 let key1: Key = b"key1" . to_vec ( ) . into ( ) ;
89- let value1 = get ( & txn, key1. clone ( ) ) ;
93+ let value1 = await ! ( get( & txn, key1. clone( ) ) ) ;
9094 println ! ( "{:?}" , ( key1, value1) ) ;
9195
9296 // scan
9397 let key1: Key = b"key1" . to_vec ( ) . into ( ) ;
94- scan ( & txn, key1.., 10 ) ;
98+ await ! ( scan( & txn, key1.., 10 ) ) ;
9599
96100 // delete
97101 let key1: Key = b"key1" . to_vec ( ) . into ( ) ;
98102 let key2: Key = b"key2" . to_vec ( ) . into ( ) ;
99- dels ( & txn, vec ! [ key1, key2] ) ;
103+ await ! ( dels( & txn, vec![ key1, key2] ) ) ;
100104}
0 commit comments