Skip to content

Commit 1a6b2a5

Browse files
authored
fix(epoxy): handle incomplete ranges in epoxy recover key (#3296)
1 parent d3a9d93 commit 1a6b2a5

File tree

1 file changed

+54
-20
lines changed
  • engine/packages/epoxy/src/workflows/replica

1 file changed

+54
-20
lines changed

engine/packages/epoxy/src/workflows/replica/setup.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,14 @@ pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Resu
8484
#[derive(Serialize, Deserialize)]
8585
struct RecoverState {
8686
after_key: Option<Vec<u8>>,
87+
incomplete_key_range_start: Option<Vec<u8>>,
8788
total_recovered_keys: u64,
8889
}
8990

9091
ctx.loope(
9192
RecoverState {
9293
after_key: None,
94+
incomplete_key_range_start: None,
9395
total_recovered_keys: 0,
9496
},
9597
|ctx, state| {
@@ -100,12 +102,16 @@ pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Resu
100102
.activity(RecoverKeysChunkInput {
101103
learning_config,
102104
after_key: state.after_key.clone(),
105+
incomplete_key_range_start: state.incomplete_key_range_start.clone(),
103106
count: crate::consts::RECOVER_KEY_CHUNK_SIZE,
104107
total_recovered_keys: state.total_recovered_keys,
105108
})
106109
.await?;
107110

108111
// Update state for next iteration
112+
// Store incomplete_key_range_start for next iteration if present
113+
state.incomplete_key_range_start = output.incomplete_key_range_start.clone();
114+
109115
if let Some(last_key) = output.last_key {
110116
state.after_key = Some(last_key);
111117
state.total_recovered_keys += output.recovered_count;
@@ -343,6 +349,9 @@ pub struct RecoverKeysChunkInput {
343349
pub learning_config: types::ClusterConfig,
344350
/// The last key value from the previous chunk, used for pagination
345351
pub after_key: Option<Vec<u8>>,
352+
/// A key that wasn't fully processed in the previous chunk due to too many instances.
353+
/// When set, the range will start FROM this key instead of skipping past after_key.
354+
pub incomplete_key_range_start: Option<Vec<u8>>,
346355
pub count: u64,
347356
pub total_recovered_keys: u64,
348357
}
@@ -357,6 +366,10 @@ pub struct RecoverKeysChunkOutput {
357366
pub last_key: Option<Vec<u8>>,
358367
/// Number of keys recovered in this chunk
359368
pub recovered_count: u64,
369+
/// If set, indicates we didn't finish this chunk due to encountering a key with too many instances.
370+
///
371+
/// The next iteration should start FROM this key instead of skipping past last_key.
372+
pub incomplete_key_range_start: Option<Vec<u8>>,
360373
}
361374

362375
/// Recovers committed values for a chunk of keys after downloading all log entries.
@@ -387,10 +400,11 @@ pub async fn recover_keys_chunk(
387400
"recovering keys chunk"
388401
);
389402

390-
let (last_key, recovered_count) = ctx
403+
let (last_key, recovered_count, incomplete_key_range_start) = ctx
391404
.udb()?
392405
.run(|tx| {
393406
let after_key = input.after_key.clone();
407+
let incomplete_key_range_start = input.incomplete_key_range_start.clone();
394408
let count = input.count;
395409

396410
async move {
@@ -401,18 +415,26 @@ pub async fn recover_keys_chunk(
401415
crate::keys::replica::KeyInstanceKey::subspace_for_all_keys();
402416
let prefix = subspace.pack(&key_instance_all);
403417

404-
// Build range start key - either from after_key or from the beginning
405-
let begin_key = if let Some(after_key) = &after_key {
406-
// Skip past all KeyInstance entries for the last processed key.
407-
let key_instance_subspace =
408-
crate::keys::replica::KeyInstanceKey::subspace(after_key.clone());
409-
let mut key_after_all_instances = subspace.pack(&key_instance_subspace);
410-
// Append 0xFF to get past all instances for this key
411-
key_after_all_instances.push(0xFF);
412-
KeySelector::first_greater_or_equal(key_after_all_instances)
413-
} else {
414-
KeySelector::first_greater_or_equal(prefix.clone())
415-
};
418+
// Build range start key
419+
let begin_key =
420+
if let Some(incomplete_key_range_start) = &incomplete_key_range_start {
421+
// Start FROM this incomplete key (don't skip past it)
422+
let key_instance_subspace = crate::keys::replica::KeyInstanceKey::subspace(
423+
incomplete_key_range_start.clone(),
424+
);
425+
let key_start = subspace.pack(&key_instance_subspace);
426+
KeySelector::first_greater_or_equal(key_start)
427+
} else if let Some(after_key) = &after_key {
428+
// Skip past all KeyInstance entries for the last processed key
429+
let key_instance_subspace =
430+
crate::keys::replica::KeyInstanceKey::subspace(after_key.clone());
431+
let mut key_after_all_instances = subspace.pack(&key_instance_subspace);
432+
// Append 0xFF to get past all instances for this key
433+
key_after_all_instances.push(0xFF);
434+
KeySelector::first_greater_or_equal(key_after_all_instances)
435+
} else {
436+
KeySelector::first_greater_or_equal(prefix.clone())
437+
};
416438

417439
// Build range end key - after all KEY_INSTANCE entries
418440
let mut end_prefix = prefix.clone();
@@ -498,23 +520,34 @@ pub async fn recover_keys_chunk(
498520

499521
// If no keys were processed despite scanning {count} instances,
500522
// it means a single key has too many instances (i.e. larger than
501-
// the range limit)
502-
if recovered_count == 0 && scanned_count >= count {
503-
bail!(
504-
"single key has more than {count} instances, cannot process in one chunk. key: {:?}",
505-
current_key,
523+
// the range limit). Return incomplete_key_range_start so the workflow can retry from this position.
524+
let incomplete_key_range_start = if recovered_count == 0 && scanned_count >= count {
525+
tracing::warn!(
526+
?replica_id,
527+
?current_key,
528+
scanned_count,
529+
count,
530+
"encountered key with more than {count} instances, will retry from this position"
506531
);
507-
}
532+
current_key
533+
} else {
534+
None
535+
};
508536

509537
tracing::info!(
510538
?replica_id,
511539
recovered_count,
512540
scanned_count,
541+
?incomplete_key_range_start,
513542
"recovered keys in chunk"
514543
);
515544

516545
// Return the last key value for pagination
517-
Result::Ok((last_processed_key, recovered_count))
546+
Result::Ok((
547+
last_processed_key,
548+
recovered_count,
549+
incomplete_key_range_start,
550+
))
518551
}
519552
})
520553
.custom_instrument(tracing::info_span!("recover_keys_chunk_tx"))
@@ -523,6 +556,7 @@ pub async fn recover_keys_chunk(
523556
Ok(RecoverKeysChunkOutput {
524557
last_key,
525558
recovered_count,
559+
incomplete_key_range_start,
526560
})
527561
}
528562

0 commit comments

Comments
 (0)