Skip to content

Commit c24580f

Browse files
committed
store: Use AsyncPgConnection for fdw pool
1 parent b4e6a5b commit c24580f

File tree

3 files changed

+37
-57
lines changed

3 files changed

+37
-57
lines changed

store/postgres/src/copy.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ use std::{
2323
};
2424

2525
use diesel::{
26-
connection::SimpleConnection as _, dsl::sql, insert_into, select, sql_query, update,
27-
ExpressionMethods, OptionalExtension, QueryDsl,
26+
dsl::sql, insert_into, select, sql_query, update, ExpressionMethods, OptionalExtension,
27+
QueryDsl,
2828
};
29-
use diesel_async::scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
30-
use diesel_async::RunQueryDsl;
29+
use diesel_async::{
30+
scoped_futures::{ScopedBoxFuture, ScopedFutureExt},
31+
AsyncConnection,
32+
};
33+
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};
3134
use graph::{
3235
futures03::{
3336
future::{select_all, BoxFuture},
@@ -46,12 +49,11 @@ use itertools::Itertools;
4649
use crate::{
4750
advisory_lock, catalog, deployment,
4851
dynds::DataSourcesTable,
49-
pool::PgConnection,
5052
primary::{DeploymentId, Primary, Site},
5153
relational::{index::IndexList, Layout, Table},
5254
relational_queries as rq,
5355
vid_batcher::{VidBatcher, VidRange},
54-
AsyncConnection, AsyncPgConnection, ConnectionPool,
56+
AsyncPgConnection, ConnectionPool,
5557
};
5658

5759
const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);
@@ -690,12 +692,12 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
690692
/// This struct helps us with that. It wraps a connection and tracks whether
691693
/// the connection was used to acquire the copy lock
692694
struct LockTrackingConnection {
693-
inner: PgConnection,
695+
inner: AsyncPgConnection,
694696
has_lock: bool,
695697
}
696698

697699
impl LockTrackingConnection {
698-
fn new(inner: PgConnection) -> Self {
700+
fn new(inner: AsyncPgConnection) -> Self {
699701
Self {
700702
inner,
701703
has_lock: false,
@@ -804,10 +806,10 @@ impl CopyTableWorker {
804806
}
805807

806808
match conn
807-
.transaction_async(|conn| {
809+
.transaction(|conn| {
808810
async {
809811
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
810-
conn.batch_execute(timeout)?;
812+
conn.batch_execute(timeout).await?;
811813
}
812814
self.table.copy_batch(conn).await
813815
}
@@ -850,7 +852,7 @@ impl CopyTableWorker {
850852
// that is hard to predict. This mechanism ensures
851853
// that if our estimation is wrong, the consequences
852854
// aren't too severe.
853-
conn.transaction_async(|conn| self.table.set_batch_size(conn, 1).scope_boxed())
855+
conn.transaction(|conn| self.table.set_batch_size(conn, 1).scope_boxed())
854856
.await?;
855857
}
856858
};
@@ -972,7 +974,7 @@ impl Connection {
972974

973975
let mut last_log = Instant::now();
974976
let conn = pool
975-
.get_fdw_async(&logger, || {
977+
.get_fdw(&logger, || {
976978
if last_log.elapsed() > LOG_INTERVAL {
977979
info!(&logger, "waiting for other copy operations to finish");
978980
last_log = Instant::now();
@@ -1005,7 +1007,9 @@ impl Connection {
10051007
callback: F,
10061008
) -> Result<BoxFuture<'conn, Result<R, StoreError>>, StoreError>
10071009
where
1008-
F: for<'r> FnOnce(&'r mut PgConnection) -> ScopedBoxFuture<'a, 'r, Result<R, StoreError>>
1010+
F: for<'r> FnOnce(
1011+
&'r mut AsyncPgConnection,
1012+
) -> ScopedBoxFuture<'a, 'r, Result<R, StoreError>>
10091013
+ Send
10101014
+ 'a,
10111015
R: Send + 'a,
@@ -1017,7 +1021,7 @@ impl Connection {
10171021
));
10181022
};
10191023
let conn = &mut conn.inner;
1020-
Ok(conn.transaction_async(|conn| async { callback(conn).await }.scope_boxed()))
1024+
Ok(conn.transaction(|conn| callback(conn).scope_boxed()))
10211025
}
10221026

10231027
/// Copy private data sources if the source uses a schema version that
@@ -1080,7 +1084,7 @@ impl Connection {
10801084
// we remove the table from the state and could drop it otherwise
10811085
let Some(conn) = self
10821086
.pool
1083-
.try_get_fdw_async(&self.logger, ENV_VARS.store.batch_worker_wait)
1087+
.try_get_fdw(&self.logger, ENV_VARS.store.batch_worker_wait)
10841088
.await
10851089
else {
10861090
return None;

store/postgres/src/pool/mod.rs

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -334,30 +334,24 @@ impl ConnectionPool {
334334
/// The `timeout` is called every time we time out waiting for a
335335
/// connection. If `timeout` returns `true`, `get_fdw` returns with that
336336
/// error, otherwise we try again to get a connection.
337-
pub fn get_fdw<F>(&self, logger: &Logger, timeout: F) -> Result<PgConnection, StoreError>
338-
where
339-
F: FnMut() -> bool,
340-
{
341-
self.get_ready()?.get_fdw(logger, timeout)
342-
}
343-
344-
/// An async version of `get_fdw`. For now, this calls `get_fdw`
345-
/// synchronously. Once `get_fdw` is not used anymore, we can make it
346-
/// truly async.
347-
pub async fn get_fdw_async<F>(
337+
pub async fn get_fdw<F>(
348338
&self,
349339
logger: &Logger,
350340
timeout: F,
351-
) -> Result<PgConnection, StoreError>
341+
) -> Result<AsyncPgConnection, StoreError>
352342
where
353343
F: FnMut() -> bool,
354344
{
355-
self.get_fdw(logger, timeout)
345+
self.get_ready()?.get_fdw(logger, timeout).await
356346
}
357347

358348
/// Get a connection from the pool for foreign data wrapper access if
359349
/// one is available
360-
async fn try_get_fdw(&self, logger: &Logger, timeout: Duration) -> Option<PgConnection> {
350+
pub async fn try_get_fdw(
351+
&self,
352+
logger: &Logger,
353+
timeout: Duration,
354+
) -> Option<AsyncPgConnection> {
361355
let Ok(inner) = self.get_ready() else {
362356
return None;
363357
};
@@ -366,17 +360,6 @@ impl ConnectionPool {
366360
.await
367361
}
368362

369-
/// An async version of `try_get_fdw`. For now, this calls `try_get_fdw`
370-
/// synchronously. Once `try_get_fdw` is not used anymore, we can make it
371-
/// truly async.
372-
pub async fn try_get_fdw_async(
373-
&self,
374-
logger: &Logger,
375-
timeout: Duration,
376-
) -> Option<PgConnection> {
377-
self.try_get_fdw(logger, timeout).await
378-
}
379-
380363
pub(crate) async fn query_permit(&self) -> QueryPermit {
381364
let pool = self.inner.get_unready();
382365
let start = Instant::now();
@@ -565,14 +548,18 @@ impl PoolInner {
565548
/// The `timeout` is called every time we time out waiting for a
566549
/// connection. If `timeout` returns `true`, `get_fdw` returns with that
567550
/// error, otherwise we try again to get a connection.
568-
fn get_fdw<F>(&self, logger: &Logger, mut timeout: F) -> Result<PgConnection, StoreError>
551+
async fn get_fdw<F>(
552+
&self,
553+
logger: &Logger,
554+
mut timeout: F,
555+
) -> Result<AsyncPgConnection, StoreError>
569556
where
570557
F: FnMut() -> bool,
571558
{
572559
let pool = self.fdw_pool(logger)?;
573560
loop {
574-
match graph::block_on(pool.get()) {
575-
Ok(conn) => return Ok(AsyncConnectionWrapper::from(conn)),
561+
match pool.get().await {
562+
Ok(conn) => return Ok(conn),
576563
Err(e) => {
577564
if timeout() {
578565
return Err(anyhow!("timeout in get_fdw: {e}").into());
@@ -585,18 +572,18 @@ impl PoolInner {
585572
/// Get a connection from the fdw pool if one is available. We wait for
586573
/// `timeout` for a connection which should be set just big enough to
587574
/// allow establishing a connection
588-
fn try_get_fdw(&self, logger: &Logger, timeout: Duration) -> Option<PgConnection> {
575+
async fn try_get_fdw(&self, logger: &Logger, timeout: Duration) -> Option<AsyncPgConnection> {
589576
// Any error trying to get a connection is treated as "couldn't get
590577
// a connection in time". If there is a serious error with the
591578
// database, e.g., because it's not available, the next database
592579
// operation will run into it and report it.
593580
let Ok(fdw_pool) = self.fdw_pool(logger) else {
594581
return None;
595582
};
596-
let Ok(conn) = graph::block_on(fdw_pool.get_timeout(timeout)) else {
583+
let Ok(conn) = fdw_pool.get_timeout(timeout).await else {
597584
return None;
598585
};
599-
Some(AsyncConnectionWrapper::from(conn))
586+
Some(conn)
600587
}
601588

602589
pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {

store/postgres/src/pool/state_tracker.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,6 @@ impl StateTracker {
5050
}
5151

5252
pub(super) async fn ignore_timeout<F, R>(&self, f: F) -> R
53-
where
54-
F: FnOnce() -> R,
55-
{
56-
self.ignore_timeout.store(true, Ordering::Relaxed);
57-
let res = f();
58-
self.ignore_timeout.store(false, Ordering::Relaxed);
59-
res
60-
}
61-
62-
#[allow(dead_code)]
63-
pub(super) async fn ignore_timeout_async<F, R>(&self, f: F) -> R
6453
where
6554
F: AsyncFnOnce() -> R,
6655
{

0 commit comments

Comments
 (0)