Skip to content

Commit 72556ad

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Add spans to points along the message send/recv path
Summary: Our observability only shows us when a message is sent out and when it is delivered to its final destination. We want to add spans to some of the important stops along the way Differential Revision: D86802704
1 parent 6273659 commit 72556ad

File tree

8 files changed

+18
-2
lines changed

8 files changed

+18
-2
lines changed

hyperactor/src/channel.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
119119
/// message is either delivered, or we eventually discover that
120120
/// the channel has failed and it will be sent back on `return_channel`.
121121
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
122+
#[hyperactor::instrument_infallible]
122123
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
123124
self.do_post(message, Some(return_channel));
124125
}
125126

126127
/// Enqueue a message to be sent on the channel.
128+
#[hyperactor::instrument_infallible]
127129
fn post(&self, message: M) {
128130
self.do_post(message, None);
129131
}
@@ -803,6 +805,7 @@ enum ChannelRxKind<M: RemoteMessage> {
803805

804806
#[async_trait]
805807
impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
808+
#[hyperactor::instrument]
806809
async fn recv(&mut self) -> Result<M, ChannelError> {
807810
match &mut self.inner {
808811
ChannelRxKind::Local(rx) => rx.recv().await,

hyperactor/src/channel/net.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ use tokio::time::Duration;
8686
use tokio::time::Instant;
8787
use tokio_util::net::Listener;
8888
use tokio_util::sync::CancellationToken;
89+
use tracing::Instrument;
8990

9091
use super::*;
9192
use crate::RemoteMessage;
@@ -652,7 +653,11 @@ impl<M: RemoteMessage> NetTx<M> {
652653
unacked,
653654
}),
654655
conn,
655-
) if outbox.is_empty() && unacked.is_empty() => match receiver.recv().await {
656+
) if outbox.is_empty() && unacked.is_empty() => match receiver
657+
.recv()
658+
.instrument(tracing::debug_span!("receiver.recv"))
659+
.await
660+
{
656661
Some(msg) => match outbox.push_back(msg) {
657662
Ok(()) => {
658663
let running = State::Running(Deliveries { outbox, unacked });
@@ -828,7 +833,7 @@ impl<M: RemoteMessage> NetTx<M> {
828833
// UnboundedReceiver::recv() is cancel safe.
829834
// Only checking mspc channel when outbox is empty. In this way, we prioritize
830835
// sending messages already in outbox.
831-
work_result = receiver.recv(), if outbox.is_empty() => {
836+
work_result = receiver.recv().instrument(tracing::debug_span!("receiver.recv")), if outbox.is_empty() => {
832837
match work_result {
833838
Some(msg) => {
834839
match outbox.push_back(msg) {

hyperactor/src/channel/net/framed.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
6666
/// `max_frame_length`. **This error is fatal:** once returned,
6767
/// the `FrameReader` must be dropped; the underlying connection
6868
/// is no longer valid.
69+
#[tracing::instrument(skip_all)]
6970
pub async fn next(&mut self) -> io::Result<Option<Bytes>> {
7071
loop {
7172
match &mut self.state {
@@ -218,6 +219,7 @@ impl<W: AsyncWrite + Unpin, B: Buf> FrameWrite<W, B> {
218219
/// returned futures at any time. Upon completion, the frame is guaranteed to be
219220
/// written, unless an error was encountered, in which case the underlying stream
220221
/// is in an undefined state.
222+
#[tracing::instrument(skip_all)]
221223
pub async fn send(&mut self) -> io::Result<()> {
222224
loop {
223225
if self.len_buf.has_remaining() {

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,6 +1161,7 @@ impl MailboxClient {
11611161
}
11621162

11631163
impl MailboxSender for MailboxClient {
1164+
#[hyperactor::instrument_infallible]
11641165
fn post_unchecked(
11651166
&self,
11661167
envelope: MessageEnvelope,

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ declare_attrs! {
7777
/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
7878
/// an `M`-typed message
7979
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
80+
#[hyperactor::instrument]
8081
pub(crate) fn actor_mesh_cast<A, M>(
8182
cx: &impl context::Actor,
8283
actor_mesh_id: ActorMeshId,

hyperactor_mesh/src/comm.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ impl Handler<CommActorMode> for CommActor {
345345
// TODO(T218630526): reliable casting for mutable topology
346346
#[async_trait]
347347
impl Handler<CastMessage> for CommActor {
348+
#[hyperactor::instrument]
348349
async fn handle(&mut self, cx: &Context<Self>, cast_message: CastMessage) -> Result<()> {
349350
// Always forward the message to the root rank of the slice, casting starts from there.
350351
let slice = cast_message.dest.slice.clone();
@@ -375,6 +376,7 @@ impl Handler<CastMessage> for CommActor {
375376

376377
#[async_trait]
377378
impl Handler<ForwardMessage> for CommActor {
379+
#[hyperactor::instrument]
378380
async fn handle(&mut self, cx: &Context<Self>, fwd_message: ForwardMessage) -> Result<()> {
379381
let ForwardMessage {
380382
sender,

monarch_hyperactor/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ impl PanicFlag {
712712

713713
#[async_trait]
714714
impl Handler<PythonMessage> for PythonActor {
715+
#[hyperactor::instrument]
715716
async fn handle(
716717
&mut self,
717718
cx: &Context<PythonActor>,

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub(crate) fn to_hy_sel(selection: &str) -> PyResult<Selection> {
137137

138138
#[pymethods]
139139
impl PythonActorMesh {
140+
#[hyperactor::instrument]
140141
fn cast(
141142
&self,
142143
message: &PythonMessage,

0 commit comments

Comments
 (0)