Skip to content

Commit 9297f75

Browse files
committed
Make job queue handler more resilient against panics
1 parent f111973 commit 9297f75

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

site/src/job_queue/mod.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -457,28 +457,36 @@ async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> {
457457
Ok(())
458458
}
459459

460-
/// Entry point for the cron job that manages the benchmark request and job queue.
461-
pub async fn create_queue_process(
460+
/// Entry point for the job queue handler, a "cron job" that manages benchmark requests and
461+
/// the job queue.
462+
pub async fn create_job_queue_process(
462463
site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>,
463464
run_interval: Duration,
464465
) {
466+
// The job queue process will be restarted in case of panics.
467+
// If it panicked repeatedly because of some transient error, it could lead to 100% CPU
468+
// utilization and a panic loop.
469+
// We thus ensure that we will always wait for the specified interval **first** before
470+
// attempting to run the cron job. In that case even if it panics everytime, the panic won't
471+
// happen more often than N seconds.
465472
let mut interval = time::interval(run_interval);
466473
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
474+
interval.reset();
467475

468476
let ctxt = site_ctxt.clone();
469477

470478
loop {
479+
interval.tick().await;
480+
471481
if let Some(ctxt_clone) = {
472482
let guard = ctxt.read();
473483
guard.as_ref().cloned()
474484
} {
475485
match perform_queue_tick(&ctxt_clone).await {
476-
Ok(_) => log::info!("Cron job finished"),
477-
Err(e) => log::error!("Cron job failed to execute: {e:?}"),
486+
Ok(_) => log::info!("Job queue handler finished"),
487+
Err(e) => log::error!("Job queue handler failed: {e:?}"),
478488
}
479489
}
480-
481-
interval.tick().await;
482490
}
483491
}
484492

site/src/main.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::future::FutureExt;
22
use parking_lot::RwLock;
3-
use site::job_queue::{create_queue_process, is_job_queue_enabled};
3+
use site::job_queue::{create_job_queue_process, is_job_queue_enabled};
44
use site::load;
55
use std::env;
66
use std::sync::Arc;
@@ -59,15 +59,19 @@ async fn main() {
5959

6060
let server = site::server::start(ctxt.clone(), port).fuse();
6161

62-
if is_job_queue_enabled() {
62+
let create_job_queue_handler = |ctxt: Arc<RwLock<Option<Arc<load::SiteCtxt>>>>| {
6363
task::spawn(async move {
64-
create_queue_process(
65-
ctxt.clone(),
66-
Duration::from_secs(queue_update_interval_seconds),
67-
)
68-
.await;
69-
});
70-
}
64+
if is_job_queue_enabled() {
65+
create_job_queue_process(ctxt, Duration::from_secs(queue_update_interval_seconds))
66+
.await;
67+
} else {
68+
futures::future::pending::<()>().await;
69+
}
70+
})
71+
.fuse()
72+
};
73+
74+
let mut job_queue_handler = create_job_queue_handler(ctxt.clone());
7175

7276
futures::pin_mut!(server);
7377
futures::pin_mut!(fut);
@@ -85,6 +89,17 @@ async fn main() {
8589
}
8690
}
8791
}
92+
// We want to have a panic boundary here; if the job queue handler panics for any
93+
// reason (e.g. a transient networking/DB issue), we want to continue running it in the
94+
// future.
95+
// We thus use tokio::task::spawn, wait for a potential panic, and then restart the
96+
// task again.
97+
res = job_queue_handler => {
98+
// The job queue handler task future has "finished", which means that it had to crash.
99+
let error = res.expect_err("Job queue handler finished without an error");
100+
log::error!("The job queue handler has panicked\n{error:?}\nIt will be now restarted");
101+
job_queue_handler = create_job_queue_handler(ctxt.clone());
102+
}
88103
}
89104
}
90105
}

0 commit comments

Comments
 (0)