Skip to content

Commit edd820d

Browse files
committed
fix: handle edge cases with gateway + actor sleep (#3248)
1 parent 3d7a3d3 commit edd820d

File tree

23 files changed

+700
-215
lines changed

23 files changed

+700
-215
lines changed

engine/packages/engine/src/util/wf/signal.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ pub async fn print_signals(signals: Vec<SignalData>, pretty: bool) -> Result<()>
5454
);
5555
}
5656

57-
if let Some(workflow_id) = &signal.workflow_id {
58-
println!(" {} {}", style("to workflow id").bold(), workflow_id,);
59-
}
60-
6157
if let Some(workflow_id) = signal.workflow_id {
6258
println!(" {} {}", style("to workflow id").bold(), workflow_id);
6359
}

engine/packages/guard/src/routing/pegboard_gateway.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ pub async fn route_request(
101101
let mut ready_sub = ctx
102102
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", actor_id))
103103
.await?;
104+
let mut stopped_sub = ctx
105+
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", actor_id))
106+
.await?;
104107
let mut fail_sub = ctx
105108
.subscribe::<pegboard::workflows::actor::Failed>(("actor_id", actor_id))
106109
.await?;
@@ -122,6 +125,8 @@ pub async fn route_request(
122125

123126
// Wake actor if sleeping
124127
if actor.sleeping {
128+
tracing::debug!(?actor_id, "actor sleeping, waking");
129+
125130
ctx.signal(pegboard::workflows::actor::Wake {})
126131
.to_workflow_id(actor.workflow_id)
127132
.send()
@@ -133,20 +138,51 @@ pub async fn route_request(
133138
} else {
134139
tracing::debug!(?actor_id, "waiting for actor to become ready");
135140

141+
let mut wake_retries = 0;
142+
136143
// Wait for ready, fail, or destroy
137-
tokio::select! {
138-
res = ready_sub.next() => { res?.runner_id },
139-
res = fail_sub.next() => {
140-
let msg = res?;
141-
return Err(msg.error.clone().build());
142-
}
143-
res = destroy_sub.next() => {
144-
res?;
145-
return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build());
146-
}
147-
// Ready timeout
148-
_ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => {
149-
return Err(errors::ActorReadyTimeout { actor_id }.build());
144+
loop {
145+
tokio::select! {
146+
res = ready_sub.next() => break res?.runner_id,
147+
res = stopped_sub.next() => {
148+
res?;
149+
150+
// Attempt to rewake once
151+
if wake_retries < 3 {
152+
tracing::debug!(?actor_id, ?wake_retries, "actor stopped while we were waiting for it to beocme ready, attempting rewake");
153+
wake_retries += 1;
154+
155+
let res = ctx.signal(pegboard::workflows::actor::Wake {})
156+
.to_workflow_id(actor.workflow_id)
157+
.send()
158+
.await;
159+
160+
if let Some(WorkflowError::WorkflowNotFound) = res
161+
.as_ref()
162+
.err()
163+
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
164+
{
165+
tracing::warn!(
166+
?actor_id,
167+
"actor workflow not found for rewake"
168+
);
169+
} else {
170+
res?;
171+
}
172+
}
173+
}
174+
res = fail_sub.next() => {
175+
let msg = res?;
176+
return Err(msg.error.clone().build());
177+
}
178+
res = destroy_sub.next() => {
179+
res?;
180+
return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build());
181+
}
182+
// Ready timeout
183+
_ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => {
184+
return Err(errors::ActorReadyTimeout { actor_id }.build());
185+
}
150186
}
151187
}
152188
};

engine/packages/pegboard-serverless/src/lib.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint")
2626
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
2727
const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots");
2828
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
29-
const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id");
29+
const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name");
3030

3131
const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5);
3232

@@ -109,7 +109,9 @@ async fn tick(
109109

110110
// Process each runner config with error handling
111111
for (ns_id, runner_name, desired_slots) in &serverless_data {
112-
let runner_config = runner_configs.iter().find(|rc| rc.namespace_id == *ns_id);
112+
let runner_config = runner_configs
113+
.iter()
114+
.find(|rc| rc.namespace_id == *ns_id && &rc.name == runner_name);
113115

114116
let Some(runner_config) = runner_config else {
115117
tracing::debug!(
@@ -225,7 +227,7 @@ async fn tick_runner_config(
225227

226228
for conn in draining_connections {
227229
if conn.shutdown_tx.send(()).is_err() {
228-
tracing::warn!(
230+
tracing::debug!(
229231
"serverless connection shutdown channel dropped, likely already stopped"
230232
);
231233
}
@@ -307,8 +309,6 @@ async fn outbound_handler(
307309
shutdown_rx: oneshot::Receiver<()>,
308310
draining: Arc<AtomicBool>,
309311
) -> Result<()> {
310-
tracing::debug!(%url, "sending outbound req");
311-
312312
let current_dc = ctx.config().topology().current_dc()?;
313313

314314
let client = rivet_pools::reqwest::client_no_timeout().await?;
@@ -341,12 +341,21 @@ async fn outbound_handler(
341341
HeaderValue::try_from(slots_per_runner)?,
342342
),
343343
(X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?),
344-
(X_RIVET_NAMESPACE_ID, HeaderValue::try_from(namespace_name)?),
344+
(
345+
X_RIVET_NAMESPACE_NAME,
346+
HeaderValue::try_from(namespace_name.clone())?,
347+
),
348+
// Deprecated
349+
(
350+
HeaderName::from_static("x-rivet-namespace-id"),
351+
HeaderValue::try_from(namespace_name)?,
352+
),
345353
])
346354
.chain(token)
347355
.collect();
348356

349357
let endpoint_url = format!("{}/start", url.trim_end_matches('/'));
358+
tracing::debug!(%endpoint_url, "sending outbound req");
350359
let req = client.get(endpoint_url).headers(headers);
351360

352361
let mut source = sse::EventSource::new(req).context("failed creating event source")?;

engine/packages/pegboard/src/workflows/actor/destroy.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input)
3535

3636
// Destroy actor
3737
if let (Some(runner_workflow_id), true) = (res.runner_workflow_id, &input.kill) {
38-
kill(ctx, input.actor_id, input.generation, runner_workflow_id).await?;
38+
ctx.signal(crate::workflows::runner::Command {
39+
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
40+
actor_id: input.actor_id.to_string(),
41+
generation: input.generation,
42+
}),
43+
})
44+
.to_workflow_id(runner_workflow_id)
45+
.send()
46+
.await?;
3947
}
4048

4149
// If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down
@@ -274,22 +282,3 @@ pub(crate) async fn clear_slot(
274282

275283
Ok(())
276284
}
277-
278-
pub(crate) async fn kill(
279-
ctx: &mut WorkflowCtx,
280-
actor_id: Id,
281-
generation: u32,
282-
runner_workflow_id: Id,
283-
) -> Result<()> {
284-
ctx.signal(crate::workflows::runner::Command {
285-
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
286-
actor_id: actor_id.to_string(),
287-
generation,
288-
}),
289-
})
290-
.to_workflow_id(runner_workflow_id)
291-
.send()
292-
.await?;
293-
294-
Ok(())
295-
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 63 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -295,13 +295,18 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
295295
return Ok(Loop::Continue);
296296
}
297297

298+
let (Some(runner_id), Some(runner_workflow_id)) = (state.runner_id, state.runner_workflow_id) else {
299+
tracing::warn!("actor not allocated, ignoring event");
300+
return Ok(Loop::Continue);
301+
};
302+
298303
match sig.inner {
299304
protocol::Event::EventActorIntent(protocol::EventActorIntent {
300305
intent,
301306
..
302307
}) => match intent {
303308
protocol::ActorIntent::ActorIntentSleep => {
304-
if let Some(runner_workflow_id) = state.runner_workflow_id {
309+
if !state.sleeping {
305310
state.gc_timeout_ts =
306311
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
307312
state.sleeping = true;
@@ -312,37 +317,35 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
312317
.await?;
313318

314319
// Send signal to kill actor now that we know it will be sleeping
315-
destroy::kill(
316-
ctx,
317-
input.actor_id,
318-
state.generation,
319-
runner_workflow_id,
320-
)
320+
ctx.signal(crate::workflows::runner::Command {
321+
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
322+
actor_id: input.actor_id.to_string(),
323+
generation: state.generation,
324+
}),
325+
})
326+
.to_workflow_id(runner_workflow_id)
327+
.send()
321328
.await?;
322-
} else {
323-
tracing::warn!("actor not allocated, ignoring sleep intent");
324329
}
325330
}
326331
protocol::ActorIntent::ActorIntentStop => {
327-
if let Some(runner_workflow_id) = state.runner_workflow_id {
328-
state.gc_timeout_ts =
329-
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
332+
state.gc_timeout_ts =
333+
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
330334

331-
ctx.activity(runtime::SetNotConnectableInput {
332-
actor_id: input.actor_id,
333-
})
334-
.await?;
335+
ctx.activity(runtime::SetNotConnectableInput {
336+
actor_id: input.actor_id,
337+
})
338+
.await?;
335339

336-
destroy::kill(
337-
ctx,
338-
input.actor_id,
339-
state.generation,
340-
runner_workflow_id,
341-
)
342-
.await?;
343-
} else {
344-
tracing::warn!("actor not allocated, ignoring stop intent");
345-
}
340+
ctx.signal(crate::workflows::runner::Command {
341+
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
342+
actor_id: input.actor_id.to_string(),
343+
generation: state.generation,
344+
}),
345+
})
346+
.to_workflow_id(runner_workflow_id)
347+
.send()
348+
.await?;
346349
}
347350
},
348351
protocol::Event::EventActorStateUpdate(
@@ -351,23 +354,19 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
351354
},
352355
) => match actor_state {
353356
protocol::ActorState::ActorStateRunning => {
354-
if let Some(runner_id) = state.runner_id {
355-
state.gc_timeout_ts = None;
356-
357-
ctx.activity(runtime::SetStartedInput {
358-
actor_id: input.actor_id,
359-
})
360-
.await?;
361-
362-
ctx.msg(Ready {
363-
runner_id,
364-
})
365-
.tag("actor_id", input.actor_id)
366-
.send()
367-
.await?;
368-
} else {
369-
tracing::warn!("actor not allocated, ignoring running event");
370-
}
357+
state.gc_timeout_ts = None;
358+
359+
ctx.activity(runtime::SetStartedInput {
360+
actor_id: input.actor_id,
361+
})
362+
.await?;
363+
364+
ctx.msg(Ready {
365+
runner_id,
366+
})
367+
.tag("actor_id", input.actor_id)
368+
.send()
369+
.await?;
371370
}
372371
protocol::ActorState::ActorStateStopped(
373372
protocol::ActorStateStopped { code, .. },
@@ -541,6 +540,19 @@ async fn handle_stopped(
541540
}
542541
}
543542

543+
// Kill old actor if lost (just in case it ended up allocating)
544+
if let (true, Some(old_runner_workflow_id)) = (lost, old_runner_workflow_id) {
545+
ctx.signal(crate::workflows::runner::Command {
546+
inner: protocol::Command::CommandStopActor(protocol::CommandStopActor {
547+
actor_id: input.actor_id.to_string(),
548+
generation: state.generation,
549+
}),
550+
})
551+
.to_workflow_id(old_runner_workflow_id)
552+
.send()
553+
.await?;
554+
}
555+
544556
// Reschedule no matter what
545557
if force_reschedule {
546558
match runtime::reschedule_actor(ctx, &input, state, true).await? {
@@ -566,18 +578,6 @@ async fn handle_stopped(
566578

567579
match (input.crash_policy, failed) {
568580
(CrashPolicy::Restart, true) => {
569-
// Kill old actor immediately if lost
570-
if lost {
571-
destroy::kill(
572-
ctx,
573-
input.actor_id,
574-
state.generation,
575-
old_runner_workflow_id
576-
.context("should have runner_workflow_id set if not sleeping")?,
577-
)
578-
.await?;
579-
}
580-
581581
match runtime::reschedule_actor(ctx, &input, state, false).await? {
582582
runtime::SpawnActorOutput::Allocated { .. } => {}
583583
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
@@ -637,6 +637,11 @@ async fn handle_stopped(
637637
state.wake_for_alarm = false;
638638
state.will_wake = false;
639639

640+
ctx.msg(Stopped {})
641+
.tag("actor_id", input.actor_id)
642+
.send()
643+
.await?;
644+
640645
Ok(None)
641646
}
642647

@@ -653,6 +658,9 @@ pub struct Ready {
653658
pub runner_id: Id,
654659
}
655660

661+
#[message("pegboard_actor_stopped")]
662+
pub struct Stopped {}
663+
656664
#[signal("pegboard_actor_allocate")]
657665
#[derive(Debug)]
658666
pub struct Allocate {

0 commit comments

Comments
 (0)