From 44cfde4394bcc23407bfbfc5cad8e5eab282e9a3 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 7 Nov 2025 12:28:47 -0800 Subject: [PATCH] chore(gas): rename wake sub to bump sub --- .../src/builder/workflow/sub_workflow.rs | 4 +- engine/packages/gasoline/src/ctx/common.rs | 4 +- engine/packages/gasoline/src/ctx/workflow.rs | 16 ++++---- engine/packages/gasoline/src/db/kv/debug.rs | 2 +- engine/packages/gasoline/src/db/kv/mod.rs | 40 +++++++++---------- engine/packages/gasoline/src/db/mod.rs | 8 ++-- engine/packages/gasoline/src/worker.rs | 8 ++-- 7 files changed, 41 insertions(+), 41 deletions(-) diff --git a/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs b/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs index 6048648b34..6c1180ae2f 100644 --- a/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs +++ b/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs @@ -266,7 +266,7 @@ where tracing::debug!("waiting for sub workflow"); - let mut wake_sub = self.ctx.db().wake_sub().await?; + let mut bump_sub = self.ctx.db().bump_sub().await?; let mut retries = self.ctx.db().max_sub_workflow_poll_retries(); let mut interval = tokio::time::interval(self.ctx.db().sub_workflow_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -294,7 +294,7 @@ where // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, } } diff --git a/engine/packages/gasoline/src/ctx/common.rs b/engine/packages/gasoline/src/ctx/common.rs index e1f470315f..49a3a34efe 100644 --- a/engine/packages/gasoline/src/ctx/common.rs +++ b/engine/packages/gasoline/src/ctx/common.rs @@ -26,7 +26,7 @@ pub async fn wait_for_workflow_output( ) -> Result { tracing::debug!(?workflow_id, "waiting for workflow"); - let mut wake_sub = db.wake_sub().await?; + let mut bump_sub = db.bump_sub().await?; let mut interval = tokio::time::interval(db.sub_workflow_poll_interval()); // Skip first tick, we wait after the db call instead of before @@ -47,7 +47,7 @@ pub async fn wait_for_workflow_output( // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, } } diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index 086104c34c..8d184036a4 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -716,7 +716,7 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal"); - let mut wake_sub = self.db.wake_sub().await?; + let mut bump_sub = self.db.bump_sub().await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -742,7 +742,7 @@ impl WorkflowCtx { // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, res = self.wait_stop() => res?, } @@ -779,7 +779,7 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal"); - let mut wake_sub = self.db.wake_sub().await?; + let mut bump_sub = self.db.bump_sub().await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -805,7 +805,7 @@ impl WorkflowCtx { // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, res = self.wait_stop() => res?, } @@ -1186,7 +1186,7 @@ impl WorkflowCtx { (async { tracing::debug!("listening for signal with timeout"); - let mut wake_sub = self.db.wake_sub().await?; + let mut bump_sub = self.db.bump_sub().await?; let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -1206,7 +1206,7 @@ impl WorkflowCtx { // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, res = self.wait_stop() => res?, } @@ -1229,7 +1229,7 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal with timeout"); - let mut wake_sub = self.db.wake_sub().await?; + let mut bump_sub = self.db.bump_sub().await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -1257,7 +1257,7 @@ impl WorkflowCtx { // Poll and wait for a wake at the same time tokio::select! { - _ = wake_sub.next() => {}, + _ = bump_sub.next() => {}, _ = interval.tick() => {}, res = self.wait_stop() => res?, } diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index d68601483c..aff9967940 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -718,7 +718,7 @@ impl DatabaseDebug for DatabaseKv { .instrument(tracing::info_span!("wake_workflows_tx")) .await?; - self.wake_worker(); + self.bump_workers(); Ok(()) } diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index debb197da8..a53fdb85ad 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -43,8 +43,8 @@ mod keys; const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); /// How long before overwriting an existing metrics lock. const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30); -/// For pubsub wake mechanism. -const WORKER_WAKE_SUBJECT: &str = "gasoline.worker.wake"; +/// For pubsub bump mechanism. +const WORKER_BUMP_SUBJECT: &str = "gasoline.worker.bump"; pub struct DatabaseKv { pools: rivet_pools::Pools, @@ -52,31 +52,31 @@ pub struct DatabaseKv { } impl DatabaseKv { - /// Spawns a new thread and publishes a worker wake message to pubsub. - fn wake_worker(&self) { + /// Spawns a new thread and publishes a worker bump message to pubsub. + fn bump_workers(&self) { let Ok(pubsub) = self.pools.ups() else { tracing::debug!("failed to acquire pubsub pool"); return; }; - let spawn_res = tokio::task::Builder::new().name("wake").spawn( + let spawn_res = tokio::task::Builder::new().name("bump").spawn( async move { // Fail gracefully if let Err(err) = pubsub .publish( - WORKER_WAKE_SUBJECT, + WORKER_BUMP_SUBJECT, &Vec::new(), universalpubsub::PublishOpts::broadcast(), ) .await { - tracing::warn!(?err, "failed to publish wake message"); + tracing::warn!(?err, "failed to publish bump message"); } } - .instrument(tracing::info_span!("wake_worker_publish")), + .instrument(tracing::info_span!("bump_worker_publish")), ); if let Err(err) = spawn_res { - tracing::error!(?err, "failed to spawn wake task"); + tracing::error!(?err, "failed to spawn bump task"); } } } @@ -424,12 +424,12 @@ impl Database for DatabaseKv { } #[tracing::instrument(skip_all)] - async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult> { + async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult> { let mut subscriber = self .pools .ups() .map_err(WorkflowError::PoolsGeneric)? - .subscribe(WORKER_WAKE_SUBJECT) + .subscribe(WORKER_BUMP_SUBJECT) .await .map_err(|x| WorkflowError::CreateSubscription(x.into()))?; @@ -586,7 +586,7 @@ impl Database for DatabaseKv { "handled failover", ); - self.wake_worker(); + self.bump_workers(); } Ok(()) @@ -815,7 +815,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.wake_worker(); + self.bump_workers(); Ok(workflow_id) } @@ -1028,7 +1028,7 @@ impl Database for DatabaseKv { { let wake_deadline_ts = key.condition.deadline_ts(); - // Update wake deadline ts + // Update wake deadline ts if earlier if last_wake_deadline_ts.is_none() || wake_deadline_ts < *last_wake_deadline_ts { @@ -1633,7 +1633,7 @@ impl Database for DatabaseKv { // Wake worker again in case some other workflow was waiting for this one to complete if wrote_to_wake_idx { - self.wake_worker(); + self.bump_workers(); } let dt = start_instant.elapsed().as_secs_f64(); @@ -1794,7 +1794,7 @@ impl Database for DatabaseKv { // // This will result in the workflow sleeping instead of immediately running again. // - // Adding this wake_worker call ensures that if the workflow has a valid wake condition before commit + // Adding this bump_workers call ensures that if the workflow has a valid wake condition before commit // then it will immediately wake up again. // // This is simpler than having this commit_workflow fn read wake conditions because: @@ -1802,7 +1802,7 @@ impl Database for DatabaseKv { // - would involve informing the worker to restart the workflow in memory instead of the usual // workflow lifecycle // - the worker is already designed to pull wake conditions frequently - self.wake_worker(); + self.bump_workers(); let dt = start_instant.elapsed().as_secs_f64(); metrics::COMMIT_WORKFLOW_DURATION.record( @@ -2111,7 +2111,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.wake_worker(); + self.bump_workers(); Ok(()) } @@ -2163,7 +2163,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.wake_worker(); + self.bump_workers(); Ok(()) } @@ -2219,7 +2219,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.wake_worker(); + self.bump_workers(); Ok(sub_workflow_id) } diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index 81af4f6b24..7b5d2a11cb 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -30,12 +30,12 @@ pub trait Database: Send { // MARK: Const fns - /// How often to pull workflows when polling. This runs alongside a wake sub. + /// How often to pull workflows when polling. This runs alongside a bump sub. fn worker_poll_interval(&self) -> Duration { Duration::from_secs(90) } - /// Poll interval when polling for signals in-process. This runs alongside a wake sub. + /// Poll interval when polling for signals in-process. This runs alongside a bump sub. fn signal_poll_interval(&self) -> Duration { Duration::from_millis(500) } @@ -45,7 +45,7 @@ pub trait Database: Send { 4 } - /// Poll interval when polling for a sub workflow in-process. This runs alongside a wake sub. + /// Poll interval when polling for a sub workflow in-process. This runs alongside a bump sub. fn sub_workflow_poll_interval(&self) -> Duration { Duration::from_millis(500) } @@ -59,7 +59,7 @@ pub trait Database: Send { /// This function returns a subscription which should resolve once the worker should fetch the database /// again. - async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult>; + async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult>; /// Updates the last ping ts for this worker. async fn update_worker_ping(&self, worker_instance_id: Id) -> WorkflowResult<()>; diff --git a/engine/packages/gasoline/src/worker.rs b/engine/packages/gasoline/src/worker.rs index 43b1680040..5321656a28 100644 --- a/engine/packages/gasoline/src/worker.rs +++ b/engine/packages/gasoline/src/worker.rs @@ -14,7 +14,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::{ctx::WorkflowCtx, db::DatabaseHandle, error::WorkflowError, registry::RegistryHandle}; /// How often to run gc and update ping. -const PING_INTERVAL: Duration = Duration::from_secs(20); +const PING_INTERVAL: Duration = Duration::from_secs(10); /// How often to publish metrics. const METRICS_INTERVAL: Duration = Duration::from_secs(20); /// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM. @@ -57,7 +57,7 @@ impl Worker { } } - /// Polls the database periodically or wakes immediately when `Database::wake` finishes + /// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes #[tracing::instrument(skip_all, fields(worker_instance_id=%self.worker_instance_id))] pub async fn start(mut self, mut shutdown_rx: Option>) -> Result<()> { tracing::debug!( @@ -67,7 +67,7 @@ impl Worker { let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?; - let mut wake_sub = { self.db.wake_sub().await? }; + let mut bump_sub = { self.db.bump_sub().await? }; let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval()); tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -89,7 +89,7 @@ impl Worker { tokio::select! { _ = tick_interval.tick() => {}, - res = wake_sub.next() => { + res = bump_sub.next() => { if res.is_none() { break Err(WorkflowError::SubscriptionUnsubscribed.into()); }