Skip to content

Commit 52b90eb

Browse files
zdevitometa-codesync[bot]
authored andcommitted
ActorStatus::Failed(String) -> Failed(ActorErrorKind) (#1815)
Summary: Pull Request resolved: #1815 Refactor to get a more structured error to ActorStatus. This deletes the unused structure in ActorErrorKind to make it cloneable/serializable so that we can put it in ActorStatus. Now that we can represent an ActorStatus that is an SupervisionEvent of a child, we can start to chain/collapse the supervision chain as it bubbles up. ghstack-source-id: 322758617 exported-using-ghexport Reviewed By: mariusae Differential Revision: D86723978 fbshipit-source-id: 3587161f752536ff913cb20ef616bd7c00ae9396
1 parent b82b189 commit 52b90eb

File tree

13 files changed

+109
-102
lines changed

13 files changed

+109
-102
lines changed

controller/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ impl ControllerMessageHandler for ControllerActor {
544544
(
545545
actor.clone(),
546546
match status {
547-
ActorStatus::Failed(msg) => msg.clone(),
547+
ActorStatus::Failed(msg) => msg.to_string(),
548548
_ => format!("unexpected actor status {status}"),
549549
},
550550
)

docs/source/books/hyperactor-book/src/actors/actor_handle.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ impl<A: Actor> IntoFuture for ActorHandle<A> {
162162
let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
163163
match result {
164164
Err(_) => ActorStatus::Unknown,
165-
Ok(status) => status.passthrough(),
165+
Ok(status) => status.clone(),
166166
}
167167
};
168168
future.boxed()

hyperactor/src/actor.rs

Lines changed: 45 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -315,49 +315,58 @@ pub struct ActorError {
315315
}
316316

317317
/// The kinds of actor serving errors.
318-
#[derive(thiserror::Error, Debug)]
318+
#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
319319
pub enum ActorErrorKind {
320+
/// Generic error with a formatted message.
321+
#[error("{0}")]
322+
Generic(String),
323+
324+
/// An actor supervision event was not handled.
325+
#[error("supervision: {0}")]
326+
UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
327+
}
328+
329+
impl ActorErrorKind {
320330
/// Error while processing actor, i.e., returned by the actor's
321331
/// processing method.
322-
#[error("processing error: {0}")]
323-
Processing(#[source] anyhow::Error),
332+
pub fn processing(err: anyhow::Error) -> Self {
333+
Self::Generic(format!("processing error: {}", err))
334+
}
324335

325336
/// Unwound stracktrace of a panic.
326-
#[error("panic: {0}")]
327-
Panic(#[source] anyhow::Error),
337+
pub fn panic(err: anyhow::Error) -> Self {
338+
Self::Generic(format!("panic: {}", err))
339+
}
328340

329341
/// Error during actor initialization.
330-
#[error("initialization error: {0}")]
331-
Init(#[source] anyhow::Error),
342+
pub fn init(err: anyhow::Error) -> Self {
343+
Self::Generic(format!("initialization error: {}", err))
344+
}
332345

333346
/// An underlying mailbox error.
334-
#[error(transparent)]
335-
Mailbox(#[from] MailboxError),
347+
pub fn mailbox(err: MailboxError) -> Self {
348+
Self::Generic(err.to_string())
349+
}
336350

337351
/// An underlying mailbox sender error.
338-
#[error(transparent)]
339-
MailboxSender(#[from] MailboxSenderError),
352+
pub fn mailbox_sender(err: MailboxSenderError) -> Self {
353+
Self::Generic(err.to_string())
354+
}
340355

341356
/// An underlying checkpoint error.
342-
#[error("checkpoint error: {0}")]
343-
Checkpoint(#[source] CheckpointError),
357+
pub fn checkpoint(err: CheckpointError) -> Self {
358+
Self::Generic(format!("checkpoint error: {}", err))
359+
}
344360

345361
/// An underlying message log error.
346-
#[error("message log error: {0}")]
347-
MessageLog(#[source] MessageLogError),
362+
pub fn message_log(err: MessageLogError) -> Self {
363+
Self::Generic(format!("message log error: {}", err))
364+
}
348365

349366
/// The actor's state could not be determined.
350-
#[error("actor is in an indeterminate state")]
351-
IndeterminateState,
352-
353-
/// An actor supervision event was not handled.
354-
#[error("supervision: {0}")]
355-
UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
356-
357-
/// A special kind of error that allows us to clone errors: we can keep the
358-
/// error string, but we lose the error structure.
359-
#[error("{0}")]
360-
Passthrough(#[from] anyhow::Error),
367+
pub fn indeterminate_state() -> Self {
368+
Self::Generic("actor is in an indeterminate state".to_string())
369+
}
361370
}
362371

363372
impl ActorError {
@@ -368,14 +377,6 @@ impl ActorError {
368377
kind: Box::new(kind),
369378
}
370379
}
371-
372-
/// Passthrough this error.
373-
fn passthrough(&self) -> Self {
374-
ActorError::new(
375-
&self.actor_id,
376-
ActorErrorKind::Passthrough(anyhow::anyhow!("{}", self.kind)),
377-
)
378-
}
379380
}
380381

381382
impl fmt::Display for ActorError {
@@ -395,7 +396,7 @@ impl From<MailboxError> for ActorError {
395396
fn from(inner: MailboxError) -> Self {
396397
Self {
397398
actor_id: Box::new(inner.actor_id().clone()),
398-
kind: Box::new(ActorErrorKind::from(inner)),
399+
kind: Box::new(ActorErrorKind::mailbox(inner)),
399400
}
400401
}
401402
}
@@ -404,7 +405,7 @@ impl From<MailboxSenderError> for ActorError {
404405
fn from(inner: MailboxSenderError) -> Self {
405406
Self {
406407
actor_id: Box::new(inner.location().actor_id().clone()),
407-
kind: Box::new(ActorErrorKind::from(inner)),
408+
kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
408409
}
409410
}
410411
}
@@ -413,7 +414,7 @@ impl From<ActorSupervisionEvent> for ActorError {
413414
fn from(inner: ActorSupervisionEvent) -> Self {
414415
Self {
415416
actor_id: Box::new(inner.actor_id.clone()),
416-
kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(inner)),
417+
kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(Box::new(inner))),
417418
}
418419
}
419420
}
@@ -473,9 +474,8 @@ pub enum ActorStatus {
473474
Stopping,
474475
/// The actor is stopped. It is no longer processing messages.
475476
Stopped,
476-
/// The actor failed with the provided actor error formatted in string
477-
/// representation.
478-
Failed(String),
477+
/// The actor failed with the provided actor error.
478+
Failed(ActorErrorKind),
479479
}
480480

481481
impl ActorStatus {
@@ -489,23 +489,11 @@ impl ActorStatus {
489489
matches!(self, Self::Failed(_))
490490
}
491491

492-
/// Create a passthrough of this status. The returned status is a clone,
493-
/// except that [`ActorStatus::Failed`] is replaced with its passthrough.
494-
fn passthrough(&self) -> Self {
495-
match self {
496-
Self::Unknown => Self::Unknown,
497-
Self::Created => Self::Created,
498-
Self::Initializing => Self::Initializing,
499-
Self::Client => Self::Client,
500-
Self::Idle => Self::Idle,
501-
Self::Processing(instant, handler) => Self::Processing(*instant, handler.clone()),
502-
Self::Saving(instant) => Self::Saving(*instant),
503-
Self::Loading(instant) => Self::Loading(*instant),
504-
Self::Stopping => Self::Stopping,
505-
Self::Stopped => Self::Stopped,
506-
Self::Failed(err) => Self::Failed(err.clone()),
507-
}
492+
/// Create a generic failure status with the provided error message.
493+
pub fn generic_failure(message: impl Into<String>) -> Self {
494+
Self::Failed(ActorErrorKind::Generic(message.into()))
508495
}
496+
509497
fn span_string(&self) -> &'static str {
510498
self.arm().unwrap_or_default()
511499
}
@@ -662,7 +650,7 @@ impl<A: Actor> IntoFuture for ActorHandle<A> {
662650
let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
663651
match result {
664652
Err(_) => ActorStatus::Unknown,
665-
Ok(status) => status.passthrough(),
653+
Ok(status) => status.clone(),
666654
}
667655
};
668656

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3142,8 +3142,8 @@ mod tests {
31423142
let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
31433143
unreachable!()
31443144
};
3145-
assert!(msg.as_str().contains(
3146-
"serving quux[0].foo[0]: processing error: a message from \
3145+
assert!(msg.to_string().contains(
3146+
"processing error: a message from \
31473147
quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
31483148
));
31493149

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub fn supervise_undeliverable_messages_with<R, F>(
165165

166166
if let Err(e) = sink.send(ActorSupervisionEvent::new(
167167
actor_id,
168-
ActorStatus::Failed(format!("message not delivered: {}", env)),
168+
ActorStatus::generic_failure(format!("message not delivered: {}", env)),
169169
Some(headers),
170170
None,
171171
)) {

hyperactor/src/proc.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,18 +1102,21 @@ impl<A: Actor> Instance<A> {
11021102
let (actor_status, event) = match result {
11031103
Ok(_) => (ActorStatus::Stopped, None),
11041104
Err(ActorError {
1105-
kind: box ActorErrorKind::UnhandledSupervisionEvent(event),
1105+
kind: box ActorErrorKind::UnhandledSupervisionEvent(box event),
11061106
..
11071107
}) => (event.actor_status.clone(), Some(event)),
1108-
Err(err) => (
1109-
ActorStatus::Failed(err.to_string()),
1110-
Some(ActorSupervisionEvent::new(
1111-
self.cell.actor_id().clone(),
1112-
ActorStatus::Failed(err.to_string()),
1113-
None,
1114-
None,
1115-
)),
1116-
),
1108+
Err(err) => {
1109+
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1110+
(
1111+
ActorStatus::Failed(error_kind.clone()),
1112+
Some(ActorSupervisionEvent::new(
1113+
self.cell.actor_id().clone(),
1114+
ActorStatus::Failed(error_kind),
1115+
None,
1116+
None,
1117+
)),
1118+
)
1119+
}
11171120
};
11181121

11191122
if let Some(parent) = self.cell.maybe_unlink_parent() {
@@ -1174,7 +1177,10 @@ impl<A: Actor> Instance<A> {
11741177
.unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
11751178
Err(ActorError::new(
11761179
self.self_id(),
1177-
ActorErrorKind::Panic(anyhow::anyhow!("{}\n{}", err_msg, backtrace)),
1180+
ActorErrorKind::panic(anyhow::anyhow!(
1181+
"{}
1182+
{}", err_msg, backtrace
1183+
)),
11781184
))
11791185
}
11801186
};
@@ -1244,7 +1250,7 @@ impl<A: Actor> Instance<A> {
12441250
actor
12451251
.init(self)
12461252
.await
1247-
.map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::Init(err)))?;
1253+
.map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
12481254
let need_drain;
12491255
'messages: loop {
12501256
self.change_status(ActorStatus::Idle);
@@ -1260,7 +1266,7 @@ impl<A: Actor> Instance<A> {
12601266
for supervision_event in supervision_event_receiver.drain() {
12611267
self.handle_supervision_event(actor, supervision_event).await?;
12621268
}
1263-
return Err(ActorError::new(self.self_id(), ActorErrorKind::Processing(err)));
1269+
return Err(ActorError::new(self.self_id(), ActorErrorKind::processing(err)));
12641270
}
12651271
}
12661272
signal = signal_receiver.recv() => {
@@ -1293,7 +1299,7 @@ impl<A: Actor> Instance<A> {
12931299
if let Err(err) = work.handle(actor, self).await {
12941300
return Err(ActorError::new(
12951301
self.self_id(),
1296-
ActorErrorKind::Processing(err),
1302+
ActorErrorKind::processing(err),
12971303
));
12981304
}
12991305
n += 1;
@@ -1323,7 +1329,7 @@ impl<A: Actor> Instance<A> {
13231329
// The supervision event wasn't handled by this actor, chain it and bubble it up.
13241330
let supervision_event = ActorSupervisionEvent::new(
13251331
self.self_id().clone(),
1326-
ActorStatus::Failed("did not handle supervision event".to_string()),
1332+
ActorStatus::generic_failure("did not handle supervision event"),
13271333
None,
13281334
Some(Box::new(supervision_event)),
13291335
);
@@ -1334,7 +1340,10 @@ impl<A: Actor> Instance<A> {
13341340
// Create a new supervision event for this failure and propagate it.
13351341
let supervision_event = ActorSupervisionEvent::new(
13361342
self.self_id().clone(),
1337-
ActorStatus::Failed(format!("failed to handle supervision event: {}", err)),
1343+
ActorStatus::generic_failure(format!(
1344+
"failed to handle supervision event: {}",
1345+
err
1346+
)),
13381347
None,
13391348
Some(Box::new(supervision_event)),
13401349
);
@@ -2388,17 +2397,17 @@ mod tests {
23882397
"some random failure"
23892398
)))
23902399
.unwrap();
2391-
let root_2_actor_id = root_2.actor_id().clone();
2400+
let _root_2_actor_id = root_2.actor_id().clone();
23922401
assert_matches!(
23932402
root_2.await,
2394-
ActorStatus::Failed(err) if err == format!("serving {}: processing error: some random failure", root_2_actor_id)
2403+
ActorStatus::Failed(err) if err.to_string() == "processing error: some random failure"
23952404
);
23962405

23972406
// TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
23982407
// stopped by a parent failure?
2399-
assert_eq!(
2408+
assert_matches!(
24002409
root.await,
2401-
ActorStatus::Failed("did not handle supervision event".to_string())
2410+
ActorStatus::Failed(ActorErrorKind::Generic(msg)) if msg == "did not handle supervision event"
24022411
);
24032412
assert_eq!(root_2_1.await, ActorStatus::Stopped);
24042413
assert_eq!(root_1.await, ActorStatus::Stopped);
@@ -2913,16 +2922,13 @@ mod tests {
29132922
// The time field is ignored for Eq and PartialEq.
29142923
ActorSupervisionEvent::new(
29152924
child_actor_id,
2916-
ActorStatus::Failed(
2925+
ActorStatus::generic_failure(
29172926
"failed to handle supervision event: failed to handle supervision event!"
2918-
.to_string(),
29192927
),
29202928
None,
29212929
Some(Box::new(ActorSupervisionEvent::new(
29222930
grandchild_actor_id,
2923-
ActorStatus::Failed(
2924-
"serving local[0].parent[2]: processing error: trigger failure".to_string()
2925-
),
2931+
ActorStatus::generic_failure("processing error: trigger failure"),
29262932
None,
29272933
None,
29282934
))),

hyperactor/src/supervision.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::Serialize;
2222

2323
use crate as hyperactor; // for macros
2424
use crate::Named;
25+
use crate::actor::ActorErrorKind;
2526
use crate::actor::ActorStatus;
2627
use crate::attrs::Attrs;
2728
use crate::reference::ActorId;
@@ -63,10 +64,12 @@ impl ActorSupervisionEvent {
6364
/// Compute an actor status from this event, ensuring that "caused-by"
6465
/// events are included in failure states. This should be used as the
6566
/// actor status when reporting events to users.
66-
pub fn status(&self) -> ActorStatus {
67-
match &self.actor_status {
68-
ActorStatus::Failed(msg) => ActorStatus::Failed(format!("{}: {}", self, msg)),
69-
status => status.clone(),
67+
pub fn status(self) -> ActorStatus {
68+
match self.actor_status {
69+
ActorStatus::Failed(_) => {
70+
ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(self)))
71+
}
72+
status => status,
7073
}
7174
}
7275
}

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ impl ProcEvents {
842842
// TODO(T231868026): find a better way to represent all actors in a proc for supervision event
843843
let event = ActorSupervisionEvent::new(
844844
proc_id.actor_id("any", 0),
845-
ActorStatus::Failed(format!("proc {} is stopped", proc_id)),
845+
ActorStatus::generic_failure(format!("proc {} is stopped", proc_id)),
846846
None,
847847
None,
848848
);

0 commit comments

Comments
 (0)