Skip to content

Commit abc6081

Browse files
committed
store: Asyncify relational::prune::status::Tracker
1 parent 2d03de8 commit abc6081

File tree

1 file changed

+30
-23
lines changed

1 file changed

+30
-23
lines changed

store/postgres/src/relational/prune.rs

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl TablePair {
107107
// Determine the last vid that we need to copy
108108
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;
109109
let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?;
110-
tracker.start_copy_final(conn, &self.src, range)?;
110+
tracker.start_copy_final(conn, &self.src, range).await?;
111111

112112
while !batcher.finished() {
113113
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
@@ -145,7 +145,9 @@ impl TablePair {
145145
})
146146
.await?;
147147
let rows = rows.unwrap_or(0);
148-
tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?;
148+
tracker
149+
.finish_batch(conn, &self.src, rows as i64, &batcher)
150+
.await?;
149151
cancel.check_cancel()?;
150152

151153
reporter.prune_batch(
@@ -173,7 +175,7 @@ impl TablePair {
173175
// Determine the last vid that we need to copy
174176
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;
175177
let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?;
176-
tracker.start_copy_nonfinal(conn, &self.src, range)?;
178+
tracker.start_copy_nonfinal(conn, &self.src, range).await?;
177179

178180
while !batcher.finished() {
179181
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
@@ -204,7 +206,9 @@ impl TablePair {
204206
}.scope_boxed()}).await?;
205207
let rows = rows.unwrap_or(0);
206208

207-
tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?;
209+
tracker
210+
.finish_batch(conn, &self.src, rows as i64, &batcher)
211+
.await?;
208212

209213
reporter.prune_batch(
210214
self.src.name.as_str(),
@@ -385,7 +389,7 @@ impl Layout {
385389
req: &PruneRequest,
386390
cancel: &CancelHandle,
387391
) -> Result<(), CancelableError<StoreError>> {
388-
let tracker = status::Tracker::new(conn, self.clone())?;
392+
let tracker = status::Tracker::new(conn, self.clone()).await?;
389393

390394
let res = self
391395
.prune_inner(logger, reporter, conn, req, cancel, &tracker)
@@ -423,7 +427,7 @@ impl Layout {
423427
let mut recreate_dst_nsp = true;
424428
for (table, strat) in &prunable_tables {
425429
reporter.start_table(table.name.as_str());
426-
tracker.start_table(conn, table)?;
430+
tracker.start_table(conn, table).await?;
427431
match strat {
428432
PruningStrategy::Rebuild => {
429433
if recreate_dst_nsp {
@@ -477,7 +481,7 @@ impl Layout {
477481
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
478482
let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?;
479483

480-
tracker.start_delete(conn, table, range, &batcher)?;
484+
tracker.start_delete(conn, table, range, &batcher).await?;
481485
while !batcher.finished() {
482486
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
483487
async move {
@@ -496,7 +500,9 @@ impl Layout {
496500
}.scope_boxed()}).await?;
497501
let rows = rows.unwrap_or(0);
498502

499-
tracker.finish_batch(conn, table, -(rows as i64), &batcher)?;
503+
tracker
504+
.finish_batch(conn, table, -(rows as i64), &batcher)
505+
.await?;
500506

501507
reporter.prune_batch(
502508
table.name.as_str(),
@@ -508,7 +514,7 @@ impl Layout {
508514
}
509515
}
510516
reporter.finish_table(table.name.as_str());
511-
tracker.finish_table(conn, table)?;
517+
tracker.finish_table(conn, table).await?;
512518
}
513519
if !recreate_dst_nsp {
514520
catalog::drop_schema(conn, dst_nsp.as_str())?;
@@ -750,7 +756,7 @@ mod status {
750756
}
751757

752758
impl Tracker {
753-
pub(super) fn new(conn: &mut PgConnection, layout: Arc<Layout>) -> StoreResult<Self> {
759+
pub(super) async fn new(conn: &mut PgConnection, layout: Arc<Layout>) -> StoreResult<Self> {
754760
use prune_state as ps;
755761
let run = ps::table
756762
.filter(ps::id.eq(layout.site.id))
@@ -819,7 +825,7 @@ mod status {
819825
.await
820826
}
821827

822-
pub(crate) fn start_table(
828+
pub(crate) async fn start_table(
823829
&self,
824830
conn: &mut PgConnection,
825831
table: &Table,
@@ -833,12 +839,13 @@ mod status {
833839
pts::started_at.eq(diesel::dsl::now),
834840
pts::phase.eq(Phase::Started),
835841
),
836-
)?;
842+
)
843+
.await?;
837844

838845
Ok(())
839846
}
840847

841-
pub(crate) fn start_copy_final(
848+
pub(crate) async fn start_copy_final(
842849
&self,
843850
conn: &mut PgConnection,
844851
table: &Table,
@@ -854,10 +861,10 @@ mod status {
854861
pts::rows.eq(0),
855862
);
856863

857-
self.update_table_state(conn, table, values)
864+
self.update_table_state(conn, table, values).await
858865
}
859866

860-
pub(crate) fn start_copy_nonfinal(
867+
pub(crate) async fn start_copy_nonfinal(
861868
&self,
862869
conn: &mut PgConnection,
863870
table: &Table,
@@ -871,10 +878,10 @@ mod status {
871878
pts::next_vid.eq(range.min),
872879
pts::nonfinal_vid.eq(range.max),
873880
);
874-
self.update_table_state(conn, table, values)
881+
self.update_table_state(conn, table, values).await
875882
}
876883

877-
pub(crate) fn finish_batch(
884+
pub(crate) async fn finish_batch(
878885
&self,
879886
conn: &mut PgConnection,
880887
src: &Table,
@@ -889,10 +896,10 @@ mod status {
889896
pts::rows.eq(pts::rows + rows),
890897
);
891898

892-
self.update_table_state(conn, src, values)
899+
self.update_table_state(conn, src, values).await
893900
}
894901

895-
pub(crate) fn finish_table(
902+
pub(crate) async fn finish_table(
896903
&self,
897904
conn: &mut PgConnection,
898905
table: &Table,
@@ -904,10 +911,10 @@ mod status {
904911
pts::phase.eq(Phase::Done),
905912
);
906913

907-
self.update_table_state(conn, table, values)
914+
self.update_table_state(conn, table, values).await
908915
}
909916

910-
pub(crate) fn start_delete(
917+
pub(crate) async fn start_delete(
911918
&self,
912919
conn: &mut PgConnection,
913920
table: &Table,
@@ -926,10 +933,10 @@ mod status {
926933
pts::batch_size.eq(batcher.batch_size() as i64),
927934
);
928935

929-
self.update_table_state(conn, table, values)
936+
self.update_table_state(conn, table, values).await
930937
}
931938

932-
fn update_table_state<V, C>(
939+
async fn update_table_state<V, C>(
933940
&self,
934941
conn: &mut PgConnection,
935942
table: &Table,

0 commit comments

Comments
 (0)