Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use enum_as_inner::EnumAsInner;
use futures::FutureExt;
use futures::future::BoxFuture;
use serde::Deserialize;
Expand Down Expand Up @@ -472,7 +473,16 @@ impl fmt::Display for Signal {
}

/// The runtime status of an actor.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
#[derive(
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Clone,
Named,
EnumAsInner
)]
pub enum ActorStatus {
/// The actor status is unknown.
Unknown,
Expand Down Expand Up @@ -505,12 +515,7 @@ pub enum ActorStatus {
impl ActorStatus {
/// Tells whether the status is a terminal state.
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Stopped | Self::Failed(_))
}

/// Tells whether the status represents a failure.
pub fn is_failed(&self) -> bool {
matches!(self, Self::Failed(_))
self.is_stopped() || self.is_failed()
}

/// Create a generic failure status with the provided error message.
Expand Down
120 changes: 87 additions & 33 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::hash::Hasher;
use std::ops::Deref;
use std::panic;
use std::panic::AssertUnwindSafe;
use std::panic::Location;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::OnceLock;
Expand All @@ -47,13 +48,15 @@ use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;
use uuid::Uuid;

use crate as hyperactor;
use crate::Actor;
use crate::ActorRef;
use crate::Handler;
use crate::Message;
use crate::Named as _;
use crate::RemoteMessage;
use crate::actor::ActorError;
use crate::actor::ActorErrorKind;
Expand Down Expand Up @@ -789,7 +792,6 @@ impl Proc {
}

/// Create a root allocation in the proc.
#[hyperactor::instrument(fields(actor_name=name))]
fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
let name = name.to_string();
match self.state().roots.entry(name.to_string()) {
Expand Down Expand Up @@ -1024,8 +1026,36 @@ impl<A: Actor> Instance<A> {

/// Notify subscribers of a change in the actors status and bump counters with the duration which
/// the last status was active for.
#[track_caller]
fn change_status(&self, new: ActorStatus) {
self.status_tx.send_replace(new.clone());
let old = self.status_tx.send_replace(new.clone());
// Actor status changes between Idle and Processing when handling every
// message. It creates too many logs if we want to log these 2 states.
// Therefore we skip the status changes between them.
if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) {
let new_status = new.arm().unwrap_or("unknown");
let change_reason = match new {
ActorStatus::Failed(reason) => reason.to_string(),
_ => "".to_string(),
};
tracing::info!(
name = "ActorStatus",
actor_id = %self.self_id(),
actor_name = self.self_id().name(),
status = new_status,
prev_status = old.arm().unwrap_or("unknown"),
caller = %Location::caller(),
change_reason,
);
}
}

fn is_terminal(&self) -> bool {
self.status_tx.borrow().is_terminal()
}

fn is_stopped(&self) -> bool {
self.status_tx.borrow().is_stopped()
}

/// This instance's actor ID.
Expand Down Expand Up @@ -1091,9 +1121,14 @@ impl<A: Actor> Instance<A> {
let instance_cell = self.cell.clone();
let actor_id = self.cell.actor_id().clone();
let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking(
self.serve(actor, actor_loop_receivers, work_rx),
));
let actor_task_handle = A::spawn_server_task(
panic_handler::with_backtrace_tracking(self.serve(
actor,
actor_loop_receivers,
work_rx,
))
.instrument(Span::current()),
);
tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
instance_cell
.inner
Expand All @@ -1119,23 +1154,34 @@ impl<A: Actor> Instance<A> {
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
.await;

let (actor_status, event) = match result {
Ok(_) => (ActorStatus::Stopped, None),
Err(ActorError {
kind: box ActorErrorKind::UnhandledSupervisionEvent(box event),
..
}) => (event.actor_status.clone(), Some(event)),
let event = match result {
Ok(_) => {
// actor should have been stopped by run_actor_tree
assert!(self.is_stopped());
None
}
Err(err) => {
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
(
ActorStatus::Failed(error_kind.clone()),
Some(ActorSupervisionEvent::new(
self.cell.actor_id().clone(),
ActorStatus::Failed(error_kind),
None,
None,
)),
)
match *err.kind {
ActorErrorKind::UnhandledSupervisionEvent(box event) => {
// Currently only terminated actors are allowed to raise supervision events.
// If we want to change that in the future, we need to modify the exit
// status here too, because we use event's actor_status as this actor's
// terminal status.
assert!(event.actor_status.is_terminal());
self.change_status(event.actor_status.clone());
Some(event)
}
_ => {
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
self.change_status(ActorStatus::Failed(error_kind.clone()));
Some(ActorSupervisionEvent::new(
self.cell.actor_id().clone(),
ActorStatus::Failed(error_kind),
None,
None,
))
}
}
}
};

Expand Down Expand Up @@ -1164,7 +1210,6 @@ impl<A: Actor> Instance<A> {
self.proc.handle_supervision_event(event);
}
}
self.change_status(actor_status);
}

/// Runs the actor, and manages its supervision tree. When the function returns,
Expand Down Expand Up @@ -1207,10 +1252,16 @@ impl<A: Actor> Instance<A> {
}
};

if let Err(ref err) = result {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
match &result {
Ok(_) => assert!(self.is_stopped()),
Err(err) => {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
assert!(!self.is_terminal());
// Send Stopping instead of Failed, because we still need to
// unlink child actors.
self.change_status(ActorStatus::Stopping);
}
}
self.change_status(ActorStatus::Stopping);

// After this point, we know we won't spawn any more children,
// so we can safely read the current child keys.
Expand Down Expand Up @@ -1292,7 +1343,6 @@ impl<A: Actor> Instance<A> {
}

/// Initialize and run the actor until it fails or is stopped.
#[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))]
async fn run(
&mut self,
actor: &mut A,
Expand Down Expand Up @@ -1407,6 +1457,7 @@ impl<A: Actor> Instance<A> {
}
}

#[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
async unsafe fn handle_message<M: Message>(
&mut self,
actor: &mut A,
Expand Down Expand Up @@ -1436,18 +1487,12 @@ impl<A: Actor> Instance<A> {
&headers,
self.self_id().to_string(),
);
let span = tracing::debug_span!(
"actor_status",
actor_id = self.self_id().to_string(),
actor_name = self.self_id().name(),
name = self.cell.status().borrow().to_string(),
);

let context = Context::new(self, headers);
// Pass a reference to the context to the handler, so that deref
// coercion allows the `this` argument to be treated exactly like
// &Instance<A>.
actor.handle(&context, message).instrument(span).await
actor.handle(&context, message).await
}

// Spawn on child on this instance. Currently used only by cap::CanSpawn.
Expand Down Expand Up @@ -1536,6 +1581,15 @@ impl<A: Actor> Drop for Instance<A> {
if status.is_terminal() {
false
} else {
tracing::info!(
name = "ActorStatus",
actor_id = %self.self_id(),
actor_name = self.self_id().name(),
status = "Stopped",
prev_status = status.arm().unwrap_or("unknown"),
caller = %Location::caller(),
"Instance is dropped",
);
*status = ActorStatus::Stopped;
true
}
Expand Down
Loading