66//!
77//! * using a sync Connection implementation in async context
88//! * using the same code base for async crates needing multiple backends
9- use std:: error:: Error ;
109use futures_util:: future:: BoxFuture ;
10+ use std:: error:: Error ;
1111
1212#[ cfg( feature = "sqlite" ) ]
1313mod sqlite;
@@ -71,7 +71,8 @@ pub trait SpawnBlocking {
7171/// # }
7272/// ```
7373#[ cfg( feature = "tokio" ) ]
74- pub type SyncConnectionWrapper < C , B = self :: implementation:: Tokio > = self :: implementation:: SyncConnectionWrapper < C , B > ;
74+ pub type SyncConnectionWrapper < C , B = self :: implementation:: Tokio > =
75+ self :: implementation:: SyncConnectionWrapper < C , B > ;
7576
7677/// A wrapper of a [`diesel::connection::Connection`] usable in async context.
7778///
@@ -106,7 +107,9 @@ mod implementation {
106107
107108 use super :: * ;
108109
109- fn from_spawn_blocking_error ( error : Box < dyn Error + Send + Sync + ' static > ) -> diesel:: result:: Error {
110+ fn from_spawn_blocking_error (
111+ error : Box < dyn Error + Send + Sync + ' static > ,
112+ ) -> diesel:: result:: Error {
110113 diesel:: result:: Error :: DatabaseError (
111114 diesel:: result:: DatabaseErrorKind :: UnableToSendCommand ,
112115 Box :: new ( error. to_string ( ) ) ,
@@ -149,18 +152,21 @@ mod implementation {
149152 // SpawnBlocking bounds
150153 S : SpawnBlocking + Send ,
151154 {
152- type LoadFuture < ' conn , ' query > = BoxFuture < ' query , QueryResult < Self :: Stream < ' conn , ' query > > > ;
155+ type LoadFuture < ' conn , ' query > =
156+ BoxFuture < ' query , QueryResult < Self :: Stream < ' conn , ' query > > > ;
153157 type ExecuteFuture < ' conn , ' query > = BoxFuture < ' query , QueryResult < usize > > ;
154158 type Stream < ' conn , ' query > = BoxStream < ' static , QueryResult < Self :: Row < ' conn , ' query > > > ;
155159 type Row < ' conn , ' query > = O ;
156160 type Backend = <C as Connection >:: Backend ;
157- type TransactionManager = SyncTransactionManagerWrapper < <C as Connection >:: TransactionManager > ;
161+ type TransactionManager =
162+ SyncTransactionManagerWrapper < <C as Connection >:: TransactionManager > ;
158163
159164 async fn establish ( database_url : & str ) -> ConnectionResult < Self > {
160165 let database_url = database_url. to_string ( ) ;
161166 let mut runtime = S :: get_runtime ( ) ;
162167
163- runtime. spawn_blocking ( move || C :: establish ( & database_url) )
168+ runtime
169+ . spawn_blocking ( move || C :: establish ( & database_url) )
164170 . await
165171 . unwrap_or_else ( |e| Err ( diesel:: ConnectionError :: BadConnection ( e. to_string ( ) ) ) )
166172 . map ( move |c| SyncConnectionWrapper :: with_runtime ( c, runtime) )
@@ -192,16 +198,22 @@ mod implementation {
192198 . boxed ( )
193199 }
194200
195- fn execute_returning_count < ' query , T > ( & mut self , source : T ) -> Self :: ExecuteFuture < ' _ , ' query >
201+ fn execute_returning_count < ' query , T > (
202+ & mut self ,
203+ source : T ,
204+ ) -> Self :: ExecuteFuture < ' _ , ' query >
196205 where
197206 T : QueryFragment < Self :: Backend > + QueryId ,
198207 {
199- self . execute_with_prepared_query ( source, |conn, query| conn. execute_returning_count ( & query) )
208+ self . execute_with_prepared_query ( source, |conn, query| {
209+ conn. execute_returning_count ( & query)
210+ } )
200211 }
201212
202213 fn transaction_state (
203214 & mut self ,
204- ) -> & mut <Self :: TransactionManager as TransactionManager < Self > >:: TransactionStateData {
215+ ) -> & mut <Self :: TransactionManager as TransactionManager < Self > >:: TransactionStateData
216+ {
205217 self . exclusive_connection ( ) . transaction_state ( )
206218 }
207219
@@ -344,16 +356,17 @@ mod implementation {
344356 S : SpawnBlocking ,
345357 {
346358 let inner = self . inner . clone ( ) ;
347- self . runtime . spawn_blocking ( move || {
348- let mut inner = inner. lock ( ) . unwrap_or_else ( |poison| {
349- // try to be resilient by providing the guard
350- inner. clear_poison ( ) ;
351- poison. into_inner ( )
352- } ) ;
353- task ( & mut inner)
354- } )
355- . unwrap_or_else ( |err| QueryResult :: Err ( from_spawn_blocking_error ( err) ) )
356- . boxed ( )
359+ self . runtime
360+ . spawn_blocking ( move || {
361+ let mut inner = inner. lock ( ) . unwrap_or_else ( |poison| {
362+ // try to be resilient by providing the guard
363+ inner. clear_poison ( ) ;
364+ poison. into_inner ( )
365+ } ) ;
366+ task ( & mut inner)
367+ } )
368+ . unwrap_or_else ( |err| QueryResult :: Err ( from_spawn_blocking_error ( err) ) )
369+ . boxed ( )
357370 }
358371
359372 fn execute_with_prepared_query < ' a , MD , Q , R > (
@@ -448,7 +461,7 @@ mod implementation {
448461 #[ cfg( feature = "tokio" ) ]
449462 pub enum Tokio {
450463 Handle ( tokio:: runtime:: Handle ) ,
451- Runtime ( tokio:: runtime:: Runtime )
464+ Runtime ( tokio:: runtime:: Runtime ) ,
452465 }
453466
454467 #[ cfg( feature = "tokio" ) ]
@@ -462,12 +475,10 @@ mod implementation {
462475 {
463476 let fut = match self {
464477 Tokio :: Handle ( handle) => handle. spawn_blocking ( task) ,
465- Tokio :: Runtime ( runtime) => runtime. spawn_blocking ( task)
478+ Tokio :: Runtime ( runtime) => runtime. spawn_blocking ( task) ,
466479 } ;
467480
468- fut
469- . map_err ( |err| Box :: from ( err) )
470- . boxed ( )
481+ fut. map_err ( |err| Box :: from ( err) ) . boxed ( )
471482 }
472483
473484 fn get_runtime ( ) -> Self {
0 commit comments