Skip to content

Commit bfd3da9

Browse files
committed
all: Rename graph::task_spawn to tokio and move TEST_RUNTIME there
1 parent c0b4450 commit bfd3da9

File tree

5 files changed

+48
-24
lines changed

5 files changed

+48
-24
lines changed

graph/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ pub mod env;
3838
pub mod ipfs;
3939

4040
/// Wrapper for spawning tasks that abort on panic, which is our default.
41-
mod task_spawn;
42-
pub use task_spawn::{
41+
mod tokio;
42+
#[cfg(debug_assertions)]
43+
pub use tokio::TEST_RUNTIME;
44+
pub use tokio::{
4345
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread,
4446
};
4547

graph/src/log/elastic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl ElasticDrain {
202202
let mut interval = tokio::time::interval(self.config.flush_interval);
203203
let max_retries = self.config.max_retries;
204204

205-
crate::task_spawn::spawn(async move {
205+
crate::tokio::spawn(async move {
206206
loop {
207207
interval.tick().await;
208208

graph/src/task_spawn.rs renamed to graph/src/tokio.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,49 @@
1-
//! The functions in this module should be used to execute futures, serving as a facade to the
2-
//! underlying executor implementation which currently is tokio. This serves a few purposes:
3-
//! - Avoid depending directly on tokio APIs, making upgrades or a potential switch easier.
4-
//! - Reflect our chosen default semantics of aborting on task panic, offering `*_allow_panic`
5-
//! functions to opt out of that.
6-
//! - Reflect that historically we've used blocking futures due to making DB calls directly within
7-
//! futures. This point should go away once https://github.com/graphprotocol/graph-node/issues/905
8-
//! is resolved. Then the blocking flavors should no longer accept futures but closures.
1+
//! Helpers for dealing with certain aspects of tokio.
92
//!
10-
//! These should not be called from within executors other than tokio, particularly the blocking
11-
//! functions will panic in that case. We should generally avoid mixing executors whenever possible.
3+
//! This module sets up a runtime on which all tests should run, as well as
4+
//! providing some functions for spawning tasks with our desired semantics.
5+
//!
6+
//! The functions in this module should be used to execute futures, serving
7+
//! as a facade to the underlying executor implementation which currently is
8+
//! tokio. This serves a few purposes:
9+
//! - Avoid depending directly on tokio APIs, making upgrades or a potential
10+
//! switch easier.
11+
//! - Reflect our chosen default semantics of aborting on task panic,
12+
//! offering `*_allow_panic` functions to opt out of that.
13+
//! - Reflect that historically we've used blocking futures due to making DB
14+
//! calls directly within futures. This point should go away once
15+
//! https://github.com/graphprotocol/graph-node/issues/905 is resolved.
16+
//! Then the blocking flavors should no longer accept futures but
17+
//! closures.
18+
//!
19+
//! These should not be called from within executors other than tokio,
20+
//! particularly the blocking functions will panic in that case. We should
21+
//! generally avoid mixing executors whenever possible.
1222
1323
use futures03::future::{FutureExt, TryFutureExt};
1424
use std::future::Future as Future03;
1525
use std::panic::AssertUnwindSafe;
1626
use tokio::task::JoinHandle;
1727

28+
#[cfg(debug_assertions)]
29+
use tokio::runtime::{Builder, Runtime};
30+
31+
#[cfg(debug_assertions)]
32+
lazy_static::lazy_static! {
33+
/// The one true runtime for all tests. Tests should use the
34+
/// `graph::test` macro to make sure they are using this runtime, the
35+
/// same way they would use `#[tokio::test]`.
36+
///
37+
/// We need to make sure we use a single runtime because if there are
38+
/// multiple runtimes involved, the task that diesel_async spawns to
39+
/// drive database connections (see `drive_connection` in the
40+
/// `diesel_async` crate) may end up on a different runtime than the one
41+
/// the test is using, leading to that task getting dropped, and the
42+
/// test using a connection receiving a `Connection closed` error.
43+
pub static ref TEST_RUNTIME: Runtime =
44+
Builder::new_multi_thread().enable_all().build().unwrap();
45+
}
46+
1847
fn abort_on_panic<T: Send + 'static>(
1948
f: impl Future03<Output = T> + Send + 'static,
2049
) -> impl Future03<Output = T> {

server/graphman/tests/util/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,14 @@ pub mod server;
44
use std::future::Future;
55
use std::sync::Mutex;
66

7+
use graph::TEST_RUNTIME;
78
use lazy_static::lazy_static;
89
use test_store::store::remove_subgraphs;
910
use test_store::store::PRIMARY_POOL;
10-
use tokio::runtime::Builder;
11-
use tokio::runtime::Runtime;
1211

1312
lazy_static! {
1413
// Used to make sure tests will run sequentially.
1514
static ref SEQ_MUX: Mutex<()> = Mutex::new(());
16-
17-
// One runtime helps share the same server between the tests.
18-
static ref RUNTIME: Runtime = Builder::new_current_thread().enable_all().build().unwrap();
1915
}
2016

2117
pub fn run_test<T, F>(test: T)
@@ -25,7 +21,7 @@ where
2521
{
2622
let _lock = SEQ_MUX.lock().unwrap_or_else(|err| err.into_inner());
2723

28-
RUNTIME.block_on(async {
24+
TEST_RUNTIME.block_on(async {
2925
cleanup_graphman_command_executions_table().await;
3026
remove_subgraphs().await;
3127

store/test-store/src/store.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use std::collections::BTreeSet;
3737
use std::collections::HashMap;
3838
use std::time::Instant;
3939
use std::{marker::PhantomData, sync::Mutex};
40-
use tokio::runtime::{Builder, Runtime};
4140
use web3::types::H256;
4241

4342
pub const NETWORK_NAME: &str = "fake_network";
@@ -54,8 +53,6 @@ lazy_static! {
5453
None => Logger::root(slog::Discard, o!()),
5554
};
5655
static ref SEQ_LOCK: Mutex<()> = Mutex::new(());
57-
pub static ref STORE_RUNTIME: Runtime =
58-
Builder::new_multi_thread().enable_all().build().unwrap();
5956
pub static ref METRICS_REGISTRY: Arc<MetricsRegistry> = Arc::new(MetricsRegistry::mock());
6057
pub static ref LOAD_MANAGER: Arc<LoadManager> = Arc::new(LoadManager::new(
6158
&LOGGER,
@@ -119,7 +116,7 @@ where
119116
Err(err) => err.into_inner(),
120117
};
121118

122-
STORE_RUNTIME.handle().block_on(async {
119+
graph::TEST_RUNTIME.handle().block_on(async {
123120
let store = STORE.clone();
124121
test(store).await
125122
})
@@ -630,7 +627,7 @@ fn build_store() -> (Arc<Store>, ConnectionPool, Config, Arc<SubscriptionManager
630627
.unwrap_or_else(|_| panic!("config is not valid (file={:?})", &opt.config));
631628
let registry = Arc::new(MetricsRegistry::mock());
632629
std::thread::spawn(move || {
633-
STORE_RUNTIME.handle().block_on(async {
630+
graph::TEST_RUNTIME.handle().block_on(async {
634631
let builder = StoreBuilder::new(&LOGGER, &NODE_ID, &config, None, registry).await;
635632
let subscription_manager = builder.subscription_manager();
636633
let primary_pool = builder.primary_pool();

0 commit comments

Comments
 (0)