Skip to content

Commit 44cfde4

Browse files
committed
chore(gas): rename wake sub to bump sub
1 parent 7239ed4 commit 44cfde4

File tree

7 files changed

+41
-41
lines changed

7 files changed

+41
-41
lines changed

engine/packages/gasoline/src/builder/workflow/sub_workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ where
266266

267267
tracing::debug!("waiting for sub workflow");
268268

269-
let mut wake_sub = self.ctx.db().wake_sub().await?;
269+
let mut bump_sub = self.ctx.db().bump_sub().await?;
270270
let mut retries = self.ctx.db().max_sub_workflow_poll_retries();
271271
let mut interval = tokio::time::interval(self.ctx.db().sub_workflow_poll_interval());
272272
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -294,7 +294,7 @@ where
294294

295295
// Poll and wait for a wake at the same time
296296
tokio::select! {
297-
_ = wake_sub.next() => {},
297+
_ = bump_sub.next() => {},
298298
_ = interval.tick() => {},
299299
}
300300
}

engine/packages/gasoline/src/ctx/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub async fn wait_for_workflow_output<W: Workflow>(
2626
) -> Result<W::Output> {
2727
tracing::debug!(?workflow_id, "waiting for workflow");
2828

29-
let mut wake_sub = db.wake_sub().await?;
29+
let mut bump_sub = db.bump_sub().await?;
3030
let mut interval = tokio::time::interval(db.sub_workflow_poll_interval());
3131

3232
// Skip first tick, we wait after the db call instead of before
@@ -47,7 +47,7 @@ pub async fn wait_for_workflow_output<W: Workflow>(
4747

4848
// Poll and wait for a wake at the same time
4949
tokio::select! {
50-
_ = wake_sub.next() => {},
50+
_ = bump_sub.next() => {},
5151
_ = interval.tick() => {},
5252
}
5353
}

engine/packages/gasoline/src/ctx/workflow.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ impl WorkflowCtx {
716716
else {
717717
tracing::debug!("listening for signal");
718718

719-
let mut wake_sub = self.db.wake_sub().await?;
719+
let mut bump_sub = self.db.bump_sub().await?;
720720
let mut retries = self.db.max_signal_poll_retries();
721721
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
722722
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -742,7 +742,7 @@ impl WorkflowCtx {
742742

743743
// Poll and wait for a wake at the same time
744744
tokio::select! {
745-
_ = wake_sub.next() => {},
745+
_ = bump_sub.next() => {},
746746
_ = interval.tick() => {},
747747
res = self.wait_stop() => res?,
748748
}
@@ -779,7 +779,7 @@ impl WorkflowCtx {
779779
else {
780780
tracing::debug!("listening for signal");
781781

782-
let mut wake_sub = self.db.wake_sub().await?;
782+
let mut bump_sub = self.db.bump_sub().await?;
783783
let mut retries = self.db.max_signal_poll_retries();
784784
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
785785
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -805,7 +805,7 @@ impl WorkflowCtx {
805805

806806
// Poll and wait for a wake at the same time
807807
tokio::select! {
808-
_ = wake_sub.next() => {},
808+
_ = bump_sub.next() => {},
809809
_ = interval.tick() => {},
810810
res = self.wait_stop() => res?,
811811
}
@@ -1186,7 +1186,7 @@ impl WorkflowCtx {
11861186
(async {
11871187
tracing::debug!("listening for signal with timeout");
11881188

1189-
let mut wake_sub = self.db.wake_sub().await?;
1189+
let mut bump_sub = self.db.bump_sub().await?;
11901190
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
11911191
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
11921192

@@ -1206,7 +1206,7 @@ impl WorkflowCtx {
12061206

12071207
// Poll and wait for a wake at the same time
12081208
tokio::select! {
1209-
_ = wake_sub.next() => {},
1209+
_ = bump_sub.next() => {},
12101210
_ = interval.tick() => {},
12111211
res = self.wait_stop() => res?,
12121212
}
@@ -1229,7 +1229,7 @@ impl WorkflowCtx {
12291229
else {
12301230
tracing::debug!("listening for signal with timeout");
12311231

1232-
let mut wake_sub = self.db.wake_sub().await?;
1232+
let mut bump_sub = self.db.bump_sub().await?;
12331233
let mut retries = self.db.max_signal_poll_retries();
12341234
let mut interval = tokio::time::interval(self.db.signal_poll_interval());
12351235
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -1257,7 +1257,7 @@ impl WorkflowCtx {
12571257

12581258
// Poll and wait for a wake at the same time
12591259
tokio::select! {
1260-
_ = wake_sub.next() => {},
1260+
_ = bump_sub.next() => {},
12611261
_ = interval.tick() => {},
12621262
res = self.wait_stop() => res?,
12631263
}

engine/packages/gasoline/src/db/kv/debug.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ impl DatabaseDebug for DatabaseKv {
718718
.instrument(tracing::info_span!("wake_workflows_tx"))
719719
.await?;
720720

721-
self.wake_worker();
721+
self.bump_workers();
722722

723723
Ok(())
724724
}

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,40 +43,40 @@ mod keys;
4343
const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30);
4444
/// How long before overwriting an existing metrics lock.
4545
const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30);
46-
/// For pubsub wake mechanism.
47-
const WORKER_WAKE_SUBJECT: &str = "gasoline.worker.wake";
46+
/// For pubsub bump mechanism.
47+
const WORKER_BUMP_SUBJECT: &str = "gasoline.worker.bump";
4848

4949
pub struct DatabaseKv {
5050
pools: rivet_pools::Pools,
5151
subspace: universaldb::utils::Subspace,
5252
}
5353

5454
impl DatabaseKv {
55-
/// Spawns a new thread and publishes a worker wake message to pubsub.
56-
fn wake_worker(&self) {
55+
/// Spawns a new thread and publishes a worker bump message to pubsub.
56+
fn bump_workers(&self) {
5757
let Ok(pubsub) = self.pools.ups() else {
5858
tracing::debug!("failed to acquire pubsub pool");
5959
return;
6060
};
6161

62-
let spawn_res = tokio::task::Builder::new().name("wake").spawn(
62+
let spawn_res = tokio::task::Builder::new().name("bump").spawn(
6363
async move {
6464
// Fail gracefully
6565
if let Err(err) = pubsub
6666
.publish(
67-
WORKER_WAKE_SUBJECT,
67+
WORKER_BUMP_SUBJECT,
6868
&Vec::new(),
6969
universalpubsub::PublishOpts::broadcast(),
7070
)
7171
.await
7272
{
73-
tracing::warn!(?err, "failed to publish wake message");
73+
tracing::warn!(?err, "failed to publish bump message");
7474
}
7575
}
76-
.instrument(tracing::info_span!("wake_worker_publish")),
76+
.instrument(tracing::info_span!("bump_worker_publish")),
7777
);
7878
if let Err(err) = spawn_res {
79-
tracing::error!(?err, "failed to spawn wake task");
79+
tracing::error!(?err, "failed to spawn bump task");
8080
}
8181
}
8282
}
@@ -424,12 +424,12 @@ impl Database for DatabaseKv {
424424
}
425425

426426
#[tracing::instrument(skip_all)]
427-
async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
427+
async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
428428
let mut subscriber = self
429429
.pools
430430
.ups()
431431
.map_err(WorkflowError::PoolsGeneric)?
432-
.subscribe(WORKER_WAKE_SUBJECT)
432+
.subscribe(WORKER_BUMP_SUBJECT)
433433
.await
434434
.map_err(|x| WorkflowError::CreateSubscription(x.into()))?;
435435

@@ -586,7 +586,7 @@ impl Database for DatabaseKv {
586586
"handled failover",
587587
);
588588

589-
self.wake_worker();
589+
self.bump_workers();
590590
}
591591

592592
Ok(())
@@ -815,7 +815,7 @@ impl Database for DatabaseKv {
815815
.await
816816
.map_err(WorkflowError::Udb)?;
817817

818-
self.wake_worker();
818+
self.bump_workers();
819819

820820
Ok(workflow_id)
821821
}
@@ -1028,7 +1028,7 @@ impl Database for DatabaseKv {
10281028
{
10291029
let wake_deadline_ts = key.condition.deadline_ts();
10301030

1031-
// Update wake deadline ts
1031+
// Update wake deadline ts if earlier
10321032
if last_wake_deadline_ts.is_none()
10331033
|| wake_deadline_ts < *last_wake_deadline_ts
10341034
{
@@ -1633,7 +1633,7 @@ impl Database for DatabaseKv {
16331633

16341634
// Wake worker again in case some other workflow was waiting for this one to complete
16351635
if wrote_to_wake_idx {
1636-
self.wake_worker();
1636+
self.bump_workers();
16371637
}
16381638

16391639
let dt = start_instant.elapsed().as_secs_f64();
@@ -1794,15 +1794,15 @@ impl Database for DatabaseKv {
17941794
//
17951795
// This will result in the workflow sleeping instead of immediately running again.
17961796
//
1797-
// Adding this wake_worker call ensures that if the workflow has a valid wake condition before commit
1797+
// Adding this bump_workers call ensures that if the workflow has a valid wake condition before commit
17981798
// then it will immediately wake up again.
17991799
//
18001800
// This is simpler than having this commit_workflow fn read wake conditions because:
18011801
// - the wake conditions are not indexed by wf id
18021802
// - would involve informing the worker to restart the workflow in memory instead of the usual
18031803
// workflow lifecycle
18041804
// - the worker is already designed to pull wake conditions frequently
1805-
self.wake_worker();
1805+
self.bump_workers();
18061806

18071807
let dt = start_instant.elapsed().as_secs_f64();
18081808
metrics::COMMIT_WORKFLOW_DURATION.record(
@@ -2111,7 +2111,7 @@ impl Database for DatabaseKv {
21112111
.await
21122112
.map_err(WorkflowError::Udb)?;
21132113

2114-
self.wake_worker();
2114+
self.bump_workers();
21152115

21162116
Ok(())
21172117
}
@@ -2163,7 +2163,7 @@ impl Database for DatabaseKv {
21632163
.await
21642164
.map_err(WorkflowError::Udb)?;
21652165

2166-
self.wake_worker();
2166+
self.bump_workers();
21672167

21682168
Ok(())
21692169
}
@@ -2219,7 +2219,7 @@ impl Database for DatabaseKv {
22192219
.await
22202220
.map_err(WorkflowError::Udb)?;
22212221

2222-
self.wake_worker();
2222+
self.bump_workers();
22232223

22242224
Ok(sub_workflow_id)
22252225
}

engine/packages/gasoline/src/db/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ pub trait Database: Send {
3030

3131
// MARK: Const fns
3232

33-
/// How often to pull workflows when polling. This runs alongside a wake sub.
33+
/// How often to pull workflows when polling. This runs alongside a bump sub.
3434
fn worker_poll_interval(&self) -> Duration {
3535
Duration::from_secs(90)
3636
}
3737

38-
/// Poll interval when polling for signals in-process. This runs alongside a wake sub.
38+
/// Poll interval when polling for signals in-process. This runs alongside a bump sub.
3939
fn signal_poll_interval(&self) -> Duration {
4040
Duration::from_millis(500)
4141
}
@@ -45,7 +45,7 @@ pub trait Database: Send {
4545
4
4646
}
4747

48-
/// Poll interval when polling for a sub workflow in-process. This runs alongside a wake sub.
48+
/// Poll interval when polling for a sub workflow in-process. This runs alongside a bump sub.
4949
fn sub_workflow_poll_interval(&self) -> Duration {
5050
Duration::from_millis(500)
5151
}
@@ -59,7 +59,7 @@ pub trait Database: Send {
5959

6060
/// This function returns a subscription which should resolve once the worker should fetch the database
6161
/// again.
62-
async fn wake_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>>;
62+
async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>>;
6363

6464
/// Updates the last ping ts for this worker.
6565
async fn update_worker_ping(&self, worker_instance_id: Id) -> WorkflowResult<()>;

engine/packages/gasoline/src/worker.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
1414
use crate::{ctx::WorkflowCtx, db::DatabaseHandle, error::WorkflowError, registry::RegistryHandle};
1515

1616
/// How often to run gc and update ping.
17-
const PING_INTERVAL: Duration = Duration::from_secs(20);
17+
const PING_INTERVAL: Duration = Duration::from_secs(10);
1818
/// How often to publish metrics.
1919
const METRICS_INTERVAL: Duration = Duration::from_secs(20);
2020
/// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM.
@@ -57,7 +57,7 @@ impl Worker {
5757
}
5858
}
5959

60-
/// Polls the database periodically or wakes immediately when `Database::wake` finishes
60+
/// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes
6161
#[tracing::instrument(skip_all, fields(worker_instance_id=%self.worker_instance_id))]
6262
pub async fn start(mut self, mut shutdown_rx: Option<watch::Receiver<()>>) -> Result<()> {
6363
tracing::debug!(
@@ -67,7 +67,7 @@ impl Worker {
6767

6868
let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?;
6969

70-
let mut wake_sub = { self.db.wake_sub().await? };
70+
let mut bump_sub = { self.db.bump_sub().await? };
7171

7272
let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval());
7373
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -89,7 +89,7 @@ impl Worker {
8989

9090
tokio::select! {
9191
_ = tick_interval.tick() => {},
92-
res = wake_sub.next() => {
92+
res = bump_sub.next() => {
9393
if res.is_none() {
9494
break Err(WorkflowError::SubscriptionUnsubscribed.into());
9595
}

0 commit comments

Comments
 (0)