1+ use crate :: utils:: setup_tracing;
2+ use crate :: utils:: unique_keyspace_name;
3+ use crate :: utils:: PerformDDL ;
4+ use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
15use scylla:: batch:: Batch ;
26use scylla:: batch:: BatchType ;
37use scylla:: client:: session:: Session ;
@@ -9,13 +13,10 @@ use scylla::query::Query;
913use scylla:: value:: { Counter , CqlValue , MaybeUnset } ;
1014use std:: collections:: HashMap ;
1115use std:: string:: String ;
12-
13- use crate :: utils:: setup_tracing;
14- use crate :: utils:: unique_keyspace_name;
15- use crate :: utils:: PerformDDL ;
16- use crate :: utils:: { create_new_session_builder, scylla_supports_tablets} ;
16+ use std:: sync:: Arc ;
1717
1818use assert_matches:: assert_matches;
19+ use scylla:: response:: query_result:: { QueryResult , QueryRowsResult } ;
1920
2021const BATCH_COUNT : usize = 100 ;
2122
@@ -25,6 +26,8 @@ async fn create_test_session(table_name: &str, supports_tablets: bool) -> Sessio
2526
2627 let mut create_ks = format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ;
2728
29+ // Need to disable tablets in this test because they don't support counters yet.
30+ // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e).
2831 if !supports_tablets && scylla_supports_tablets ( & session) . await {
2932 create_ks += " AND TABLETS = {'enabled': false}"
3033 }
@@ -142,6 +145,145 @@ async fn batch_statements_and_values_mismatch_detected() {
142145 }
143146}
144147
148+ #[ tokio:: test]
149+ async fn test_batch ( ) {
150+ setup_tracing ( ) ;
151+ let session = Arc :: new ( create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ) ;
152+ let ks = unique_keyspace_name ( ) ;
153+
154+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
155+ session
156+ . ddl ( format ! (
157+ "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))" ,
158+ ks
159+ ) )
160+ . await
161+ . unwrap ( ) ;
162+
163+ let prepared_statement = session
164+ . prepare ( format ! (
165+ "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" ,
166+ ks
167+ ) )
168+ . await
169+ . unwrap ( ) ;
170+
171+ // TODO: Add API that supports binding values to statements in batch creation process,
172+ // to avoid problem of statements/values count mismatch
173+ use scylla:: batch:: Batch ;
174+ let mut batch: Batch = Default :: default ( ) ;
175+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)" , ks) [ ..] ) ;
176+ batch. append_statement ( & format ! ( "INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')" , ks) [ ..] ) ;
177+ batch. append_statement ( prepared_statement. clone ( ) ) ;
178+
179+ let four_value: i32 = 4 ;
180+ let hello_value: String = String :: from ( "hello" ) ;
181+ let session_clone = session. clone ( ) ;
182+ // We're spawning to a separate task here to test that it works even in that case, because in some scenarios
183+ // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries)
184+ // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on
185+ // some lifetime for some unknown reason), so can't be spawned on tokio.
186+ // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details
187+ tokio:: spawn ( async move {
188+ let values = (
189+ ( 1_i32 , 2_i32 , "abc" ) ,
190+ ( ) ,
191+ ( 1_i32 , & four_value, hello_value. as_str ( ) ) ,
192+ ) ;
193+ session_clone. batch ( & batch, values) . await . unwrap ( ) ;
194+ } )
195+ . await
196+ . unwrap ( ) ;
197+
198+ let mut results: Vec < ( i32 , i32 , String ) > = session
199+ . query_unpaged ( format ! ( "SELECT a, b, c FROM {}.t_batch" , ks) , & [ ] )
200+ . await
201+ . unwrap ( )
202+ . into_rows_result ( )
203+ . unwrap ( )
204+ . rows :: < ( i32 , i32 , String ) > ( )
205+ . unwrap ( )
206+ . collect :: < Result < _ , _ > > ( )
207+ . unwrap ( ) ;
208+
209+ results. sort ( ) ;
210+ assert_eq ! (
211+ results,
212+ vec![
213+ ( 1 , 2 , String :: from( "abc" ) ) ,
214+ ( 1 , 4 , String :: from( "hello" ) ) ,
215+ ( 7 , 11 , String :: from( "" ) )
216+ ]
217+ ) ;
218+
219+ // Test repreparing statement inside a batch
220+ let mut batch: Batch = Default :: default ( ) ;
221+ batch. append_statement ( prepared_statement) ;
222+ let values = ( ( 4_i32 , 20_i32 , "foobar" ) , ) ;
223+
224+ // This statement flushes the prepared statement cache
225+ session
226+ . ddl ( format ! (
227+ "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42" ,
228+ ks
229+ ) )
230+ . await
231+ . unwrap ( ) ;
232+ session. batch ( & batch, values) . await . unwrap ( ) ;
233+
234+ let results: Vec < ( i32 , i32 , String ) > = session
235+ . query_unpaged (
236+ format ! ( "SELECT a, b, c FROM {}.t_batch WHERE a = 4" , ks) ,
237+ & [ ] ,
238+ )
239+ . await
240+ . unwrap ( )
241+ . into_rows_result ( )
242+ . unwrap ( )
243+ . rows :: < ( i32 , i32 , String ) > ( )
244+ . unwrap ( )
245+ . collect :: < Result < _ , _ > > ( )
246+ . unwrap ( ) ;
247+
248+ assert_eq ! ( results, vec![ ( 4 , 20 , String :: from( "foobar" ) ) ] ) ;
249+ }
250+
251+ // This is a regression test for #1134.
252+ #[ tokio:: test]
253+ async fn test_batch_to_multiple_tables ( ) {
254+ setup_tracing ( ) ;
255+ let session = create_new_session_builder ( ) . build ( ) . await . unwrap ( ) ;
256+ let ks = unique_keyspace_name ( ) ;
257+
258+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
259+ session. use_keyspace ( & ks, true ) . await . unwrap ( ) ;
260+ session
261+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))" )
262+ . await
263+ . unwrap ( ) ;
264+ session
265+ . ddl ( "CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))" )
266+ . await
267+ . unwrap ( ) ;
268+
269+ let prepared_statement = session
270+ . prepare (
271+ "
272+ BEGIN BATCH
273+ INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?);
274+ INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?);
275+ APPLY BATCH;
276+ " ,
277+ )
278+ . await
279+ . unwrap ( ) ;
280+
281+ session
282+ . execute_unpaged ( & prepared_statement, ( 1 , 2 , "ala" , 4 , 5 , "ma" ) )
283+ . await
284+ . unwrap ( ) ;
285+ }
286+
145287#[ tokio:: test]
146288async fn test_batch_of_simple_statements ( ) {
147289 setup_tracing ( ) ;
@@ -324,34 +466,148 @@ async fn test_batch_of_mixed_prepared_and_simple_statements() {
324466 verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
325467}
326468
469+ // Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied.
327470#[ tokio:: test]
328- async fn test_cas_batch ( ) {
329- setup_tracing ( ) ;
330- let test_name = String :: from ( "test_cas_batch" ) ;
471+ async fn test_batch_lwts ( ) {
472+ let test_name = String :: from ( "test_counter_batch" ) ;
331473 let session = create_test_session ( & test_name, false ) . await ;
332474
333- let query_str = format ! (
334- "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS" ,
335- & test_name
336- ) ;
337- let prepared = session. prepare ( Query :: new ( query_str) ) . await . unwrap ( ) ;
475+ session
476+ . query_unpaged (
477+ format ! ( "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)" , & test_name) ,
478+ ( & test_name, 0 , 0 ) ,
479+ )
480+ . await
481+ . unwrap ( ) ;
482+
338483 let mut batch = Batch :: new ( BatchType :: Unlogged ) ;
339- let mut batch_values = Vec :: with_capacity ( BATCH_COUNT ) ;
340- for i in 0 ..BATCH_COUNT as i32 {
341- batch. append_statement ( prepared. clone ( ) ) ;
342- batch_values. push ( ( & test_name, i, i + 1 ) ) ;
484+ batch. append_statement (
485+ format ! (
486+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
487+ & test_name, & test_name
488+ )
489+ . as_str ( ) ,
490+ ) ;
491+ batch. append_statement (
492+ format ! (
493+ "INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)" ,
494+ & test_name, & test_name
495+ )
496+ . as_str ( ) ,
497+ ) ;
498+ batch. append_statement (
499+ format ! (
500+ "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0" ,
501+ & test_name, & test_name
502+ )
503+ . as_str ( ) ,
504+ ) ;
505+
506+ let batch_res: QueryResult = session. batch ( & batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
507+ let batch_deserializer = batch_res. into_rows_result ( ) . unwrap ( ) ;
508+
509+ // Scylla always returns a result set with additional 'applied' column, which indicates whether the statement was applied.
510+ // In contrast, Cassandra returns only 'applied' column for applied transactions; and 'applied' column
511+ // with result set for rejected ones.
512+ // This is described in https://opensource.docs.scylladb.com/stable/features/lwt.html and
513+ // https://opensource.docs.scylladb.com/stable/kb/lwt-differences.html).
514+ let is_scylla: bool = batch_deserializer. column_specs ( ) . len ( ) == 4 ;
515+
516+ if is_scylla {
517+ test_batch_lwts_for_scylla ( & session, & batch, & batch_deserializer, & test_name) . await ;
518+ } else {
519+ test_batch_lwts_for_cassandra ( & session, & batch, & batch_deserializer, & test_name) . await ;
343520 }
344- let result = session. batch ( & batch, batch_values. clone ( ) ) . await . unwrap ( ) ;
345- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
346- result. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
347- assert ! ( row. 0 , "First CAS batch should be applied" ) ;
521+ }
348522
349- verify_batch_insert ( & session, & test_name, BATCH_COUNT ) . await ;
523+ async fn test_batch_lwts_for_scylla (
524+ session : & Session ,
525+ batch : & Batch ,
526+ query_rows_result : & QueryRowsResult ,
527+ k0_value : & str ,
528+ ) {
529+ // Alias required by clippy
530+ type IntOrNull = Option < i32 > ;
531+ type StrOrNull = Option < String > ;
532+
533+ // Returned columns are:
534+ // [applied], k0, k1, v
535+ let batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = query_rows_result
536+ . rows ( )
537+ . unwrap ( )
538+ . collect :: < Result < _ , _ > > ( )
539+ . unwrap ( ) ;
540+
541+ let k0_value = k0_value. to_string ( ) ;
542+ let expected_batch_res_rows = vec ! [
543+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
544+ ( true , None , None , None ) ,
545+ ( true , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 0 ) ) ,
546+ ] ;
547+
548+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
549+
550+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
551+ let prepared_batch_res: QueryResult =
552+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
553+
554+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
555+ . into_rows_result ( )
556+ . unwrap ( )
557+ . rows ( )
558+ . unwrap ( )
559+ . map ( |r| r. unwrap ( ) )
560+ . collect ( ) ;
561+
562+ let expected_prepared_batch_res_rows = vec ! [
563+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
564+ ( false , None , None , None ) ,
565+ ( false , Some ( k0_value. clone( ) ) , Some ( 0 ) , Some ( 1 ) ) ,
566+ ] ;
567+
568+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
569+ }
570+
571+ async fn test_batch_lwts_for_cassandra (
572+ session : & Session ,
573+ batch : & Batch ,
574+ query_rows_result : & QueryRowsResult ,
575+ k0_value : & str ,
576+ ) {
577+ // Alias required by clippy
578+ type IntOrNull = Option < i32 > ;
579+ type StrOrNull = Option < String > ;
580+
581+ // Returned columns are:
582+ // [applied]
583+ let batch_res_rows: Vec < ( bool , ) > = query_rows_result
584+ . rows ( )
585+ . unwrap ( )
586+ . map ( |r| r. unwrap ( ) )
587+ . collect ( ) ;
588+
589+ let expected_batch_res_rows = vec ! [ ( true , ) ] ;
590+
591+ assert_eq ! ( batch_res_rows, expected_batch_res_rows) ;
592+
593+ let prepared_batch: Batch = session. prepare_batch ( batch) . await . unwrap ( ) ;
594+ let prepared_batch_res: QueryResult =
595+ session. batch ( & prepared_batch, ( ( ) , ( ) , ( ) ) ) . await . unwrap ( ) ;
596+
597+ // Returned columns are:
598+ // [applied], k0, k1, v
599+ let prepared_batch_res_rows: Vec < ( bool , StrOrNull , IntOrNull , IntOrNull ) > = prepared_batch_res
600+ . into_rows_result ( )
601+ . unwrap ( )
602+ . rows ( )
603+ . unwrap ( )
604+ . map ( |r| r. unwrap ( ) )
605+ . collect ( ) ;
606+
607+ let expected_prepared_batch_res_rows =
608+ vec ! [ ( false , Some ( k0_value. to_string( ) ) , Some ( 0 ) , Some ( 1 ) ) ] ;
350609
351- let result2 = session. batch ( & batch, batch_values) . await . unwrap ( ) ;
352- let row: ( bool , Option < String > , Option < i32 > , Option < i32 > ) =
353- result2. into_rows_result ( ) . unwrap ( ) . first_row ( ) . unwrap ( ) ;
354- assert ! ( !row. 0 , "Second CAS batch should not be applied" ) ;
610+ assert_eq ! ( prepared_batch_res_rows, expected_prepared_batch_res_rows) ;
355611}
356612
357613#[ tokio:: test]
0 commit comments