Skip to content

Commit bee06d0

Browse files
committed
store: Refactor TableState.load
We need to get VidBatcher.load out of the sync closure
1 parent caca1c1 commit bee06d0

File tree

1 file changed

+31
-38
lines changed

1 file changed

+31
-38
lines changed

store/postgres/src/copy.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ impl TableState {
379379
.map(|table| table.clone())
380380
}
381381

382-
cts::table
382+
let mut states = Vec::new();
383+
for (id, entity_type, current_vid, target_vid, size, duration_ms) in cts::table
383384
.filter(cts::dst.eq(dst_layout.site.id))
384385
.select((
385386
cts::id,
@@ -392,43 +393,35 @@ impl TableState {
392393
.order_by(cts::entity_type)
393394
.load::<(i32, String, i64, i64, i64, i64)>(conn)?
394395
.into_iter()
395-
.map(
396-
|(id, entity_type, current_vid, target_vid, size, duration_ms)| {
397-
let entity_type = src_layout.input_schema.entity_type(&entity_type)?;
398-
let src =
399-
resolve_entity(src_layout, "source", &entity_type, dst_layout.site.id, id);
400-
let dst = resolve_entity(
401-
dst_layout,
402-
"destination",
403-
&entity_type,
404-
dst_layout.site.id,
405-
id,
406-
);
407-
match (src, dst) {
408-
(Ok(src), Ok(dst)) => {
409-
let batcher = VidBatcher::load(
410-
conn,
411-
&src_layout.site.namespace,
412-
&src,
413-
VidRange::new(current_vid, target_vid),
414-
)?
415-
.with_batch_size(size as usize);
416-
417-
Ok(TableState {
418-
primary: primary.cheap_clone(),
419-
src,
420-
dst,
421-
dst_site: dst_layout.site.clone(),
422-
batcher,
423-
duration_ms,
424-
})
425-
}
426-
(Err(e), _) => Err(e),
427-
(_, Err(e)) => Err(e),
428-
}
429-
},
430-
)
431-
.collect()
396+
{
397+
let entity_type = src_layout.input_schema.entity_type(&entity_type)?;
398+
let src = resolve_entity(src_layout, "source", &entity_type, dst_layout.site.id, id)?;
399+
let dst = resolve_entity(
400+
dst_layout,
401+
"destination",
402+
&entity_type,
403+
dst_layout.site.id,
404+
id,
405+
)?;
406+
let batcher = VidBatcher::load(
407+
conn,
408+
&src_layout.site.namespace,
409+
&src,
410+
VidRange::new(current_vid, target_vid),
411+
)?
412+
.with_batch_size(size as usize);
413+
414+
let state = TableState {
415+
primary: primary.cheap_clone(),
416+
src,
417+
dst,
418+
dst_site: dst_layout.site.clone(),
419+
batcher,
420+
duration_ms,
421+
};
422+
states.push(state);
423+
}
424+
Ok(states)
432425
}
433426

434427
fn record_progress(

0 commit comments

Comments
 (0)