Skip to content

Commit 8bb3192

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
log ActorStatus change events (#1834)
Summary: Pull Request resolved: #1834 The goal is that we can get a `ActorStatus` in the `status` column by searching scuba with: > actor_id=<name> and name=ActorStatus Reviewed By: mariusae Differential Revision: D86792902 fbshipit-source-id: cfeef21f70f39048cfd912860b62385074c81051
1 parent 8d9ce9c commit 8bb3192

File tree

1 file changed

+43
-13
lines changed

1 file changed

+43
-13
lines changed

hyperactor/src/proc.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::hash::Hasher;
2121
use std::ops::Deref;
2222
use std::panic;
2323
use std::panic::AssertUnwindSafe;
24+
use std::panic::Location;
2425
use std::pin::Pin;
2526
use std::sync::Arc;
2627
use std::sync::OnceLock;
@@ -47,13 +48,15 @@ use tokio::sync::watch;
4748
use tokio::task::JoinHandle;
4849
use tracing::Instrument;
4950
use tracing::Level;
51+
use tracing::Span;
5052
use uuid::Uuid;
5153

5254
use crate as hyperactor;
5355
use crate::Actor;
5456
use crate::ActorRef;
5557
use crate::Handler;
5658
use crate::Message;
59+
use crate::Named as _;
5760
use crate::RemoteMessage;
5861
use crate::actor::ActorError;
5962
use crate::actor::ActorErrorKind;
@@ -789,7 +792,6 @@ impl Proc {
789792
}
790793

791794
/// Create a root allocation in the proc.
792-
#[hyperactor::instrument(fields(actor_name=name))]
793795
fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
794796
let name = name.to_string();
795797
match self.state().roots.entry(name.to_string()) {
@@ -1024,8 +1026,28 @@ impl<A: Actor> Instance<A> {
10241026

10251027
/// Notify subscribers of a change in the actors status and bump counters with the duration which
10261028
/// the last status was active for.
1029+
#[track_caller]
10271030
fn change_status(&self, new: ActorStatus) {
1028-
self.status_tx.send_replace(new.clone());
1031+
let old = self.status_tx.send_replace(new.clone());
1032+
// Actor status changes between Idle and Processing when handling every
1033+
// message. It creates too many logs if we want to log these 2 states.
1034+
// Therefore we skip the status changes between them.
1035+
if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) {
1036+
let new_status = new.arm().unwrap_or("unknown");
1037+
let change_reason = match new {
1038+
ActorStatus::Failed(reason) => reason.to_string(),
1039+
_ => "".to_string(),
1040+
};
1041+
tracing::info!(
1042+
name = "ActorStatus",
1043+
actor_id = %self.self_id(),
1044+
actor_name = self.self_id().name(),
1045+
status = new_status,
1046+
prev_status = old.arm().unwrap_or("unknown"),
1047+
caller = %Location::caller(),
1048+
change_reason,
1049+
);
1050+
}
10291051
}
10301052

10311053
fn is_terminal(&self) -> bool {
@@ -1099,9 +1121,14 @@ impl<A: Actor> Instance<A> {
10991121
let instance_cell = self.cell.clone();
11001122
let actor_id = self.cell.actor_id().clone();
11011123
let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
1102-
let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking(
1103-
self.serve(actor, actor_loop_receivers, work_rx),
1104-
));
1124+
let actor_task_handle = A::spawn_server_task(
1125+
panic_handler::with_backtrace_tracking(self.serve(
1126+
actor,
1127+
actor_loop_receivers,
1128+
work_rx,
1129+
))
1130+
.instrument(Span::current()),
1131+
);
11051132
tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
11061133
instance_cell
11071134
.inner
@@ -1316,7 +1343,6 @@ impl<A: Actor> Instance<A> {
13161343
}
13171344

13181345
/// Initialize and run the actor until it fails or is stopped.
1319-
#[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))]
13201346
async fn run(
13211347
&mut self,
13221348
actor: &mut A,
@@ -1431,6 +1457,7 @@ impl<A: Actor> Instance<A> {
14311457
}
14321458
}
14331459

1460+
#[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
14341461
async unsafe fn handle_message<M: Message>(
14351462
&mut self,
14361463
actor: &mut A,
@@ -1460,18 +1487,12 @@ impl<A: Actor> Instance<A> {
14601487
&headers,
14611488
self.self_id().to_string(),
14621489
);
1463-
let span = tracing::debug_span!(
1464-
"actor_status",
1465-
actor_id = self.self_id().to_string(),
1466-
actor_name = self.self_id().name(),
1467-
name = self.cell.status().borrow().to_string(),
1468-
);
14691490

14701491
let context = Context::new(self, headers);
14711492
// Pass a reference to the context to the handler, so that deref
14721493
// coercion allows the `this` argument to be treated exactly like
14731494
// &Instance<A>.
1474-
actor.handle(&context, message).instrument(span).await
1495+
actor.handle(&context, message).await
14751496
}
14761497

14771498
// Spawn on child on this instance. Currently used only by cap::CanSpawn.
@@ -1560,6 +1581,15 @@ impl<A: Actor> Drop for Instance<A> {
15601581
if status.is_terminal() {
15611582
false
15621583
} else {
1584+
tracing::info!(
1585+
name = "ActorStatus",
1586+
actor_id = %self.self_id(),
1587+
actor_name = self.self_id().name(),
1588+
status = "Stopped",
1589+
prev_status = status.arm().unwrap_or("unknown"),
1590+
caller = %Location::caller(),
1591+
"Instance is dropped",
1592+
);
15631593
*status = ActorStatus::Stopped;
15641594
true
15651595
}

0 commit comments

Comments
 (0)