Skip to content

Commit d3a9d93

Browse files
authored
fix(epoxy): fix replica recovery to skip old ballots (#3295)
* chore(guard): add metadata to rate limit error * chore(tracing-utils): add tracing utils for custom instrument * feat(epoxy): add ability to get & set coordinator state * fix(epoxy): fix replica recovery to skip old ballots
1 parent 4cdf8f9 commit d3a9d93

File tree

6 files changed

+68
-17
lines changed

6 files changed

+68
-17
lines changed

engine/packages/epoxy/src/consts.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
2828
///
2929
/// Our current impl is incredily slow since we do not parallelize downloads.
3030
///
31-
/// Assume we have 10M keys with a 50k chunk size. This results in 200 round trips.
31+
/// Assume we have 10M keys with a 5k chunk size. This results in 2000 round trips.
3232
///
33-
/// If we have a latency of 200 ms (worst case), this takes 40 seconds total to propagate.
34-
pub const DOWNLOAD_INSTANCE_COUNT: u64 = 50_000;
33+
/// If we have a latency of 200 ms (worst case), this takes 400 seconds total to propagate.
34+
pub const DOWNLOAD_INSTANCE_COUNT: u64 = 5_000;
3535

3636
/// Number of keys to recover in a single chunk during the recovery process.
3737
///

engine/packages/epoxy/src/replica/ballot.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,26 @@ pub fn compare_ballots(
6565
))
6666
}
6767

68+
/// Result of ballot validation with detailed context for error reporting
69+
#[derive(Debug)]
70+
pub struct BallotValidationResult {
71+
pub is_valid: bool,
72+
pub incoming_ballot: protocol::Ballot,
73+
pub stored_ballot: protocol::Ballot,
74+
pub comparison: std::cmp::Ordering,
75+
}
76+
6877
/// Validate that a ballot is the highest seen for the given instance & updates the highest stored
6978
/// ballot if needed.
7079
///
71-
/// Returns true if the ballot is valid (higher than previously seen).
80+
/// Returns detailed validation result including comparison information.
7281
#[tracing::instrument(skip_all)]
7382
pub async fn validate_and_update_ballot_for_instance(
7483
tx: &Transaction,
7584
replica_id: protocol::ReplicaId,
7685
ballot: &protocol::Ballot,
7786
instance: &protocol::Instance,
78-
) -> Result<bool> {
87+
) -> Result<BallotValidationResult> {
7988
let instance_ballot_key =
8089
keys::replica::InstanceBallotKey::new(instance.replica_id, instance.slot_id);
8190
let subspace = keys::subspace(replica_id);
@@ -98,18 +107,24 @@ pub async fn validate_and_update_ballot_for_instance(
98107
};
99108

100109
// Compare incoming ballot with highest seen - only accept if strictly greater
101-
let is_valid = match compare_ballots(ballot, &highest_ballot) {
110+
let comparison = compare_ballots(ballot, &highest_ballot);
111+
let is_valid = match comparison {
102112
std::cmp::Ordering::Greater => true,
103113
std::cmp::Ordering::Equal | std::cmp::Ordering::Less => false,
104114
};
105115

106116
// If the incoming ballot is higher, update our stored highest
107-
if compare_ballots(ballot, &highest_ballot) == std::cmp::Ordering::Greater {
117+
if comparison == std::cmp::Ordering::Greater {
108118
let serialized = instance_ballot_key.serialize(ballot.clone())?;
109119
tx.set(&packed_key, &serialized);
110120

111121
tracing::debug!(?ballot, ?instance, "updated highest ballot for instance");
112122
}
113123

114-
Ok(is_valid)
124+
Ok(BallotValidationResult {
125+
is_valid,
126+
incoming_ballot: ballot.clone(),
127+
stored_ballot: highest_ballot,
128+
comparison,
129+
})
115130
}

engine/packages/epoxy/src/replica/messages/accept.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,17 @@ pub async fn accept(
2121

2222
// Validate ballot
2323
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
24-
let is_valid =
24+
let validation =
2525
ballot::validate_and_update_ballot_for_instance(tx, replica_id, &current_ballot, &instance)
2626
.await?;
27-
ensure!(is_valid, "ballot validation failed for pre_accept");
27+
ensure!(
28+
validation.is_valid,
29+
"ballot validation failed for accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})",
30+
validation.incoming_ballot,
31+
validation.stored_ballot,
32+
instance,
33+
validation.comparison
34+
);
2835

2936
// EPaxos Step 18
3037
let log_entry = protocol::LogEntry {

engine/packages/epoxy/src/replica/messages/pre_accept.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,17 @@ pub async fn pre_accept(
2424

2525
// Validate ballot
2626
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
27-
let is_valid =
27+
let validation =
2828
ballot::validate_and_update_ballot_for_instance(tx, replica_id, &current_ballot, &instance)
2929
.await?;
30-
ensure!(is_valid, "ballot validation failed for pre_accept");
30+
ensure!(
31+
validation.is_valid,
32+
"ballot validation failed for pre_accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})",
33+
validation.incoming_ballot,
34+
validation.stored_ballot,
35+
instance,
36+
validation.comparison
37+
);
3138

3239
// Find interference for this key
3340
let interf = utils::find_interference(tx, replica_id, &proposal.commands).await?;

engine/packages/epoxy/src/replica/messages/prepare.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ pub async fn prepare(
3030
};
3131

3232
// EPaxos Step 38: Validate ballot for this instance
33-
let is_valid =
33+
let validation =
3434
ballot::validate_and_update_ballot_for_instance(tx, replica_id, &ballot, &instance).await?;
35-
36-
let response = if is_valid {
35+
let response = if validation.is_valid {
3736
// EPaxos Step 39: Reply PrepareOK with current log entry data
3837
match current_entry {
3938
Some(entry) => protocol::PrepareResponse::PrepareOk(protocol::PrepareOk {

engine/packages/epoxy/src/workflows/replica/setup.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ pub struct DownloadInstancesChunkOutput {
157157
}
158158

159159
#[activity(DownloadInstancesChunk)]
160+
#[timeout = 300]
161+
#[max_retries = 100]
160162
pub async fn download_instances_chunk(
161163
ctx: &ActivityCtx,
162164
input: &DownloadInstancesChunkInput,
@@ -197,6 +199,8 @@ pub async fn download_instances_chunk(
197199

198200
// Apply each log entry from the downloaded instances
199201
let total_entries = instances.len();
202+
let mut applied_count = 0;
203+
let mut skipped_count = 0;
200204
for (idx, entry) in instances.iter().enumerate() {
201205
tracing::debug!(
202206
progress = format!("{}/{}", idx + 1, total_entries),
@@ -206,9 +210,27 @@ pub async fn download_instances_chunk(
206210
);
207211

208212
// Apply the log entry to replay any uncommitted operations
209-
apply_log_entry(ctx, &entry.log_entry, &entry.instance).await?;
213+
// Skip entries that fail to apply (e.g., ballot validation failures during recovery)
214+
if let Err(err) = apply_log_entry(ctx, &entry.log_entry, &entry.instance).await {
215+
tracing::debug!(
216+
?err,
217+
?entry.instance,
218+
state = ?entry.log_entry.state,
219+
"skipping log entry that failed to apply during recovery"
220+
);
221+
skipped_count += 1;
222+
continue;
223+
}
224+
applied_count += 1;
210225
}
211226

227+
tracing::info!(
228+
total_entries,
229+
applied_count,
230+
skipped_count,
231+
"finished applying log entries for chunk"
232+
);
233+
212234
// Return whether we should continue downloading chunks and the last instance
213235
Ok(DownloadInstancesChunkOutput {
214236
last_instance: instances.last().map(|entry| entry.instance.clone().into()),
@@ -479,7 +501,8 @@ pub async fn recover_keys_chunk(
479501
// the range limit)
480502
if recovered_count == 0 && scanned_count >= count {
481503
bail!(
482-
"single key has more than {count} instances, cannot process in one chunk",
504+
"single key has more than {count} instances, cannot process in one chunk. key: {:?}",
505+
current_key,
483506
);
484507
}
485508

0 commit comments

Comments
 (0)