Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/builder/workflow/sub_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ where

tracing::debug!("waiting for sub workflow");

let mut wake_sub = self.ctx.db().wake_sub().await?;
let mut bump_sub = self.ctx.db().bump_sub().await?;
let mut retries = self.ctx.db().max_sub_workflow_poll_retries();
let mut interval = tokio::time::interval(self.ctx.db().sub_workflow_poll_interval());
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand Down Expand Up @@ -294,7 +294,7 @@ where

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
}
}
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn wait_for_workflow_output<W: Workflow>(
) -> Result<W::Output> {
tracing::debug!(?workflow_id, "waiting for workflow");

let mut wake_sub = db.wake_sub().await?;
let mut bump_sub = db.bump_sub().await?;
let mut interval = tokio::time::interval(db.sub_workflow_poll_interval());

// Skip first tick, we wait after the db call instead of before
Expand All @@ -47,7 +47,7 @@ pub async fn wait_for_workflow_output<W: Workflow>(

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
}
}
Expand Down
16 changes: 8 additions & 8 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ impl WorkflowCtx {
else {
tracing::debug!("listening for signal");

let mut wake_sub = self.db.wake_sub().await?;
let mut bump_sub = self.db.bump_sub().await?;
let mut retries = self.db.max_signal_poll_retries();
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -742,7 +742,7 @@ impl WorkflowCtx {

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
res = self.wait_stop() => res?,
}
Expand Down Expand Up @@ -779,7 +779,7 @@ impl WorkflowCtx {
else {
tracing::debug!("listening for signal");

let mut wake_sub = self.db.wake_sub().await?;
let mut bump_sub = self.db.bump_sub().await?;
let mut retries = self.db.max_signal_poll_retries();
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -805,7 +805,7 @@ impl WorkflowCtx {

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
res = self.wait_stop() => res?,
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ impl WorkflowCtx {
(async {
tracing::debug!("listening for signal with timeout");

let mut wake_sub = self.db.wake_sub().await?;
let mut bump_sub = self.db.bump_sub().await?;
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

Expand All @@ -1206,7 +1206,7 @@ impl WorkflowCtx {

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
res = self.wait_stop() => res?,
}
Expand All @@ -1229,7 +1229,7 @@ impl WorkflowCtx {
else {
tracing::debug!("listening for signal with timeout");

let mut wake_sub = self.db.wake_sub().await?;
let mut bump_sub = self.db.bump_sub().await?;
let mut retries = self.db.max_signal_poll_retries();
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand Down Expand Up @@ -1257,7 +1257,7 @@ impl WorkflowCtx {

// Poll and wait for a wake at the same time
tokio::select! {
_ = wake_sub.next() => {},
_ = bump_sub.next() => {},
_ = interval.tick() => {},
res = self.wait_stop() => res?,
}
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/db/kv/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ impl DatabaseDebug for DatabaseKv {
.instrument(tracing::info_span!("wake_workflows_tx"))
.await?;

self.wake_worker();
self.bump_workers();

Ok(())
}
Expand Down
40 changes: 20 additions & 20 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,40 @@ mod keys;
const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30);
/// How long before overwriting an existing metrics lock.
const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30);
/// For pubsub wake mechanism.
const WORKER_WAKE_SUBJECT: &str = "gasoline.worker.wake";
/// For pubsub bump mechanism.
const WORKER_BUMP_SUBJECT: &str = "gasoline.worker.bump";

pub struct DatabaseKv {
pools: rivet_pools::Pools,
subspace: universaldb::utils::Subspace,
}

impl DatabaseKv {
/// Spawns a new thread and publishes a worker wake message to pubsub.
fn wake_worker(&self) {
/// Spawns a new thread and publishes a worker bump message to pubsub.
fn bump_workers(&self) {
let Ok(pubsub) = self.pools.ups() else {
tracing::debug!("failed to acquire pubsub pool");
return;
};

let spawn_res = tokio::task::Builder::new().name("wake").spawn(
let spawn_res = tokio::task::Builder::new().name("bump").spawn(
async move {
// Fail gracefully
if let Err(err) = pubsub
.publish(
WORKER_WAKE_SUBJECT,
WORKER_BUMP_SUBJECT,
&Vec::new(),
universalpubsub::PublishOpts::broadcast(),
)
.await
{
tracing::warn!(?err, "failed to publish wake message");
tracing::warn!(?err, "failed to publish bump message");
}
}
.instrument(tracing::info_span!("wake_worker_publish")),
.instrument(tracing::info_span!("bump_worker_publish")),
);
if let Err(err) = spawn_res {
tracing::error!(?err, "failed to spawn wake task");
tracing::error!(?err, "failed to spawn bump task");
}
}
}
Expand Down Expand Up @@ -424,12 +424,12 @@ impl Database for DatabaseKv {
}

#[tracing::instrument(skip_all)]
async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
let mut subscriber = self
.pools
.ups()
.map_err(WorkflowError::PoolsGeneric)?
.subscribe(WORKER_WAKE_SUBJECT)
.subscribe(WORKER_BUMP_SUBJECT)
.await
.map_err(|x| WorkflowError::CreateSubscription(x.into()))?;

Expand Down Expand Up @@ -586,7 +586,7 @@ impl Database for DatabaseKv {
"handled failover",
);

self.wake_worker();
self.bump_workers();
}

Ok(())
Expand Down Expand Up @@ -815,7 +815,7 @@ impl Database for DatabaseKv {
.await
.map_err(WorkflowError::Udb)?;

self.wake_worker();
self.bump_workers();

Ok(workflow_id)
}
Expand Down Expand Up @@ -1028,7 +1028,7 @@ impl Database for DatabaseKv {
{
let wake_deadline_ts = key.condition.deadline_ts();

// Update wake deadline ts
// Update wake deadline ts if earlier
if last_wake_deadline_ts.is_none()
|| wake_deadline_ts < *last_wake_deadline_ts
{
Expand Down Expand Up @@ -1633,7 +1633,7 @@ impl Database for DatabaseKv {

// Wake worker again in case some other workflow was waiting for this one to complete
if wrote_to_wake_idx {
self.wake_worker();
self.bump_workers();
}

let dt = start_instant.elapsed().as_secs_f64();
Expand Down Expand Up @@ -1794,15 +1794,15 @@ impl Database for DatabaseKv {
//
// This will result in the workflow sleeping instead of immediately running again.
//
// Adding this wake_worker call ensures that if the workflow has a valid wake condition before commit
// Adding this bump_workers call ensures that if the workflow has a valid wake condition before commit
// then it will immediately wake up again.
//
// This is simpler than having this commit_workflow fn read wake conditions because:
// - the wake conditions are not indexed by wf id
// - would involve informing the worker to restart the workflow in memory instead of the usual
// workflow lifecycle
// - the worker is already designed to pull wake conditions frequently
self.wake_worker();
self.bump_workers();

let dt = start_instant.elapsed().as_secs_f64();
metrics::COMMIT_WORKFLOW_DURATION.record(
Expand Down Expand Up @@ -2111,7 +2111,7 @@ impl Database for DatabaseKv {
.await
.map_err(WorkflowError::Udb)?;

self.wake_worker();
self.bump_workers();

Ok(())
}
Expand Down Expand Up @@ -2163,7 +2163,7 @@ impl Database for DatabaseKv {
.await
.map_err(WorkflowError::Udb)?;

self.wake_worker();
self.bump_workers();

Ok(())
}
Expand Down Expand Up @@ -2219,7 +2219,7 @@ impl Database for DatabaseKv {
.await
.map_err(WorkflowError::Udb)?;

self.wake_worker();
self.bump_workers();

Ok(sub_workflow_id)
}
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub trait Database: Send {

// MARK: Const fns

/// How often to pull workflows when polling. This runs alongside a wake sub.
/// How often to pull workflows when polling. This runs alongside a bump sub.
fn worker_poll_interval(&self) -> Duration {
Duration::from_secs(90)
}

/// Poll interval when polling for signals in-process. This runs alongside a wake sub.
/// Poll interval when polling for signals in-process. This runs alongside a bump sub.
fn signal_poll_interval(&self) -> Duration {
Duration::from_millis(500)
}
Expand All @@ -45,7 +45,7 @@ pub trait Database: Send {
4
}

/// Poll interval when polling for a sub workflow in-process. This runs alongside a wake sub.
/// Poll interval when polling for a sub workflow in-process. This runs alongside a bump sub.
fn sub_workflow_poll_interval(&self) -> Duration {
Duration::from_millis(500)
}
Expand All @@ -59,7 +59,7 @@ pub trait Database: Send {

/// This function returns a subscription which should resolve once the worker should fetch the database
/// again.
async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>>;
async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>>;

/// Updates the last ping ts for this worker.
async fn update_worker_ping(&self, worker_instance_id: Id) -> WorkflowResult<()>;
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/gasoline/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::{ctx::WorkflowCtx, db::DatabaseHandle, error::WorkflowError, registry::RegistryHandle};

/// How often to run gc and update ping.
const PING_INTERVAL: Duration = Duration::from_secs(20);
const PING_INTERVAL: Duration = Duration::from_secs(10);
/// How often to publish metrics.
const METRICS_INTERVAL: Duration = Duration::from_secs(20);
/// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM.
Expand Down Expand Up @@ -57,7 +57,7 @@ impl Worker {
}
}

/// Polls the database periodically or wakes immediately when `Database::wake` finishes
/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes
#[tracing::instrument(skip_all, fields(worker_instance_id=%self.worker_instance_id))]
pub async fn start(mut self, mut shutdown_rx: Option<watch::Receiver<()>>) -> Result<()> {
tracing::debug!(
Expand All @@ -67,7 +67,7 @@ impl Worker {

let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?;

let mut wake_sub = { self.db.wake_sub().await? };
let mut bump_sub = { self.db.bump_sub().await? };

let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval());
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -89,7 +89,7 @@ impl Worker {

tokio::select! {
_ = tick_interval.tick() => {},
res = wake_sub.next() => {
res = bump_sub.next() => {
if res.is_none() {
break Err(WorkflowError::SubscriptionUnsubscribed.into());
}
Expand Down
Loading