Skip to content

Commit bd58d19

Browse files
committed
store: Make sure migrations run completely on a blocking thread (CI PASSES AGAIN)
1 parent 70b39df commit bd58d19

File tree

1 file changed

+21
-29
lines changed

1 file changed

+21
-29
lines changed

store/postgres/src/pool/mod.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -589,14 +589,11 @@ impl PoolInner {
589589
sql_query("select 1").execute(&mut conn).await.is_ok()
590590
}
591591

592-
async fn locale_check(
593-
&self,
594-
logger: &Logger,
595-
conn: &mut AsyncPgConnection,
596-
) -> Result<(), StoreError> {
592+
async fn locale_check(&self, logger: &Logger) -> Result<(), StoreError> {
593+
let mut conn = self.get().await?;
597594
Ok(
598-
if let Err(msg) = catalog::Locale::load(conn).await?.suitable() {
599-
if &self.shard == &*PRIMARY_SHARD && primary::is_empty(conn).await? {
595+
if let Err(msg) = catalog::Locale::load(&mut conn).await?.suitable() {
596+
if &self.shard == &*PRIMARY_SHARD && primary::is_empty(&mut conn).await? {
600597
const MSG: &str =
601598
"Database does not use C locale. \
602599
Please check the graph-node documentation for how to set up the database locale";
@@ -652,25 +649,24 @@ impl PoolInner {
652649
self: Arc<Self>,
653650
servers: &[ForeignServer],
654651
) -> Result<MigrationCount, StoreError> {
652+
self.locale_check(&self.logger).await?;
653+
655654
self.configure_fdw(servers).await?;
655+
656656
// We use AsyncConnectionWrapper here since diesel_async doesn't
657-
// offer an async analog to `HarnessWithOutput` yet, and we
658-
// therefore use sync infrastructure to run migrations. That's not a
659-
// big deal, since this happens at server startup only.
657+
// offer a truly async way to run migrations, and we need to be very
658+
// careful that block_on only gets called on a blocking thread to
659+
// avoid errors from the tokio runtime
660+
let logger = self.logger.cheap_clone();
660661
let mut conn = self.get().await.map(AsyncConnectionWrapper::from)?;
661-
let (this, count) = conn
662-
.transaction::<_, StoreError, _>(|conn| {
663-
async {
664-
let count = migrate_schema(&self.logger, conn).await?;
665-
Ok((self, count))
666-
}
667-
.scope_boxed()
668-
})
669-
.await?;
670-
671-
this.locale_check(&this.logger, &mut conn).await?;
672662

673-
Ok(count)
663+
tokio::task::spawn_blocking(move || {
664+
diesel::Connection::transaction::<_, StoreError, _>(&mut conn, |conn| {
665+
migrate_schema(&logger, conn)
666+
})
667+
})
668+
.await
669+
.expect("migration task panicked")
674670
}
675671

676672
/// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
@@ -810,11 +806,7 @@ impl MigrationCount {
810806
}
811807

812808
/// Run all schema migrations.
813-
///
814-
/// When multiple `graph-node` processes start up at the same time, we ensure
815-
/// that they do not run migrations in parallel by using `blocking_conn` to
816-
/// serialize them. The `conn` is used to run the actual migration.
817-
async fn migrate_schema(
809+
fn migrate_schema(
818810
logger: &Logger,
819811
conn: &mut AsyncConnectionWrapper<AsyncPgConnection>,
820812
) -> Result<MigrationCount, StoreError> {
@@ -823,7 +815,7 @@ async fn migrate_schema(
823815
// Collect migration logging output
824816
let mut output = vec![];
825817

826-
let old_count = catalog::migration_count(conn).await?;
818+
let old_count = graph::block_on(catalog::migration_count(conn))?;
827819
let mut harness = HarnessWithOutput::new(conn, &mut output);
828820

829821
info!(logger, "Running migrations");
@@ -848,7 +840,7 @@ async fn migrate_schema(
848840
debug!(logger, "Postgres migration output"; "output" => msg);
849841
}
850842

851-
let migrations = catalog::migration_count(conn).await?;
843+
let migrations = graph::block_on(catalog::migration_count(conn))?;
852844

853845
Ok(MigrationCount {
854846
new: migrations,

0 commit comments

Comments
 (0)