@@ -23,11 +23,14 @@ use std::{
2323} ;
2424
2525use diesel:: {
26- connection :: SimpleConnection as _ , dsl:: sql, insert_into, select, sql_query, update,
27- ExpressionMethods , OptionalExtension , QueryDsl ,
26+ dsl:: sql, insert_into, select, sql_query, update, ExpressionMethods , OptionalExtension ,
27+ QueryDsl ,
2828} ;
29- use diesel_async:: scoped_futures:: { ScopedBoxFuture , ScopedFutureExt } ;
30- use diesel_async:: RunQueryDsl ;
29+ use diesel_async:: {
30+ scoped_futures:: { ScopedBoxFuture , ScopedFutureExt } ,
31+ AsyncConnection ,
32+ } ;
33+ use diesel_async:: { RunQueryDsl , SimpleAsyncConnection } ;
3134use graph:: {
3235 futures03:: {
3336 future:: { select_all, BoxFuture } ,
@@ -46,12 +49,11 @@ use itertools::Itertools;
4649use crate :: {
4750 advisory_lock, catalog, deployment,
4851 dynds:: DataSourcesTable ,
49- pool:: PgConnection ,
5052 primary:: { DeploymentId , Primary , Site } ,
5153 relational:: { index:: IndexList , Layout , Table } ,
5254 relational_queries as rq,
5355 vid_batcher:: { VidBatcher , VidRange } ,
54- AsyncConnection , AsyncPgConnection , ConnectionPool ,
56+ AsyncPgConnection , ConnectionPool ,
5557} ;
5658
5759const LOG_INTERVAL : Duration = Duration :: from_secs ( 3 * 60 ) ;
@@ -690,12 +692,12 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
690692/// This struct helps us with that. It wraps a connection and tracks whether
691693/// the connection was used to acquire the copy lock
692694struct LockTrackingConnection {
693- inner : PgConnection ,
695+ inner : AsyncPgConnection ,
694696 has_lock : bool ,
695697}
696698
697699impl LockTrackingConnection {
698- fn new ( inner : PgConnection ) -> Self {
700+ fn new ( inner : AsyncPgConnection ) -> Self {
699701 Self {
700702 inner,
701703 has_lock : false ,
@@ -804,10 +806,10 @@ impl CopyTableWorker {
804806 }
805807
806808 match conn
807- . transaction_async ( |conn| {
809+ . transaction ( |conn| {
808810 async {
809811 if let Some ( timeout) = BATCH_STATEMENT_TIMEOUT . as_ref ( ) {
810- conn. batch_execute ( timeout) ?;
812+ conn. batch_execute ( timeout) . await ?;
811813 }
812814 self . table . copy_batch ( conn) . await
813815 }
@@ -850,7 +852,7 @@ impl CopyTableWorker {
850852 // that is hard to predict. This mechanism ensures
851853 // that if our estimation is wrong, the consequences
852854 // aren't too severe.
853- conn. transaction_async ( |conn| self . table . set_batch_size ( conn, 1 ) . scope_boxed ( ) )
855+ conn. transaction ( |conn| self . table . set_batch_size ( conn, 1 ) . scope_boxed ( ) )
854856 . await ?;
855857 }
856858 } ;
@@ -972,7 +974,7 @@ impl Connection {
972974
973975 let mut last_log = Instant :: now ( ) ;
974976 let conn = pool
975- . get_fdw_async ( & logger, || {
977+ . get_fdw ( & logger, || {
976978 if last_log. elapsed ( ) > LOG_INTERVAL {
977979 info ! ( & logger, "waiting for other copy operations to finish" ) ;
978980 last_log = Instant :: now ( ) ;
@@ -1005,7 +1007,9 @@ impl Connection {
10051007 callback : F ,
10061008 ) -> Result < BoxFuture < ' conn , Result < R , StoreError > > , StoreError >
10071009 where
1008- F : for < ' r > FnOnce ( & ' r mut PgConnection ) -> ScopedBoxFuture < ' a , ' r , Result < R , StoreError > >
1010+ F : for < ' r > FnOnce (
1011+ & ' r mut AsyncPgConnection ,
1012+ ) -> ScopedBoxFuture < ' a , ' r , Result < R , StoreError > >
10091013 + Send
10101014 + ' a ,
10111015 R : Send + ' a ,
@@ -1017,7 +1021,7 @@ impl Connection {
10171021 ) ) ;
10181022 } ;
10191023 let conn = & mut conn. inner ;
1020- Ok ( conn. transaction_async ( |conn| async { callback ( conn) . await } . scope_boxed ( ) ) )
1024+ Ok ( conn. transaction ( |conn| callback ( conn) . scope_boxed ( ) ) )
10211025 }
10221026
10231027 /// Copy private data sources if the source uses a schema version that
@@ -1080,7 +1084,7 @@ impl Connection {
10801084 // we remove the table from the state and could drop it otherwise
10811085 let Some ( conn) = self
10821086 . pool
1083- . try_get_fdw_async ( & self . logger , ENV_VARS . store . batch_worker_wait )
1087+ . try_get_fdw ( & self . logger , ENV_VARS . store . batch_worker_wait )
10841088 . await
10851089 else {
10861090 return None ;
0 commit comments