diff --git a/hyperactor/Cargo.toml b/hyperactor/Cargo.toml index 413ea8a70..1e0a7948d 100644 --- a/hyperactor/Cargo.toml +++ b/hyperactor/Cargo.toml @@ -57,6 +57,7 @@ hostname = "0.3" humantime = "2.1" hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" } hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" } +indenter = "0.3.3" inventory = "0.3.8" lazy_static = "1.5" local-ip-address = "0.5.3" diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 3fa30e4de..745641a68 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -151,6 +151,13 @@ pub trait Actor: Sized + Send + Debug + 'static { ) -> Result<(), anyhow::Error> { handle_undeliverable_message(cx, envelope) } + + /// If overridden, we will use this name in place of the + /// ActorId for talking about this actor in supervision error + /// messages. + fn display_name(&self) -> Option { + None + } } /// Default implementation of [`Actor::handle_undeliverable_message`]. Defined @@ -341,8 +348,11 @@ pub enum ActorErrorKind { #[error("{0}")] Generic(String), - /// An actor supervision event was not handled. - #[error("supervision: {0}")] + /// An error that occurred while trying to handle a supervision event. + #[error("{0} while handling {1}")] + ErrorDuringHandlingSupervision(String, Box), + /// The actor did not attempt to handle + #[error("{0}")] UnhandledSupervisionEvent(Box), } @@ -350,7 +360,12 @@ impl ActorErrorKind { /// Error while processing actor, i.e., returned by the actor's /// processing method. pub fn processing(err: anyhow::Error) -> Self { - Self::Generic(format!("processing error: {}", err)) + // Unbox err from the anyhow err. Check if it is an ActorErrorKind object. + // If it is directly use it as the new ActorError's ActorErrorKind. + // This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent + // up the handling infrastructure. + err.downcast::() + .unwrap_or_else(|err| Self::Generic(err.to_string())) } /// Unwound stracktrace of a panic. @@ -435,15 +450,6 @@ impl From for ActorError { } } -impl From for ActorError { - fn from(inner: ActorSupervisionEvent) -> Self { - Self { - actor_id: Box::new(inner.actor_id.clone()), - kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(Box::new(inner))), - } - } -} - /// A collection of signals to control the behavior of the actor. /// Signals are internal runtime control plane messages and should not be /// sent outside of the runtime. diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 88b0f6618..0c36fda11 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -3143,7 +3143,7 @@ mod tests { unreachable!() }; assert!(msg.to_string().contains( - "processing error: a message from \ + "a message from \ quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned" )); diff --git a/hyperactor/src/mailbox/undeliverable.rs b/hyperactor/src/mailbox/undeliverable.rs index f8d85f041..cf0bb0a8d 100644 --- a/hyperactor/src/mailbox/undeliverable.rs +++ b/hyperactor/src/mailbox/undeliverable.rs @@ -165,9 +165,9 @@ pub fn supervise_undeliverable_messages_with( if let Err(e) = sink.send(ActorSupervisionEvent::new( actor_id, + None, ActorStatus::generic_failure(format!("message not delivered: {}", env)), Some(headers), - None, )) { tracing::warn!( %e, diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 1880339ff..70e2c9466 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -1169,7 +1169,6 @@ impl Instance { let result = self .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx) .await; - let event = match result { Ok(_) => { // actor should have been stopped by run_actor_tree @@ -1189,11 +1188,12 @@ impl Instance { } _ => { let error_kind = ActorErrorKind::Generic(err.kind.to_string()); - self.change_status(ActorStatus::Failed(error_kind.clone())); + let status = ActorStatus::Failed(error_kind); + self.change_status(status.clone()); Some(ActorSupervisionEvent::new( self.cell.actor_id().clone(), - ActorStatus::Failed(error_kind), - None, + actor.display_name(), + status, None, )) } @@ -1387,7 +1387,11 @@ impl Instance { for supervision_event in supervision_event_receiver.drain() { self.handle_supervision_event(actor, supervision_event).await?; } - return Err(ActorError::new(self.self_id(), ActorErrorKind::processing(err))); + let kind = ActorErrorKind::processing(err); + return Err(ActorError { + actor_id: Box::new(self.self_id().clone()), + kind: Box::new(kind), + }); } } signal = signal_receiver.recv() => { @@ -1447,28 +1451,17 @@ impl Instance { Ok(()) } Ok(false) => { - // The supervision event wasn't handled by this actor, chain it and bubble it up. - let supervision_event = ActorSupervisionEvent::new( - self.self_id().clone(), - ActorStatus::generic_failure("did not handle supervision event"), - None, - Some(Box::new(supervision_event)), - ); - Err(supervision_event.into()) + let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event)); + Err(ActorError::new(self.self_id(), kind)) } Err(err) => { // The actor failed to handle the supervision event, it should die. // Create a new supervision event for this failure and propagate it. - let supervision_event = ActorSupervisionEvent::new( - self.self_id().clone(), - ActorStatus::generic_failure(format!( - "failed to handle supervision event: {}", - err - )), - None, - Some(Box::new(supervision_event)), + let kind = ActorErrorKind::ErrorDuringHandlingSupervision( + err.to_string(), + Box::new(supervision_event), ); - Err(supervision_event.into()) + Err(ActorError::new(self.self_id(), kind)) } } } @@ -2525,14 +2518,15 @@ mod tests { let _root_2_actor_id = root_2.actor_id().clone(); assert_matches!( root_2.await, - ActorStatus::Failed(err) if err.to_string() == "processing error: some random failure" + ActorStatus::Failed(err) if err.to_string() == "some random failure" ); // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was // stopped by a parent failure? + // Currently the parent fails with an error related to the child's failure. assert_matches!( root.await, - ActorStatus::Failed(ActorErrorKind::Generic(msg)) if msg == "did not handle supervision event" + ActorStatus::Failed(err) if err.to_string().contains("some random failure") ); assert_eq!(root_2_1.await, ActorStatus::Stopped); assert_eq!(root_1.await, ActorStatus::Stopped); @@ -2958,109 +2952,8 @@ mod tests { assert!(!root_2_1_state.load(Ordering::SeqCst)); assert_eq!( reported_event.event().map(|e| e.actor_id.clone()), - Some(root.actor_id().clone()) - ); - } - - #[tokio::test] - async fn test_supervision_event_handler_propagates() { - #[derive(Debug)] - struct FailingSupervisionActor; - - #[async_trait] - impl Actor for FailingSupervisionActor { - type Params = (); - - async fn new(_: ()) -> Result { - Ok(Self) - } - - async fn handle_supervision_event( - &mut self, - _this: &Instance, - _event: &ActorSupervisionEvent, - ) -> Result { - anyhow::bail!("failed to handle supervision event!") - } - } - - #[async_trait] - impl Handler for FailingSupervisionActor { - async fn handle( - &mut self, - _cx: &crate::Context, - message: String, - ) -> anyhow::Result<()> { - Err(anyhow::anyhow!(message)) - } - } - - #[derive(Debug)] - struct ParentActor(tokio::sync::mpsc::UnboundedSender); - - #[async_trait] - impl Actor for ParentActor { - type Params = tokio::sync::mpsc::UnboundedSender; - - async fn new( - supervision_events: tokio::sync::mpsc::UnboundedSender, - ) -> Result { - Ok(Self(supervision_events)) - } - - async fn handle_supervision_event( - &mut self, - _this: &Instance, - event: &ActorSupervisionEvent, - ) -> Result { - self.0.send(event.clone()).unwrap(); - Ok(true) - } - } - - let proc = Proc::local(); - - let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); - - let parent = proc.spawn::("parent", event_tx).await.unwrap(); - let child = proc - .spawn_child::(parent.cell().clone(), ()) - .await - .unwrap(); - let grandchild = proc - .spawn_child::(child.cell().clone(), ()) - .await - .unwrap(); - - let child_actor_id = child.actor_id().clone(); - let grandchild_actor_id = grandchild.actor_id().clone(); - - // Grandchild fails, triggering failure up the tree, finally receiving - // the event at the root. - grandchild.send("trigger failure".to_string()).unwrap(); - - assert!(grandchild.await.is_failed()); - assert!(child.await.is_failed()); - - assert_eq!( - event_rx.recv().await.unwrap(), - // The time field is ignored for Eq and PartialEq. - ActorSupervisionEvent::new( - child_actor_id, - ActorStatus::generic_failure( - "failed to handle supervision event: failed to handle supervision event!" - ), - None, - Some(Box::new(ActorSupervisionEvent::new( - grandchild_actor_id, - ActorStatus::generic_failure("processing error: trigger failure"), - None, - None, - ))), - ) + Some(root_2_1.actor_id().clone()) ); - - assert!(event_rx.try_recv().is_err()); } #[tokio::test] diff --git a/hyperactor/src/supervision.rs b/hyperactor/src/supervision.rs index 143ee9a09..ff1c141e4 100644 --- a/hyperactor/src/supervision.rs +++ b/hyperactor/src/supervision.rs @@ -10,13 +10,13 @@ use std::fmt; use std::fmt::Debug; +use std::fmt::Write; use std::time::SystemTime; -use chrono::DateTime; -use chrono::offset::Local; use derivative::Derivative; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; +use indenter::indented; use serde::Deserialize; use serde::Serialize; @@ -33,6 +33,8 @@ use crate::reference::ActorId; pub struct ActorSupervisionEvent { /// The actor id of the child actor where the event is triggered. pub actor_id: ActorId, + /// Friendly display name, if the actor class customized it. + pub display_name: Option, /// The time when the event is triggered. #[derivative(PartialEq = "ignore")] pub occurred_at: SystemTime, @@ -41,57 +43,108 @@ pub struct ActorSupervisionEvent { /// If this event is associated with a message, the message headers. #[derivative(PartialEq = "ignore")] pub message_headers: Option, - /// Optional supervision event that caused this event, for recursive propagation. - pub caused_by: Option>, } impl ActorSupervisionEvent { /// Create a new supervision event. Timestamp is set to the current time. pub fn new( actor_id: ActorId, + display_name: Option, actor_status: ActorStatus, message_headers: Option, - caused_by: Option>, ) -> Self { Self { actor_id, + display_name, occurred_at: RealClock.system_time_now(), actor_status, message_headers, - caused_by, } } - /// Compute an actor status from this event, ensuring that "caused-by" - /// events are included in failure states. This should be used as the - /// actor status when reporting events to users. - pub fn status(self) -> ActorStatus { - match self.actor_status { - ActorStatus::Failed(_) => { - ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(self))) - } - status => status, + + fn actor_name(&self) -> String { + self.display_name + .clone() + .unwrap_or_else(|| self.actor_id.to_string()) + } + + fn actually_failing_actor(&self) -> &ActorSupervisionEvent { + let mut event = self; + while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(e)) = + &event.actor_status + { + event = e; } + event } } impl std::error::Error for ActorSupervisionEvent {} +fn fmt_status<'a>( + actor_id: &ActorId, + status: &'a ActorStatus, + f: &mut fmt::Formatter<'_>, +) -> Result, fmt::Error> { + let mut f = indented(f).with_str(" "); + + match status { + ActorStatus::Stopped if actor_id.name() == "agent" => { + // Host agent stopped - use simplified message from D86984496 + let name = match actor_id.proc_id() { + crate::reference::ProcId::Direct(addr, _) => addr.to_string(), + crate::reference::ProcId::Ranked(_, _) => actor_id.proc_id().to_string(), + }; + write!( + f, + "The process {} owned by this actor became unresponsive and is assumed dead, check the log on the host for details", + name + )?; + Ok(None) + } + ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision( + msg, + during_handling_of, + )) => { + write!(f, "{}", msg)?; + Ok(Some(during_handling_of)) + } + ActorStatus::Failed(ActorErrorKind::Generic(msg)) => { + write!(f, "{}", msg)?; + Ok(None) + } + status => { + write!(f, "{}", status)?; + Ok(None) + } + } +} + impl fmt::Display for ActorSupervisionEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( + let actor_name = self.actor_name(); + writeln!( f, - "{}: {} at {}", - self.actor_id, - self.actor_status, - DateTime::::from(self.occurred_at), + "The actor {} and all its descendants have failed.", + actor_name )?; - if let Some(message_headers) = &self.message_headers { - let headers = serde_json::to_string(&message_headers) - .expect("could not serialize message headers"); - write!(f, " (headers: {})", headers)?; - } - if let Some(caused_by) = &self.caused_by { - write!(f, ": (caused by: {})", caused_by)?; + let failing_event = self.actually_failing_actor(); + let failing_actor = failing_event.actor_name(); + let its_name = if failing_actor == actor_name { + "itself" + } else { + &failing_actor + }; + writeln!(f, "This occurred because the actor {} failed.", its_name)?; + writeln!(f, "The error was:")?; + let during_handling_of = + fmt_status(&failing_event.actor_id, &failing_event.actor_status, f)?; + if let Some(event) = during_handling_of { + writeln!( + f, + "This error occurred during the handling of another failure:" + )?; + fmt::Display::fmt(event, f)?; } Ok(()) } diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 2dd6aa4c5..03b9306c9 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -1211,7 +1211,7 @@ mod tests { // The message will be returned! assert_matches!( events.next().await.unwrap(), - ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered") + ProcEvent::Crashed(0, reason) if reason.contains("message not delivered") ); // TODO: Stop the proc. @@ -1333,7 +1333,7 @@ mod tests { // The message will be returned! assert_matches!( events.next().await.unwrap(), - ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered") + ProcEvent::Crashed(0, reason) if reason.contains("message not delivered") ); // Get 'pong' to send 'ping' a message. Since 'ping's @@ -1348,7 +1348,7 @@ mod tests { // The message will be returned! assert_matches!( events.next().await.unwrap(), - ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered") + ProcEvent::Crashed(0, reason) if reason.contains("message not delivered") ); } @@ -1403,7 +1403,7 @@ mod tests { // The message will be returned! assert_matches!( events.next().await.unwrap(), - ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered") + ProcEvent::Crashed(0, reason) if reason.contains("message not delivered") ); // Stop the mesh. diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index 3aaeab15e..e06023868 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -842,8 +842,8 @@ impl ProcEvents { // TODO(T231868026): find a better way to represent all actors in a proc for supervision event let event = ActorSupervisionEvent::new( proc_id.actor_id("any", 0), - ActorStatus::generic_failure(format!("proc {} is stopped", proc_id)), None, + ActorStatus::generic_failure(format!("proc {} is stopped", proc_id)), None, ); tracing::debug!(name = "ActorSupervisionEventDelivery", event = ?event); diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 57d10ddd1..909a86315 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -747,8 +747,8 @@ impl ProcMeshRef { create_rank: rank, supervision_events: vec![ActorSupervisionEvent::new( agent_id, - ActorStatus::Stopped, None, + ActorStatus::Stopped, None, )], }), diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 1bf2da282..26b6b607a 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -29,7 +29,9 @@ use hyperactor::PortRef; use hyperactor::RefClient; use hyperactor::RemoteMessage; use hyperactor::WorldId; +use hyperactor::actor::ActorErrorKind; use hyperactor::actor::ActorHandle; +use hyperactor::actor::ActorStatus; use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; @@ -815,7 +817,12 @@ impl Handler for ProcActor { event: ActorSupervisionEvent, ) -> anyhow::Result<()> { let actor_id = event.actor_id.clone(); - let status = event.status(); + let status = match event.actor_status { + ActorStatus::Failed(_) => { + ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(event))) + } + status => status, + }; let message = ProcSupervisionState { world_id: self.params.world_id.clone(), proc_id: self.params.proc.proc_id().clone(), diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index 76939eb71..dfd25603a 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -23,6 +23,8 @@ use hyperactor::OncePortHandle; use hyperactor::PortHandle; use hyperactor::ProcId; use hyperactor::actor::ActorError; +use hyperactor::actor::ActorErrorKind; +use hyperactor::actor::ActorStatus; use hyperactor::attrs::Attrs; use hyperactor::mailbox::MessageEnvelope; use hyperactor::mailbox::Undeliverable; @@ -30,6 +32,7 @@ use hyperactor::message::Bind; use hyperactor::message::Bindings; use hyperactor::message::Unbind; use hyperactor::reference::WorldId; +use hyperactor::supervision::ActorSupervisionEvent; use hyperactor_mesh::actor_mesh::CAST_ACTOR_MESH_ID; use hyperactor_mesh::comm::multicast::CAST_ORIGINATING_SENDER; use hyperactor_mesh::comm::multicast::CastInfo; @@ -629,6 +632,12 @@ impl Actor for PythonActor { Ok(()) } + fn display_name(&self) -> Option { + self.instance.as_ref().and_then(|instance| { + Python::with_gil(|py| instance.bind(py).str().ok().map(|s| s.to_string())) + }) + } + async fn handle_undeliverable_message( &mut self, ins: &Instance, @@ -901,24 +910,40 @@ impl Handler for PythonActor { // error. This will not set the causal chain for ActorSupervisionEvent, // so make sure to include the original event in the error message // to provide context. - Err(anyhow::anyhow!( - "__supervise__ on {} did not handle a supervision event, \ - propagating to next owner. Original event:\n{}", - cx.self_id(), - message.event - )) + + // False -- we propagate the event onward, but update it with the fact that + // this actor is now the event creator. + let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new( + ActorSupervisionEvent::new( + cx.self_id().clone(), + self.display_name(), + ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent( + Box::new(message.event), + )), + None, + ), + )); + Err(anyhow::Error::new(err)) } } Err(err) => { // Any other exception will supersede in the propagation chain, // and will become its own supervision failure. // Include the event it was handling in the error message. - Err(anyhow::anyhow!( - "__supervise__ on {} raised an exception while handling a supervision event. Original event: {}. Exception raised: {}", - cx.self_id(), - message.event, - err - )) + + // Add to caused_by chain. + let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new( + ActorSupervisionEvent::new( + cx.self_id().clone(), + self.display_name(), + ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision( + err.to_string(), + Box::new(message.event), + )), + None, + ), + )); + Err(anyhow::Error::new(err)) } } }) diff --git a/monarch_hyperactor/src/actor_mesh.rs b/monarch_hyperactor/src/actor_mesh.rs index a52b7b5be..c1d1efd9a 100644 --- a/monarch_hyperactor/src/actor_mesh.rs +++ b/monarch_hyperactor/src/actor_mesh.rs @@ -315,8 +315,8 @@ impl ActorMeshProtocol for PythonActorMeshImpl { // Dummy actor as placeholder to indicate the whole mesh is stopped // TODO(albertli): remove this when pushing all supervision logic to rust. id!(default[0].actor[0]), - ActorStatus::generic_failure("actor mesh is stopped due to proc mesh shutdown"), None, + ActorStatus::generic_failure("actor mesh is stopped due to proc mesh shutdown"), None, )), }; @@ -372,11 +372,11 @@ impl PythonActorMeshImpl { // Dummy actor as placeholder to indicate the whole mesh is stopped // TODO(albertli): remove this when pushing all supervision logic to rust. id!(default[0].actor[0]), + None, ActorStatus::generic_failure( "actor mesh is stopped due to proc mesh shutdown", ), None, - None, ), ))) } diff --git a/monarch_hyperactor/src/supervision.rs b/monarch_hyperactor/src/supervision.rs index 443b5fe74..fe143ce69 100644 --- a/monarch_hyperactor/src/supervision.rs +++ b/monarch_hyperactor/src/supervision.rs @@ -80,6 +80,10 @@ impl MeshFailure { fn __repr__(&self) -> String { format!("{}", self) } + + fn report(&self) -> String { + format!("{}", self.event) + } } pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { diff --git a/monarch_hyperactor/src/v1/actor_mesh.rs b/monarch_hyperactor/src/v1/actor_mesh.rs index 4fd885ca7..698d7c5c1 100644 --- a/monarch_hyperactor/src/v1/actor_mesh.rs +++ b/monarch_hyperactor/src/v1/actor_mesh.rs @@ -375,8 +375,8 @@ fn actor_state_to_supervision_events( } else { vec![ActorSupervisionEvent::new( actor_id.expect("actor_id is None"), - ActorStatus::Stopped, None, + ActorStatus::Stopped, None, )] } @@ -503,12 +503,12 @@ async fn actor_states_monitor( 0, ActorSupervisionEvent::new( cx.instance().self_id().clone(), + None, ActorStatus::generic_failure(format!( "unable to query for proc states: {:?}", e )), None, - None, ), mesh.name(), &owner, @@ -548,6 +548,7 @@ async fn actor_states_monitor( // Attribute this to the monitored actor, even if the underlying // cause is a proc_failure. We propagate the cause explicitly. mesh.get(point.rank()).unwrap().actor_id().clone(), + None, actor_status, // ActorStatus::Failed(ActorErrorKind::Generic(format!( // "process failure: {}", @@ -559,7 +560,6 @@ async fn actor_states_monitor( // }) // ))), None, - None, ), mesh.name(), &owner, @@ -580,12 +580,12 @@ async fn actor_states_monitor( 0, ActorSupervisionEvent::new( cx.instance().self_id().clone(), + None, ActorStatus::generic_failure(format!( "unable to query for actor states: {:?}", e )), None, - None, ), mesh.name(), &owner, diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/supervision.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/supervision.pyi index f8ecbd3ec..bedf161b6 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/supervision.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/supervision.pyi @@ -25,3 +25,8 @@ class MeshFailure: """ @property def mesh(self) -> object: ... + def report(self) -> str: + """ + User-readable error report for this particular failure. + """ + ... diff --git a/python/monarch/_src/actor/actor_mesh.py b/python/monarch/_src/actor/actor_mesh.py index 4d37ca5f2..dcf9ec830 100644 --- a/python/monarch/_src/actor/actor_mesh.py +++ b/python/monarch/_src/actor/actor_mesh.py @@ -821,6 +821,8 @@ def send(self, obj: Any) -> None: pass def exception(self, obj: Exception) -> None: + if isinstance(obj, ActorError): + obj = obj.exception # we deliver each error exactly once, so if there is no port to respond to, # the error is sent to the current actor as an exception. raise obj from None diff --git a/python/monarch/_src/actor/supervision.py b/python/monarch/_src/actor/supervision.py index eada6df41..afd69aabd 100644 --- a/python/monarch/_src/actor/supervision.py +++ b/python/monarch/_src/actor/supervision.py @@ -23,5 +23,5 @@ def unhandled_fault_hook(failure: MeshFailure) -> None: Single argument is the SupervisionEvent """ - _logger.error(f"Unhandled mesh failure, crashing! {failure}") + _logger.error(f"Unhandled monarch error on the root actor: {failure.report()}") sys.exit(1) diff --git a/python/tests/test_actor_error.py b/python/tests/test_actor_error.py index 1702a1985..a1c9bdfa5 100644 --- a/python/tests/test_actor_error.py +++ b/python/tests/test_actor_error.py @@ -693,8 +693,8 @@ async def test_supervision_with_proc_mesh_stopped(mesh) -> None: # new call should fail with check of health state of actor mesh with pytest.raises( SupervisionError, - match="actor mesh is stopped due to proc mesh shutdown" - + "|Actor .* exited because of the following reason.*stopped", + match=r"actor mesh is stopped due to proc mesh shutdown" + + "|The actor .* and all its descendants have failed", ): await actor_mesh.check.call() @@ -1019,10 +1019,7 @@ async def test_supervise_callback_unhandled(): ) message = re.compile( - """\ -__supervise__ on .*supervisor.* did not handle a supervision event, propagating to next owner. \ -Original event:.*error_actor.*\ -""", + r"The actor .* and all its descendants have failed\..*error_actor", re.DOTALL, ) # Note that __supervise__ will not get called until the next message diff --git a/python/tests/test_python_actors.py b/python/tests/test_python_actors.py index adf82b4a3..d6527d88a 100644 --- a/python/tests/test_python_actors.py +++ b/python/tests/test_python_actors.py @@ -1486,7 +1486,7 @@ async def test_actor_mesh_stop() -> None: with pytest.raises( SupervisionError, - match="Actor .*printer_.* (exited because of the following reason|is unhealthy with reason).*stopped", + match=r"(?s)Actor .*printer_.* exited because of the following reason:.*stopped", ): await am_1.print.call("hello 1") diff --git a/python/tests/test_supervision_hierarchy.py b/python/tests/test_supervision_hierarchy.py new file mode 100644 index 000000000..c95c0d10a --- /dev/null +++ b/python/tests/test_supervision_hierarchy.py @@ -0,0 +1,133 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import os + +from threading import Event +from typing import Callable, Optional, TypeVar + +import monarch.actor +from monarch._rust_bindings.monarch_hyperactor.supervision import MeshFailure + +from monarch.actor import Actor, endpoint, this_host + + +T = TypeVar("T") + + +class Lambda(Actor): + # pyre-ignore[56] + @endpoint + def run(self, func: Callable[[], T]) -> T: + return func() + + +class Nest(Actor): + def __init__(self): + self.nest = this_host().spawn_procs().spawn("nested", Lambda) + + # pyre-ignore[56] + @endpoint + def nested(self, func: Callable[[], T]) -> T: + return self.nest.run.broadcast(func) + + # pyre-ignore[56] + @endpoint + def nested_call_one(self, func: Callable[[], T]) -> T: + return self.nest.run.call_one(func).get() + + # pyre-ignore[56] + @endpoint + def direct(self, func: Callable[[], T]) -> T: + return func() + + @endpoint + def kill_nest(self) -> None: + pid = self.nest.run.call_one(lambda: os.getpid()).get() + os.kill(pid, 9) + + +def error(): + print("I AM ABOUT TO ERROR!!!!") + raise ValueError("Error.") + + +class SuperviseNest(Nest): + def __supervise__(self, x): + print("SUPERVISE: ", x) + + +class FaultCapture: + """Helper class to capture unhandled faults for testing.""" + + def __init__(self): + self.failure_happened = Event() + self.captured_failure: Optional[MeshFailure] = None + self.original_hook = None + + def __enter__(self): + self.original_hook = monarch.actor.unhandled_fault_hook + monarch.actor.unhandled_fault_hook = self.capture_fault + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.failure_happened.wait(timeout=10) + monarch.actor.unhandled_fault_hook = self.original_hook + + def capture_fault(self, failure: MeshFailure) -> None: + """Capture the fault instead of exiting the process.""" + print(f"Captured fault: {failure.report()}") + self.captured_failure = failure + self.failure_happened.set() + + def assert_fault_occurred(self, expected_substring: Optional[str] = None): + """Assert that a fault was captured, optionally checking the message.""" + assert ( + self.captured_failure is not None + ), "Expected a fault to be captured, but none occurred" + if expected_substring: + report = self.captured_failure.report() + assert expected_substring in report, ( + f"Expected fault message to contain '{expected_substring}', " + f"but got: {report}" + ) + + +def test_actor_failure(): + """ + If an actor dies, the client should receive an unhandled fault. + """ + with FaultCapture() as capture: + actor = this_host().spawn_procs().spawn("actor", Lambda) + actor.run.broadcast(error) + + capture.assert_fault_occurred("This occurred because the actor itself failed.") + + +def test_proc_failure(): + """ + If a proc dies, the client should receive an unhandled fault. + """ + with FaultCapture() as capture: + actor = this_host().spawn_procs().spawn("actor", Nest) + actor.kill_nest.call_one().get() + + capture.assert_fault_occurred() + + +def test_nested_mesh_kills_actor_actor_error(): + """ + If a nested actor errors, the fault should propagate to the client. + """ + with FaultCapture() as capture: + actor = this_host().spawn_procs().spawn("actor", Nest) + v = actor.nested_call_one.call_one(lambda: 4).get() + assert v == 4 + actor.nested.call_one(error).get() + print("ERRORED THE ACTOR") + capture.assert_fault_occurred( + "actor .. failed" + )