Skip to content

Commit f3a6c2c

Browse files
committed
store: Asyncify catalog::histogram_bounds
1 parent bee06d0 commit f3a6c2c

File tree

4 files changed

+32
-26
lines changed

4 files changed

+32
-26
lines changed

store/postgres/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ fn pg_stats_has_range_bounds_histogram(conn: &mut PgConnection) -> Result<bool,
10611061
.map_err(StoreError::from)
10621062
}
10631063

1064-
pub(crate) fn histogram_bounds(
1064+
pub(crate) async fn histogram_bounds(
10651065
conn: &mut PgConnection,
10661066
namespace: &Namespace,
10671067
table: &SqlName,

store/postgres/src/copy.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ struct CopyState {
121121
}
122122

123123
impl CopyState {
124-
fn new(
124+
async fn new(
125125
conn: &mut PgConnection,
126126
primary: Primary,
127127
src: Arc<Layout>,
@@ -162,22 +162,22 @@ impl CopyState {
162162
src.site.id
163163
));
164164
}
165-
Self::load(conn, primary, src, dst, target_block)
165+
Self::load(conn, primary, src, dst, target_block).await
166166
}
167-
None => Self::create(conn, primary.cheap_clone(), src, dst, target_block),
167+
None => Self::create(conn, primary.cheap_clone(), src, dst, target_block).await,
168168
}?;
169169

170170
Ok(state)
171171
}
172172

173-
fn load(
173+
async fn load(
174174
conn: &mut PgConnection,
175175
primary: Primary,
176176
src: Arc<Layout>,
177177
dst: Arc<Layout>,
178178
target_block: BlockPtr,
179179
) -> Result<CopyState, StoreError> {
180-
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref())?;
180+
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?;
181181
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
182182
tables.into_iter().partition(|table| table.finished());
183183
unfinished.sort_by_key(|table| table.dst.object.to_string());
@@ -190,7 +190,7 @@ impl CopyState {
190190
})
191191
}
192192

193-
fn create(
193+
async fn create(
194194
conn: &mut PgConnection,
195195
primary: Primary,
196196
src: Arc<Layout>,
@@ -212,15 +212,18 @@ impl CopyState {
212212
let mut unfinished = Vec::new();
213213
for dst_table in dst.tables.values() {
214214
if let Some(src_table) = src.table_for_entity(&dst_table.object).ok() {
215-
unfinished.push(TableState::init(
216-
conn,
217-
primary.cheap_clone(),
218-
dst.site.clone(),
219-
&src,
220-
src_table.clone(),
221-
dst_table.clone(),
222-
&target_block,
223-
)?);
215+
unfinished.push(
216+
TableState::init(
217+
conn,
218+
primary.cheap_clone(),
219+
dst.site.clone(),
220+
&src,
221+
src_table.clone(),
222+
dst_table.clone(),
223+
&target_block,
224+
)
225+
.await?,
226+
);
224227
}
225228
}
226229
unfinished.sort_by_key(|table| table.dst.object.to_string());
@@ -324,7 +327,7 @@ struct TableState {
324327
}
325328

326329
impl TableState {
327-
fn init(
330+
async fn init(
328331
conn: &mut PgConnection,
329332
primary: Primary,
330333
dst_site: Arc<Site>,
@@ -334,7 +337,8 @@ impl TableState {
334337
target_block: &BlockPtr,
335338
) -> Result<Self, StoreError> {
336339
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
337-
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
340+
let batcher =
341+
VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range).await?;
338342
Ok(Self {
339343
primary,
340344
src,
@@ -349,7 +353,7 @@ impl TableState {
349353
self.batcher.finished()
350354
}
351355

352-
fn load(
356+
async fn load(
353357
conn: &mut PgConnection,
354358
primary: Primary,
355359
src_layout: &Layout,
@@ -408,7 +412,8 @@ impl TableState {
408412
&src_layout.site.namespace,
409413
&src,
410414
VidRange::new(current_vid, target_vid),
411-
)?
415+
)
416+
.await?
412417
.with_batch_size(size as usize);
413418

414419
let state = TableState {
@@ -1128,7 +1133,7 @@ impl Connection {
11281133
let primary = self.primary.cheap_clone();
11291134
let mut state = self
11301135
.transaction(|conn| {
1131-
async { CopyState::new(conn, primary, src, dst, target_block) }.scope_boxed()
1136+
CopyState::new(conn, primary, src, dst, target_block).scope_boxed()
11321137
})?
11331138
.await?;
11341139

store/postgres/src/relational/prune.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl TablePair {
106106

107107
// Determine the last vid that we need to copy
108108
let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?;
109-
let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?;
109+
let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range).await?;
110110
tracker.start_copy_final(conn, &self.src, range).await?;
111111

112112
while !batcher.finished() {
@@ -174,7 +174,7 @@ impl TablePair {
174174

175175
// Determine the last vid that we need to copy
176176
let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?;
177-
let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?;
177+
let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range).await?;
178178
tracker.start_copy_nonfinal(conn, &self.src, range).await?;
179179

180180
while !batcher.finished() {
@@ -479,7 +479,8 @@ impl Layout {
479479
// Delete all entity versions whose range was closed
480480
// before `req.earliest_block`
481481
let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?;
482-
let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?;
482+
let mut batcher =
483+
VidBatcher::load(conn, &self.site.namespace, &table, range).await?;
483484

484485
tracker.start_delete(conn, table, range, &batcher).await?;
485486
while !batcher.finished() {

store/postgres/src/vid_batcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ impl VidBatcher {
118118
/// The `vid_range` is inclusive, i.e., the batcher will iterate over
119119
/// all vids `vid_range.0 <= vid <= vid_range.1`; for an empty table,
120120
/// the `vid_range` must be set to `(-1, 0)`
121-
pub fn load(
121+
pub async fn load(
122122
conn: &mut PgConnection,
123123
nsp: &Namespace,
124124
table: &Table,
125125
vid_range: VidRange,
126126
) -> Result<Self, StoreError> {
127-
let bounds = catalog::histogram_bounds(conn, nsp, &table.name, VID_COLUMN)?;
127+
let bounds = catalog::histogram_bounds(conn, nsp, &table.name, VID_COLUMN).await?;
128128
let batch_size = AdaptiveBatchSize::new(table);
129129
Self::new(bounds, vid_range, batch_size)
130130
}

0 commit comments

Comments
 (0)