Skip to content

Commit d5ee786

Browse files
authored
chore(pegboard): allow configuring runner_eligible_threshold and runner_lost_threshold (#3372)
1 parent f5628c8 commit d5ee786

File tree

5 files changed

+49
-16
lines changed

5 files changed

+49
-16
lines changed

engine/packages/config/src/config/pegboard.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,21 @@ pub struct Pegboard {
3535
///
3636
/// **Experimental**
3737
pub reschedule_backoff_max_exponent: Option<usize>,
38+
/// How long after last ping before considering a runner ineligible for allocation.
39+
///
40+
/// Unit is in milliseconds.
41+
///
42+
/// **Experimental**
43+
pub runner_eligible_threshold: Option<i64>,
44+
/// How long to wait after last ping before forcibly removing a runner from the database
45+
/// and deleting its workflow, evicting all actors.
46+
///
47+
/// Note that the runner may still be running and can reconnect.
48+
///
49+
/// Unit is in milliseconds.
50+
///
51+
/// **Experimental**
52+
pub runner_lost_threshold: Option<i64>,
3853
}
3954

4055
impl Pegboard {
@@ -57,4 +72,14 @@ impl Pegboard {
5772
pub fn reschedule_backoff_max_exponent(&self) -> usize {
5873
self.reschedule_backoff_max_exponent.unwrap_or(8)
5974
}
75+
76+
pub fn runner_eligible_threshold(&self) -> i64 {
77+
self.runner_eligible_threshold
78+
.unwrap_or(util::duration::seconds(10))
79+
}
80+
81+
pub fn runner_lost_threshold(&self) -> i64 {
82+
self.runner_lost_threshold
83+
.unwrap_or(util::duration::seconds(15))
84+
}
6085
}

engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use gas::prelude::*;
22
use universaldb::options::ConflictRangeType;
33
use universaldb::utils::IsolationLevel::*;
44

5-
use crate::{keys, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
5+
use crate::keys;
66

77
#[derive(Debug)]
88
pub struct Input {
@@ -45,6 +45,8 @@ pub enum RunnerEligibility {
4545

4646
#[operation]
4747
pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) -> Result<Output> {
48+
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();
49+
4850
let notifications = ctx
4951
.udb()?
5052
.run(|tx| {
@@ -183,7 +185,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
183185
)?;
184186

185187
if last_ping_ts.saturating_sub(old_last_ping_ts)
186-
> RUNNER_ELIGIBLE_THRESHOLD_MS
188+
> runner_eligible_threshold
187189
{
188190
notifications.push(RunnerNotification {
189191
runner_id: runner.runner_id,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::time::Instant;
1313
use universaldb::options::{ConflictRangeType, MutationType, StreamingMode};
1414
use universaldb::utils::{FormalKey, IsolationLevel::*};
1515

16-
use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
16+
use crate::{keys, metrics};
1717

1818
use super::{Allocate, Destroy, Input, PendingAllocation, State, destroy};
1919

@@ -148,12 +148,14 @@ async fn allocate_actor(
148148
})
149149
.unwrap_or_default();
150150

151+
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();
152+
151153
// NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the
152154
// client wf
153155
let (for_serverless, res) = ctx
154156
.udb()?
155157
.run(|tx| async move {
156-
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
158+
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;
157159

158160
// Check if runner is an serverless runner
159161
let for_serverless = tx

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,6 @@ use vbare::OwnedVersionedData;
1212

1313
use crate::{keys, metrics, workflows::actor::Allocate};
1414

15-
/// How long after last ping before considering a runner ineligible for allocation.
16-
pub const RUNNER_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10);
17-
/// How long to wait after last ping before forcibly removing a runner from the database and deleting its
18-
/// workflow, evicting all actors. Note that the runner may still be running and can reconnect.
19-
///
20-
/// Runner ping interval is currently set to 3s.
21-
const RUNNER_LOST_THRESHOLD_MS: i64 = util::duration::seconds(15);
2215
/// Batch size of how many events to ack.
2316
const EVENT_ACK_BATCH_SIZE: i64 = 500;
2417

@@ -88,8 +81,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
8881
let input = input.clone();
8982

9083
async move {
84+
let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold();
85+
9186
match ctx
92-
.listen_with_timeout::<Main>(RUNNER_LOST_THRESHOLD_MS)
87+
.listen_with_timeout::<Main>(runner_lost_threshold)
9388
.await?
9489
{
9590
Some(Main::Forward(sig)) => {
@@ -117,7 +112,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
117112
runner_id: input.runner_id.to_string(),
118113
last_event_idx: init_data.last_event_idx,
119114
metadata: protocol::ProtocolMetadata {
120-
runner_lost_threshold: RUNNER_LOST_THRESHOLD_MS,
115+
runner_lost_threshold: runner_lost_threshold,
121116
},
122117
}),
123118
})
@@ -403,7 +398,7 @@ async fn handle_stopping(
403398
) -> Result<()> {
404399
if !state.draining {
405400
// The workflow will enter a draining state where it can still process signals if
406-
// needed. After RUNNER_LOST_THRESHOLD_MS it will exit this loop and stop.
401+
// needed. After the runner lost threshold it will exit this loop and stop.
407402
state.draining = true;
408403

409404
// Can't parallelize these two activities, requires reading from state
@@ -942,6 +937,8 @@ struct CheckExpiredInput {
942937

943938
#[activity(CheckExpired)]
944939
async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result<bool> {
940+
let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold();
941+
945942
ctx.udb()?
946943
.run(|tx| async move {
947944
let tx = tx.with_subspace(keys::subspace());
@@ -954,7 +951,7 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result<b
954951
.await?;
955952

956953
let now = util::timestamp::now();
957-
let expired = last_ping_ts < now - RUNNER_LOST_THRESHOLD_MS;
954+
let expired = last_ping_ts < now - runner_lost_threshold;
958955

959956
if expired {
960957
tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?;
@@ -989,6 +986,8 @@ pub(crate) async fn allocate_pending_actors(
989986
ctx: &ActivityCtx,
990987
input: &AllocatePendingActorsInput,
991988
) -> Result<AllocatePendingActorsOutput> {
989+
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();
990+
992991
// NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf
993992
let (allocations, pending_actor_count) = ctx
994993
.udb()?
@@ -1012,7 +1011,7 @@ pub(crate) async fn allocate_pending_actors(
10121011
Snapshot,
10131012
);
10141013
let mut pending_actor_count = 0;
1015-
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
1014+
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;
10161015

10171016
'queue_loop: loop {
10181017
let Some(queue_entry) = queue_stream.try_next().await? else {

rivetkit-typescript/packages/rivetkit/src/engine-process/mod.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ export async function ensureEngineProcess(
9999
RIVET__PEGBOARD__BASE_RETRY_TIMEOUT: "100",
100100
// Set max exponent to 1 to have a maximum of base_retry_timeout
101101
RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT: "1",
102+
// Reduce thresholds for faster development iteration
103+
//
104+
// Default ping interval is 3s, this gives a 2s & 4s grace
105+
RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD: "5000",
106+
RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD: "7000",
102107
},
103108
});
104109

0 commit comments

Comments
 (0)