@@ -30,6 +30,7 @@ use diesel::{
3030 Connection as _,
3131} ;
3232use graph:: {
33+ cheap_clone:: CheapClone ,
3334 components:: store:: DeploymentLocator ,
3435 data:: {
3536 store:: scalar:: ToPrimitive ,
@@ -1886,8 +1887,9 @@ pub fn is_empty(conn: &mut PgConnection) -> Result<bool, StoreError> {
18861887/// a query returns either success or anything but a
18871888/// `Err(StoreError::DatabaseUnavailable)`. This only works for tables that
18881889/// are mirrored through `refresh_tables`
1890+ #[ derive( Clone , CheapClone ) ]
18891891pub struct Mirror {
1890- pools : Vec < ConnectionPool > ,
1892+ pools : Arc < Vec < ConnectionPool > > ,
18911893}
18921894
18931895impl Mirror {
@@ -1917,6 +1919,7 @@ impl Mirror {
19171919 pools. push ( pool. clone ( ) ) ;
19181920 pools
19191921 } ) ;
1922+ let pools = Arc :: new ( pools) ;
19201923 Mirror { pools }
19211924 }
19221925
@@ -1925,7 +1928,7 @@ impl Mirror {
19251928 /// used for non-critical uses like command line tools
19261929 pub fn primary_only ( primary : ConnectionPool ) -> Mirror {
19271930 Mirror {
1928- pools : vec ! [ primary] ,
1931+ pools : Arc :: new ( vec ! [ primary] ) ,
19291932 }
19301933 }
19311934
@@ -1940,7 +1943,7 @@ impl Mirror {
19401943 mut f : impl ' a
19411944 + FnMut ( & mut PooledConnection < ConnectionManager < PgConnection > > ) -> Result < T , StoreError > ,
19421945 ) -> Result < T , StoreError > {
1943- for pool in & self . pools {
1946+ for pool in self . pools . as_ref ( ) {
19441947 let mut conn = match pool. get ( ) {
19451948 Ok ( conn) => conn,
19461949 Err ( StoreError :: DatabaseUnavailable ) => continue ,
@@ -1955,6 +1958,27 @@ impl Mirror {
19551958 Err ( StoreError :: DatabaseUnavailable )
19561959 }
19571960
1961+ /// An async version of `read` that spawns a blocking task to do the
1962+ /// actual work. This is useful when you want to call `read` from an
1963+ /// async context
1964+ pub ( crate ) async fn read_async < T , F > ( & self , mut f : F ) -> Result < T , StoreError >
1965+ where
1966+ T : ' static + Send ,
1967+ F : ' static
1968+ + Send
1969+ + FnMut ( & mut PooledConnection < ConnectionManager < PgConnection > > ) -> Result < T , StoreError > ,
1970+ {
1971+ let this = self . cheap_clone ( ) ;
1972+ let res = graph:: spawn_blocking ( async move { this. read ( |conn| f ( conn) ) } ) . await ;
1973+ match res {
1974+ Ok ( v) => v,
1975+ Err ( e) => Err ( internal_error ! (
1976+ "spawn_blocking in read_async failed: {}" ,
1977+ e
1978+ ) ) ,
1979+ }
1980+ }
1981+
19581982 /// Refresh the contents of mirrored tables from the primary (through
19591983 /// the fdw mapping that `ForeignServer` establishes)
19601984 pub ( crate ) fn refresh_tables (
@@ -2050,8 +2074,10 @@ impl Mirror {
20502074 self . read ( |conn| queries:: assignments ( conn, node) )
20512075 }
20522076
2053- pub fn active_assignments ( & self , node : & NodeId ) -> Result < Vec < Site > , StoreError > {
2054- self . read ( |conn| queries:: active_assignments ( conn, node) )
2077+ pub async fn active_assignments ( & self , node : & NodeId ) -> Result < Vec < Site > , StoreError > {
2078+ let node = node. clone ( ) ;
2079+ self . read_async ( move |conn| queries:: active_assignments ( conn, & node) )
2080+ . await
20552081 }
20562082
20572083 pub fn assigned_node ( & self , site : & Site ) -> Result < Option < NodeId > , StoreError > {
0 commit comments