@@ -2,6 +2,7 @@ use anyhow::Result;
22use async_trait:: async_trait;
33use chrono:: Utc ;
44use diesel:: prelude:: * ;
5+ use diesel_async:: RunQueryDsl ;
56use graphman_store:: CommandKind ;
67use graphman_store:: Execution ;
78use graphman_store:: ExecutionId ;
@@ -27,7 +28,7 @@ impl GraphmanStore {
2728#[ async_trait]
2829impl graphman_store:: GraphmanStore for GraphmanStore {
2930 async fn new_execution ( & self , kind : CommandKind ) -> Result < ExecutionId > {
30- let mut conn = self . primary_pool . get_sync ( ) . await ?;
31+ let mut conn = self . primary_pool . get ( ) . await ?;
3132
3233 let id: i64 = diesel:: insert_into ( gce:: table)
3334 . values ( (
@@ -36,20 +37,21 @@ impl graphman_store::GraphmanStore for GraphmanStore {
3637 gce:: created_at. eq ( Utc :: now ( ) ) ,
3738 ) )
3839 . returning ( gce:: id)
39- . get_result ( & mut conn) ?;
40+ . get_result ( & mut conn)
41+ . await ?;
4042
4143 Ok ( ExecutionId ( id) )
4244 }
4345
4446 async fn load_execution ( & self , id : ExecutionId ) -> Result < Execution > {
45- let mut conn = self . primary_pool . get_sync ( ) . await ?;
46- let execution = gce:: table. find ( id) . first ( & mut conn) ?;
47+ let mut conn = self . primary_pool . get ( ) . await ?;
48+ let execution = gce:: table. find ( id) . first ( & mut conn) . await ?;
4749
4850 Ok ( execution)
4951 }
5052
5153 async fn mark_execution_as_running ( & self , id : ExecutionId ) -> Result < ( ) > {
52- let mut conn = self . primary_pool . get_sync ( ) . await ?;
54+ let mut conn = self . primary_pool . get ( ) . await ?;
5355
5456 diesel:: update ( gce:: table)
5557 . set ( (
@@ -58,13 +60,14 @@ impl graphman_store::GraphmanStore for GraphmanStore {
5860 ) )
5961 . filter ( gce:: id. eq ( id) )
6062 . filter ( gce:: completed_at. is_null ( ) )
61- . execute ( & mut conn) ?;
63+ . execute ( & mut conn)
64+ . await ?;
6265
6366 Ok ( ( ) )
6467 }
6568
6669 async fn mark_execution_as_failed ( & self , id : ExecutionId , error_message : String ) -> Result < ( ) > {
67- let mut conn = self . primary_pool . get_sync ( ) . await ?;
70+ let mut conn = self . primary_pool . get ( ) . await ?;
6871
6972 diesel:: update ( gce:: table)
7073 . set ( (
@@ -73,21 +76,23 @@ impl graphman_store::GraphmanStore for GraphmanStore {
7376 gce:: completed_at. eq ( Utc :: now ( ) ) ,
7477 ) )
7578 . filter ( gce:: id. eq ( id) )
76- . execute ( & mut conn) ?;
79+ . execute ( & mut conn)
80+ . await ?;
7781
7882 Ok ( ( ) )
7983 }
8084
8185 async fn mark_execution_as_succeeded ( & self , id : ExecutionId ) -> Result < ( ) > {
82- let mut conn = self . primary_pool . get_sync ( ) . await ?;
86+ let mut conn = self . primary_pool . get ( ) . await ?;
8387
8488 diesel:: update ( gce:: table)
8589 . set ( (
8690 gce:: status. eq ( ExecutionStatus :: Succeeded ) ,
8791 gce:: completed_at. eq ( Utc :: now ( ) ) ,
8892 ) )
8993 . filter ( gce:: id. eq ( id) )
90- . execute ( & mut conn) ?;
94+ . execute ( & mut conn)
95+ . await ?;
9196
9297 Ok ( ( ) )
9398 }
0 commit comments