@@ -35,7 +35,7 @@ use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3535use crate :: storage:: { ObjectStoreFormat , PARSEABLE_ROOT_DIRECTORY } ;
3636use crate :: HTTP_CLIENT ;
3737use actix_web:: http:: header:: { self , HeaderMap } ;
38- use actix_web:: web:: Path ;
38+ use actix_web:: web:: { Json , Path } ;
3939use actix_web:: Responder ;
4040use bytes:: Bytes ;
4141use chrono:: Utc ;
@@ -729,6 +729,9 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
729729 let mut dresses = vec ! [ ] ;
730730
731731 for ingestor in ingestor_metadata {
732+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
733+ continue ;
734+ }
732735 let uri = Url :: parse ( & format ! (
733736 "{}{}/metrics" ,
734737 & ingestor. domain_name,
@@ -749,11 +752,10 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
749752 let text = res. text ( ) . await . map_err ( PostError :: NetworkError ) ?;
750753 let lines: Vec < Result < String , std:: io:: Error > > =
751754 text. lines ( ) . map ( |line| Ok ( line. to_owned ( ) ) ) . collect_vec ( ) ;
752-
753755 let sample = prometheus_parse:: Scrape :: parse ( lines. into_iter ( ) )
754756 . map_err ( |err| PostError :: CustomError ( err. to_string ( ) ) ) ?
755757 . samples ;
756- let ingestor_metrics = Metrics :: from_prometheus_samples ( sample, & ingestor)
758+ let ingestor_metrics = Metrics :: ingestor_prometheus_samples ( sample, & ingestor)
757759 . await
758760 . map_err ( |err| {
759761 error ! ( "Fatal: failed to get ingestor metrics: {:?}" , err) ;
@@ -767,10 +769,11 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
767769 ) ;
768770 }
769771 }
772+ dresses. push ( Metrics :: querier_prometheus_metrics ( ) . await ) ;
770773 Ok ( dresses)
771774}
772775
773- pub fn init_cluster_metrics_schedular ( ) -> Result < ( ) , PostError > {
776+ pub async fn init_cluster_metrics_scheduler ( ) -> Result < ( ) , PostError > {
774777 info ! ( "Setting up schedular for cluster metrics ingestion" ) ;
775778 let mut scheduler = AsyncScheduler :: new ( ) ;
776779 scheduler
@@ -779,25 +782,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
779782 let result: Result < ( ) , PostError > = async {
780783 let cluster_metrics = fetch_cluster_metrics ( ) . await ;
781784 if let Ok ( metrics) = cluster_metrics {
782- if !metrics. is_empty ( ) {
783- info ! ( "Cluster metrics fetched successfully from all ingestors" ) ;
784- if let Ok ( metrics_bytes) = serde_json:: to_vec ( & metrics) {
785- if matches ! (
786- ingest_internal_stream(
787- INTERNAL_STREAM_NAME . to_string( ) ,
788- bytes:: Bytes :: from( metrics_bytes) ,
789- )
790- . await ,
791- Ok ( ( ) )
792- ) {
793- info ! ( "Cluster metrics successfully ingested into internal stream" ) ;
794- } else {
795- error ! ( "Failed to ingest cluster metrics into internal stream" ) ;
796- }
797- } else {
798- error ! ( "Failed to serialize cluster metrics" ) ;
799- }
800- }
785+ let json_value = serde_json:: to_value ( metrics)
786+ . map_err ( |e| anyhow:: anyhow!( "Failed to serialize metrics: {}" , e) ) ?;
787+
788+ ingest_internal_stream ( INTERNAL_STREAM_NAME . to_string ( ) , Json ( json_value) )
789+ . await
790+ . map_err ( |e| anyhow:: anyhow!( "Failed to ingest metrics: {}" , e) ) ?;
801791 }
802792 Ok ( ( ) )
803793 }
0 commit comments