Skip to content

Commit e4ce294

Browse files
authored
Merge pull request #1473 from Lorak-mmk/schema-agreement-dont-fail
Dont fail schema agreement on first error
2 parents 0a1a76f + fb034d5 commit e4ce294

File tree

2 files changed

+110
-38
lines changed

2 files changed

+110
-38
lines changed

scylla/src/client/session.rs

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,42 +2200,13 @@ impl Session {
22002200
last_error.map(Result::Err)
22012201
}
22022202

2203-
/// Awaits schema agreement among all reachable nodes.
2204-
///
2205-
/// Issues an agreement check each `Session::schema_agreement_interval`.
2206-
/// Loops indefinitely until the agreement is reached.
2207-
///
2208-
/// If `required_node` is Some, only returns Ok if this node successfully
2209-
/// returned its schema version during the agreement process.
2210-
async fn await_schema_agreement_indefinitely(
2211-
&self,
2212-
required_node: Option<Uuid>,
2213-
) -> Result<Uuid, SchemaAgreementError> {
2214-
loop {
2215-
tokio::time::sleep(self.schema_agreement_interval).await;
2216-
if let Some(agreed_version) = self
2217-
.check_schema_agreement_with_required_node(required_node)
2218-
.await?
2219-
{
2220-
return Ok(agreed_version);
2221-
}
2222-
}
2223-
}
2224-
22252203
/// Awaits schema agreement among all reachable nodes.
22262204
///
22272205
/// Issues an agreement check each `Session::schema_agreement_interval`.
22282206
/// If agreement is not reached in `Session::schema_agreement_timeout`,
22292207
/// `SchemaAgreementError::Timeout` is returned.
22302208
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
2231-
timeout(
2232-
self.schema_agreement_timeout,
2233-
self.await_schema_agreement_indefinitely(None),
2234-
)
2235-
.await
2236-
.unwrap_or(Err(SchemaAgreementError::Timeout(
2237-
self.schema_agreement_timeout,
2238-
)))
2209+
self.await_schema_agreement_with_required_node(None).await
22392210
}
22402211

22412212
/// Awaits schema agreement among all reachable nodes.
@@ -2250,14 +2221,34 @@ impl Session {
22502221
&self,
22512222
required_node: Option<Uuid>,
22522223
) -> Result<Uuid, SchemaAgreementError> {
2253-
timeout(
2254-
self.schema_agreement_timeout,
2255-
self.await_schema_agreement_indefinitely(required_node),
2256-
)
2224+
// None: no finished attempt recorded
2225+
// Some(Ok(())): Last attempt successful, without agreement
2226+
// Some(Err(_)): Last attempt failed
2227+
let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
2228+
timeout(self.schema_agreement_timeout, async {
2229+
loop {
2230+
let result = self
2231+
.check_schema_agreement_with_required_node(required_node)
2232+
.await;
2233+
match result {
2234+
Ok(Some(agreed_version)) => return agreed_version,
2235+
Ok(None) => last_agreement_failure = Some(Ok(())),
2236+
Err(err) => last_agreement_failure = Some(Err(err)),
2237+
}
2238+
tokio::time::sleep(self.schema_agreement_interval).await;
2239+
}
2240+
})
22572241
.await
2258-
.unwrap_or(Err(SchemaAgreementError::Timeout(
2259-
self.schema_agreement_timeout,
2260-
)))
2242+
.map_err(|_| {
2243+
match last_agreement_failure {
2244+
// There were no finished attempts - the only error we can return is Timeout.
2245+
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2246+
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2247+
Some(Err(err)) => err,
2248+
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2249+
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2250+
}
2251+
})
22612252
}
22622253

22632254
/// Checks if all reachable nodes have the same schema version.

scylla/tests/integration/session/schema_agreement.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
use std::sync::Arc;
22

33
use assert_matches::assert_matches;
4+
use scylla::client::PoolSize;
45
use scylla::client::session::Session;
56
use scylla::client::session_builder::SessionBuilder;
6-
use scylla::errors::{ExecutionError, RequestAttemptError, SchemaAgreementError};
7+
use scylla::errors::{DbError, ExecutionError, RequestAttemptError, SchemaAgreementError};
78
use scylla::policies::load_balancing::{NodeIdentifier, SingleTargetLoadBalancingPolicy};
89
use scylla::response::query_result::QueryResult;
910
use scylla::statement::Statement;
1011
use scylla_proxy::{
1112
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, RunningProxy,
1213
ShardAwareness, WorkerError,
1314
};
15+
use tracing::info;
1416

1517
use crate::utils::{
1618
calculate_proxy_host_ids, setup_tracing, test_with_3_node_cluster, unique_keyspace_name,
@@ -157,3 +159,82 @@ async fn test_schema_await_with_unreachable_node() {
157159
Err(err) => panic!("{}", err),
158160
}
159161
}
162+
163+
// Verifies that schema agreement process works correctly even if the first check fails.
164+
#[tokio::test]
165+
#[cfg_attr(scylla_cloud_tests, ignore)]
166+
async fn test_schema_await_with_transient_failure() {
167+
setup_tracing();
168+
169+
let res = test_with_3_node_cluster(
170+
ShardAwareness::QueryNode,
171+
|proxy_uris, translation_map, mut running_proxy| async move {
172+
// DB preparation phase
173+
let session: Session = SessionBuilder::new()
174+
.known_node(proxy_uris[0].as_str())
175+
.address_translator(Arc::new(translation_map.clone()))
176+
// Important in order to have a predictable amount of connections after session creation.
177+
// Shard connections are created asynchronously, so it's hard to predict how many will be opened
178+
// already when we check schema agreement.
179+
.pool_size(PoolSize::PerHost(1.try_into().unwrap()))
180+
.build()
181+
.await
182+
.unwrap();
183+
184+
let node_rules = Some(vec![RequestRule(
185+
Condition::not(Condition::ConnectionRegisteredAnyEvent)
186+
.and(Condition::RequestOpcode(RequestOpcode::Query))
187+
.and(Condition::BodyContainsCaseSensitive(Box::new(
188+
*b"system.local",
189+
)))
190+
.and(Condition::TrueForLimitedTimes(1)),
191+
// Use error that would prevent DefaultRetryPolicy from retrying.
192+
// I don't think it is used for those queries, but it's additional future-proofing
193+
// for the test.
194+
RequestReaction::forge_with_error(DbError::SyntaxError),
195+
)]);
196+
197+
// First, a sanity check for proxy rules.
198+
// If for each node first request fails (and subsequent requests succeed),
199+
// then first schema agreement check should return error, and second should succeed.
200+
// Note that this is only true because we configured the session with 1-connection-per-node.
201+
info!("Starting phase 1 - sanity check");
202+
{
203+
running_proxy
204+
.running_nodes
205+
.iter_mut()
206+
.for_each(|node| node.change_request_rules(node_rules.clone()));
207+
208+
// The important check: first call should error out, second one should succeed.
209+
session.check_schema_agreement().await.unwrap_err();
210+
session.check_schema_agreement().await.unwrap();
211+
212+
running_proxy
213+
.running_nodes
214+
.iter_mut()
215+
.for_each(|node| node.change_request_rules(Some(vec![])));
216+
}
217+
218+
// Now let's check that awaiting schema agreement doesn't bail on error.
219+
// I'll use the same proxy rules as before, so first check will error out.
220+
info!("Starting phase 2 - main test");
221+
{
222+
running_proxy
223+
.running_nodes
224+
.iter_mut()
225+
.for_each(|node| node.change_request_rules(node_rules.clone()));
226+
227+
session.await_schema_agreement().await.unwrap();
228+
}
229+
230+
running_proxy
231+
},
232+
)
233+
.await;
234+
235+
match res {
236+
Ok(()) => (),
237+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
238+
Err(err) => panic!("{}", err),
239+
}
240+
}

0 commit comments

Comments
 (0)