11use std:: sync:: Arc ;
2+ use std:: time:: Duration ;
23
34use crate :: ccm:: cluster:: { Cluster , ClusterOptions } ;
45use crate :: ccm:: { run_ccm_test, CLUSTER_VERSION } ;
56use crate :: common:: utils:: { setup_tracing, unique_keyspace_name} ;
67
78use scylla:: client:: execution_profile:: ExecutionProfile ;
9+ use scylla:: client:: session:: Session ;
810use scylla:: cluster:: { ClusterState , Node , NodeRef } ;
911use scylla:: policies:: load_balancing:: { FallbackPlan , LoadBalancingPolicy , RoutingInfo } ;
1012use scylla:: query:: Query ;
1113use tokio:: sync:: Mutex ;
14+ use tracing:: info;
15+ use uuid:: Uuid ;
1216
17+ /// Creates a cluster configuration with 3 nodes for schema agreement tests.
1318fn cluster_3_nodes ( ) -> ClusterOptions {
1419 ClusterOptions {
1520 name : "schema_agreement_test" . to_string ( ) ,
@@ -19,6 +24,7 @@ fn cluster_3_nodes() -> ClusterOptions {
1924 }
2025}
2126
27+ /// A load balancing policy that targets a single node.
2228#[ derive( Debug ) ]
2329struct SingleTargetLBP {
2430 target : ( Arc < Node > , Option < u32 > ) ,
@@ -46,145 +52,220 @@ impl LoadBalancingPolicy for SingleTargetLBP {
4652 }
4753}
4854
55+ /// Waits for schema agreement with a timeout and retries.
56+ async fn wait_for_schema_agreement (
57+ session : & Session ,
58+ timeout : Duration ,
59+ retries : u32 ,
60+ ) -> Result < Option < Uuid > , anyhow:: Error > {
61+ let retry_interval = Duration :: from_millis ( 500 ) ;
62+ let mut attempts = 0 ;
63+
64+ tokio:: time:: timeout ( timeout, async {
65+ loop {
66+ match session. check_schema_agreement ( ) . await {
67+ Ok ( Some ( agreement) ) => return Ok ( Some ( agreement) ) ,
68+ Ok ( None ) => {
69+ attempts += 1 ;
70+ if attempts > retries {
71+ return Err ( anyhow:: anyhow!(
72+ "Schema agreement not reached after {} retries" ,
73+ retries
74+ ) ) ;
75+ }
76+ info ! ( "Schema agreement not yet reached, retrying ({}/{})" , attempts, retries) ;
77+ tokio:: time:: sleep ( retry_interval) . await ;
78+ }
79+ Err ( e) => return Err ( anyhow:: anyhow!( "Failed to check schema agreement: {}" , e) ) ,
80+ }
81+ }
82+ } )
83+ . await
84+ . map_err ( |_| anyhow:: anyhow!( "Schema agreement timed out after {:?}" , timeout) ) ?
85+ }
86+
87+ /// Sets up a keyspace with a given replication factor.
88+ async fn setup_keyspace (
89+ session : & Session ,
90+ keyspace : & str ,
91+ replication_factor : u32 ,
92+ ) -> Result < ( ) , anyhow:: Error > {
93+ let query = format ! (
94+ "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : {}}}" ,
95+ keyspace, replication_factor
96+ ) ;
97+ session. query_unpaged ( query, & [ ] ) . await ?;
98+ session. use_keyspace ( keyspace, true ) . await ?;
99+ Ok ( ( ) )
100+ }
101+
49102#[ tokio:: test]
50103#[ cfg_attr( not( ccm_tests) , ignore) ]
51104async fn test_schema_agreement ( ) {
52105 setup_tracing ( ) ;
53106 run_ccm_test ( cluster_3_nodes, test_schema_agreement_all_nodes) . await ;
54107 run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_stopped_node) . await ;
55108 run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_paused_node) . await ;
109+ // TODO - multidc cases
56110}
57111
112+ /// Tests schema agreement with all nodes running.
58113async fn test_schema_agreement_all_nodes ( cluster : Arc < Mutex < Cluster > > ) {
59114 let cluster = cluster. lock ( ) . await ;
60- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
115+ let session = cluster
116+ . make_session_builder ( )
117+ . await
118+ . build ( )
119+ . await
120+ . expect ( "Failed to create session" ) ;
61121
62- // Create keyspace
63122 let keyspace = unique_keyspace_name ( ) ;
123+ setup_keyspace ( & session, & keyspace, 3 )
124+ . await
125+ . expect ( "Failed to setup keyspace" ) ;
126+
127+ info ! ( "Creating table in test_schema_agreement_all_nodes" ) ;
64128 session
65- . query_unpaged (
66- format ! (
67- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
68- keyspace
69- ) ,
70- & [ ] ,
71- )
72- . await
73- . unwrap ( ) ;
74-
75- // Use keyspace
76- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
77-
78- // Create a table and check schema agreement
79- let _result = session
80- . query_unpaged (
81- "CREATE TABLE test_schema_agreement_all (k int primary key, v int)" ,
82- & [ ] ,
83- )
84- . await
85- . unwrap ( ) ;
86-
87- // Check if schema is in agreement
88- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
89- assert ! ( schema_agreement. is_some( ) ) ;
129+ . query_unpaged ( "CREATE TABLE test_table (k int primary key, v int)" , & [ ] )
130+ . await
131+ . expect ( "Failed to create table" ) ;
132+
133+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
134+ . await
135+ . expect ( "Schema agreement failed" ) ;
136+ assert ! ( agreement. is_some( ) , "Schema agreement should be reached" ) ;
137+ info ! ( "Schema agreement achieved with all nodes" ) ;
90138}
91139
140+ /// Tests schema agreement with one node stopped.
92141async fn test_schema_agreement_with_stopped_node ( cluster : Arc < Mutex < Cluster > > ) {
93142 let cluster = cluster. lock ( ) . await ;
94-
95- // Create keyspace
96- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
143+ let session = cluster
144+ . make_session_builder ( )
145+ . await
146+ . build ( )
147+ . await
148+ . expect ( "Failed to create session" ) ;
97149
98150 let keyspace = unique_keyspace_name ( ) ;
151+ setup_keyspace ( & session, & keyspace, 3 )
152+ . await
153+ . expect ( "Failed to setup keyspace" ) ;
154+
155+ let node = cluster
156+ . nodes ( )
157+ . get_by_id ( 2 )
158+ . await
159+ . expect ( "Failed to get node 2" ) ;
160+ info ! ( "Stopping node 2" ) ;
161+ node. write ( ) . await . stop ( None ) . await . expect ( "Failed to stop node" ) ;
162+
163+ info ! ( "Creating table with one node stopped" ) ;
99164 session
100- . query_unpaged (
101- format ! (
102- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
103- keyspace
104- ) ,
105- & [ ] ,
106- )
107- . await
108- . unwrap ( ) ;
109-
110- // Use keyspace
111- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
112-
113- // Stop node 2
114- let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
115- node. write ( ) . await . stop ( None ) . await . unwrap ( ) ;
116-
117- // Create a table while one node is stopped
118- let _result = session
119- . query_unpaged (
120- "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)" ,
121- & [ ] ,
122- )
123- . await
124- . unwrap ( ) ;
125-
126- // Schema agreement should succeed with remaining up nodes
127- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
128- assert ! ( schema_agreement. is_some( ) ) ;
129-
130- // Start the node back
131- node. write ( ) . await . start ( None ) . await . unwrap ( ) ;
132- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
133- assert ! ( schema_agreement. is_some( ) ) ;
165+ . query_unpaged ( "CREATE TABLE test_table (k int primary key, v int)" , & [ ] )
166+ . await
167+ . expect ( "Failed to create table" ) ;
168+
169+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
170+ . await
171+ . expect ( "Schema agreement failed with stopped node" ) ;
172+ assert ! (
173+ agreement. is_some( ) ,
174+ "Schema agreement should be reached with remaining nodes"
175+ ) ;
176+
177+ info ! ( "Restarting node 2" ) ;
178+ node. write ( )
179+ . await
180+ . start ( None )
181+ . await
182+ . expect ( "Failed to restart node" ) ;
183+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
184+ . await
185+ . expect ( "Schema agreement failed after restart" ) ;
186+ assert ! (
187+ agreement. is_some( ) ,
188+ "Schema agreement should be reached after node restart"
189+ ) ;
190+ info ! ( "Schema agreement achieved after node restart" ) ;
134191}
135192
193+ /// Tests schema agreement with one node paused.
136194async fn test_schema_agreement_with_paused_node ( cluster : Arc < Mutex < Cluster > > ) {
137195 let cluster = cluster. lock ( ) . await ;
138-
139- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
196+ let session = cluster
197+ . make_session_builder ( )
198+ . await
199+ . build ( )
200+ . await
201+ . expect ( "Failed to create session" ) ;
140202
141203 let keyspace = unique_keyspace_name ( ) ;
142- session
143- . query_unpaged (
144- format ! (
145- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
146- keyspace
147- ) ,
148- & [ ] ,
149- )
204+ setup_keyspace ( & session, & keyspace, 3 )
150205 . await
151- . unwrap ( ) ;
152-
153- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
206+ . expect ( "Failed to setup keyspace" ) ;
154207
155- // Stop node 2
156208 let node_id = 2 ;
157- let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
209+ let ccm_node = cluster
210+ . nodes ( )
211+ . get_by_id ( node_id)
212+ . await
213+ . expect ( "Failed to get node 2" ) ;
158214 let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
159- ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
215+ info ! ( "Pausing node 2" ) ;
216+ ccm_node
217+ . write ( )
218+ . await
219+ . pause ( )
220+ . await
221+ . expect ( "Failed to pause node" ) ;
160222
161- // Find the corresponding Scylla node from the session to avoid querying it directly
162223 let cluster_state = session. get_cluster_state ( ) ;
163- let scylla_node = cluster_state
224+ let running_scylla_node = cluster_state
164225 . get_nodes_info ( )
165226 . iter ( )
166227 . find ( |n| n. address . ip ( ) != ccm_node_addr)
167- . expect ( "Could not find unpaused Scylla node for querying " ) ;
228+ . expect ( "Could not find unpaused Scylla node" ) ;
168229
169230 let policy = SingleTargetLBP {
170- target : ( scylla_node . clone ( ) , Some ( 0 ) ) ,
231+ target : ( running_scylla_node . clone ( ) , Some ( 0 ) ) ,
171232 } ;
172233 let execution_profile = ExecutionProfile :: builder ( )
173234 . load_balancing_policy ( Arc :: new ( policy) )
174235 . build ( ) ;
175- let mut stmt =
176- Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
236+ let mut stmt = Query :: new ( "CREATE TABLE test_table (k int primary key, v int)" ) ;
177237 stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
178- // Create a table while one node is paused
179- let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
180238
181- // Schema agreement should succeed with remaining up nodes
182- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
183- assert ! ( schema_agreement. is_some( ) ) ;
239+ info ! ( "Creating table with one node paused" ) ;
240+ session
241+ . query_unpaged ( stmt, & [ ] )
242+ . await
243+ . expect ( "Failed to create table" ) ;
244+
245+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
246+ . await
247+ . expect ( "Schema agreement failed with paused node" ) ;
248+ assert ! (
249+ agreement. is_some( ) ,
250+ "Schema agreement should be reached with remaining nodes"
251+ ) ;
184252
185- // Start the node back
186- ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
253+ info ! ( "Resuming node 2" ) ;
254+ ccm_node
255+ . write ( )
256+ . await
257+ . resume ( )
258+ . await
259+ . expect ( "Failed to resume node" ) ;
187260
188- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
189- assert ! ( schema_agreement. is_some( ) ) ;
261+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
262+ . await
263+ . expect ( "Schema agreement failed after resume" ) ;
264+ assert ! (
265+ agreement. is_some( ) ,
266+ "Schema agreement should be reached after node resume"
267+ ) ;
268+ info ! ( "Schema agreement achieved after node resume" ) ;
190269}
270+
271+
0 commit comments