Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ hostname = "0.3"
humantime = "2.1"
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
indenter = "0.3.3"
inventory = "0.3.8"
lazy_static = "1.5"
local-ip-address = "0.5.3"
Expand Down
30 changes: 18 additions & 12 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ pub trait Actor: Sized + Send + Debug + 'static {
) -> Result<(), anyhow::Error> {
handle_undeliverable_message(cx, envelope)
}

/// If overridden, we will use this name in place of the
/// ActorId for talking about this actor in supervision error
/// messages.
fn display_name(&self) -> Option<String> {
None
}
}

/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
Expand Down Expand Up @@ -341,16 +348,24 @@ pub enum ActorErrorKind {
#[error("{0}")]
Generic(String),

/// An actor supervision event was not handled.
#[error("supervision: {0}")]
/// An error that occurred while trying to handle a supervision event.
#[error("{0} while handling {1}")]
ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
/// The actor did not attempt to handle
#[error("{0}")]
UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
}

impl ActorErrorKind {
/// Error while processing actor, i.e., returned by the actor's
/// processing method.
pub fn processing(err: anyhow::Error) -> Self {
Self::Generic(format!("processing error: {}", err))
// Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
// If it is directly use it as the new ActorError's ActorErrorKind.
// This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
// up the handling infrastructure.
err.downcast::<ActorErrorKind>()
.unwrap_or_else(|err| Self::Generic(err.to_string()))
}

/// Unwound stracktrace of a panic.
Expand Down Expand Up @@ -435,15 +450,6 @@ impl From<MailboxSenderError> for ActorError {
}
}

impl From<ActorSupervisionEvent> for ActorError {
fn from(inner: ActorSupervisionEvent) -> Self {
Self {
actor_id: Box::new(inner.actor_id.clone()),
kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(Box::new(inner))),
}
}
}

/// A collection of signals to control the behavior of the actor.
/// Signals are internal runtime control plane messages and should not be
/// sent outside of the runtime.
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,7 @@ mod tests {
unreachable!()
};
assert!(msg.to_string().contains(
"processing error: a message from \
"a message from \
quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
));

Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/mailbox/undeliverable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ pub fn supervise_undeliverable_messages_with<R, F>(

if let Err(e) = sink.send(ActorSupervisionEvent::new(
actor_id,
None,
ActorStatus::generic_failure(format!("message not delivered: {}", env)),
Some(headers),
None,
)) {
tracing::warn!(
%e,
Expand Down
145 changes: 19 additions & 126 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,6 @@ impl<A: Actor> Instance<A> {
let result = self
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
.await;

let event = match result {
Ok(_) => {
// actor should have been stopped by run_actor_tree
Expand All @@ -1189,11 +1188,12 @@ impl<A: Actor> Instance<A> {
}
_ => {
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
self.change_status(ActorStatus::Failed(error_kind.clone()));
let status = ActorStatus::Failed(error_kind);
self.change_status(status.clone());
Some(ActorSupervisionEvent::new(
self.cell.actor_id().clone(),
ActorStatus::Failed(error_kind),
None,
actor.display_name(),
status,
None,
))
}
Expand Down Expand Up @@ -1387,7 +1387,11 @@ impl<A: Actor> Instance<A> {
for supervision_event in supervision_event_receiver.drain() {
self.handle_supervision_event(actor, supervision_event).await?;
}
return Err(ActorError::new(self.self_id(), ActorErrorKind::processing(err)));
let kind = ActorErrorKind::processing(err);
return Err(ActorError {
actor_id: Box::new(self.self_id().clone()),
kind: Box::new(kind),
});
}
}
signal = signal_receiver.recv() => {
Expand Down Expand Up @@ -1447,28 +1451,17 @@ impl<A: Actor> Instance<A> {
Ok(())
}
Ok(false) => {
// The supervision event wasn't handled by this actor, chain it and bubble it up.
let supervision_event = ActorSupervisionEvent::new(
self.self_id().clone(),
ActorStatus::generic_failure("did not handle supervision event"),
None,
Some(Box::new(supervision_event)),
);
Err(supervision_event.into())
let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
Err(ActorError::new(self.self_id(), kind))
}
Err(err) => {
// The actor failed to handle the supervision event, it should die.
// Create a new supervision event for this failure and propagate it.
let supervision_event = ActorSupervisionEvent::new(
self.self_id().clone(),
ActorStatus::generic_failure(format!(
"failed to handle supervision event: {}",
err
)),
None,
Some(Box::new(supervision_event)),
let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
err.to_string(),
Box::new(supervision_event),
);
Err(supervision_event.into())
Err(ActorError::new(self.self_id(), kind))
}
}
}
Expand Down Expand Up @@ -2525,14 +2518,15 @@ mod tests {
let _root_2_actor_id = root_2.actor_id().clone();
assert_matches!(
root_2.await,
ActorStatus::Failed(err) if err.to_string() == "processing error: some random failure"
ActorStatus::Failed(err) if err.to_string() == "some random failure"
);

// TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
// stopped by a parent failure?
// Currently the parent fails with an error related to the child's failure.
assert_matches!(
root.await,
ActorStatus::Failed(ActorErrorKind::Generic(msg)) if msg == "did not handle supervision event"
ActorStatus::Failed(err) if err.to_string().contains("some random failure")
);
assert_eq!(root_2_1.await, ActorStatus::Stopped);
assert_eq!(root_1.await, ActorStatus::Stopped);
Expand Down Expand Up @@ -2958,109 +2952,8 @@ mod tests {
assert!(!root_2_1_state.load(Ordering::SeqCst));
assert_eq!(
reported_event.event().map(|e| e.actor_id.clone()),
Some(root.actor_id().clone())
);
}

#[tokio::test]
async fn test_supervision_event_handler_propagates() {
#[derive(Debug)]
struct FailingSupervisionActor;

#[async_trait]
impl Actor for FailingSupervisionActor {
type Params = ();

async fn new(_: ()) -> Result<Self, anyhow::Error> {
Ok(Self)
}

async fn handle_supervision_event(
&mut self,
_this: &Instance<Self>,
_event: &ActorSupervisionEvent,
) -> Result<bool, anyhow::Error> {
anyhow::bail!("failed to handle supervision event!")
}
}

#[async_trait]
impl Handler<String> for FailingSupervisionActor {
async fn handle(
&mut self,
_cx: &crate::Context<Self>,
message: String,
) -> anyhow::Result<()> {
Err(anyhow::anyhow!(message))
}
}

#[derive(Debug)]
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);

#[async_trait]
impl Actor for ParentActor {
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;

async fn new(
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
) -> Result<Self, anyhow::Error> {
Ok(Self(supervision_events))
}

async fn handle_supervision_event(
&mut self,
_this: &Instance<Self>,
event: &ActorSupervisionEvent,
) -> Result<bool, anyhow::Error> {
self.0.send(event.clone()).unwrap();
Ok(true)
}
}

let proc = Proc::local();

let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();

let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
let child = proc
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
.await
.unwrap();
let grandchild = proc
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
.await
.unwrap();

let child_actor_id = child.actor_id().clone();
let grandchild_actor_id = grandchild.actor_id().clone();

// Grandchild fails, triggering failure up the tree, finally receiving
// the event at the root.
grandchild.send("trigger failure".to_string()).unwrap();

assert!(grandchild.await.is_failed());
assert!(child.await.is_failed());

assert_eq!(
event_rx.recv().await.unwrap(),
// The time field is ignored for Eq and PartialEq.
ActorSupervisionEvent::new(
child_actor_id,
ActorStatus::generic_failure(
"failed to handle supervision event: failed to handle supervision event!"
),
None,
Some(Box::new(ActorSupervisionEvent::new(
grandchild_actor_id,
ActorStatus::generic_failure("processing error: trigger failure"),
None,
None,
))),
)
Some(root_2_1.actor_id().clone())
);

assert!(event_rx.try_recv().is_err());
}

#[tokio::test]
Expand Down
Loading