Skip to content

Commit 6d14685

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add event logs for proc mesh from HostMesh::spawn (#1883)
Summary: Pull Request resolved: #1883 The `HostMesh::spawn` method spawn a new proc mesh. This diff logs the status changes of this new proc mesh. Reviewed By: mariusae Differential Revision: D86931476 fbshipit-source-id: 27119c7445ab6cc66b05aba270a8884a03270155
1 parent 20f03d9 commit 6d14685

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

hyperactor/src/proc.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,19 @@ struct ProcState {
152152
clock: ClockKind,
153153
}
154154

155+
impl Drop for ProcState {
156+
fn drop(&mut self) {
157+
// We only want log ProcStatus::Dropped when ProcState is dropped,
158+
// rather than Proc is dropped. This is because we need to wait for
159+
// Proc::inner's ref count becomes 0.
160+
tracing::info!(
161+
proc_id = %self.proc_id,
162+
name = "ProcStatus",
163+
status = "Dropped"
164+
);
165+
}
166+
}
167+
155168
/// A snapshot view of the proc's actor ledger.
156169
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157170
pub struct ActorLedgerSnapshot {
@@ -337,7 +350,6 @@ impl Proc {
337350
}
338351

339352
/// Create a new direct-addressed proc.
340-
#[tracing::instrument]
341353
pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
342354
let (addr, rx) = channel::serve(addr)?;
343355
let proc_id = ProcId::Direct(addr, name);
@@ -347,7 +359,6 @@ impl Proc {
347359
}
348360

349361
/// Create a new direct-addressed proc with a default sender for the forwarder.
350-
#[tracing::instrument(skip(default))]
351362
pub fn direct_with_default(
352363
addr: ChannelAddr,
353364
name: String,
@@ -369,6 +380,11 @@ impl Proc {
369380
forwarder: BoxedMailboxSender,
370381
clock: ClockKind,
371382
) -> Self {
383+
tracing::info!(
384+
proc_id = %proc_id,
385+
name = "ProcStatus",
386+
status = "Created"
387+
);
372388
Self {
373389
inner: Arc::new(ProcState {
374390
proc_id,

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,16 @@ impl HostMeshRef {
670670
cx: &impl context::Actor,
671671
name: &str,
672672
per_host: Extent,
673+
) -> v1::Result<ProcMesh> {
674+
self.spawn_inner(cx, Name::new(name), per_host).await
675+
}
676+
677+
#[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))]
678+
async fn spawn_inner(
679+
&self,
680+
cx: &impl context::Actor,
681+
mesh_name: Name,
682+
per_host: Extent,
673683
) -> v1::Result<ProcMesh> {
674684
let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
675685
let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
@@ -690,7 +700,13 @@ impl HostMeshRef {
690700
.map_err(|err| v1::Error::ConfigurationError(err.into()))?;
691701

692702
let region: Region = extent.clone().into();
693-
let mesh_name = Name::new(name);
703+
704+
tracing::info!(
705+
name = "ProcMeshStatus",
706+
status = "Spawn::Attempt",
707+
%region,
708+
"spawning proc mesh"
709+
);
694710

695711
let mut procs = Vec::new();
696712
let num_ranks = region.num_ranks();
@@ -714,7 +730,7 @@ impl HostMeshRef {
714730
for (host_rank, host) in self.ranks.iter().enumerate() {
715731
for per_host_rank in 0..per_host.num_ranks() {
716732
let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
717-
let proc_name = Name::new(format!("{}_{}", name, per_host_rank));
733+
let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank));
718734
proc_names.push(proc_name.clone());
719735
host.mesh_agent()
720736
.create_or_update(
@@ -743,8 +759,15 @@ impl HostMeshRef {
743759
format!("failed while querying proc status: {}", e),
744760
)
745761
})?;
762+
let proc_id = host.named_proc(&proc_name);
763+
tracing::info!(
764+
name = "ProcMeshStatus",
765+
status = "Spawn::CreatingProc",
766+
%proc_id,
767+
rank = create_rank,
768+
);
746769
procs.push(ProcRef::new(
747-
host.named_proc(&proc_name),
770+
proc_id,
748771
create_rank,
749772
// TODO: specify or retrieve from state instead, to avoid attestation.
750773
ActorRef::attest(host.named_proc(&proc_name).actor_id("agent", 0)),
@@ -803,6 +826,15 @@ impl HostMeshRef {
803826
},
804827
};
805828

829+
tracing::error!(
830+
name = "ProcMeshStatus",
831+
status = "Spawn::GetRankStatus",
832+
rank = host_rank,
833+
"rank {} is terminating with state: {}",
834+
host_rank,
835+
state
836+
);
837+
806838
return Err(v1::Error::ProcCreationError {
807839
state,
808840
host_rank,
@@ -811,6 +843,12 @@ impl HostMeshRef {
811843
}
812844
}
813845
Err(complete) => {
846+
tracing::error!(
847+
name = "ProcMeshStatus",
848+
status = "Spawn::GetRankStatus",
849+
"timeout after {:?} when waiting for procs being created",
850+
config::global::get(PROC_SPAWN_MAX_IDLE),
851+
);
814852
// Fill remaining ranks with a timeout status via the
815853
// legacy shim.
816854
let legacy = mesh_to_rankedvalues_with_default(
@@ -823,7 +861,10 @@ impl HostMeshRef {
823861
}
824862
}
825863

826-
ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await
864+
let mesh =
865+
ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await;
866+
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",);
867+
mesh
827868
}
828869

829870
/// The name of the referenced host mesh.

0 commit comments

Comments
 (0)