Skip to content

Commit ed4f9b9

Browse files
committed
store: Use AsyncPgConnection in remaining places in copy
1 parent c915a0b commit ed4f9b9

File tree

2 files changed

+15
-15
lines changed

2 files changed

+15
-15
lines changed

store/postgres/src/copy.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ struct CopyState {
120120

121121
impl CopyState {
122122
async fn new(
123-
conn: &mut PgConnection,
123+
conn: &mut AsyncPgConnection,
124124
primary: Primary,
125125
src: Arc<Layout>,
126126
dst: Arc<Layout>,
@@ -170,7 +170,7 @@ impl CopyState {
170170
}
171171

172172
async fn load(
173-
conn: &mut PgConnection,
173+
conn: &mut AsyncPgConnection,
174174
primary: Primary,
175175
src: Arc<Layout>,
176176
dst: Arc<Layout>,
@@ -190,7 +190,7 @@ impl CopyState {
190190
}
191191

192192
async fn create(
193-
conn: &mut PgConnection,
193+
conn: &mut AsyncPgConnection,
194194
primary: Primary,
195195
src: Arc<Layout>,
196196
dst: Arc<Layout>,
@@ -255,7 +255,7 @@ impl CopyState {
255255
self.dst.site.shard != self.src.site.shard
256256
}
257257

258-
async fn finished(&self, conn: &mut PgConnection) -> Result<(), StoreError> {
258+
async fn finished(&self, conn: &mut AsyncPgConnection) -> Result<(), StoreError> {
259259
use copy_state as cs;
260260

261261
update(cs::table.filter(cs::dst.eq(self.dst.site.id)))
@@ -331,7 +331,7 @@ struct TableState {
331331

332332
impl TableState {
333333
async fn init(
334-
conn: &mut PgConnection,
334+
conn: &mut AsyncPgConnection,
335335
primary: Primary,
336336
dst_site: Arc<Site>,
337337
src_layout: &Layout,
@@ -357,7 +357,7 @@ impl TableState {
357357
}
358358

359359
async fn load(
360-
conn: &mut PgConnection,
360+
conn: &mut AsyncPgConnection,
361361
primary: Primary,
362362
src_layout: &Layout,
363363
dst_layout: &Layout,
@@ -435,7 +435,7 @@ impl TableState {
435435

436436
async fn record_progress(
437437
&mut self,
438-
conn: &mut PgConnection,
438+
conn: &mut AsyncPgConnection,
439439
elapsed: Duration,
440440
) -> Result<(), StoreError> {
441441
use copy_table_state as cts;
@@ -472,7 +472,7 @@ impl TableState {
472472
Ok(())
473473
}
474474

475-
async fn record_finished(&self, conn: &mut PgConnection) -> Result<(), StoreError> {
475+
async fn record_finished(&self, conn: &mut AsyncPgConnection) -> Result<(), StoreError> {
476476
use copy_table_state as cts;
477477

478478
update(
@@ -486,7 +486,7 @@ impl TableState {
486486
Ok(())
487487
}
488488

489-
async fn is_cancelled(&self, conn: &mut PgConnection) -> Result<bool, StoreError> {
489+
async fn is_cancelled(&self, conn: &mut AsyncPgConnection) -> Result<bool, StoreError> {
490490
let dst = self.dst_site.as_ref();
491491
let canceled = self.primary.is_copy_cancelled(dst).await?;
492492
if canceled {
@@ -500,7 +500,7 @@ impl TableState {
500500
Ok(canceled)
501501
}
502502

503-
async fn copy_batch(&mut self, conn: &mut PgConnection) -> Result<Status, StoreError> {
503+
async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result<Status, StoreError> {
504504
let (duration, count) = self
505505
.batcher
506506
.step(async |start, end| {
@@ -529,7 +529,7 @@ impl TableState {
529529

530530
async fn set_batch_size(
531531
&mut self,
532-
conn: &mut PgConnection,
532+
conn: &mut AsyncPgConnection,
533533
size: usize,
534534
) -> Result<(), StoreError> {
535535
use copy_table_state as cts;

store/postgres/src/vid_batcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use graph::{
1313

1414
use crate::{
1515
catalog,
16-
pool::PgConnection,
1716
primary::Namespace,
1817
relational::{Table, VID_COLUMN},
1918
AsyncPgConnection,
@@ -277,7 +276,7 @@ impl VidRange {
277276

278277
/// Return the full range of `vid` values in the table `src`
279278
pub async fn for_copy(
280-
conn: &mut PgConnection,
279+
conn: &mut AsyncPgConnection,
281280
src: &Table,
282281
target_block: &BlockPtr,
283282
) -> Result<Self, StoreError> {
@@ -286,7 +285,7 @@ impl VidRange {
286285
} else {
287286
"lower(block_range) <= $1"
288287
};
289-
let vid_range = diesel::RunQueryDsl::load::<VidRange>(
288+
let vid_range = diesel_async::RunQueryDsl::load::<VidRange>(
290289
sql_query(format!(
291290
"/* controller=copy,target={target_number} */ \
292291
select coalesce(min(vid), 0) as min_vid, \
@@ -298,7 +297,8 @@ impl VidRange {
298297
))
299298
.bind::<Integer, _>(&target_block.number),
300299
conn,
301-
)?
300+
)
301+
.await?
302302
.pop()
303303
.unwrap_or(EMPTY_VID_RANGE);
304304
Ok(vid_range)

0 commit comments

Comments
 (0)