Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions engine/packages/config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod guard;
pub mod logs;
pub mod pegboard;
pub mod pubsub;
pub mod runtime;
pub mod telemetry;
pub mod topology;
pub mod vector;
Expand All @@ -27,6 +28,7 @@ pub use guard::*;
pub use logs::*;
pub use pegboard::*;
pub use pubsub::PubSub;
pub use runtime::*;
pub use telemetry::*;
pub use topology::*;
pub use vector::*;
Expand Down Expand Up @@ -102,7 +104,7 @@ pub struct Root {
pub telemetry: Telemetry,

#[serde(default)]
pub allow_version_rollback: bool,
pub runtime: Runtime,
}

impl Default for Root {
Expand All @@ -121,7 +123,7 @@ impl Default for Root {
clickhouse: None,
vector_http: None,
telemetry: Default::default(),
allow_version_rollback: false,
runtime: Default::default(),
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions engine/packages/config/src/config/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::time::Duration;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
pub struct Runtime {
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
/// Defaults to 30 seconds.
worker_shutdown_duration: Option<u32>,
/// Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults
// to 1 hour.
guard_shutdown_duration: Option<u32>,
/// Whether or not to allow running the engine when the previous version that was run is higher than
// the current version.
allow_version_rollback: Option<bool>,
}

impl Runtime {
pub fn worker_shutdown_duration(&self) -> Duration {
Duration::from_secs(self.worker_shutdown_duration.unwrap_or(30) as u64)
}

pub fn guard_shutdown_duration(&self) -> Duration {
Duration::from_secs(self.guard_shutdown_duration.unwrap_or(60 * 60) as u64)
}

pub fn allow_version_rollback(&self) -> bool {
self.allow_version_rollback.unwrap_or_default()
}
}
2 changes: 1 addition & 1 deletion engine/packages/engine/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn verify_engine_version(
config: &rivet_config::Config,
pools: &rivet_pools::Pools,
) -> Result<()> {
if config.allow_version_rollback {
if config.runtime.allow_version_rollback() {
return Ok(());
}

Expand Down
47 changes: 32 additions & 15 deletions engine/packages/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,50 @@ use rivet_service_manager::{RunConfigData, Service, ServiceKind};

pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
let services = vec![
Service::new("api_peer", ServiceKind::ApiPeer, |config, pools| {
Box::pin(rivet_api_peer::start(config, pools))
}),
Service::new("guard", ServiceKind::ApiPublic, |config, pools| {
Box::pin(rivet_guard::start(config, pools))
}),
Service::new(
"api_peer",
ServiceKind::ApiPeer,
|config, pools| Box::pin(rivet_api_peer::start(config, pools)),
false,
),
Service::new(
"guard",
ServiceKind::ApiPublic,
|config, pools| Box::pin(rivet_guard::start(config, pools)),
true,
),
Service::new(
"workflow_worker",
ServiceKind::Standalone,
|config, pools| Box::pin(rivet_workflow_worker::start(config, pools)),
true,
),
Service::new(
"bootstrap",
ServiceKind::Oneshot,
|config, pools| Box::pin(rivet_bootstrap::start(config, pools)),
false,
),
Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| {
Box::pin(rivet_bootstrap::start(config, pools))
}),
Service::new(
"pegboard_serverless",
// There should only be one of these, since it's auto-scaling requests
ServiceKind::Singleton,
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
false,
),
// Core services
Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| {
Box::pin(rivet_tracing_reconfigure::start(config, pools))
}),
Service::new("cache_purge", ServiceKind::Core, |config, pools| {
Box::pin(rivet_cache_purge::start(config, pools))
}),
Service::new(
"tracing_reconfigure",
ServiceKind::Core,
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
false,
),
Service::new(
"cache_purge",
ServiceKind::Core,
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
false,
),
];

Ok(RunConfigData { services })
Expand Down
28 changes: 19 additions & 9 deletions engine/packages/engine/tests/common/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,30 @@ impl TestCtx {
let pools = pools.clone();
async move {
let services = vec![
Service::new("api-peer", ServiceKind::ApiPeer, |config, pools| {
Box::pin(rivet_api_peer::start(config, pools))
}),
Service::new("guard", ServiceKind::Standalone, |config, pools| {
Box::pin(rivet_guard::start(config, pools))
}),
Service::new(
"api-peer",
ServiceKind::ApiPeer,
|config, pools| Box::pin(rivet_api_peer::start(config, pools)),
false,
),
Service::new(
"guard",
ServiceKind::Standalone,
|config, pools| Box::pin(rivet_guard::start(config, pools)),
true,
),
Service::new(
"workflow-worker",
ServiceKind::Standalone,
|config, pools| Box::pin(rivet_workflow_worker::start(config, pools)),
true,
),
Service::new(
"bootstrap",
ServiceKind::Oneshot,
|config, pools| Box::pin(rivet_bootstrap::start(config, pools)),
false,
),
Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| {
Box::pin(rivet_bootstrap::start(config, pools))
}),
];

rivet_service_manager::start(config, pools, services).await
Expand Down
92 changes: 38 additions & 54 deletions engine/packages/gasoline/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
};

use anyhow::{Context, Result};
use futures_util::StreamExt;
use futures_util::{StreamExt, stream::FuturesUnordered};
use opentelemetry::trace::TraceContextExt;
use rivet_util::{Id, signal::TermSignal};
use tokio::{signal::ctrl_c, sync::watch, task::JoinHandle};
use rivet_runtime::TermSignal;
use rivet_util::Id;
use tokio::{sync::watch, task::JoinHandle};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

Expand All @@ -22,8 +23,6 @@ use crate::{
pub(crate) const PING_INTERVAL: Duration = Duration::from_secs(10);
/// How often to publish metrics.
const METRICS_INTERVAL: Duration = Duration::from_secs(20);
/// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM.
const SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
// How long the pull workflows function can take before shutting down the runtime.
const PULL_WORKFLOWS_TIMEOUT: Duration = Duration::from_secs(10);

Expand Down Expand Up @@ -62,7 +61,8 @@ impl Worker {
}
}

/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes
/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes.
/// Provide a shutdown_rx to allow shutting down without triggering SIGTERM.
#[tracing::instrument(skip_all, fields(worker_id=%self.worker_id))]
pub async fn start(mut self, mut shutdown_rx: Option<watch::Receiver<()>>) -> Result<()> {
tracing::debug!(
Expand All @@ -77,8 +77,7 @@ impl Worker {
let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval());
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut term_signal =
TermSignal::new().context("failed to setup termination signal handler")?;
let mut term_signal = TermSignal::new().await;

// Update ping at least once before doing anything else
self.db
Expand Down Expand Up @@ -125,12 +124,11 @@ impl Worker {
break Ok(());
}
}
_ = ctrl_c() => break Ok(()),
_ = term_signal.recv() => break Ok(()),
}

if let Err(err) = self.tick(&cache).await {
// Cancel background tasks
// Cancel background tasks. We abort because these are not critical tasks.
gc_handle.abort();
metrics_handle.abort();

Expand Down Expand Up @@ -201,7 +199,7 @@ impl Worker {
.span_context()
.clone();

let handle = tokio::task::spawn(
let handle = tokio::spawn(
// NOTE: No .in_current_span() because we want this to be a separate trace
async move {
if let Err(err) = ctx.run(current_span_ctx).await {
Expand All @@ -226,7 +224,7 @@ impl Worker {
let db = self.db.clone();
let worker_id = self.worker_id;

tokio::task::spawn(
tokio::spawn(
async move {
let mut ping_interval = tokio::time::interval(PING_INTERVAL);
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -251,7 +249,7 @@ impl Worker {
let db = self.db.clone();
let worker_id = self.worker_id;

tokio::task::spawn(
tokio::spawn(
async move {
let mut metrics_interval = tokio::time::interval(METRICS_INTERVAL);
metrics_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -270,79 +268,65 @@ impl Worker {

#[tracing::instrument(skip_all)]
async fn shutdown(mut self, mut term_signal: TermSignal) {
// Shutdown sequence
let shutdown_duration = self.config.runtime.worker_shutdown_duration();

tracing::info!(
duration=?SHUTDOWN_DURATION,
duration=?shutdown_duration,
remaining_workflows=?self.running_workflows.len(),
"starting worker shutdown"
);

let shutdown_start = Instant::now();

if let Err(err) = self.db.mark_worker_inactive(self.worker_id).await {
tracing::error!(?err, worker_id=?self.worker_id, "failed to mark worker as inactive");
}

// Send stop signal to all running workflows
for (workflow_id, wf) in &self.running_workflows {
if wf.stop.send(()).is_err() {
tracing::warn!(
tracing::debug!(
?workflow_id,
"stop channel closed, workflow likely already stopped"
);
}
}

let mut second_sigterm = false;
loop {
self.running_workflows
.retain(|_, wf| !wf.handle.is_finished());
// Collect all workflow tasks
let mut wf_futs = self
.running_workflows
.iter_mut()
.map(|(_, wf)| &mut wf.handle)
.collect::<FuturesUnordered<_>>();

// Shutdown complete
if self.running_workflows.is_empty() {
break;
}

if shutdown_start.elapsed() > SHUTDOWN_DURATION {
tracing::debug!("shutdown timed out");
break;
}
let shutdown_start = Instant::now();
loop {
// Future will resolve once all workflow tasks complete
let join_fut = async { while let Some(_) = wf_futs.next().await {} };

tokio::select! {
_ = ctrl_c() => {
if second_sigterm {
tracing::warn!("received third SIGTERM, aborting shutdown");
break;
}

tracing::warn!("received second SIGTERM");
second_sigterm = true;

continue;
_ = join_fut => {
break;
}
_ = term_signal.recv() => {
if second_sigterm {
tracing::warn!("received third SIGTERM, aborting shutdown");
abort = term_signal.recv() => {
if abort {
tracing::warn!("aborting worker shutdown");
break;
}

tracing::warn!("received second SIGTERM");
second_sigterm = true;

continue;
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {}
_ = tokio::time::sleep(shutdown_duration.saturating_sub(shutdown_start.elapsed())) => {
tracing::warn!("worker shutdown timed out");
break;
}
}
}

if self.running_workflows.is_empty() {
let remaining_workflows = wf_futs.into_iter().count();
if remaining_workflows == 0 {
tracing::info!("all workflows evicted");
} else {
tracing::warn!(remaining_workflows=?self.running_workflows.len(), "not all workflows evicted");
}

tracing::info!("shutdown complete");

rivet_runtime::shutdown().await;
tracing::info!("worker shutdown complete");
}
}

Expand Down
Loading
Loading