@@ -19,12 +19,13 @@ use executor_api_pb::{
1919 HostResources ,
2020 ReportExecutorStateRequest ,
2121 ReportExecutorStateResponse ,
22+ TaskAllocation ,
2223 TaskResult ,
2324} ;
24- use tokio:: sync:: watch;
25+ use tokio:: sync:: watch:: { self , Receiver , Sender } ;
2526use tokio_stream:: { wrappers:: WatchStream , Stream } ;
2627use tonic:: { Request , Response , Status } ;
27- use tracing:: { debug, error, info, trace, warn} ;
28+ use tracing:: { debug, error, info, instrument , trace, warn} ;
2829
2930use crate :: {
3031 blob_store:: { self , BlobStorage } ,
@@ -711,6 +712,105 @@ impl ExecutorAPIService {
711712 }
712713}
713714
715+ fn log_desired_executor_state_delta (
716+ last_sent_state : & DesiredExecutorState ,
717+ desired_state : & DesiredExecutorState ,
718+ ) {
719+ debug ! ( ?desired_state, "got desired state" ) ;
720+
721+ let mut last_assignments: HashMap < String , String > = HashMap :: default ( ) ;
722+ for ta in & last_sent_state. task_allocations {
723+ if let ( Some ( fe_id) , Some ( alloc_id) ) = ( & ta. function_executor_id , & ta. allocation_id ) {
724+ last_assignments. insert ( fe_id. clone ( ) , alloc_id. clone ( ) ) ;
725+ }
726+ }
727+
728+ for TaskAllocation {
729+ function_executor_id : fn_executor_id_option,
730+ allocation_id : allocation_id_option,
731+ ..
732+ } in & desired_state. task_allocations
733+ {
734+ if let ( Some ( fn_executor_id) , Some ( allocation_id) ) =
735+ ( fn_executor_id_option, allocation_id_option)
736+ {
737+ match last_assignments. get ( fn_executor_id) {
738+ Some ( last_allocation_id) => {
739+ if allocation_id != last_allocation_id {
740+ info ! (
741+ %fn_executor_id,
742+ %allocation_id, %last_allocation_id, "re-assigning FE"
743+ )
744+ }
745+ last_assignments. remove ( fn_executor_id) ;
746+ }
747+ None => {
748+ info ! ( %fn_executor_id, %allocation_id, "assigning FE" )
749+ }
750+ }
751+ }
752+ }
753+
754+ for ( fn_executor_id, last_allocation_id) in last_assignments {
755+ info ! ( %fn_executor_id, %last_allocation_id, "idling FE" )
756+ }
757+ }
758+
759+ #[ instrument( skip_all, fields( executor_id = %executor_id) ) ]
760+ async fn executor_update_loop (
761+ executor_id : ExecutorId ,
762+ executor_manager : Arc < ExecutorManager > ,
763+ mut executor_state_rx : Receiver < ( ) > ,
764+ grpc_tx : Sender < Result < DesiredExecutorState , Status > > ,
765+ ) {
766+ // Mark the state as changed to trigger the first change
767+ // notification to the executor. This is important because between
768+ // the report_executor_state and the get_desired_executor_states
769+ // requests, the executor state may received a new desired state.
770+ executor_state_rx. mark_changed ( ) ;
771+
772+ // Use the default/empty value for the last-sent desired state, so
773+ // that the first change logged will be a delta from nothing (=>
774+ // the complete executor state).
775+ let mut last_sent_state = DesiredExecutorState :: default ( ) ;
776+
777+ loop {
778+ // Wait for a state change for this executor or grpc stream closing.
779+ tokio:: select! {
780+ _ = grpc_tx. closed( ) => {
781+ info!(
782+ "get_desired_executor_states: grpc stream closed"
783+ ) ;
784+ break ;
785+ }
786+ result = executor_state_rx. changed( ) => {
787+ if let Err ( err) = result {
788+ info!(
789+ ?err,
790+ "state machine watcher closing"
791+ ) ;
792+ break ;
793+ }
794+ }
795+ }
796+
797+ // Get the latest state
798+ let desired_state = executor_manager. get_executor_state ( & executor_id) . await ;
799+
800+ // Log the state delta
801+ log_desired_executor_state_delta ( & last_sent_state, & desired_state) ;
802+
803+ // Send the state to the executor
804+ if let Err ( err) = grpc_tx. send ( Ok ( desired_state. clone ( ) ) ) {
805+ info ! ( ?err, "grpc stream closing" ) ;
806+ break ;
807+ }
808+
809+ // Store the sent state for next comparison
810+ last_sent_state = desired_state;
811+ }
812+ }
813+
714814#[ tonic:: async_trait]
715815impl ExecutorApi for ExecutorAPIService {
716816 #[ allow( non_camel_case_types) ] // The autogenerated code in the trait uses snake_case types in some cases
@@ -810,7 +910,7 @@ impl ExecutorApi for ExecutorAPIService {
810910 "Got get_desired_executor_states request" ,
811911 ) ;
812912
813- let mut executor_state_rx = match self . executor_manager . subscribe ( & executor_id) . await {
913+ let executor_state_rx = match self . executor_manager . subscribe ( & executor_id) . await {
814914 Some ( state_rx) => state_rx,
815915 None => {
816916 let msg = "executor not found, or not yet registered" ;
@@ -827,65 +927,12 @@ impl ExecutorApi for ExecutorAPIService {
827927 task_allocations : vec ! [ ] ,
828928 clock : Some ( 0 ) ,
829929 } ) ) ;
830- let executor_manager = self . executor_manager . clone ( ) ;
831- tokio:: spawn ( async move {
832- // 1. Mark the state as changed to trigger the first change notification to the
833- // executor. This is important because between the report_executor_state and
834- // the get_desired_executor_states requests, the executor state may received
835- // a new desired state.
836- executor_state_rx. mark_changed ( ) ;
837- loop {
838- // 2. Wait for a state change for this executor or grpc stream closing.
839- tokio:: select! {
840- _ = grpc_tx. closed( ) => {
841- info!(
842- executor_id = executor_id. get( ) ,
843- "get_desired_executor_states: grpc stream closed"
844- ) ;
845- break ;
846- }
847- result = executor_state_rx. changed( ) => {
848- if let Err ( err) = result {
849- info!(
850- executor_id = executor_id. get( ) ,
851- "get_desired_executor_states: state machine watcher closing: {}" , err
852- ) ;
853- break ;
854- }
855- }
856- }
857-
858- // 3. Get the latest state
859- let desired_state: DesiredExecutorState = match executor_manager
860- . get_executor_state ( & executor_id)
861- . await
862- . try_into ( )
863- {
864- Ok ( desired_state) => desired_state,
865- Err ( err) => {
866- info ! (
867- executor_id = executor_id. get( ) ,
868- "get_desired_executor_states: failed to convert desired state, ignoring: {:?}" , err
869- ) ;
870- continue ;
871- }
872- } ;
873-
874- debug ! (
875- executor_id = executor_id. get( ) ,
876- "get_desired_executor_states: got desired state: {:#?}" , desired_state
877- ) ;
878-
879- // 4. Send the state to the executor
880- if let Err ( err) = grpc_tx. send ( Ok ( desired_state) ) {
881- info ! (
882- executor_id = executor_id. get( ) ,
883- "get_desired_executor_states: grpc stream closing: {}" , err
884- ) ;
885- break ;
886- }
887- }
888- } ) ;
930+ tokio:: spawn ( executor_update_loop (
931+ executor_id,
932+ self . executor_manager . clone ( ) ,
933+ executor_state_rx,
934+ grpc_tx,
935+ ) ) ;
889936
890937 let grpc_stream = WatchStream :: from_changes ( grpc_rx) ;
891938 Ok ( Response :: new (
0 commit comments