Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 70 additions & 8 deletions hyperactor/src/channel/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::framed::FrameReader;
use super::framed::FrameWrite;
use super::framed::WriteState;
use crate::RemoteMessage;
use crate::channel::ChannelAddr;
use crate::channel::ChannelError;
use crate::channel::SendError;
use crate::channel::TxStatus;
Expand Down Expand Up @@ -203,14 +204,18 @@ struct Outbox<'a, M: RemoteMessage> {
next_seq: u64,
deque: MessageDeque<M>,
log_id: &'a str,
dest_addr: &'a ChannelAddr,
session_id: u64,
}

impl<'a, M: RemoteMessage> Outbox<'a, M> {
fn new(log_id: &'a str) -> Self {
fn new(log_id: &'a str, dest_addr: &'a ChannelAddr, session_id: u64) -> Self {
Self {
next_seq: 0,
deque: MessageDeque(VecDeque::new()),
log_id,
dest_addr,
session_id,
}
}

Expand Down Expand Up @@ -255,7 +260,24 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> {

let frame = Frame::Message(self.next_seq, message);
let message = serialize_bincode(&frame).map_err(|e| format!("serialization error: {e}"))?;
metrics::REMOTE_MESSAGE_SEND_SIZE.record(message.frame_len() as f64, &[]);
let message_size = message.frame_len();
metrics::REMOTE_MESSAGE_SEND_SIZE.record(message_size as f64, &[]);

// Track throughput for this channel pair
metrics::CHANNEL_THROUGHPUT_BYTES.add(
message_size as u64,
hyperactor_telemetry::kv_pairs!(
"dest" => self.dest_addr.to_string(),
"session_id" => self.session_id.to_string(),
),
);
metrics::CHANNEL_THROUGHPUT_MESSAGES.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => self.dest_addr.to_string(),
"session_id" => self.session_id.to_string(),
),
);

self.deque.push_back(QueuedMessage {
seq: self.next_seq,
Expand Down Expand Up @@ -371,7 +393,7 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
}

/// Remove acked messages from the deque.
fn prune(&mut self, acked: u64, acked_at: Instant) {
fn prune(&mut self, acked: u64, acked_at: Instant, dest_addr: &ChannelAddr, session_id: u64) {
assert!(
self.largest_acked.as_ref().map_or(0, |i| i.0) <= acked,
"{}: received out-of-order ack; received: {}; stored largest: {}",
Expand All @@ -386,7 +408,16 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
let deque = &mut self.deque;
while let Some(msg) = deque.front() {
if msg.seq <= acked {
deque.pop_front();
let msg: QueuedMessage<M> = deque.pop_front().unwrap();
// Track latency: time from when message was first received to when it was acked
let latency_micros = msg.received_at.elapsed().as_micros() as i64;
metrics::CHANNEL_LATENCY_MICROS.record(
latency_micros as f64,
hyperactor_telemetry::kv_pairs!(
"dest" => dest_addr.to_string(),
"session_id" => session_id.to_string(),
),
);
} else {
// Messages in the deque are orderd by seq in ascending
// order. So we could return early once we encounter
Expand Down Expand Up @@ -461,9 +492,9 @@ enum State<'a, M: RemoteMessage> {
}

impl<'a, M: RemoteMessage> State<'a, M> {
fn init(log_id: &'a str) -> Self {
fn init(log_id: &'a str, dest_addr: &'a ChannelAddr, session_id: u64) -> Self {
Self::Running(Deliveries {
outbox: Outbox::new(log_id),
outbox: Outbox::new(log_id, dest_addr, session_id),
unacked: Unacked::new(None, log_id),
})
}
Expand Down Expand Up @@ -543,7 +574,8 @@ async fn run<M: RemoteMessage>(

let session_id = rand::random();
let log_id = format!("session {}.{}", link.dest(), session_id);
let mut state = State::init(&log_id);
let dest = link.dest();
let mut state = State::init(&log_id, &dest, session_id);
let mut conn = Conn::reconnect_with_default();

let (state, conn) = loop {
Expand Down Expand Up @@ -859,7 +891,7 @@ where
Ok(response) => {
match response {
NetRxResponse::Ack(ack) => {
unacked.prune(ack, RealClock.now());
unacked.prune(ack, RealClock.now(), &link.dest(), session_id);
(State::Running(Deliveries { outbox, unacked }), Conn::Connected { reader, write_state })
}
NetRxResponse::Reject => {
Expand Down Expand Up @@ -934,6 +966,15 @@ where
"{log_id}: outbox send error: {err}; message size: {}",
outbox.front_size().expect("outbox should not be empty"),
);
// Track error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => link.dest().to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::SendError.as_str(),
),
);
(State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default())
}
}
Expand Down Expand Up @@ -1030,6 +1071,18 @@ where

// Need to resend unacked after reconnecting.
let largest_acked = unacked.largest_acked;
let num_retries = unacked.deque.len();
if num_retries > 0 {
// Track reconnection for this channel pair
metrics::CHANNEL_RECONNECTIONS.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => link.dest().to_string(),
"transport" => link.dest().transport().to_string(),
"reason" => "reconnect_with_unacked",
),
);
}
outbox.requeue_unacked(unacked.deque);
(
State::Running(Deliveries {
Expand Down Expand Up @@ -1061,6 +1114,15 @@ where
session_id,
err
);
// Track connection error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => link.dest().to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::ConnectionError.as_str(),
),
);
(
State::Running(Deliveries { outbox, unacked }),
Conn::reconnect(backoff),
Expand Down
73 changes: 59 additions & 14 deletions hyperactor/src/channel/net/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,21 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
};

// De-frame the multi-part message.
let bytes_len = bytes.len();
let message = match serde_multipart::Message::from_framed(bytes) {
Ok(message) => message,
Err(err) => break (
Err(err) => {
// Track deframing error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"source" => self.source.to_string(),
"dest" => self.dest.to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::DeframeError.as_str(),
),
);
break (
next,
Err::<(), anyhow::Error>(err.into()).context(
format!(
Expand All @@ -192,7 +204,8 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
)
),
false
),
)
},
};

// Finally decode the message. This assembles the M-typed message
Expand Down Expand Up @@ -220,6 +233,23 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
}
match self.send_with_buffer_metric(&log_id, &tx, message).await {
Ok(()) => {
// Track throughput for this channel pair
metrics::CHANNEL_THROUGHPUT_BYTES.add(
bytes_len as u64,
hyperactor_telemetry::kv_pairs!(
"source" => self.source.to_string(),
"dest" => self.dest.to_string(),
"session_id" => session_id.to_string(),
),
);
metrics::CHANNEL_THROUGHPUT_MESSAGES.add(
1,
hyperactor_telemetry::kv_pairs!(
"source" => self.source.to_string(),
"dest" => self.dest.to_string(),
"session_id" => session_id.to_string(),
),
);
// In channel's contract, "delivered" means the message
// is sent to the NetRx object. Therefore, we could bump
// `next_seq` as far as the message is put on the mpsc
Expand All @@ -236,16 +266,28 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
}
}
},
Err(err) => break (
next,
Err::<(), anyhow::Error>(err.into()).context(
format!(
"{log_id}: deserialize message with M = {}",
type_name::<M>(),
)
),
false
),
Err(err) => {
// Track deserialization error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"source" => self.source.to_string(),
"dest" => self.dest.to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::DeserializeError.as_str(),
),
);
break (
next,
Err::<(), anyhow::Error>(err.into()).context(
format!(
"{log_id}: deserialize message with M = {}",
type_name::<M>(),
)
),
false
)
},
}
},
}
Expand Down Expand Up @@ -490,11 +532,13 @@ where
};

if let Err(ref err) = res {
metrics::CHANNEL_CONNECTION_ERRORS.add(
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"transport" => dest.transport().to_string(),
"error" => err.to_string(),
"error_type" => metrics::ChannelErrorType::ConnectionError.as_str(),
"dest" => dest.to_string(),
),
);

Expand All @@ -513,12 +557,13 @@ where
});
}
Err(err) => {
metrics::CHANNEL_CONNECTION_ERRORS.add(
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"transport" => listener_channel_addr.transport().to_string(),
"operation" => "accept",
"error" => err.to_string(),
"error_type" => metrics::ChannelErrorType::ConnectionError.as_str(),
),
);

Expand Down
36 changes: 34 additions & 2 deletions hyperactor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@ use hyperactor_telemetry::declare_static_histogram;
use hyperactor_telemetry::declare_static_timer;
use hyperactor_telemetry::declare_static_up_down_counter;

/// Error types for channel-related errors. Only used for telemetry.
#[derive(Debug, Clone, Copy)]
pub enum ChannelErrorType {
/// Error occurred while sending a message.
SendError,
/// Error occurred while connecting to a channel.
ConnectionError,
/// Error occurred while deframing a message.
DeframeError,
/// Error occurred while deserializing a message.
DeserializeError,
}

impl ChannelErrorType {
/// Returns the string representation of the error type.
pub fn as_str(&self) -> &'static str {
match self {
ChannelErrorType::SendError => "send_error",
ChannelErrorType::ConnectionError => "connection_error",
ChannelErrorType::DeframeError => "deframe_error",
ChannelErrorType::DeserializeError => "deserialize_error",
}
}
}

// MAILBOX
// Tracks messages that couldn't be delivered to their destination and were returned as undeliverable
declare_static_counter!(
Expand Down Expand Up @@ -44,16 +69,23 @@ declare_static_timer!(
declare_static_histogram!(REMOTE_MESSAGE_SEND_SIZE, "channel.remote_message_send_size");
// Tracks the number of new channel connections established (client and server)
declare_static_counter!(CHANNEL_CONNECTIONS, "channel.connections");
// Tracks errors that occur when establishing channel connections
declare_static_counter!(CHANNEL_CONNECTION_ERRORS, "channel.connection_errors");
// Tracks the number of channel reconnection attempts
declare_static_counter!(CHANNEL_RECONNECTIONS, "channel.reconnections");
// Tracks errors for each channel pair
declare_static_counter!(CHANNEL_ERRORS, "channel.errors");
// Tracks the number of NetRx encountering full buffer, i.e. its mpsc channel.

// This metric counts how often the NetRx→client mpsc channel remains full,
// incrementing once per CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL while blocked.
declare_static_counter!(CHANNEL_NET_RX_BUFFER_FULL, "channel.net_rx_buffer_full");

// Tracks throughput (bytes sent)
declare_static_counter!(CHANNEL_THROUGHPUT_BYTES, "channel.throughput.bytes");
// Tracks throughput (message count)
declare_static_counter!(CHANNEL_THROUGHPUT_MESSAGES, "channel.throughput.messages");
// Tracks message latency for each channel pair in microseconds
declare_static_histogram!(CHANNEL_LATENCY_MICROS, "channel.latency.us");

// PROC MESH
// Tracks the number of active processes in the process mesh
declare_static_counter!(PROC_MESH_ALLOCATION, "proc_mesh.active_procs");
Expand Down