Skip to content

Commit 68c2569

Browse files
committed
node: Use AsyncPgConnection in remaining places
1 parent b8a9c65 commit 68c2569

File tree

4 files changed

+22
-18
lines changed

4 files changed

+22
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ serde = { workspace = true }
3737
shellexpand = "3.1.1"
3838
termcolor = "1.4.1"
3939
diesel = { workspace = true }
40+
diesel-async = { workspace = true }
4041
prometheus = { version = "0.14.0", features = ["push"] }
4142
json-structural-diff = { version = "0.2", features = ["colorize"] }
4243
globset = "0.4.16"

node/src/manager/commands/txn_speed.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use std::{collections::HashMap, thread::sleep, time::Duration};
22

3+
use diesel::dsl::sql;
34
use graph::prelude::anyhow;
4-
use graph_store_postgres::{ConnectionPool, PgConnection};
5+
use graph_store_postgres::{AsyncPgConnection, ConnectionPool};
56

67
use crate::manager::catalog;
78

89
pub async fn run(pool: ConnectionPool, delay: u64) -> Result<(), anyhow::Error> {
9-
fn query(conn: &mut PgConnection) -> Result<Vec<(String, i64, i64)>, anyhow::Error> {
10+
async fn query(conn: &mut AsyncPgConnection) -> Result<Vec<(String, i64, i64)>, anyhow::Error> {
1011
use catalog::pg_catalog::pg_stat_database as d;
11-
use diesel::dsl::*;
1212
use diesel::sql_types::BigInt;
13-
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
13+
use diesel::{ExpressionMethods, QueryDsl};
14+
use diesel_async::RunQueryDsl;
1415

1516
let rows = d::table
1617
.filter(d::datname.eq_any(vec!["explorer", "graph"]))
@@ -20,7 +21,8 @@ pub async fn run(pool: ConnectionPool, delay: u64) -> Result<(), anyhow::Error>
2021
sql::<BigInt>("txid_current()::bigint"),
2122
))
2223
//.select((d::datname))
23-
.load::<(Option<String>, i64, i64)>(conn)?;
24+
.load::<(Option<String>, i64, i64)>(conn)
25+
.await?;
2426
Ok(rows
2527
.into_iter()
2628
.map(|(datname, all_txn, write_txn)| {
@@ -31,7 +33,7 @@ pub async fn run(pool: ConnectionPool, delay: u64) -> Result<(), anyhow::Error>
3133

3234
let mut speeds = HashMap::new();
3335
let mut conn = pool.get_sync().await?;
34-
for (datname, all_txn, write_txn) in query(&mut conn)? {
36+
for (datname, all_txn, write_txn) in query(&mut conn).await? {
3537
speeds.insert(datname, (all_txn, write_txn));
3638
}
3739
println!(
@@ -41,7 +43,7 @@ pub async fn run(pool: ConnectionPool, delay: u64) -> Result<(), anyhow::Error>
4143
sleep(Duration::from_secs(delay));
4244
println!("Number of transactions/minute");
4345
println!("{:10} {:>7} write", "database", "all");
44-
for (datname, all_txn, write_txn) in query(&mut conn)? {
46+
for (datname, all_txn, write_txn) in query(&mut conn).await? {
4547
let (all_speed, write_speed) = speeds
4648
.get(&datname)
4749
.map(|(all_txn_old, write_txn_old)| {

node/src/manager/deployment.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ use diesel::dsl::sql;
66
use diesel::sql_types::Text;
77
use diesel::{
88
ExpressionMethods, JoinOnDsl, NullableExpressionMethods, PgTextExpressionMethods, QueryDsl,
9-
RunQueryDsl,
109
};
10+
use diesel_async::RunQueryDsl;
1111

1212
use graph::components::store::DeploymentId;
1313
use graph::{
1414
components::store::DeploymentLocator,
1515
prelude::{anyhow, lazy_static, regex::Regex, DeploymentHash},
1616
};
1717
use graph_store_postgres::command_support::catalog as store_catalog;
18-
use graph_store_postgres::ConnectionPool;
19-
use graph_store_postgres::{unused, PgConnection};
18+
use graph_store_postgres::unused;
19+
use graph_store_postgres::{AsyncPgConnection, ConnectionPool};
2020

2121
lazy_static! {
2222
// `Qm...` optionally follow by `:$shard`
@@ -94,12 +94,12 @@ impl DeploymentSearch {
9494

9595
pub async fn lookup(&self, primary: &ConnectionPool) -> Result<Vec<Deployment>, anyhow::Error> {
9696
let mut conn = primary.get_sync().await?;
97-
self.lookup_with_conn(&mut conn)
97+
self.lookup_with_conn(&mut conn).await
9898
}
9999

100-
pub fn lookup_with_conn(
100+
pub async fn lookup_with_conn(
101101
&self,
102-
conn: &mut PgConnection,
102+
conn: &mut AsyncPgConnection,
103103
) -> Result<Vec<Deployment>, anyhow::Error> {
104104
use store_catalog::deployment_schemas as ds;
105105
use store_catalog::subgraph as s;
@@ -130,19 +130,19 @@ impl DeploymentSearch {
130130
let deployments: Vec<Deployment> = match self {
131131
DeploymentSearch::Name { name } => {
132132
let pattern = format!("%{}%", name);
133-
query.filter(s::name.ilike(&pattern)).load(conn)?
133+
query.filter(s::name.ilike(&pattern)).load(conn).await?
134134
}
135135
DeploymentSearch::Hash { hash, shard } => {
136136
let query = query.filter(ds::subgraph.eq(&hash));
137137
match shard {
138-
Some(shard) => query.filter(ds::shard.eq(shard)).load(conn)?,
139-
None => query.load(conn)?,
138+
Some(shard) => query.filter(ds::shard.eq(shard)).load(conn).await?,
139+
None => query.load(conn).await?,
140140
}
141141
}
142142
DeploymentSearch::Deployment { namespace } => {
143-
query.filter(ds::name.eq(&namespace)).load(conn)?
143+
query.filter(ds::name.eq(&namespace)).load(conn).await?
144144
}
145-
DeploymentSearch::All => query.load(conn)?,
145+
DeploymentSearch::All => query.load(conn).await?,
146146
};
147147
Ok(deployments)
148148
}

0 commit comments

Comments
 (0)