Skip to content

Commit 20f03d9

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Log ProcMeshStatus with status field (#1882)
Summary: Pull Request resolved: #1882 The goal is that we can get a `ProcMeshStatus` by searching scuba with: > mesh_name=<name> and name=ProcMeshStatus Reviewed By: mariusae Differential Revision: D86829124 fbshipit-source-id: ce961f8d5d5dc827507e4d93288eea4553b47691
1 parent 8bb3192 commit 20f03d9

File tree

3 files changed

+77
-33
lines changed

3 files changed

+77
-33
lines changed

hyperactor/src/channel.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -828,8 +828,9 @@ impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
828828
/// if the channel cannot be established. The underlying connection is
829829
/// dropped whenever the returned Tx is dropped.
830830
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ChannelError`.
831+
#[track_caller]
831832
pub fn dial<M: RemoteMessage>(addr: ChannelAddr) -> Result<ChannelTx<M>, ChannelError> {
832-
tracing::debug!(name = "dial", "dialing channel {}", addr);
833+
tracing::debug!(name = "dial", caller = %Location::caller(), %addr, "dialing channel {}", addr);
833834
let inner = match addr {
834835
ChannelAddr::Local(port) => ChannelTxKind::Local(local::dial(port)?),
835836
ChannelAddr::Tcp(addr) => ChannelTxKind::Tcp(net::tcp::dial(addr)),

hyperactor_mesh/src/alloc.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,11 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
418418
return Err(AllocatorError::Incomplete(self.extent().clone()));
419419
};
420420

421-
let name = state.arm().unwrap_or("unknown");
421+
let name = tracing::Span::current()
422+
.metadata()
423+
.map(|m| m.name())
424+
.unwrap_or("initialize");
425+
let status = format!("ProcState:{}", state.arm().unwrap_or("unknown"));
422426

423427
match state {
424428
ProcState::Created {
@@ -427,17 +431,20 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
427431
let rank = point.rank();
428432
if let Some(old_create_key) = created.insert(rank, create_key.clone()) {
429433
tracing::warn!(
434+
name,
435+
status,
436+
rank,
430437
"rank {rank} reassigned from {old_create_key} to {create_key}"
431438
);
432439
}
433440
tracing::info!(
434-
name = name,
435-
rank = rank,
441+
name,
442+
status,
443+
rank,
436444
"proc with create key {}, rank {}: created",
437445
create_key,
438446
rank
439447
);
440-
// tracing::info!("created: {} rank {}: created", create_key, rank);
441448
}
442449
ProcState::Running {
443450
create_key,
@@ -447,7 +454,9 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
447454
} => {
448455
let Some(rank) = created.rank(&create_key) else {
449456
tracing::warn!(
450-
name = name,
457+
name,
458+
%proc_id,
459+
status,
451460
"proc id {proc_id} with create key {create_key} \
452461
is running, but was not created"
453462
);
@@ -463,14 +472,19 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
463472
if let Some(old_allocated_proc) = running.insert(*rank, allocated_proc.clone())
464473
{
465474
tracing::warn!(
466-
name = name,
475+
name,
476+
%proc_id,
477+
status,
478+
rank,
467479
"duplicate running notifications for {rank}: \
468480
old:{old_allocated_proc}; \
469481
new:{allocated_proc}"
470482
)
471483
}
472484
tracing::info!(
473-
name = name,
485+
name,
486+
%proc_id,
487+
status,
474488
"proc {} rank {}: running at addr:{addr} mesh_agent:{mesh_agent}",
475489
proc_id,
476490
rank
@@ -481,7 +495,8 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
481495
// ProcState::Failed to fail the whole allocation.
482496
ProcState::Stopped { create_key, reason } => {
483497
tracing::error!(
484-
name = name,
498+
name,
499+
status,
485500
"allocation failed for proc with create key {}: {}",
486501
create_key,
487502
reason
@@ -493,7 +508,8 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
493508
description,
494509
} => {
495510
tracing::error!(
496-
name = name,
511+
name,
512+
status,
497513
"allocation failed for world {}: {}",
498514
world_id,
499515
description

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ use serde::Deserialize;
5151
use serde::Serialize;
5252
use tokio::sync::Notify;
5353
use tracing::Instrument;
54-
use tracing::Level;
55-
use tracing::span;
5654

5755
use crate::CommActor;
5856
use crate::alloc::Alloc;
@@ -295,28 +293,40 @@ impl ProcMesh {
295293

296294
/// Allocate a new ProcMesh from the provided alloc.
297295
/// Allocate does not require an owning actor because references are not owned.
298-
#[tracing::instrument(skip_all)]
299296
#[track_caller]
300297
pub async fn allocate(
301298
cx: &impl context::Actor,
302-
mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
299+
alloc: Box<dyn Alloc + Send + Sync + 'static>,
303300
name: &str,
301+
) -> v1::Result<Self> {
302+
let caller = Location::caller();
303+
Self::allocate_inner(cx, alloc, Name::new(name), caller).await
304+
}
305+
306+
// Use allocate_inner to set field mesh_name in span
307+
#[hyperactor::instrument(fields(mesh_name=name.to_string()))]
308+
async fn allocate_inner(
309+
cx: &impl context::Actor,
310+
mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
311+
name: Name,
312+
caller: &'static Location<'static>,
304313
) -> v1::Result<Self> {
305314
let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1;
306315
tracing::info!(
307-
name = "ProcMesh::Allocate::Attempt",
316+
name = "ProcMeshStatus",
317+
status = "Allocate::Attempt",
318+
%caller,
308319
alloc_id,
309-
caller = %Location::caller(),
310320
shape = ?alloc.shape(),
311321
"allocating proc mesh"
312322
);
313323

314324
let running = alloc
315325
.initialize()
316-
.instrument(span!(
317-
Level::INFO,
318-
"ProcMesh::Allocate::Initialize",
319-
alloc_id
326+
.instrument(tracing::info_span!(
327+
"ProcMeshStatus::Allocate::Initialize",
328+
alloc_id,
329+
mesh_name = name.to_string()
320330
))
321331
.await?;
322332

@@ -330,17 +340,18 @@ impl ProcMesh {
330340
// First make sure we can serve the proc:
331341
let proc_channel_addr = {
332342
let _guard =
333-
tracing::span!(Level::INFO, "allocate_serve_proc", proc_id = %proc.proc_id())
334-
.entered();
343+
tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered();
335344
let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?;
336345
proc.clone().serve(rx);
346+
tracing::info!(
347+
name = "ProcMeshStatus",
348+
status = "Allocate::ChannelServe",
349+
mesh_name = name.to_string(),
350+
%addr,
351+
"proc started listening on addr: {addr}"
352+
);
337353
addr
338354
};
339-
tracing::info!(
340-
name = "ProcMesh::Allocate::ChannelServe",
341-
alloc_id = alloc_id,
342-
"proc started listening on addr: {proc_channel_addr}"
343-
);
344355

345356
let bind_allocated_procs = |router: &DialMailboxRouter| {
346357
// Route all of the allocated procs:
@@ -423,7 +434,6 @@ impl ProcMesh {
423434
.collect();
424435

425436
let stop = Arc::new(Notify::new());
426-
let name = Name::new(name);
427437
let extent = alloc.extent().clone();
428438

429439
{
@@ -453,10 +463,10 @@ impl ProcMesh {
453463
}
454464
}
455465
}
456-
});
466+
}.instrument(tracing::info_span!("alloc_monitor")));
457467
}
458468

459-
Self::create(
469+
let mesh = Self::create(
460470
cx,
461471
name,
462472
ProcMeshAllocation::Allocated {
@@ -466,14 +476,21 @@ impl ProcMesh {
466476
},
467477
true, // alloc-based meshes support comm actors
468478
)
469-
.await
479+
.await;
480+
match &mesh {
481+
Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"),
482+
Err(error) => {
483+
tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error)
484+
}
485+
}
486+
mesh
470487
}
471488

472489
/// Detach the proc mesh from the lifetime of `self`, and return its reference.
473-
#[allow(dead_code)]
490+
#[cfg(test)]
474491
pub(crate) fn detach(self) -> ProcMeshRef {
475492
// This also keeps the ProcMeshAllocation::Allocated alloc task alive.
476-
self.current_ref
493+
self.current_ref.clone()
477494
}
478495

479496
/// Stop this mesh gracefully.
@@ -508,6 +525,16 @@ impl Deref for ProcMesh {
508525
}
509526
}
510527

528+
impl Drop for ProcMesh {
529+
fn drop(&mut self) {
530+
tracing::info!(
531+
name = "ProcMeshStatus",
532+
mesh_name = %self.name,
533+
status = "Dropped",
534+
);
535+
}
536+
}
537+
511538
/// Represents different ways ProcMeshes can be allocated.
512539
enum ProcMeshAllocation {
513540
/// A mesh that has been allocated from an `Alloc`.

0 commit comments

Comments
 (0)