diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 835c7f882..a1ed5bdb1 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::SystemTime; use async_trait::async_trait; +use enum_as_inner::EnumAsInner; use futures::FutureExt; use futures::future::BoxFuture; use serde::Deserialize; @@ -472,7 +473,16 @@ impl fmt::Display for Signal { } /// The runtime status of an actor. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)] +#[derive( + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Clone, + Named, + EnumAsInner +)] pub enum ActorStatus { /// The actor status is unknown. Unknown, @@ -505,12 +515,7 @@ pub enum ActorStatus { impl ActorStatus { /// Tells whether the status is a terminal state. pub fn is_terminal(&self) -> bool { - matches!(self, Self::Stopped | Self::Failed(_)) - } - - /// Tells whether the status represents a failure. - pub fn is_failed(&self) -> bool { - matches!(self, Self::Failed(_)) + self.is_stopped() || self.is_failed() } /// Create a generic failure status with the provided error message. diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 4d1d36c52..6417f4251 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -21,6 +21,7 @@ use std::hash::Hasher; use std::ops::Deref; use std::panic; use std::panic::AssertUnwindSafe; +use std::panic::Location; use std::pin::Pin; use std::sync::Arc; use std::sync::OnceLock; @@ -47,6 +48,7 @@ use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::Instrument; use tracing::Level; +use tracing::Span; use uuid::Uuid; use crate as hyperactor; @@ -54,6 +56,7 @@ use crate::Actor; use crate::ActorRef; use crate::Handler; use crate::Message; +use crate::Named as _; use crate::RemoteMessage; use crate::actor::ActorError; use crate::actor::ActorErrorKind; @@ -789,7 +792,6 @@ impl Proc { } /// Create a root allocation in the proc. - #[hyperactor::instrument(fields(actor_name=name))] fn allocate_root_id(&self, name: &str) -> Result { let name = name.to_string(); match self.state().roots.entry(name.to_string()) { @@ -1024,8 +1026,36 @@ impl Instance { /// Notify subscribers of a change in the actors status and bump counters with the duration which /// the last status was active for. + #[track_caller] fn change_status(&self, new: ActorStatus) { - self.status_tx.send_replace(new.clone()); + let old = self.status_tx.send_replace(new.clone()); + // Actor status changes between Idle and Processing when handling every + // message. It creates too many logs if we want to log these 2 states. + // Therefore we skip the status changes between them. + if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) { + let new_status = new.arm().unwrap_or("unknown"); + let change_reason = match new { + ActorStatus::Failed(reason) => reason.to_string(), + _ => "".to_string(), + }; + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = new_status, + prev_status = old.arm().unwrap_or("unknown"), + caller = %Location::caller(), + change_reason, + ); + } + } + + fn is_terminal(&self) -> bool { + self.status_tx.borrow().is_terminal() + } + + fn is_stopped(&self) -> bool { + self.status_tx.borrow().is_stopped() } /// This instance's actor ID. @@ -1091,9 +1121,14 @@ impl Instance { let instance_cell = self.cell.clone(); let actor_id = self.cell.actor_id().clone(); let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone()); - let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking( - self.serve(actor, actor_loop_receivers, work_rx), - )); + let actor_task_handle = A::spawn_server_task( + panic_handler::with_backtrace_tracking(self.serve( + actor, + actor_loop_receivers, + work_rx, + )) + .instrument(Span::current()), + ); tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle); instance_cell .inner @@ -1119,23 +1154,34 @@ impl Instance { .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx) .await; - let (actor_status, event) = match result { - Ok(_) => (ActorStatus::Stopped, None), - Err(ActorError { - kind: box ActorErrorKind::UnhandledSupervisionEvent(box event), - .. - }) => (event.actor_status.clone(), Some(event)), + let event = match result { + Ok(_) => { + // actor should have been stopped by run_actor_tree + assert!(self.is_stopped()); + None + } Err(err) => { - let error_kind = ActorErrorKind::Generic(err.kind.to_string()); - ( - ActorStatus::Failed(error_kind.clone()), - Some(ActorSupervisionEvent::new( - self.cell.actor_id().clone(), - ActorStatus::Failed(error_kind), - None, - None, - )), - ) + match *err.kind { + ActorErrorKind::UnhandledSupervisionEvent(box event) => { + // Currently only terminated actors are allowed to raise supervision events. + // If we want to change that in the future, we need to modify the exit + // status here too, because we use event's actor_status as this actor's + // terminal status. + assert!(event.actor_status.is_terminal()); + self.change_status(event.actor_status.clone()); + Some(event) + } + _ => { + let error_kind = ActorErrorKind::Generic(err.kind.to_string()); + self.change_status(ActorStatus::Failed(error_kind.clone())); + Some(ActorSupervisionEvent::new( + self.cell.actor_id().clone(), + ActorStatus::Failed(error_kind), + None, + None, + )) + } + } } }; @@ -1164,7 +1210,6 @@ impl Instance { self.proc.handle_supervision_event(event); } } - self.change_status(actor_status); } /// Runs the actor, and manages its supervision tree. When the function returns, @@ -1207,10 +1252,16 @@ impl Instance { } }; - if let Err(ref err) = result { - tracing::error!("{}: actor failure: {}", self.self_id(), err); + match &result { + Ok(_) => assert!(self.is_stopped()), + Err(err) => { + tracing::error!("{}: actor failure: {}", self.self_id(), err); + assert!(!self.is_terminal()); + // Send Stopping instead of Failed, because we still need to + // unlink child actors. + self.change_status(ActorStatus::Stopping); + } } - self.change_status(ActorStatus::Stopping); // After this point, we know we won't spawn any more children, // so we can safely read the current child keys. @@ -1292,7 +1343,6 @@ impl Instance { } /// Initialize and run the actor until it fails or is stopped. - #[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))] async fn run( &mut self, actor: &mut A, @@ -1407,6 +1457,7 @@ impl Instance { } } + #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))] async unsafe fn handle_message( &mut self, actor: &mut A, @@ -1436,18 +1487,12 @@ impl Instance { &headers, self.self_id().to_string(), ); - let span = tracing::debug_span!( - "actor_status", - actor_id = self.self_id().to_string(), - actor_name = self.self_id().name(), - name = self.cell.status().borrow().to_string(), - ); let context = Context::new(self, headers); // Pass a reference to the context to the handler, so that deref // coercion allows the `this` argument to be treated exactly like // &Instance. - actor.handle(&context, message).instrument(span).await + actor.handle(&context, message).await } // Spawn on child on this instance. Currently used only by cap::CanSpawn. @@ -1536,6 +1581,15 @@ impl Drop for Instance { if status.is_terminal() { false } else { + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = "Stopped", + prev_status = status.arm().unwrap_or("unknown"), + caller = %Location::caller(), + "Instance is dropped", + ); *status = ActorStatus::Stopped; true }