Skip to content

Commit 9f71238

Browse files
committed
fix: improve sigterm handling for the entire runtime
1 parent 728f761 commit 9f71238

File tree

16 files changed

+440
-239
lines changed

16 files changed

+440
-239
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/config/src/config/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod guard;
1313
pub mod logs;
1414
pub mod pegboard;
1515
pub mod pubsub;
16+
pub mod runtime;
1617
pub mod telemetry;
1718
pub mod topology;
1819
pub mod vector;
@@ -27,6 +28,7 @@ pub use guard::*;
2728
pub use logs::*;
2829
pub use pegboard::*;
2930
pub use pubsub::PubSub;
31+
pub use runtime::*;
3032
pub use telemetry::*;
3133
pub use topology::*;
3234
pub use vector::*;
@@ -102,7 +104,7 @@ pub struct Root {
102104
pub telemetry: Telemetry,
103105

104106
#[serde(default)]
105-
pub allow_version_rollback: bool,
107+
pub runtime: Runtime,
106108
}
107109

108110
impl Default for Root {
@@ -121,7 +123,7 @@ impl Default for Root {
121123
clickhouse: None,
122124
vector_http: None,
123125
telemetry: Default::default(),
124-
allow_version_rollback: false,
126+
runtime: Default::default(),
125127
}
126128
}
127129
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use std::time::Duration;
2+
3+
use schemars::JsonSchema;
4+
use serde::{Deserialize, Serialize};
5+
6+
#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
7+
pub struct Runtime {
8+
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
9+
/// Defaults to 30 seconds.
10+
worker_shutdown_duration: Option<u32>,
11+
/// Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults
12+
// to 1 hour.
13+
guard_shutdown_duration: Option<u32>,
14+
/// Whether or not to allow running the engine when the previous version that was run is higher than
15+
// the current version.
16+
allow_version_rollback: Option<bool>,
17+
}
18+
19+
impl Runtime {
20+
pub fn worker_shutdown_duration(&self) -> Duration {
21+
Duration::from_secs(self.worker_shutdown_duration.unwrap_or(30) as u64)
22+
}
23+
24+
pub fn guard_shutdown_duration(&self) -> Duration {
25+
Duration::from_secs(self.guard_shutdown_duration.unwrap_or(60 * 60) as u64)
26+
}
27+
28+
pub fn allow_version_rollback(&self) -> bool {
29+
self.allow_version_rollback.unwrap_or_default()
30+
}
31+
}

engine/packages/engine/src/commands/start.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ async fn verify_engine_version(
108108
config: &rivet_config::Config,
109109
pools: &rivet_pools::Pools,
110110
) -> Result<()> {
111-
if config.allow_version_rollback {
111+
if config.runtime.allow_version_rollback() {
112112
return Ok(());
113113
}
114114

engine/packages/engine/src/run_config.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,50 @@ use rivet_service_manager::{RunConfigData, Service, ServiceKind};
33

44
pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
55
let services = vec![
6-
Service::new("api_peer", ServiceKind::ApiPeer, |config, pools| {
7-
Box::pin(rivet_api_peer::start(config, pools))
8-
}),
9-
Service::new("guard", ServiceKind::ApiPublic, |config, pools| {
10-
Box::pin(rivet_guard::start(config, pools))
11-
}),
6+
Service::new(
7+
"api_peer",
8+
ServiceKind::ApiPeer,
9+
|config, pools| Box::pin(rivet_api_peer::start(config, pools)),
10+
false,
11+
),
12+
Service::new(
13+
"guard",
14+
ServiceKind::ApiPublic,
15+
|config, pools| Box::pin(rivet_guard::start(config, pools)),
16+
true,
17+
),
1218
Service::new(
1319
"workflow_worker",
1420
ServiceKind::Standalone,
1521
|config, pools| Box::pin(rivet_workflow_worker::start(config, pools)),
22+
true,
23+
),
24+
Service::new(
25+
"bootstrap",
26+
ServiceKind::Oneshot,
27+
|config, pools| Box::pin(rivet_bootstrap::start(config, pools)),
28+
false,
1629
),
17-
Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| {
18-
Box::pin(rivet_bootstrap::start(config, pools))
19-
}),
2030
Service::new(
2131
"pegboard_serverless",
2232
// There should only be one of these, since it's auto-scaling requests
2333
ServiceKind::Singleton,
2434
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
35+
false,
2536
),
2637
// Core services
27-
Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| {
28-
Box::pin(rivet_tracing_reconfigure::start(config, pools))
29-
}),
30-
Service::new("cache_purge", ServiceKind::Core, |config, pools| {
31-
Box::pin(rivet_cache_purge::start(config, pools))
32-
}),
38+
Service::new(
39+
"tracing_reconfigure",
40+
ServiceKind::Core,
41+
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
42+
false,
43+
),
44+
Service::new(
45+
"cache_purge",
46+
ServiceKind::Core,
47+
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
48+
false,
49+
),
3350
];
3451

3552
Ok(RunConfigData { services })

engine/packages/engine/tests/common/ctx.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,20 +78,30 @@ impl TestCtx {
7878
let pools = pools.clone();
7979
async move {
8080
let services = vec![
81-
Service::new("api-peer", ServiceKind::ApiPeer, |config, pools| {
82-
Box::pin(rivet_api_peer::start(config, pools))
83-
}),
84-
Service::new("guard", ServiceKind::Standalone, |config, pools| {
85-
Box::pin(rivet_guard::start(config, pools))
86-
}),
81+
Service::new(
82+
"api-peer",
83+
ServiceKind::ApiPeer,
84+
|config, pools| Box::pin(rivet_api_peer::start(config, pools)),
85+
false,
86+
),
87+
Service::new(
88+
"guard",
89+
ServiceKind::Standalone,
90+
|config, pools| Box::pin(rivet_guard::start(config, pools)),
91+
true,
92+
),
8793
Service::new(
8894
"workflow-worker",
8995
ServiceKind::Standalone,
9096
|config, pools| Box::pin(rivet_workflow_worker::start(config, pools)),
97+
true,
98+
),
99+
Service::new(
100+
"bootstrap",
101+
ServiceKind::Oneshot,
102+
|config, pools| Box::pin(rivet_bootstrap::start(config, pools)),
103+
false,
91104
),
92-
Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| {
93-
Box::pin(rivet_bootstrap::start(config, pools))
94-
}),
95105
];
96106

97107
rivet_service_manager::start(config, pools, services).await

engine/packages/gasoline/src/worker.rs

Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use std::{
44
};
55

66
use anyhow::{Context, Result};
7-
use futures_util::StreamExt;
7+
use futures_util::{StreamExt, stream::FuturesUnordered};
88
use opentelemetry::trace::TraceContextExt;
9-
use rivet_util::{Id, signal::TermSignal};
10-
use tokio::{signal::ctrl_c, sync::watch, task::JoinHandle};
9+
use rivet_runtime::TermSignal;
10+
use rivet_util::Id;
11+
use tokio::{sync::watch, task::JoinHandle};
1112
use tracing::Instrument;
1213
use tracing_opentelemetry::OpenTelemetrySpanExt;
1314

@@ -22,8 +23,6 @@ use crate::{
2223
pub(crate) const PING_INTERVAL: Duration = Duration::from_secs(10);
2324
/// How often to publish metrics.
2425
const METRICS_INTERVAL: Duration = Duration::from_secs(20);
25-
/// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM.
26-
const SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
2726
// How long the pull workflows function can take before shutting down the runtime.
2827
const PULL_WORKFLOWS_TIMEOUT: Duration = Duration::from_secs(10);
2928

@@ -62,7 +61,8 @@ impl Worker {
6261
}
6362
}
6463

65-
/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes
64+
/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes.
65+
/// Provide a shutdown_rx to allow shutting down without triggering SIGTERM.
6666
#[tracing::instrument(skip_all, fields(worker_id=%self.worker_id))]
6767
pub async fn start(mut self, mut shutdown_rx: Option<watch::Receiver<()>>) -> Result<()> {
6868
tracing::debug!(
@@ -77,8 +77,7 @@ impl Worker {
7777
let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval());
7878
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
7979

80-
let mut term_signal =
81-
TermSignal::new().context("failed to setup termination signal handler")?;
80+
let mut term_signal = TermSignal::new().await;
8281

8382
// Update ping at least once before doing anything else
8483
self.db
@@ -125,12 +124,11 @@ impl Worker {
125124
break Ok(());
126125
}
127126
}
128-
_ = ctrl_c() => break Ok(()),
129127
_ = term_signal.recv() => break Ok(()),
130128
}
131129

132130
if let Err(err) = self.tick(&cache).await {
133-
// Cancel background tasks
131+
// Cancel background tasks. We abort because these are not critical tasks.
134132
gc_handle.abort();
135133
metrics_handle.abort();
136134

@@ -201,7 +199,7 @@ impl Worker {
201199
.span_context()
202200
.clone();
203201

204-
let handle = tokio::task::spawn(
202+
let handle = tokio::spawn(
205203
// NOTE: No .in_current_span() because we want this to be a separate trace
206204
async move {
207205
if let Err(err) = ctx.run(current_span_ctx).await {
@@ -226,7 +224,7 @@ impl Worker {
226224
let db = self.db.clone();
227225
let worker_id = self.worker_id;
228226

229-
tokio::task::spawn(
227+
tokio::spawn(
230228
async move {
231229
let mut ping_interval = tokio::time::interval(PING_INTERVAL);
232230
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -251,7 +249,7 @@ impl Worker {
251249
let db = self.db.clone();
252250
let worker_id = self.worker_id;
253251

254-
tokio::task::spawn(
252+
tokio::spawn(
255253
async move {
256254
let mut metrics_interval = tokio::time::interval(METRICS_INTERVAL);
257255
metrics_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -270,79 +268,65 @@ impl Worker {
270268

271269
#[tracing::instrument(skip_all)]
272270
async fn shutdown(mut self, mut term_signal: TermSignal) {
273-
// Shutdown sequence
271+
let shutdown_duration = self.config.runtime.worker_shutdown_duration();
272+
274273
tracing::info!(
275-
duration=?SHUTDOWN_DURATION,
274+
duration=?shutdown_duration,
276275
remaining_workflows=?self.running_workflows.len(),
277276
"starting worker shutdown"
278277
);
279278

280-
let shutdown_start = Instant::now();
281-
282279
if let Err(err) = self.db.mark_worker_inactive(self.worker_id).await {
283280
tracing::error!(?err, worker_id=?self.worker_id, "failed to mark worker as inactive");
284281
}
285282

283+
// Send stop signal to all running workflows
286284
for (workflow_id, wf) in &self.running_workflows {
287285
if wf.stop.send(()).is_err() {
288-
tracing::warn!(
286+
tracing::debug!(
289287
?workflow_id,
290288
"stop channel closed, workflow likely already stopped"
291289
);
292290
}
293291
}
294292

295-
let mut second_sigterm = false;
296-
loop {
297-
self.running_workflows
298-
.retain(|_, wf| !wf.handle.is_finished());
293+
// Collect all workflow tasks
294+
let mut wf_futs = self
295+
.running_workflows
296+
.iter_mut()
297+
.map(|(_, wf)| &mut wf.handle)
298+
.collect::<FuturesUnordered<_>>();
299299

300-
// Shutdown complete
301-
if self.running_workflows.is_empty() {
302-
break;
303-
}
304-
305-
if shutdown_start.elapsed() > SHUTDOWN_DURATION {
306-
tracing::debug!("shutdown timed out");
307-
break;
308-
}
300+
let shutdown_start = Instant::now();
301+
loop {
302+
// Future will resolve once all workflow tasks complete
303+
let join_fut = async { while let Some(_) = wf_futs.next().await {} };
309304

310305
tokio::select! {
311-
_ = ctrl_c() => {
312-
if second_sigterm {
313-
tracing::warn!("received third SIGTERM, aborting shutdown");
314-
break;
315-
}
316-
317-
tracing::warn!("received second SIGTERM");
318-
second_sigterm = true;
319-
320-
continue;
306+
_ = join_fut => {
307+
break;
321308
}
322-
_ = term_signal.recv() => {
323-
if second_sigterm {
324-
tracing::warn!("received third SIGTERM, aborting shutdown");
309+
abort = term_signal.recv() => {
310+
if abort {
311+
tracing::warn!("aborting worker shutdown");
325312
break;
326313
}
327-
328-
tracing::warn!("received second SIGTERM");
329-
second_sigterm = true;
330-
331-
continue;
332314
}
333-
_ = tokio::time::sleep(Duration::from_secs(2)) => {}
315+
_ = tokio::time::sleep(shutdown_duration.saturating_sub(shutdown_start.elapsed())) => {
316+
tracing::warn!("worker shutdown timed out");
317+
break;
318+
}
334319
}
335320
}
336321

337-
if self.running_workflows.is_empty() {
322+
let remaining_workflows = wf_futs.into_iter().count();
323+
if remaining_workflows == 0 {
338324
tracing::info!("all workflows evicted");
339325
} else {
340326
tracing::warn!(remaining_workflows=?self.running_workflows.len(), "not all workflows evicted");
341327
}
342328

343-
tracing::info!("shutdown complete");
344-
345-
rivet_runtime::shutdown().await;
329+
tracing::info!("worker shutdown complete");
346330
}
347331
}
348332

0 commit comments

Comments
 (0)