Skip to content

Commit 33e28e6

Browse files
committed
Asyncify remaining method in catalog
1 parent f3a6c2c commit 33e28e6

File tree

11 files changed

+123
-103
lines changed

11 files changed

+123
-103
lines changed

node/src/manager/commands/stats.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ pub async fn show(
9999
) -> Result<(), anyhow::Error> {
100100
let (site, mut conn) = site_and_conn(pools, search).await?;
101101

102-
let catalog = store_catalog::Catalog::load(&mut conn, site.cheap_clone(), false, vec![])?;
103-
let stats = catalog.stats(&mut conn)?;
102+
let catalog =
103+
store_catalog::Catalog::load(&mut conn, site.cheap_clone(), false, vec![]).await?;
104+
let stats = catalog.stats(&mut conn).await?;
104105

105-
let account_like = store_catalog::account_like(&mut conn, &site)?;
106+
let account_like = store_catalog::account_like(&mut conn, &site).await?;
106107

107108
show_stats(stats.as_slice(), account_like)
108109
}

store/postgres/src/catalog.rs

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub struct Locale {
127127

128128
impl Locale {
129129
/// Load locale information for current database
130-
pub fn load(conn: &mut PgConnection) -> Result<Locale, StoreError> {
130+
pub async fn load(conn: &mut PgConnection) -> Result<Locale, StoreError> {
131131
use diesel::dsl::sql;
132132
use pg_database as db;
133133

@@ -196,16 +196,16 @@ pub struct Catalog {
196196

197197
impl Catalog {
198198
/// Load the catalog for an existing subgraph
199-
pub fn load(
199+
pub async fn load(
200200
conn: &mut PgConnection,
201201
site: Arc<Site>,
202202
use_bytea_prefix: bool,
203203
entities_with_causality_region: Vec<EntityType>,
204204
) -> Result<Self, StoreError> {
205-
let text_columns = get_text_columns(conn, &site.namespace)?;
206-
let use_poi = supports_proof_of_indexing(conn, &site.namespace)?;
207-
let has_minmax_multi_ops = has_minmax_multi_ops(conn)?;
208-
let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn)?;
205+
let text_columns = get_text_columns(conn, &site.namespace).await?;
206+
let use_poi = supports_proof_of_indexing(conn, &site.namespace).await?;
207+
let has_minmax_multi_ops = has_minmax_multi_ops(conn).await?;
208+
let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn).await?;
209209

210210
Ok(Catalog {
211211
site,
@@ -219,13 +219,13 @@ impl Catalog {
219219
}
220220

221221
/// Return a new catalog suitable for creating a new subgraph
222-
pub fn for_creation(
222+
pub async fn for_creation(
223223
conn: &mut PgConnection,
224224
site: Arc<Site>,
225225
entities_with_causality_region: BTreeSet<EntityType>,
226226
) -> Result<Self, StoreError> {
227-
let has_minmax_multi_ops = has_minmax_multi_ops(conn)?;
228-
let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn)?;
227+
let has_minmax_multi_ops = has_minmax_multi_ops(conn).await?;
228+
let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn).await?;
229229

230230
Ok(Catalog {
231231
site,
@@ -281,7 +281,7 @@ impl Catalog {
281281
}
282282
}
283283

284-
pub fn stats(&self, conn: &mut PgConnection) -> Result<Vec<VersionStats>, StoreError> {
284+
pub async fn stats(&self, conn: &mut PgConnection) -> Result<Vec<VersionStats>, StoreError> {
285285
#[derive(Queryable, QueryableByName)]
286286
pub struct DbStats {
287287
#[diesel(sql_type = BigInt)]
@@ -318,7 +318,7 @@ impl Catalog {
318318
upper: Vec<i32>,
319319
}
320320

321-
fn block_range_histogram(
321+
async fn block_range_histogram(
322322
conn: &mut PgConnection,
323323
namespace: &Namespace,
324324
) -> Result<Vec<RangeHistogram>, StoreError> {
@@ -371,7 +371,7 @@ impl Catalog {
371371
.map_err(StoreError::from)?;
372372

373373
let mut range_histogram = if self.pg_stats_has_range_bounds_histogram {
374-
block_range_histogram(conn, &self.site.namespace)?
374+
block_range_histogram(conn, &self.site.namespace).await?
375375
} else {
376376
vec![]
377377
};
@@ -399,7 +399,7 @@ impl Catalog {
399399
}
400400
}
401401

402-
fn get_text_columns(
402+
async fn get_text_columns(
403403
conn: &mut PgConnection,
404404
namespace: &Namespace,
405405
) -> Result<HashMap<String, HashSet<String>>, StoreError> {
@@ -429,7 +429,7 @@ fn get_text_columns(
429429
Ok(map)
430430
}
431431

432-
pub fn table_exists(
432+
pub async fn table_exists(
433433
conn: &mut PgConnection,
434434
namespace: &str,
435435
table: &SqlName,
@@ -449,17 +449,17 @@ pub fn table_exists(
449449
Ok(!result.is_empty())
450450
}
451451

452-
pub fn supports_proof_of_indexing(
452+
pub async fn supports_proof_of_indexing(
453453
conn: &mut PgConnection,
454454
namespace: &Namespace,
455455
) -> Result<bool, StoreError> {
456456
lazy_static! {
457457
static ref POI_TABLE_NAME: SqlName = SqlName::verbatim(POI_TABLE.to_owned());
458458
}
459-
table_exists(conn, namespace.as_str(), &POI_TABLE_NAME)
459+
table_exists(conn, namespace.as_str(), &POI_TABLE_NAME).await
460460
}
461461

462-
pub fn current_servers(conn: &mut PgConnection) -> Result<Vec<String>, StoreError> {
462+
pub async fn current_servers(conn: &mut PgConnection) -> Result<Vec<String>, StoreError> {
463463
#[derive(QueryableByName)]
464464
struct Srv {
465465
#[diesel(sql_type = Text)]
@@ -474,7 +474,7 @@ pub fn current_servers(conn: &mut PgConnection) -> Result<Vec<String>, StoreErro
474474

475475
/// Return the options for the foreign server `name` as a map of option
476476
/// names to values
477-
pub fn server_options(
477+
pub async fn server_options(
478478
conn: &mut PgConnection,
479479
name: &str,
480480
) -> Result<HashMap<String, Option<String>>, StoreError> {
@@ -498,7 +498,10 @@ pub fn server_options(
498498
Ok(HashMap::from_iter(entries))
499499
}
500500

501-
pub fn has_namespace(conn: &mut PgConnection, namespace: &Namespace) -> Result<bool, StoreError> {
501+
pub async fn has_namespace(
502+
conn: &mut PgConnection,
503+
namespace: &Namespace,
504+
) -> Result<bool, StoreError> {
502505
use pg_namespace as nsp;
503506

504507
Ok(select(diesel::dsl::exists(
@@ -511,7 +514,7 @@ pub fn has_namespace(conn: &mut PgConnection, namespace: &Namespace) -> Result<b
511514
/// another database. If the schema does not exist, or is not a foreign
512515
/// schema, do nothing. This crucially depends on the fact that we never mix
513516
/// foreign and local tables in the same schema.
514-
pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), StoreError> {
517+
pub async fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), StoreError> {
515518
use foreign_tables as ft;
516519

517520
let is_foreign = select(diesel::dsl::exists(
@@ -526,7 +529,7 @@ pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), St
526529
Ok(())
527530
}
528531

529-
pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result<Vec<String>, StoreError> {
532+
pub async fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result<Vec<String>, StoreError> {
530533
use foreign_tables as ft;
531534

532535
ft::table
@@ -538,7 +541,7 @@ pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result<Vec<String>,
538541

539542
/// Drop the schema `nsp` and all its contents if it exists, and create it
540543
/// again so that `nsp` is an empty schema
541-
pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {
544+
pub async fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {
542545
let query = format!(
543546
"drop schema if exists {nsp} cascade;\
544547
create schema {nsp};",
@@ -548,15 +551,15 @@ pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreEr
548551
}
549552

550553
/// Drop the schema `nsp` and all its contents if it exists
551-
pub fn drop_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {
554+
pub async fn drop_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {
552555
let query = format!("drop schema if exists {nsp} cascade;", nsp = nsp);
553556
Ok(conn.batch_execute(&query)?)
554557
}
555558

556-
pub fn migration_count(conn: &mut PgConnection) -> Result<usize, StoreError> {
559+
pub async fn migration_count(conn: &mut PgConnection) -> Result<usize, StoreError> {
557560
use __diesel_schema_migrations as m;
558561

559-
if !table_exists(conn, NAMESPACE_PUBLIC, &MIGRATIONS_TABLE)? {
562+
if !table_exists(conn, NAMESPACE_PUBLIC, &MIGRATIONS_TABLE).await? {
560563
return Ok(0);
561564
}
562565

@@ -567,7 +570,10 @@ pub fn migration_count(conn: &mut PgConnection) -> Result<usize, StoreError> {
567570
.map_err(StoreError::from)
568571
}
569572

570-
pub fn account_like(conn: &mut PgConnection, site: &Site) -> Result<HashSet<String>, StoreError> {
573+
pub async fn account_like(
574+
conn: &mut PgConnection,
575+
site: &Site,
576+
) -> Result<HashSet<String>, StoreError> {
571577
use table_stats as ts;
572578
let names = ts::table
573579
.filter(ts::deployment.eq(site.id))
@@ -587,7 +593,7 @@ pub fn account_like(conn: &mut PgConnection, site: &Site) -> Result<HashSet<Stri
587593
Ok(names)
588594
}
589595

590-
pub fn set_account_like(
596+
pub async fn set_account_like(
591597
conn: &mut PgConnection,
592598
site: &Site,
593599
table_name: &SqlName,
@@ -607,7 +613,7 @@ pub fn set_account_like(
607613
Ok(())
608614
}
609615

610-
pub fn copy_account_like(
616+
pub async fn copy_account_like(
611617
conn: &mut PgConnection,
612618
src: &Site,
613619
dst: &Site,
@@ -626,7 +632,7 @@ pub fn copy_account_like(
626632
.execute(conn)?)
627633
}
628634

629-
pub fn set_last_pruned_block(
635+
pub async fn set_last_pruned_block(
630636
conn: &mut PgConnection,
631637
site: &Site,
632638
table_name: &SqlName,
@@ -690,7 +696,7 @@ pub(crate) mod table_schema {
690696
}
691697
}
692698

693-
pub fn columns(
699+
pub async fn columns(
694700
conn: &mut PgConnection,
695701
nsp: &str,
696702
table_name: &str,
@@ -719,7 +725,7 @@ pub(crate) mod table_schema {
719725
/// Return a SQL statement to create the foreign table
720726
/// `{dst_nsp}.{table_name}` for the server `server` which has the same
721727
/// schema as the (local) table `{src_nsp}.{table_name}`
722-
pub fn create_foreign_table(
728+
pub async fn create_foreign_table(
723729
conn: &mut PgConnection,
724730
src_nsp: &str,
725731
table_name: &str,
@@ -753,7 +759,7 @@ pub fn create_foreign_table(
753759
Ok(query)
754760
}
755761

756-
let columns = table_schema::columns(conn, src_nsp, table_name)?;
762+
let columns = table_schema::columns(conn, src_nsp, table_name).await?;
757763
let query = build_query(columns, src_nsp, table_name, dst_nsp, server).map_err(|_| {
758764
anyhow!(
759765
"failed to generate 'create foreign table' query for {}.{}",
@@ -777,7 +783,7 @@ pub fn create_foreign_table(
777783
/// The list `shard_nsps` consists of pairs `(name, namespace)` where `name`
778784
/// is the name of the shard and `namespace` is the namespace where the
779785
/// `src_table` is mapped
780-
pub fn create_cross_shard_view(
786+
pub async fn create_cross_shard_view(
781787
conn: &mut PgConnection,
782788
src_nsp: &str,
783789
src_table: &str,
@@ -805,7 +811,7 @@ pub fn create_cross_shard_view(
805811
Ok(query)
806812
}
807813

808-
let columns = table_schema::columns(conn, src_nsp, src_table)?;
814+
let columns = table_schema::columns(conn, src_nsp, src_table).await?;
809815
let query = build_query(&columns, src_table, dst_nsp, shard_nsps).map_err(|_| {
810816
anyhow!(
811817
"failed to generate 'create foreign table' query for {}.{}",
@@ -817,7 +823,7 @@ pub fn create_cross_shard_view(
817823
}
818824

819825
/// Checks in the database if a given index is valid.
820-
pub(crate) fn check_index_is_valid(
826+
pub(crate) async fn check_index_is_valid(
821827
conn: &mut PgConnection,
822828
schema_name: &str,
823829
index_name: &str,
@@ -848,7 +854,7 @@ pub(crate) fn check_index_is_valid(
848854
Ok(matches!(result, Some(true)))
849855
}
850856

851-
pub(crate) fn indexes_for_table(
857+
pub(crate) async fn indexes_for_table(
852858
conn: &mut PgConnection,
853859
schema_name: &str,
854860
table_name: &str,
@@ -878,7 +884,7 @@ pub(crate) fn indexes_for_table(
878884
Ok(results.into_iter().map(|i| i.def).collect())
879885
}
880886

881-
pub(crate) fn drop_index(
887+
pub(crate) async fn drop_index(
882888
conn: &mut PgConnection,
883889
schema_name: &str,
884890
index_name: &str,
@@ -895,7 +901,7 @@ pub(crate) fn drop_index(
895901
/// Return by how much the slowest replica connected to the database `conn`
896902
/// is lagging. The returned value has millisecond precision. If the
897903
/// database has no replicas, return `0`
898-
pub(crate) fn replication_lag(conn: &mut PgConnection) -> Result<Duration, StoreError> {
904+
pub(crate) async fn replication_lag(conn: &mut PgConnection) -> Result<Duration, StoreError> {
899905
#[derive(Queryable, QueryableByName)]
900906
struct Lag {
901907
#[diesel(sql_type = Nullable<Integer>)]
@@ -916,7 +922,7 @@ pub(crate) fn replication_lag(conn: &mut PgConnection) -> Result<Duration, Store
916922
Ok(Duration::from_millis(lag))
917923
}
918924

919-
pub(crate) fn cancel_vacuum(
925+
pub(crate) async fn cancel_vacuum(
920926
conn: &mut PgConnection,
921927
namespace: &Namespace,
922928
) -> Result<(), StoreError> {
@@ -934,7 +940,7 @@ pub(crate) fn cancel_vacuum(
934940
Ok(())
935941
}
936942

937-
pub(crate) fn default_stats_target(conn: &mut PgConnection) -> Result<i32, StoreError> {
943+
pub(crate) async fn default_stats_target(conn: &mut PgConnection) -> Result<i32, StoreError> {
938944
#[derive(Queryable, QueryableByName)]
939945
struct Target {
940946
#[diesel(sql_type = Integer)]
@@ -947,7 +953,7 @@ pub(crate) fn default_stats_target(conn: &mut PgConnection) -> Result<i32, Store
947953
Ok(target.setting)
948954
}
949955

950-
pub(crate) fn stats_targets(
956+
pub(crate) async fn stats_targets(
951957
conn: &mut PgConnection,
952958
namespace: &Namespace,
953959
) -> Result<BTreeMap<SqlName, BTreeMap<SqlName, i32>>, StoreError> {
@@ -976,7 +982,7 @@ pub(crate) fn stats_targets(
976982
Ok(map)
977983
}
978984

979-
pub(crate) fn set_stats_target(
985+
pub(crate) async fn set_stats_target(
980986
conn: &mut PgConnection,
981987
namespace: &Namespace,
982988
table: &SqlName,
@@ -997,7 +1003,7 @@ pub(crate) fn set_stats_target(
9971003
/// same logic that Postgres' [autovacuum
9981004
/// daemon](https://www.postgresql.org/docs/current/routine-vacuuming.html#AUTOVACUUM)
9991005
/// uses
1000-
pub(crate) fn needs_autoanalyze(
1006+
pub(crate) async fn needs_autoanalyze(
10011007
conn: &mut PgConnection,
10021008
namespace: &Namespace,
10031009
) -> Result<Vec<SqlName>, StoreError> {
@@ -1025,7 +1031,7 @@ pub(crate) fn needs_autoanalyze(
10251031

10261032
/// Check whether the database for `conn` supports the `minmax_multi_ops`
10271033
/// introduced in Postgres 14
1028-
fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result<bool, StoreError> {
1034+
async fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result<bool, StoreError> {
10291035
const QUERY: &str = "select count(*) = 2 as has_ops \
10301036
from pg_opclass \
10311037
where opcname in('int8_minmax_multi_ops', 'int4_minmax_multi_ops')";
@@ -1041,7 +1047,7 @@ fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result<bool, StoreError> {
10411047

10421048
/// Check whether the database for `conn` has the column
10431049
/// `pg_stats.range_bounds_histogram` introduced in Postgres 17
1044-
fn pg_stats_has_range_bounds_histogram(conn: &mut PgConnection) -> Result<bool, StoreError> {
1050+
async fn pg_stats_has_range_bounds_histogram(conn: &mut PgConnection) -> Result<bool, StoreError> {
10451051
#[derive(Queryable, QueryableByName)]
10461052
struct HasIt {
10471053
#[diesel(sql_type = Bool)]

0 commit comments

Comments
 (0)