11use futures:: future:: FutureExt ;
22use parking_lot:: RwLock ;
3- use site:: job_queue:: { create_queue_process , is_job_queue_enabled} ;
3+ use site:: job_queue:: { create_job_queue_process , is_job_queue_enabled} ;
44use site:: load;
55use std:: env;
66use std:: sync:: Arc ;
@@ -59,15 +59,19 @@ async fn main() {
5959
6060 let server = site:: server:: start ( ctxt. clone ( ) , port) . fuse ( ) ;
6161
62- if is_job_queue_enabled ( ) {
62+ let create_job_queue_handler = | ctxt : Arc < RwLock < Option < Arc < load :: SiteCtxt > > > > | {
6363 task:: spawn ( async move {
64- create_queue_process (
65- ctxt. clone ( ) ,
66- Duration :: from_secs ( queue_update_interval_seconds) ,
67- )
68- . await ;
69- } ) ;
70- }
64+ if is_job_queue_enabled ( ) {
65+ create_job_queue_process ( ctxt, Duration :: from_secs ( queue_update_interval_seconds) )
66+ . await ;
67+ } else {
68+ futures:: future:: pending :: < ( ) > ( ) . await ;
69+ }
70+ } )
71+ . fuse ( )
72+ } ;
73+
74+ let mut job_queue_handler = create_job_queue_handler ( ctxt. clone ( ) ) ;
7175
7276 futures:: pin_mut!( server) ;
7377 futures:: pin_mut!( fut) ;
@@ -85,6 +89,17 @@ async fn main() {
8589 }
8690 }
8791 }
92+ // We want to have a panic boundary here; if the job queue handler panics for any
93+ // reason (e.g. a transient networking/DB issue), we want to continue running it in the
94+ // future.
95+ // We thus use tokio::task::spawn, wait for a potential panic, and then restart the
96+ // task again.
97+ res = job_queue_handler => {
98+ // The job queue handler task future has "finished", which means that it had to crash.
99+ let error = res. expect_err( "Job queue handler finished without an error" ) ;
100+ log:: error!( "The job queue handler has panicked\n {error:?}\n It will be now restarted" ) ;
101+ job_queue_handler = create_job_queue_handler( ctxt. clone( ) ) ;
102+ }
88103 }
89104 }
90105}
0 commit comments