Skip to content

Commit f31fa9f

Browse files
committed
node: Replace uses of Pool.get_sync with Pool.get
1 parent 662560b commit f31fa9f

File tree

5 files changed

+26
-20
lines changed

5 files changed

+26
-20
lines changed

node/src/manager/commands/chain.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::sync::Arc;
22

33
use diesel::sql_query;
4-
use diesel::RunQueryDsl;
4+
use diesel_async::AsyncConnection;
5+
use diesel_async::RunQueryDsl;
56
use graph::blockchain::BlockHash;
67
use graph::blockchain::BlockPtr;
78
use graph::blockchain::ChainIdentifier;
@@ -24,7 +25,6 @@ use graph_chain_ethereum::EthereumAdapterTrait as _;
2425
use graph_store_postgres::add_chain;
2526
use graph_store_postgres::find_chain;
2627
use graph_store_postgres::update_chain_name;
27-
use graph_store_postgres::AsyncConnection;
2828
use graph_store_postgres::BlockStore;
2929
use graph_store_postgres::ChainStatus;
3030
use graph_store_postgres::ChainStore;
@@ -37,7 +37,7 @@ use crate::network_setup::Networks;
3737

3838
pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
3939
let mut chains = {
40-
let mut conn = primary.get_sync().await?;
40+
let mut conn = primary.get().await?;
4141
block_store::load_chains(&mut conn).await?
4242
};
4343
chains.sort_by_key(|chain| chain.name.clone());
@@ -122,7 +122,7 @@ pub async fn info(
122122
}
123123
}
124124

125-
let mut conn = primary.get_sync().await?;
125+
let mut conn = primary.get().await?;
126126

127127
let chain = block_store::find_chain(&mut conn, &name)
128128
.await?
@@ -231,7 +231,7 @@ pub async fn change_block_cache_shard(
231231
) -> Result<(), Error> {
232232
println!("Changing block cache shard for {} to {}", chain_name, shard);
233233

234-
let mut conn = primary_store.get_sync().await?;
234+
let mut conn = primary_store.get().await?;
235235

236236
let chain = find_chain(&mut conn, &chain_name)
237237
.await?
@@ -247,7 +247,7 @@ pub async fn change_block_cache_shard(
247247
let new_name = format!("{}-old", &chain_name);
248248
let ident = chain_store.chain_identifier().await?;
249249

250-
conn.transaction_async::<(), StoreError, _>(|conn| {
250+
conn.transaction::<(), StoreError, _>(|conn| {
251251
async {
252252
let shard = Shard::new(shard.to_string())?;
253253

@@ -259,7 +259,7 @@ pub async fn change_block_cache_shard(
259259
sql_query(
260260
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
261261
)
262-
.execute(conn)?;
262+
.execute(conn).await?;
263263

264264
// Update the current chain name to chain-old
265265
update_chain_name(conn, &chain_name, &new_name).await?;
@@ -271,7 +271,7 @@ pub async fn change_block_cache_shard(
271271
sql_query(
272272
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
273273
)
274-
.execute(conn)?;
274+
.execute(conn).await?;
275275
Ok(())
276276
}.scope_boxed()
277277
}).await?;

node/src/manager/commands/copy.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl};
1+
use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl};
2+
use diesel_async::RunQueryDsl;
23
use std::{collections::HashMap, sync::Arc};
34

45
use graph::{
@@ -67,18 +68,20 @@ impl CopyState {
6768
.get(shard)
6869
.ok_or_else(|| anyhow!("can not find pool for shard {}", shard))?;
6970

70-
let mut dconn = dpool.get_sync().await?;
71+
let mut dconn = dpool.get().await?;
7172

7273
let tables = cts::table
7374
.filter(cts::dst.eq(dst))
7475
.order_by(cts::entity_type)
75-
.load::<CopyTableState>(&mut dconn)?;
76+
.load::<CopyTableState>(&mut dconn)
77+
.await?;
7678

7779
let on_sync = on_sync(&mut dconn, DeploymentId(dst)).await?;
7880

7981
Ok(cs::table
8082
.filter(cs::dst.eq(dst))
8183
.get_result::<CopyState>(&mut dconn)
84+
.await
8285
.optional()?
8386
.map(|state| (state, tables, on_sync)))
8487
}
@@ -209,7 +212,7 @@ pub async fn list(pools: HashMap<Shard, ConnectionPool>) -> Result<(), Error> {
209212
use catalog::deployment_schemas as ds;
210213

211214
let primary = pools.get(&*PRIMARY_SHARD).expect("there is a primary pool");
212-
let mut conn = primary.get_sync().await?;
215+
let mut conn = primary.get().await?;
213216

214217
let copies = ac::table
215218
.inner_join(ds::table.on(ds::id.eq(ac::dst)))
@@ -221,7 +224,8 @@ pub async fn list(pools: HashMap<Shard, ConnectionPool>) -> Result<(), Error> {
221224
ds::subgraph,
222225
ds::shard,
223226
))
224-
.load::<(i32, i32, Option<UtcDateTime>, UtcDateTime, String, Shard)>(&mut conn)?;
227+
.load::<(i32, i32, Option<UtcDateTime>, UtcDateTime, String, Shard)>(&mut conn)
228+
.await?;
225229
if copies.is_empty() {
226230
println!("no active copies");
227231
} else {
@@ -274,18 +278,20 @@ pub async fn status(
274278
let primary = pools
275279
.get(&*PRIMARY_SHARD)
276280
.ok_or_else(|| anyhow!("can not find deployment with id {}", dst))?;
277-
let mut pconn = primary.get_sync().await?;
281+
let mut pconn = primary.get().await?;
278282
let dst = dst.locate_unique(primary).await?.id.0;
279283

280284
let (shard, deployment) = ds::table
281285
.filter(ds::id.eq(dst))
282286
.select((ds::shard, ds::subgraph))
283-
.get_result::<(Shard, String)>(&mut pconn)?;
287+
.get_result::<(Shard, String)>(&mut pconn)
288+
.await?;
284289

285290
let (active, cancelled_at) = ac::table
286291
.filter(ac::dst.eq(dst))
287292
.select((ac::src, ac::cancelled_at))
288293
.get_result::<(i32, Option<UtcDateTime>)>(&mut pconn)
294+
.await
289295
.optional()?
290296
.map(|(_, cancelled_at)| (true, cancelled_at))
291297
.unwrap_or((false, None));

node/src/manager/commands/stats.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ use graph::prelude::anyhow;
1010
use graph::prelude::CheapClone as _;
1111
use graph_store_postgres::command_support::catalog as store_catalog;
1212
use graph_store_postgres::command_support::catalog::Site;
13+
use graph_store_postgres::AsyncPgConnection;
1314
use graph_store_postgres::ConnectionPool;
14-
use graph_store_postgres::PgConnection;
1515
use graph_store_postgres::Shard;
1616
use graph_store_postgres::SubgraphStore;
1717
use graph_store_postgres::PRIMARY_SHARD;
1818

1919
async fn site_and_conn(
2020
pools: HashMap<Shard, ConnectionPool>,
2121
search: &DeploymentSearch,
22-
) -> Result<(Arc<Site>, PgConnection), anyhow::Error> {
22+
) -> Result<(Arc<Site>, AsyncPgConnection), anyhow::Error> {
2323
let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap();
2424
let locator = search.locate_unique(primary_pool).await?;
2525

@@ -32,7 +32,7 @@ async fn site_and_conn(
3232
.ok_or_else(|| anyhow!("deployment `{}` does not exist", search))?;
3333
let site = Arc::new(site);
3434

35-
let conn = pools.get(&site.shard).unwrap().get_sync().await?;
35+
let conn = pools.get(&site.shard).unwrap().get().await?;
3636

3737
Ok((site, conn))
3838
}

node/src/manager/commands/txn_speed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub async fn run(pool: ConnectionPool, delay: u64) -> Result<(), anyhow::Error>
3232
}
3333

3434
let mut speeds = HashMap::new();
35-
let mut conn = pool.get_sync().await?;
35+
let mut conn = pool.get().await?;
3636
for (datname, all_txn, write_txn) in query(&mut conn).await? {
3737
speeds.insert(datname, (all_txn, write_txn));
3838
}

node/src/manager/deployment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl DeploymentSearch {
9393
}
9494

9595
pub async fn lookup(&self, primary: &ConnectionPool) -> Result<Vec<Deployment>, anyhow::Error> {
96-
let mut conn = primary.get_sync().await?;
96+
let mut conn = primary.get().await?;
9797
self.lookup_with_conn(&mut conn).await
9898
}
9999

0 commit comments

Comments
 (0)