Skip to content

Commit 2595e31

Browse files
goffrieConvex, Inc.
authored andcommitted
Keep the _session_requests deletion cursor across runs of the cleanup worker (#39094)
GitOrigin-RevId: 14326de623c4072a119fd260a77ef89b7e671f1b
1 parent 4f7ebe3 commit 2595e31

File tree

1 file changed

+19
-13
lines changed
  • crates/application/src/system_table_cleanup

1 file changed

+19
-13
lines changed

crates/application/src/system_table_cleanup/mod.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
122122
self.runtime.clone(),
123123
Quota::per_second(*SYSTEM_TABLE_ROWS_PER_SECOND),
124124
);
125+
let mut session_requests_delete_cursor = None;
125126
loop {
126127
// Jitter the wait between deletion runs to even out load.
127128
let delay = SYSTEM_TABLE_CLEANUP_FREQUENCY.mul_f32(self.runtime.rng().random());
@@ -140,15 +141,19 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
140141
},
141142
None => None,
142143
};
143-
self.cleanup_system_table(
144-
TableNamespace::Global,
145-
&SESSION_REQUESTS_TABLE,
146-
session_requests_cutoff
147-
.map_or(CreationTimeInterval::None, CreationTimeInterval::Before),
148-
&rate_limiter,
149-
*SESSION_CLEANUP_DELETE_CONCURRENCY,
150-
)
151-
.await?;
144+
// Preserve the deletion cursor between runs. This helps skip index tombstones.
145+
// Note that we only update the cursor after a successful run.
146+
(_, session_requests_delete_cursor) = self
147+
.cleanup_system_table(
148+
TableNamespace::Global,
149+
&SESSION_REQUESTS_TABLE,
150+
session_requests_cutoff
151+
.map_or(CreationTimeInterval::None, CreationTimeInterval::Before),
152+
&rate_limiter,
153+
*SESSION_CLEANUP_DELETE_CONCURRENCY,
154+
session_requests_delete_cursor,
155+
)
156+
.await?;
152157
}
153158
}
154159

@@ -281,9 +286,9 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
281286
to_delete: CreationTimeInterval,
282287
rate_limiter: &RateLimiter<RT>,
283288
num_deleters: usize,
284-
) -> anyhow::Result<usize> {
289+
mut cursor: Option<(CreationTime, ResolvedDocumentId)>,
290+
) -> anyhow::Result<(usize, Option<(CreationTime, ResolvedDocumentId)>)> {
285291
let _timer = system_table_cleanup_timer();
286-
let mut cursor = None;
287292

288293
let (tx, rx) = tokio::sync::mpsc::channel(1);
289294
let deleter = |chunk: Vec<ResolvedDocumentId>| async {
@@ -320,7 +325,7 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
320325
};
321326

322327
let ((), deleted) = futures::try_join!(reader, deleters)?;
323-
Ok(deleted)
328+
Ok((deleted, cursor))
324329
}
325330

326331
async fn cleanup_system_table_read_chunk(
@@ -521,13 +526,14 @@ mod tests {
521526
let rate_limiter =
522527
new_rate_limiter(rt.clone(), Quota::per_second(NonZeroU32::new(10).unwrap()));
523528

524-
let deleted = worker
529+
let (deleted, _cursor) = worker
525530
.cleanup_system_table(
526531
TableNamespace::Global,
527532
&SESSION_REQUESTS_TABLE,
528533
CreationTimeInterval::Before(cutoff),
529534
&rate_limiter,
530535
num_deleters,
536+
None,
531537
)
532538
.await?;
533539
assert_eq!(deleted, 3);

0 commit comments

Comments
 (0)