@@ -60,6 +60,10 @@ use crate::local_state_broker::LocalStateBrokerMessage;
6060use crate :: mailbox:: EitherPortRef ;
6161use crate :: mailbox:: PyMailbox ;
6262use crate :: mailbox:: PythonUndeliverableMessageEnvelope ;
63+ use crate :: metrics:: ENDPOINT_ACTOR_COUNT ;
64+ use crate :: metrics:: ENDPOINT_ACTOR_ERROR ;
65+ use crate :: metrics:: ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM ;
66+ use crate :: metrics:: ENDPOINT_ACTOR_PANIC ;
6367use crate :: proc:: InstanceWrapper ;
6468use crate :: proc:: PyActorId ;
6569use crate :: proc:: PyProc ;
@@ -759,11 +763,17 @@ impl Handler<PythonMessage> for PythonActor {
759763 cx. port ( ) ,
760764 PythonTask :: new ( future) ?,
761765 receiver,
766+ cx. self_id ( ) . to_string ( ) ,
767+ endpoint. clone ( ) ,
762768 )
763769 . instrument (
764770 tracing:: info_span!(
765- "Calling endpoint on PythonActor" , actor = %cx. self_id( ) , rank = rank, endpoint = endpoint
766- ) . or_current ( )
771+ "PythonActor endpoint" ,
772+ actor_id = %cx. self_id( ) ,
773+ %rank,
774+ %endpoint
775+ )
776+ . or_current ( )
767777 . follows_from ( tracing:: Span :: current ( ) . id ( ) )
768778 . clone ( ) ,
769779 ) ,
@@ -860,7 +870,19 @@ async fn handle_async_endpoint_panic(
860870 panic_sender : PortHandle < PanicFromPy > ,
861871 task : PythonTask ,
862872 side_channel : oneshot:: Receiver < PyObject > ,
873+ actor_id : String ,
874+ endpoint : String ,
863875) {
876+ // Create attributes for metrics with actor_id and endpoint
877+ let attributes =
878+ hyperactor_telemetry:: kv_pairs!( "actor_id" => actor_id, "endpoint" => endpoint) ;
879+
880+ // Record the start time for latency measurement
881+ let start_time = std:: time:: Instant :: now ( ) ;
882+
883+ // Increment throughput counter
884+ ENDPOINT_ACTOR_COUNT . add ( 1 , attributes) ;
885+
864886 let err_or_never = async {
865887 // The side channel will resolve with a value if a panic occured during
866888 // processing of the async endpoint, see [Panics in async endpoints].
@@ -871,6 +893,7 @@ async fn handle_async_endpoint_panic(
871893 . unwrap ( )
872894 . clone ( )
873895 . into ( ) ;
896+ ENDPOINT_ACTOR_PANIC . add ( 1 , attributes) ;
874897 Some ( err. into ( ) )
875898 } ) ,
876899 // An Err means that the sender has been dropped without sending.
@@ -892,10 +915,17 @@ async fn handle_async_endpoint_panic(
892915 result
893916 }
894917 } {
918+ // Record error and panic metrics
919+ ENDPOINT_ACTOR_ERROR . add ( 1 , attributes) ;
920+
895921 panic_sender
896922 . send ( PanicFromPy ( panic) )
897923 . expect ( "Unable to send panic message" ) ;
898924 }
925+
926+ // Record latency in microseconds
927+ let elapsed_micros = start_time. elapsed ( ) . as_micros ( ) as f64 ;
928+ ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM . record ( elapsed_micros, attributes) ;
899929}
900930
901931#[ pyclass( module = "monarch._rust_bindings.monarch_hyperactor.actor" ) ]
0 commit comments