Skip to content

Commit c7a49a9

Browse files
committed
store: Switch from mobc to deadpool
deadpool seems to get much more use and is udated more frequently. It also provides better insights on various size etc. metrics
1 parent 9f0ce51 commit c7a49a9

File tree

5 files changed

+91
-155
lines changed

5 files changed

+91
-155
lines changed

Cargo.lock

Lines changed: 5 additions & 120 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ diesel = { version = "2.2.7", features = [
5656
"chrono",
5757
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
5858
] }
59-
diesel-async = { version = "0.7.3", features = ["mobc", "async-connection-wrapper", "tokio", "postgres"] }
59+
diesel-async = { version = "0.7.3", features = ["deadpool", "async-connection-wrapper", "tokio", "postgres"] }
6060
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
6161
diesel-dynamic-schema = { version = "0.2.3", features = ["postgres"] }
6262
diesel_derives = "2.2.7"
@@ -92,6 +92,7 @@ strum = { version = "0.26", features = ["derive"] }
9292
syn = { version = "2.0.106", features = ["full"] }
9393
test-store = { path = "./store/test-store" }
9494
thiserror = "2.0.16"
95+
deadpool = { version = "0.12", features = ["rt_tokio_1", "managed"] }
9596
tokio = { version = "1.45.1", features = ["full"] }
9697
tokio-stream = { version = "0.1.15", features = ["sync"] }
9798
tokio-retry = "0.3.0"

store/postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition.workspace = true
77
async-trait = { workspace = true }
88
blake3 = "1.8"
99
chrono = { workspace = true }
10+
deadpool = { workspace = true }
1011
derive_more = { version = "2.0.1", features = ["full"] }
1112
diesel = { workspace = true }
1213
diesel-async = { workspace = true }

store/postgres/src/pool/mod.rs

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use deadpool::managed::Timeouts;
2+
use deadpool::Runtime;
13
use diesel::sql_query;
24
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
3-
use diesel_async::pooled_connection::{mobc, AsyncDieselConnectionManager};
5+
use diesel_async::pooled_connection::deadpool::Pool as DeadpoolPool;
6+
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
47
use diesel_async::{AsyncConnection as _, RunQueryDsl, SimpleAsyncConnection};
58
use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput};
69

@@ -36,9 +39,10 @@ pub use coordinator::PoolCoordinator;
3639
pub use foreign_server::ForeignServer;
3740
use state_tracker::{ErrorHandler, EventHandler, StateTracker};
3841

39-
type AsyncPool = mobc::Pool<diesel_async::AsyncPgConnection>;
42+
type AsyncPool = DeadpoolPool<diesel_async::AsyncPgConnection>;
4043
/// A database connection for asynchronous diesel operations
41-
pub type AsyncPgConnection = mobc::PooledConnection<diesel_async::AsyncPgConnection>;
44+
pub type AsyncPgConnection =
45+
diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>;
4246

4347
/// The namespace under which the `PRIMARY_TABLES` are mapped into each
4448
/// shard
@@ -180,6 +184,7 @@ impl PoolState {
180184
// We just tried to set up the pool; if it is still not set up and
181185
// we didn't have an error, it means the database is not available
182186
if self.needs_setup() {
187+
error!(self.logger, "Database is not available, setup did not work");
183188
return Err(StoreError::DatabaseUnavailable);
184189
} else {
185190
Ok(pool)
@@ -424,6 +429,10 @@ impl PoolInner {
424429
map.insert("shard".to_string(), shard.to_string());
425430
map
426431
};
432+
// Note: deadpool provides built-in metrics via pool.status()
433+
// The r2d2-style ErrorHandler and EventHandler are not needed with deadpool.
434+
// Metrics can be obtained from pool.status() and custom hooks can be added
435+
// to the pool builder if needed.
427436
let error_counter = registry
428437
.global_counter(
429438
"store_connection_error_count",
@@ -450,40 +459,59 @@ impl PoolInner {
450459
registry.cheap_clone(),
451460
wait_stats.clone(),
452461
const_labels.clone(),
453-
state_tracker,
462+
state_tracker.clone(),
454463
));
455464

456465
// Connect to Postgres
457-
let conn_manager = AsyncDieselConnectionManager::new(postgres_url.clone());
458-
crit!(
459-
logger_pool,
460-
"Unfinished: need to replicate min_idle with mobc"
461-
);
462-
let _min_idle = ENV_VARS.store.connection_min_idle.filter(|min_idle| {
463-
if *min_idle <= pool_size {
464-
true
465-
} else {
466-
warn!(
467-
logger_pool,
468-
"Configuration error: min idle {} exceeds pool size {}, ignoring min idle",
469-
min_idle,
470-
pool_size
471-
);
472-
false
473-
}
474-
});
475-
let builder: mobc::Builder<_> = mobc::Pool::builder()
476-
.get_timeout(Some(ENV_VARS.store.connection_timeout))
477-
.max_open(pool_size as u64)
478-
.max_idle_lifetime(Some(ENV_VARS.store.connection_idle_timeout));
479-
let pool = builder.build(conn_manager);
466+
let conn_manager = {
467+
let mut config = diesel_async::pooled_connection::ManagerConfig::default();
468+
config.recycling_method = diesel_async::pooled_connection::RecyclingMethod::Verified;
469+
AsyncDieselConnectionManager::new_with_config(postgres_url.clone(), config)
470+
};
471+
472+
// Note: deadpool does not support min_idle configuration
473+
if let Some(min_idle) = ENV_VARS.store.connection_min_idle {
474+
warn!(
475+
logger_pool,
476+
"min_idle configuration ({}) is not supported by deadpool and will be ignored",
477+
min_idle
478+
);
479+
}
480+
481+
let timeouts = Timeouts {
482+
wait: Some(ENV_VARS.store.connection_timeout),
483+
create: Some(ENV_VARS.store.connection_timeout),
484+
recycle: Some(ENV_VARS.store.connection_timeout),
485+
};
486+
487+
// The post_create and post_recycle hooks are only called when
488+
// create and recycle succeed; we can therefore mark the pool
489+
// available
490+
let pool = AsyncPool::builder(conn_manager)
491+
.max_size(pool_size as usize)
492+
.timeouts(timeouts)
493+
.runtime(Runtime::Tokio1)
494+
.post_create(state_tracker.mark_available_hook())
495+
.post_recycle(state_tracker.mark_available_hook())
496+
.build()
497+
.expect("failed to create connection pool");
498+
480499
let fdw_pool = fdw_pool_size.map(|pool_size| {
481500
let conn_manager = AsyncDieselConnectionManager::new(postgres_url.clone());
482-
let builder = mobc::Pool::builder()
483-
.get_timeout(Some(ENV_VARS.store.connection_timeout))
484-
.max_open(pool_size as u64)
485-
.max_idle_lifetime(Some(FDW_IDLE_TIMEOUT));
486-
builder.build(conn_manager)
501+
let fdw_timeouts = Timeouts {
502+
wait: Some(ENV_VARS.store.connection_timeout),
503+
create: None,
504+
recycle: Some(FDW_IDLE_TIMEOUT),
505+
};
506+
507+
AsyncPool::builder(conn_manager)
508+
.max_size(pool_size as usize)
509+
.timeouts(fdw_timeouts)
510+
.runtime(Runtime::Tokio1)
511+
.post_create(state_tracker.mark_available_hook())
512+
.post_recycle(state_tracker.mark_available_hook())
513+
.build()
514+
.expect("failed to create fdw connection pool")
487515
});
488516

489517
info!(logger_store, "Pool successfully connected to Postgres");
@@ -570,7 +598,12 @@ impl PoolInner {
570598
let Ok(fdw_pool) = self.fdw_pool(logger) else {
571599
return None;
572600
};
573-
let Ok(conn) = fdw_pool.get_timeout(timeout).await else {
601+
let timeouts = Timeouts {
602+
wait: Some(timeout),
603+
create: None,
604+
recycle: None,
605+
};
606+
let Ok(conn) = fdw_pool.timeout_get(&timeouts).await else {
574607
return None;
575608
};
576609
Some(conn)

0 commit comments

Comments
 (0)