1+ use crate :: utils:: setup_tracing;
2+ use crate :: utils:: unique_keyspace_name;
3+ use crate :: utils:: PerformDDL ;
4+ use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
15use scylla:: batch:: Batch ;
26use scylla:: batch:: BatchType ;
37use scylla:: client:: session:: Session ;
@@ -9,13 +13,10 @@ use scylla::query::Query;
913use scylla:: value:: { Counter , CqlValue , MaybeUnset } ;
1014use std:: collections:: HashMap ;
1115use std:: string:: String ;
12-
13- use crate :: utils:: setup_tracing;
14- use crate :: utils:: unique_keyspace_name;
15- use crate :: utils:: PerformDDL ;
16- use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
16+ use std:: sync:: Arc ;
1717
1818use assert_matches:: assert_matches;
19+ use scylla:: response:: query_result:: { QueryResult , QueryRowsResult } ;
1920
2021const BATCH_COUNT : usize = 100 ;
2122
@@ -25,6 +26,8 @@ async fn create_test_session(table_name: &str, supports_tablets: bool) -> Sessio
2526
2627 let mut create_ks = format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ;
2728
29+ // Need to disable tablets in this test because they don't support counters yet.
30+ // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e).
2831 if !supports_tablets && scylla_supports_tablets ( & session) . await {
2932 create_ks += " AND TABLETS = {'enabled': false}"
3033 }
@@ -142,6 +145,145 @@ async fn batch_statements_and_values_mismatch_detected() {
142145 }
143146}
144147
148+ #[ tokio:: test]
149+ async fn test_batch ( ) {
150+ setup_tracing ( ) ;
151+ let session = Arc :: new ( create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ) ;
152+ let ks = unique_keyspace_name ( ) ;
153+
154+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
155+ session
156+ . ddl ( format ! (
157+ "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))" ,
158+ ks
159+ ) )
160+ . await
161+ . unwrap ( ) ;
162+
163+ let prepared_statement = session
164+ . prepare ( format ! (
165+ "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" ,
166+ ks
167+ ) )
168+ . await
169+ . unwrap ( ) ;
170+
171+ // TODO: Add API that supports binding values to statements in batch creation process,
172+ // to avoid problem of statements/values count mismatch
173+ use scylla:: batch:: Batch ;
174+ let mut batch: Batch = Default :: default ( ) ;
175+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" , ks) [ ..] ) ;
176+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')" , ks) [ ..] ) ;
177+ batch. append_statement ( prepared_statement. clone ( ) ) ;
178+
179+ let four_value: i32 = 4 ;
180+ let hello_value: String = String :: from ( "hello" ) ;
181+ let session_clone = session. clone ( ) ;
182+ // We're spawning to a separate task here to test that it works even in that case, because in some scenarios
183+ // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries)
184+ // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on
185+ // some lifetime for some unknown reason), so can't be spawned on tokio.
186+ // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details
187+ tokio:: spawn ( async move {
188+ let values = (
189+ ( 1_i32 , 2_i32 , "abc" ) ,
190+ ( ) ,
191+ ( 1_i32 , & four_value, hello_value. as_str ( ) ) ,
192+ ) ;
193+ session_clone. batch ( & batch, values) . await . unwrap ( ) ;
194+ } )
195+ . await
196+ . unwrap ( ) ;
197+
198+ let mut results: Vec < ( i32 , i32 , String ) > = session
199+ . query_unpaged ( format ! ( "SELECT a, b, c FROM {}.t_batch" , ks) , & [ ] )
200+ . await
201+ . unwrap ( )
202+ . into_rows_result ( )
203+ . unwrap ( )
204+ . rows :: < ( i32 , i32 , String ) > ( )
205+ . unwrap ( )
206+ . collect :: < Result < _ , _ > > ( )
207+ . unwrap ( ) ;
208+
209+ results. sort ( ) ;
210+ assert_eq ! (
211+ results,
212+ vec![
213+ ( 1 , 2 , String :: from( "abc" ) ) ,
214+ ( 1 , 4 , String :: from( "hello" ) ) ,
215+ ( 7 , 11 , String :: from( "" ) )
216+ ]
217+ ) ;
218+
219+ // Test repreparing statement inside a batch
220+ let mut batch: Batch = Default :: default ( ) ;
221+ batch. append_statement ( prepared_statement) ;
222+ let values = ( ( 4_i32 , 20_i32 , "foobar" ) , ) ;
223+
224+ // This statement flushes the prepared statement cache
225+ session
226+ . ddl ( format ! (
227+ "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42" ,
228+ ks
229+ ) )
230+ . await
231+ . unwrap ( ) ;
232+ session. batch ( & batch, values) . await . unwrap ( ) ;
233+
234+ let results: Vec < ( i32 , i32 , String ) > = session
235+ . query_unpaged (
236+ format ! ( "SELECT a, b, c FROM {}.t_batch WHERE a = 4" , ks) ,
237+ & [ ] ,
238+ )
239+ . await
240+ . unwrap ( )
241+ . into_rows_result ( )
242+ . unwrap ( )
243+ . rows :: < ( i32 , i32 , String ) > ( )
244+ . unwrap ( )
245+ . collect :: < Result < _ , _ > > ( )
246+ . unwrap ( ) ;
247+
248+ assert_eq ! ( results, vec![ ( 4 , 20 , String :: from( "foobar" ) ) ] ) ;
249+ }
250+
251+ // This is a regression test for #1134.
252+ #[ tokio:: test]
253+ async fn test_batch_to_multiple_tables ( ) {
254+ setup_tracing ( ) ;
255+ let session = create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ;
256+ let ks = unique_keyspace_name ( ) ;
257+
258+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
259+ session. use_keyspace ( & ks, true ) . await . unwrap ( ) ;
260+ session
261+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))" )
262+ . await
263+ . unwrap ( ) ;
264+ session
265+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))" )
266+ . await
267+ . unwrap ( ) ;
268+
269+ let prepared_statement = session
270+ . prepare (
271+ "
272+ BEGIN BATCH
273+ INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?);
274+ INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?);
275+ APPLY BATCH;
276+ " ,
277+ )
278+ . await
279+ . unwrap ( ) ;
280+
281+ session
282+ . execute_unpaged ( & prepared_statement, ( 1 , 2 , "ala" , 4 , 5 , "ma" ) )
283+ . await
284+ . unwrap ( ) ;
285+ }
286+
145287#[ tokio:: test]
146288async fn test_batch_of_simple_statements ( ) {
147289 setup_tracing ( ) ;
@@ -324,34 +466,144 @@ async fn test_batch_of_mixed_bound_and_simple_statements() {
324466 verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
325467}
326468
469+ // Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied.
327470#[ tokio:: test]
328- async fn test_cas_batch ( ) {
329- setup_tracing ( ) ;
330- let test_name = String :: from ( "test_cas_batch" ) ;
471+ async fn test_batch_lwts ( ) {
472+ let test_name = String :: from ( "test_counter_batch" ) ;
331473 let session = create_test_session ( & test_name, false ) . await ;
332474
333- let query_str = format ! (
334- "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS" ,
335- & test_name
336- ) ;
337- let prepared = session. prepare ( Query :: new ( query_str) ) . await . unwrap ( ) ;
475+ session
476+ . query_unpaged (
477+ format ! ( "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)" , & test_name) ,
478+ ( & test_name, 0 , 0 ) ,
479+ )
480+ . await
481+ . unwrap ( ) ;
482+
338483 let mut batch = Batch :: new ( BatchType :: Unlogged ) ;
339- let mut batch_values = Vec :: with_capacity ( BATCH_COUNT ) ;
340- for i in 0 ..BATCH_COUNT as i32 {
341- batch. append_statement ( prepared. clone ( ) ) ;
342- batch_values. push ( ( & test_name, i, i + 1 ) ) ;
484+ batch. append_statement (
485+ format ! (
486+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
487+ & test_name, & test_name
488+ )
489+ . as_str ( ) ,
490+ ) ;
491+ batch. append_statement (
492+ format ! (
493+ "INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)" ,
494+ & test_name, & test_name
495+ )
496+ . as_str ( ) ,
497+ ) ;
498+ batch. append_statement (
499+ format ! (
500+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
501+ & test_name, & test_name
502+ )
503+ . as_str ( ) ,
504+ ) ;
505+
506+ let batch_res: QueryResult = session. batch ( & batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
507+ let batch_deserializer = batch_res. into_rows_result ( ) . unwrap ( ) ;
508+
509+ // Scylla returns 4 columns, but Cassandra returns only 1
510+ let is_scylla: bool = batch_deserializer. column_specs ( ) . len ( ) == 4 ;
511+
512+ if is_scylla {
513+ test_batch_lwts_for_scylla ( & session, & batch, & batch_deserializer, & test_name) . await ;
514+ } else {
515+ test_batch_lwts_for_cassandra ( & session, & batch, & batch_deserializer, & test_name) . await ;
343516 }
344- let result = session. batch ( & batch, batch_values. clone ( ) ) . await . unwrap ( ) ;
345- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
346- result. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
347- assert ! ( row. 0 , "First CAS batch should be applied" ) ;
517+ }
348518
349- verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
519+ async fn test_batch_lwts_for_scylla (
520+ session : & Session ,
521+ batch : & Batch ,
522+ query_rows_result : & QueryRowsResult ,
523+ k0_value : & str ,
524+ ) {
525+ // Alias required by clippy
526+ type IntOrNull = Option < i32 > ;
527+ type StrOrNull = Option < String > ;
528+
529+ // Returned columns are:
530+ // [applied], k0, k1, v
531+ let batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = query_rows_result
532+ . rows ( )
533+ . unwrap ( )
534+ . collect :: < Result < _ , _ > > ( )
535+ . unwrap ( ) ;
536+
537+ let k0_value = k0_value. to_string ( ) ;
538+ let expected_batch_res_rows = vec ! [
539+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
540+ ( true , None , None , None ) ,
541+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
542+ ] ;
543+
544+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
545+
546+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
547+ let prepared_batch_res: QueryResult =
548+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
549+
550+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
551+ . into_rows_result ( )
552+ . unwrap ( )
553+ . rows ( )
554+ . unwrap ( )
555+ . map ( |r| r. unwrap ( ) )
556+ . collect ( ) ;
557+
558+ let expected_prepared_batch_res_rows = vec ! [
559+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
560+ ( false , None , None , None ) ,
561+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
562+ ] ;
563+
564+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
565+ }
566+
567+ async fn test_batch_lwts_for_cassandra (
568+ session : & Session ,
569+ batch : & Batch ,
570+ query_rows_result : & QueryRowsResult ,
571+ k0_value : & str ,
572+ ) {
573+ // Alias required by clippy
574+ type IntOrNull = Option < i32 > ;
575+ type StrOrNull = Option < String > ;
576+
577+ // Returned columns are:
578+ // [applied]
579+ let batch_res_rows: Vec < ( bool , ) > = query_rows_result
580+ . rows ( )
581+ . unwrap ( )
582+ . map ( |r| r. unwrap ( ) )
583+ . collect ( ) ;
584+
585+ let expected_batch_res_rows = vec ! [ ( true , ) ] ;
586+
587+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
588+
589+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
590+ let prepared_batch_res: QueryResult =
591+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
592+
593+ // Returned columns are:
594+ // [applied], p1, c1, r1, r2
595+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
596+ . into_rows_result ( )
597+ . unwrap ( )
598+ . rows ( )
599+ . unwrap ( )
600+ . map ( |r| r. unwrap ( ) )
601+ . collect ( ) ;
602+
603+ let expected_prepared_batch_res_rows =
604+ vec ! [ ( false , Some ( k0_value. to_string( ) ) , Some ( 0 ) , Some ( 1 ) ) ] ;
350605
351- let result2 = session. batch ( & batch, batch_values) . await . unwrap ( ) ;
352- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
353- result2. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
354- assert ! ( !row. 0 , "Second CAS batch should not be applied" ) ;
606+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
355607}
356608
357609#[ tokio:: test]
0 commit comments