Skip to content

Commit 564472c

Browse files
committed
store: Asyncify DeploymentStore::get_conn
1 parent 6ce0ff2 commit 564472c

File tree

1 file changed

+41
-39
lines changed

1 file changed

+41
-39
lines changed

store/postgres/src/deployment_store.rs

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl DeploymentStore {
192192
on_sync: OnSync,
193193
index_def: Option<IndexList>,
194194
) -> Result<(), StoreError> {
195-
let mut conn = self.get_conn()?;
195+
let mut conn = self.get_conn().await?;
196196
conn.transaction_async::<_, StoreError, _>(|conn| {
197197
async {
198198
let exists = deployment::exists(conn, &site).await?;
@@ -263,7 +263,7 @@ impl DeploymentStore {
263263
&self,
264264
site: Arc<Site>,
265265
) -> Result<SubgraphDeploymentEntity, StoreError> {
266-
let mut conn = self.get_conn()?;
266+
let mut conn = self.get_conn().await?;
267267
let layout = self.layout(&mut conn, site.clone()).await?;
268268
Ok(
269269
detail::deployment_entity(&mut conn, &site, &layout.input_schema)
@@ -274,7 +274,7 @@ impl DeploymentStore {
274274
// Remove the data and metadata for the deployment `site`. This operation
275275
// is not reversible
276276
pub(crate) async fn drop_deployment(&self, site: &Site) -> Result<(), StoreError> {
277-
let mut conn = self.get_conn()?;
277+
let mut conn = self.get_conn().await?;
278278
conn.transaction_async(|conn| {
279279
async {
280280
crate::deployment::drop_schema(conn, &site.namespace).await?;
@@ -409,8 +409,10 @@ impl DeploymentStore {
409409
}
410410

411411
/// Deprecated. Use `with_conn` instead.
412-
fn get_conn(&self) -> Result<PooledConnection<ConnectionManager<PgConnection>>, StoreError> {
413-
self.pool.get()
412+
async fn get_conn(
413+
&self,
414+
) -> Result<PooledConnection<ConnectionManager<PgConnection>>, StoreError> {
415+
self.pool.get_async().await
414416
}
415417

416418
/// Panics if `idx` is not a valid index for a read only pool.
@@ -426,7 +428,7 @@ impl DeploymentStore {
426428
replica: ReplicaId,
427429
) -> Result<PooledConnection<ConnectionManager<PgConnection>>, Error> {
428430
let conn = match replica {
429-
ReplicaId::Main => self.get_conn()?,
431+
ReplicaId::Main => self.get_conn().await?,
430432
ReplicaId::ReadOnly(idx) => self.read_only_conn(idx).await?,
431433
};
432434
Ok(conn)
@@ -469,7 +471,7 @@ impl DeploymentStore {
469471
return Ok(layout);
470472
}
471473

472-
let mut conn = self.get_conn()?;
474+
let mut conn = self.get_conn().await?;
473475
self.layout(&mut conn, site).await
474476
}
475477

@@ -533,7 +535,7 @@ impl DeploymentStore {
533535
return Ok(info.clone());
534536
}
535537

536-
let mut conn = self.get_conn()?;
538+
let mut conn = self.get_conn().await?;
537539
self.subgraph_info_with_conn(&mut conn, site).await
538540
}
539541

@@ -548,7 +550,7 @@ impl DeploymentStore {
548550
&self,
549551
ids: Vec<String>,
550552
) -> Result<Vec<DeploymentDetail>, StoreError> {
551-
let conn = &mut *self.get_conn()?;
553+
let conn = &mut *self.get_conn().await?;
552554
detail::deployment_details(conn, ids)
553555
}
554556

@@ -557,23 +559,23 @@ impl DeploymentStore {
557559
locator: &DeploymentLocator,
558560
) -> Result<DeploymentDetail, StoreError> {
559561
let id = DeploymentId::from(locator.clone());
560-
let conn = &mut *self.get_conn()?;
562+
let conn = &mut *self.get_conn().await?;
561563
detail::deployment_details_for_id(conn, &id)
562564
}
563565

564566
pub(crate) async fn deployment_statuses(
565567
&self,
566568
sites: &[Arc<Site>],
567569
) -> Result<Vec<status::Info>, StoreError> {
568-
let conn = &mut *self.get_conn()?;
570+
let conn = &mut *self.get_conn().await?;
569571
detail::deployment_statuses(conn, sites)
570572
}
571573

572574
pub(crate) async fn deployment_exists_and_synced(
573575
&self,
574576
id: &DeploymentHash,
575577
) -> Result<bool, StoreError> {
576-
let mut conn = self.get_conn()?;
578+
let mut conn = self.get_conn().await?;
577579
deployment::exists_and_synced(&mut conn, id.as_str()).await
578580
}
579581

@@ -582,14 +584,14 @@ impl DeploymentStore {
582584
id: &DeploymentHash,
583585
block_ptr: BlockPtr,
584586
) -> Result<(), StoreError> {
585-
let mut conn = self.get_conn()?;
587+
let mut conn = self.get_conn().await?;
586588
conn.transaction_async(|conn| deployment::set_synced(conn, id, block_ptr).scope_boxed())
587589
.await
588590
}
589591

590592
/// Look up the on_sync action for this deployment
591593
pub(crate) async fn on_sync(&self, site: &Site) -> Result<OnSync, StoreError> {
592-
let mut conn = self.get_conn()?;
594+
let mut conn = self.get_conn().await?;
593595
deployment::on_sync(&mut conn, site.id).await
594596
}
595597

@@ -599,7 +601,7 @@ impl DeploymentStore {
599601
&self,
600602
site: &Site,
601603
) -> Result<Option<DeploymentId>, StoreError> {
602-
let mut conn = self.get_conn()?;
604+
let mut conn = self.get_conn().await?;
603605
crate::copy::source(&mut conn, site).await
604606
}
605607

@@ -609,7 +611,7 @@ impl DeploymentStore {
609611
&self,
610612
namespace: &crate::primary::Namespace,
611613
) -> Result<(), StoreError> {
612-
let mut conn = self.get_conn()?;
614+
let mut conn = self.get_conn().await?;
613615
deployment::drop_schema(&mut conn, namespace).await
614616
}
615617

@@ -631,7 +633,7 @@ impl DeploymentStore {
631633
delete from active_copies;
632634
";
633635

634-
let mut conn = self.get_conn()?;
636+
let mut conn = self.get_conn().await?;
635637
conn.batch_execute(QUERY)?;
636638
conn.batch_execute("delete from deployment_schemas;")?;
637639
Ok(())
@@ -651,7 +653,7 @@ impl DeploymentStore {
651653
site: Arc<Site>,
652654
entity: Option<&str>,
653655
) -> Result<(), StoreError> {
654-
let mut conn = self.get_conn()?;
656+
let mut conn = self.get_conn().await?;
655657
let layout = self.layout(&mut conn, site).await?;
656658
let tables = entity
657659
.map(|entity| resolve_table_name(&layout, entity))
@@ -668,7 +670,7 @@ impl DeploymentStore {
668670
&self,
669671
site: Arc<Site>,
670672
) -> Result<(i32, BTreeMap<SqlName, BTreeMap<SqlName, i32>>), StoreError> {
671-
let mut conn = self.get_conn()?;
673+
let mut conn = self.get_conn().await?;
672674
let default = catalog::default_stats_target(&mut conn).await?;
673675
let targets = catalog::stats_targets(&mut conn, &site.namespace).await?;
674676

@@ -682,7 +684,7 @@ impl DeploymentStore {
682684
columns: Vec<String>,
683685
target: i32,
684686
) -> Result<(), StoreError> {
685-
let mut conn = self.get_conn()?;
687+
let mut conn = self.get_conn().await?;
686688
let layout = self.layout(&mut conn, site.clone()).await?;
687689

688690
let tables = entity
@@ -787,7 +789,7 @@ impl DeploymentStore {
787789

788790
pub(crate) async fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
789791
let store = self.clone();
790-
let mut binding = self.get_conn()?;
792+
let mut binding = self.get_conn().await?;
791793
let conn = binding.deref_mut();
792794
IndexList::load(conn, site, store).await
793795
}
@@ -845,7 +847,7 @@ impl DeploymentStore {
845847
// will use the updated value
846848
self.layout_cache.remove(site);
847849

848-
let mut conn = self.get_conn()?;
850+
let mut conn = self.get_conn().await?;
849851
deployment::set_history_blocks(&mut conn, site, history_blocks).await
850852
}
851853

@@ -959,7 +961,7 @@ impl DeploymentStore {
959961
) -> Result<Option<BlockTime>, StoreError> {
960962
let store = self.cheap_clone();
961963

962-
let mut conn = self.get_conn()?;
964+
let mut conn = self.get_conn().await?;
963965
let layout = store.layout(&mut conn, site.cheap_clone()).await?;
964966
layout.last_rollup(&mut conn).await
965967
}
@@ -1069,7 +1071,7 @@ impl DeploymentStore {
10691071
key: &EntityKey,
10701072
block: BlockNumber,
10711073
) -> Result<Option<Entity>, StoreError> {
1072-
let mut conn = self.get_conn()?;
1074+
let mut conn = self.get_conn().await?;
10731075
let layout = self.layout(&mut conn, site).await?;
10741076
layout.find(&mut conn, key, block)
10751077
}
@@ -1085,7 +1087,7 @@ impl DeploymentStore {
10851087
if ids_for_type.is_empty() {
10861088
return Ok(BTreeMap::new());
10871089
}
1088-
let mut conn = self.get_conn()?;
1090+
let mut conn = self.get_conn().await?;
10891091
let layout = self.layout(&mut conn, site).await?;
10901092

10911093
layout.find_many(&mut conn, ids_for_type, block)
@@ -1098,7 +1100,7 @@ impl DeploymentStore {
10981100
causality_region: CausalityRegion,
10991101
block_range: Range<BlockNumber>,
11001102
) -> Result<BTreeMap<BlockNumber, Vec<EntitySourceOperation>>, StoreError> {
1101-
let mut conn = self.get_conn()?;
1103+
let mut conn = self.get_conn().await?;
11021104
let layout = self.layout(&mut conn, site).await?;
11031105
layout.find_range(&mut conn, entity_types, causality_region, block_range)
11041106
}
@@ -1110,7 +1112,7 @@ impl DeploymentStore {
11101112
block: BlockNumber,
11111113
excluded_keys: &Vec<EntityKey>,
11121114
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
1113-
let mut conn = self.get_conn()?;
1115+
let mut conn = self.get_conn().await?;
11141116
let layout = self.layout(&mut conn, site).await?;
11151117
layout.find_derived(&mut conn, derived_query, block, excluded_keys)
11161118
}
@@ -1120,7 +1122,7 @@ impl DeploymentStore {
11201122
site: Arc<Site>,
11211123
block: BlockNumber,
11221124
) -> Result<Vec<EntityOperation>, StoreError> {
1123-
let mut conn = self.get_conn()?;
1125+
let mut conn = self.get_conn().await?;
11241126
let layout = self.layout(&mut conn, site).await?;
11251127
let changes = layout.find_changes(&mut conn, block)?;
11261128

@@ -1134,7 +1136,7 @@ impl DeploymentStore {
11341136
site: Arc<Site>,
11351137
query: EntityQuery,
11361138
) -> Result<Vec<Entity>, QueryExecutionError> {
1137-
let mut conn = self.get_conn()?;
1139+
let mut conn = self.get_conn().await?;
11381140
self.execute_query(&mut conn, site, query)
11391141
.await
11401142
.map(|(entities, _)| entities)
@@ -1151,7 +1153,7 @@ impl DeploymentStore {
11511153
) -> Result<(), StoreError> {
11521154
let mut conn = {
11531155
let _section = stopwatch.start_section("transact_blocks_get_conn");
1154-
self.get_conn()?
1156+
self.get_conn().await?
11551157
};
11561158

11571159
let (layout, earliest_block) = deployment::with_lock(&mut conn, &site, async |conn| {
@@ -1384,7 +1386,7 @@ impl DeploymentStore {
13841386
site: Arc<Site>,
13851387
block_ptr_to: BlockPtr,
13861388
) -> Result<(), StoreError> {
1387-
let mut conn = self.get_conn()?;
1389+
let mut conn = self.get_conn().await?;
13881390

13891391
let block_ptr_from = Self::block_ptr_with_conn(&mut conn, site.cheap_clone()).await?;
13901392

@@ -1415,7 +1417,7 @@ impl DeploymentStore {
14151417
site: Arc<Site>,
14161418
block_ptr_to: BlockPtr,
14171419
) -> Result<(), StoreError> {
1418-
let mut conn = self.get_conn()?;
1420+
let mut conn = self.get_conn().await?;
14191421

14201422
let block_ptr_from = Self::block_ptr_with_conn(&mut conn, site.cheap_clone()).await?;
14211423

@@ -1447,7 +1449,7 @@ impl DeploymentStore {
14471449
block_ptr_to: BlockPtr,
14481450
firehose_cursor: &FirehoseCursor,
14491451
) -> Result<(), StoreError> {
1450-
let mut conn = self.get_conn()?;
1452+
let mut conn = self.get_conn().await?;
14511453
// Unwrap: If we are reverting then the block ptr is not `None`.
14521454
let deployment_head = Self::block_ptr_with_conn(&mut conn, site.cheap_clone())
14531455
.await?
@@ -1555,7 +1557,7 @@ impl DeploymentStore {
15551557
&self,
15561558
id: &DeploymentHash,
15571559
) -> Result<Option<(DeploymentHash, BlockPtr)>, StoreError> {
1558-
let mut conn = self.get_conn()?;
1560+
let mut conn = self.get_conn().await?;
15591561
deployment::graft_pending(&mut conn, id).await
15601562
}
15611563

@@ -1614,7 +1616,7 @@ impl DeploymentStore {
16141616
return Err(StoreError::Canceled);
16151617
}
16161618

1617-
let mut conn = self.get_conn()?;
1619+
let mut conn = self.get_conn().await?;
16181620
conn.transaction_async::<(), StoreError, _>(|conn| {
16191621
async {
16201622
// Copy shared dynamic data sources and adjust their ID; if
@@ -1688,7 +1690,7 @@ impl DeploymentStore {
16881690
.await?;
16891691
}
16901692

1691-
let mut conn = self.get_conn()?;
1693+
let mut conn = self.get_conn().await?;
16921694
if ENV_VARS.postpone_attribute_index_creation {
16931695
// check if all indexes are valid and recreate them if they aren't
16941696
self.load_indexes(site.clone())
@@ -1723,7 +1725,7 @@ impl DeploymentStore {
17231725
current_ptr: &BlockPtr,
17241726
parent_ptr: &BlockPtr,
17251727
) -> Result<UnfailOutcome, StoreError> {
1726-
let mut conn = self.get_conn()?;
1728+
let mut conn = self.get_conn().await?;
17271729
let deployment_id = &site.deployment;
17281730

17291731
conn.transaction_async(|conn| {
@@ -1820,7 +1822,7 @@ impl DeploymentStore {
18201822
site: Arc<Site>,
18211823
current_ptr: &BlockPtr,
18221824
) -> Result<UnfailOutcome, StoreError> {
1823-
let mut conn = self.get_conn()?;
1825+
let mut conn = self.get_conn().await?;
18241826
let deployment_id = &site.deployment;
18251827

18261828
conn.transaction_async(|conn| async {
@@ -1883,7 +1885,7 @@ impl DeploymentStore {
18831885

18841886
#[cfg(debug_assertions)]
18851887
pub async fn error_count(&self, id: &DeploymentHash) -> Result<usize, StoreError> {
1886-
let mut conn = self.get_conn()?;
1888+
let mut conn = self.get_conn().await?;
18871889
deployment::error_count(&mut conn, id).await
18881890
}
18891891

0 commit comments

Comments
 (0)