@@ -25,16 +25,13 @@ use rayon::prelude::*;
2525use relative_path:: RelativePathBuf ;
2626use snapshot:: ManifestItem ;
2727use std:: io:: Error as IOError ;
28- use tracing:: { error, info } ;
28+ use tracing:: error;
2929
3030use crate :: {
3131 event:: DEFAULT_TIMESTAMP_KEY ,
3232 handlers:: {
3333 self ,
34- http:: {
35- base_path_without_preceding_slash,
36- modal:: { NodeMetadata , NodeType } ,
37- } ,
34+ http:: { base_path_without_preceding_slash, cluster:: for_each_live_ingestor} ,
3835 } ,
3936 metrics:: { EVENTS_INGESTED_DATE , EVENTS_INGESTED_SIZE_DATE , EVENTS_STORAGE_SIZE_DATE } ,
4037 option:: Mode ,
@@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
458455 storage : Arc < dyn ObjectStorage > ,
459456 stream_name : & str ,
460457 dates : Vec < String > ,
461- ) -> Result < Option < String > , ObjectStorageError > {
458+ ) -> Result < ( ) , ObjectStorageError > {
462459 if !dates. is_empty ( ) {
463460 // get current snapshot
464461 let mut meta = storage. get_object_store_format ( stream_name) . await ?;
@@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
472469 storage. put_snapshot ( stream_name, meta. snapshot ) . await ?;
473470 }
474471
475- // retention is initiated from the querier
476- // request is forwarded to all ingestors to clean up their manifests
477- // no action required for the Index or Prism nodes
478- match PARSEABLE . options . mode {
479- Mode :: All | Mode :: Ingest => {
480- Ok ( get_first_event ( storage. clone ( ) , stream_name, Vec :: new ( ) ) . await ?)
481- }
482- Mode :: Query => Ok ( get_first_event ( storage, stream_name, dates) . await ?) ,
483- Mode :: Index | Mode :: Prism => Err ( ObjectStorageError :: UnhandledError ( Box :: new (
484- std:: io:: Error :: new (
485- std:: io:: ErrorKind :: Unsupported ,
486- "Can't remove manifest from within Index or Prism server" ,
487- ) ,
488- ) ) ) ,
489- }
490- }
472+ if !dates. is_empty ( ) && matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
473+ let stream_name_clone = stream_name. to_string ( ) ;
474+ let dates_clone = dates. clone ( ) ;
491475
492- pub async fn get_first_event (
493- storage : Arc < dyn ObjectStorage > ,
494- stream_name : & str ,
495- dates : Vec < String > ,
496- ) -> Result < Option < String > , ObjectStorageError > {
497- let mut first_event_at: String = String :: default ( ) ;
498- match PARSEABLE . options . mode {
499- Mode :: All | Mode :: Ingest => {
500- // get current snapshot
501- let stream_first_event = PARSEABLE . get_stream ( stream_name) ?. get_first_event ( ) ;
502- if let Some ( first_event) = stream_first_event {
503- first_event_at = first_event;
504- } else {
505- let mut meta = storage. get_object_store_format ( stream_name) . await ?;
506- let meta_clone = meta. clone ( ) ;
507- let manifests = meta_clone. snapshot . manifest_list ;
508- let time_partition = meta_clone. time_partition ;
509- if manifests. is_empty ( ) {
510- info ! ( "No manifest found for stream {stream_name}" ) ;
511- return Err ( ObjectStorageError :: Custom ( "No manifest found" . to_string ( ) ) ) ;
512- }
513- let manifest = & manifests[ 0 ] ;
514- let path = partition_path (
515- stream_name,
516- manifest. time_lower_bound ,
517- manifest. time_upper_bound ,
518- ) ;
519- let Some ( manifest) = storage. get_manifest ( & path) . await ? else {
520- return Err ( ObjectStorageError :: UnhandledError (
521- "Manifest found in snapshot but not in object-storage"
522- . to_string ( )
523- . into ( ) ,
524- ) ) ;
525- } ;
526- if let Some ( first_event) = manifest. files . first ( ) {
527- let lower_bound = match time_partition {
528- Some ( time_partition) => {
529- let ( lower_bound, _) = get_file_bounds ( first_event, time_partition) ;
530- lower_bound
531- }
532- None => {
533- let ( lower_bound, _) =
534- get_file_bounds ( first_event, DEFAULT_TIMESTAMP_KEY . to_string ( ) ) ;
535- lower_bound
536- }
537- } ;
538- first_event_at = lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ;
539- meta. first_event_at = Some ( first_event_at. clone ( ) ) ;
540- storage. put_stream_manifest ( stream_name, & meta) . await ?;
541- PARSEABLE
542- . get_stream ( stream_name) ?
543- . set_first_event_at ( & first_event_at) ;
544- }
545- }
546- }
547- Mode :: Query => {
548- let ingestor_metadata: Vec < NodeMetadata > =
549- handlers:: http:: cluster:: get_node_info ( NodeType :: Ingestor )
550- . await
551- . map_err ( |err| {
552- error ! ( "Fatal: failed to get ingestor info: {:?}" , err) ;
553- ObjectStorageError :: from ( err)
554- } ) ?;
555- let mut ingestors_first_event_at: Vec < String > = Vec :: new ( ) ;
556- for ingestor in ingestor_metadata {
476+ for_each_live_ingestor ( move |ingestor| {
477+ let stream_name = stream_name_clone. clone ( ) ;
478+ let dates = dates_clone. clone ( ) ;
479+ async move {
557480 let url = format ! (
558481 "{}{}/logstream/{}/retention/cleanup" ,
559482 ingestor. domain_name,
560483 base_path_without_preceding_slash( ) ,
561484 stream_name
562485 ) ;
563- let ingestor_first_event_at =
564- handlers:: http:: cluster:: send_retention_cleanup_request (
565- & url,
566- ingestor. clone ( ) ,
567- & dates,
568- )
486+ handlers:: http:: cluster:: send_retention_cleanup_request ( & url, ingestor, & dates)
569487 . await ?;
570- if !ingestor_first_event_at. is_empty ( ) {
571- ingestors_first_event_at. push ( ingestor_first_event_at) ;
572- }
573- }
574- if ingestors_first_event_at. is_empty ( ) {
575- return Ok ( None ) ;
488+ Ok :: < ( ) , ObjectStorageError > ( ( ) )
576489 }
577- first_event_at = ingestors_first_event_at. iter ( ) . min ( ) . unwrap ( ) . to_string ( ) ;
578- }
579- _ => { }
490+ } )
491+ . await ?;
580492 }
581493
582- Ok ( Some ( first_event_at ) )
494+ Ok ( ( ) )
583495}
584496
585497/// Partition the path to which this manifest belongs.
0 commit comments