|
66 | 66 | //! ``` |
67 | 67 |
|
68 | 68 | #![warn(missing_docs)] |
| 69 | + |
69 | 70 | use diesel::backend::Backend; |
70 | 71 | use diesel::query_builder::{AsQuery, QueryFragment, QueryId}; |
| 72 | +use diesel::result::Error; |
71 | 73 | use diesel::row::Row; |
72 | 74 | use diesel::{ConnectionResult, QueryResult}; |
73 | 75 | use futures_util::{Future, Stream}; |
| 76 | +use std::fmt::Debug; |
74 | 77 |
|
75 | 78 | pub use scoped_futures; |
76 | | -use scoped_futures::ScopedBoxFuture; |
| 79 | +use scoped_futures::{ScopedBoxFuture, ScopedFutureExt}; |
77 | 80 |
|
78 | 81 | #[cfg(feature = "mysql")] |
79 | 82 | mod mysql; |
@@ -254,6 +257,65 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send { |
254 | 257 | Ok(()) |
255 | 258 | } |
256 | 259 |
|
| 260 | + /// Executes the given function inside a transaction, but does not commit |
| 261 | + /// it. Panics if the given function returns an error. |
| 262 | + /// |
| 263 | + /// # Example |
| 264 | + /// |
| 265 | + /// ```rust |
| 266 | + /// # include!("doctest_setup.rs"); |
| 267 | + /// use diesel::result::Error; |
| 268 | + /// use scoped_futures::ScopedFutureExt; |
| 269 | + /// |
| 270 | + /// # #[tokio::main(flavor = "current_thread")] |
| 271 | + /// # async fn main() { |
| 272 | + /// # run_test().await.unwrap(); |
| 273 | + /// # } |
| 274 | + /// # |
| 275 | + /// # async fn run_test() -> QueryResult<()> { |
| 276 | + /// # use schema::users::dsl::*; |
| 277 | + /// # let conn = &mut establish_connection().await; |
| 278 | + /// conn.test_transaction::<_, Error, _>(|conn| async move { |
| 279 | + /// diesel::insert_into(users) |
| 280 | + /// .values(name.eq("Ruby")) |
| 281 | + /// .execute(conn) |
| 282 | + /// .await?; |
| 283 | + /// |
| 284 | + /// let all_names = users.select(name).load::<String>(conn).await?; |
| 285 | + /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names); |
| 286 | + /// |
| 287 | + /// Ok(()) |
| 288 | + /// }.scope_boxed()).await; |
| 289 | + /// |
| 290 | + /// // Even though we returned `Ok`, the transaction wasn't committed. |
| 291 | + /// let all_names = users.select(name).load::<String>(conn).await?; |
| 292 | + /// assert_eq!(vec!["Sean", "Tess"], all_names); |
| 293 | + /// # Ok(()) |
| 294 | + /// # } |
| 295 | + /// ``` |
| 296 | + async fn test_transaction<'a, R, E, F>(&'a mut self, f: F) -> R |
| 297 | + where |
| 298 | + F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a, |
| 299 | + E: Debug + Send + 'a, |
| 300 | + R: Send + 'a, |
| 301 | + Self: 'a, |
| 302 | + { |
| 303 | + use futures_util::TryFutureExt; |
| 304 | + |
| 305 | + let mut user_result = None; |
| 306 | + let _ = self |
| 307 | + .transaction::<R, _, _>(|c| { |
| 308 | + f(c).map_err(|_| Error::RollbackTransaction) |
| 309 | + .and_then(|r| { |
| 310 | + user_result = Some(r); |
| 311 | + futures_util::future::ready(Err(Error::RollbackTransaction)) |
| 312 | + }) |
| 313 | + .scope_boxed() |
| 314 | + }) |
| 315 | + .await; |
| 316 | + user_result.expect("Transaction did not succeed") |
| 317 | + } |
| 318 | + |
257 | 319 | #[doc(hidden)] |
258 | 320 | fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> |
259 | 321 | where |
|
0 commit comments