@@ -36,7 +36,7 @@ use http::StatusCode;
3636use itertools:: Itertools ;
3737use serde:: { Deserialize , Serialize } ;
3838use serde_json:: { json, Value } ;
39- use std:: collections:: { HashMap , HashSet } ;
39+ use std:: collections:: HashMap ;
4040use std:: pin:: Pin ;
4141use std:: sync:: Arc ;
4242use std:: time:: Instant ;
@@ -81,22 +81,22 @@ pub async fn get_records_and_fields(
8181 query_request : & Query ,
8282 req : & HttpRequest ,
8383) -> Result < ( Option < Vec < RecordBatch > > , Option < Vec < String > > ) , QueryError > {
84- let tables = resolve_stream_names ( & query_request. query ) ?;
8584 let session_state = QUERY_SESSION . state ( ) ;
86-
8785 let time_range =
8886 TimeRange :: parse_human_time ( & query_request. start_time , & query_request. end_time ) ?;
87+ let tables = resolve_stream_names ( & query_request. query ) ?;
88+ //check or load streams in memory
89+ create_streams_for_distributed ( tables. clone ( ) ) . await ?;
8990
90- let query: LogicalQuery =
91- into_query ( query_request, & session_state, time_range, & tables) . await ?;
91+ let query: LogicalQuery = into_query ( query_request, & session_state, time_range) . await ?;
9292 let creds = extract_session_key_from_req ( req) ?;
9393 let permissions = Users . get_permissions ( & creds) ;
9494
9595 let table_name = tables
9696 . first ( )
9797 . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" ) ) ?;
9898 user_auth_for_datasets ( & permissions, & tables) . await ?;
99- update_schema_when_distributed ( & tables ) . await ? ;
99+
100100 let ( records, fields) = execute ( query, table_name, false ) . await ?;
101101
102102 let records = match records {
@@ -114,9 +114,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
114114 let time_range =
115115 TimeRange :: parse_human_time ( & query_request. start_time , & query_request. end_time ) ?;
116116 let tables = resolve_stream_names ( & query_request. query ) ?;
117- update_schema_when_distributed ( & tables) . await ?;
118- let query: LogicalQuery =
119- into_query ( & query_request, & session_state, time_range, & tables) . await ?;
117+ //check or load streams in memory
118+ create_streams_for_distributed ( tables. clone ( ) ) . await ?;
119+
120+ let query: LogicalQuery = into_query ( & query_request, & session_state, time_range) . await ?;
120121 let creds = extract_session_key_from_req ( & req) ?;
121122 let permissions = Users . get_permissions ( & creds) ;
122123
@@ -406,35 +407,12 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
406407/// Create streams for querier if they do not exist
407408/// get list of streams from memory and storage
408409/// create streams for memory from storage if they do not exist
409- pub async fn create_streams_for_querier ( ) -> Result < ( ) , QueryError > {
410- let store = PARSEABLE . storage . get_object_store ( ) ;
411- let querier_streams = PARSEABLE . streams . list ( ) ;
412-
413- let querier_streams_set: HashSet < _ > = querier_streams. into_iter ( ) . collect ( ) ;
414- // fetch querier streams which have field list blank
415- // now missing streams should be list of streams which are in storage but not in querier
416- // and also have no fields in the schema
417- // this is to ensure that we do not create streams for querier which already exist in querier
418-
419- let missing_streams: Vec < _ > = store
420- . list_streams ( )
421- . await ?
422- . into_iter ( )
423- . filter ( |stream_name| {
424- !querier_streams_set. contains ( stream_name)
425- || PARSEABLE
426- . get_stream ( stream_name)
427- . map ( |s| s. get_schema ( ) . fields ( ) . is_empty ( ) )
428- . unwrap_or ( false )
429- } )
430- . collect ( ) ;
431-
432- if missing_streams. is_empty ( ) {
410+ pub async fn create_streams_for_distributed ( streams : Vec < String > ) -> Result < ( ) , QueryError > {
411+ if PARSEABLE . options . mode != Mode :: Query && PARSEABLE . options . mode != Mode :: Prism {
433412 return Ok ( ( ) ) ;
434413 }
435-
436414 let mut join_set = JoinSet :: new ( ) ;
437- for stream_name in missing_streams {
415+ for stream_name in streams {
438416 join_set. spawn ( async move {
439417 let result = PARSEABLE
440418 . create_stream_and_schema_from_storage ( & stream_name)
@@ -492,7 +470,6 @@ pub async fn into_query(
492470 query : & Query ,
493471 session_state : & SessionState ,
494472 time_range : TimeRange ,
495- tables : & Vec < String > ,
496473) -> Result < LogicalQuery , QueryError > {
497474 if query. query . is_empty ( ) {
498475 return Err ( QueryError :: EmptyQuery ) ;
@@ -505,33 +482,7 @@ pub async fn into_query(
505482 if query. end_time . is_empty ( ) {
506483 return Err ( QueryError :: EmptyEndTime ) ;
507484 }
508- let raw_logical_plan = match session_state. create_logical_plan ( & query. query ) . await {
509- Ok ( plan) => plan,
510- Err ( _) => {
511- let mut join_set = JoinSet :: new ( ) ;
512- for stream_name in tables {
513- let stream_name = stream_name. clone ( ) ;
514- join_set. spawn ( async move {
515- let result = PARSEABLE
516- . create_stream_and_schema_from_storage ( & stream_name)
517- . await ;
518-
519- if let Err ( e) = & result {
520- warn ! ( "Failed to create stream '{}': {}" , stream_name, e) ;
521- }
522-
523- ( stream_name, result)
524- } ) ;
525- }
526-
527- while let Some ( result) = join_set. join_next ( ) . await {
528- if let Err ( join_error) = result {
529- warn ! ( "Task join error: {}" , join_error) ;
530- }
531- }
532- session_state. create_logical_plan ( & query. query ) . await ?
533- }
534- } ;
485+ let raw_logical_plan = session_state. create_logical_plan ( & query. query ) . await ?;
535486
536487 Ok ( crate :: query:: Query {
537488 raw_logical_plan,
0 commit comments