@@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
2020use arrow_flight:: flight_service_server:: FlightServiceServer ;
2121use arrow_flight:: PollInfo ;
2222use arrow_schema:: ArrowError ;
23-
24- use datafusion:: common:: tree_node:: TreeNode ;
2523use serde_json:: json;
2624use std:: net:: SocketAddr ;
2725use std:: time:: Instant ;
@@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;
3533
3634use crate :: handlers:: http:: cluster:: get_node_info;
3735use crate :: handlers:: http:: modal:: { NodeMetadata , NodeType } ;
38- use crate :: handlers:: http:: query:: { into_query, update_schema_when_distributed } ;
36+ use crate :: handlers:: http:: query:: into_query;
3937use crate :: handlers:: livetail:: cross_origin_config;
4038use crate :: metrics:: QUERY_EXECUTE_TIME ;
4139use crate :: parseable:: PARSEABLE ;
42- use crate :: query:: { execute, TableScanVisitor , QUERY_SESSION } ;
40+ use crate :: query:: { execute, resolve_stream_names , QUERY_SESSION } ;
4341use crate :: utils:: arrow:: flight:: {
4442 append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4543 send_to_ingester,
@@ -131,40 +129,26 @@ impl FlightService for AirServiceImpl {
131129
132130 let ticket =
133131 get_query_from_ticket ( & req) . map_err ( |e| Status :: invalid_argument ( e. to_string ( ) ) ) ?;
134-
132+ let streams = resolve_stream_names ( & ticket. query ) . map_err ( |e| {
133+ error ! ( "Failed to extract table names from SQL: {}" , e) ;
134+ Status :: invalid_argument ( "Invalid SQL query syntax" )
135+ } ) ?;
135136 info ! ( "query requested to airplane: {:?}" , ticket) ;
136137
137138 // get the query session_state
138139 let session_state = QUERY_SESSION . state ( ) ;
139140
140- // get the logical plan and extract the table name
141- let raw_logical_plan = session_state
142- . create_logical_plan ( & ticket. query )
143- . await
144- . map_err ( |err| {
145- error ! ( "Datafusion Error: Failed to create logical plan: {}" , err) ;
146- Status :: internal ( "Failed to create logical plan" )
147- } ) ?;
148-
149141 let time_range = TimeRange :: parse_human_time ( & ticket. start_time , & ticket. end_time )
150142 . map_err ( |e| Status :: internal ( e. to_string ( ) ) ) ?;
151143 // create a visitor to extract the table name
152- let mut visitor = TableScanVisitor :: default ( ) ;
153- let _ = raw_logical_plan. visit ( & mut visitor) ;
154-
155- let streams = visitor. into_inner ( ) ;
156144
157145 let stream_name = streams
158146 . first ( )
159147 . ok_or_else ( || Status :: aborted ( "Malformed SQL Provided, Table Name Not Found" ) ) ?
160148 . to_owned ( ) ;
161149
162- update_schema_when_distributed ( & streams)
163- . await
164- . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
165-
166150 // map payload to query
167- let query = into_query ( & ticket, & session_state, time_range)
151+ let query = into_query ( & ticket, & session_state, time_range, & streams )
168152 . await
169153 . map_err ( |_| Status :: internal ( "Failed to parse query" ) ) ?;
170154
@@ -214,9 +198,11 @@ impl FlightService for AirServiceImpl {
214198
215199 let permissions = Users . get_permissions ( & key) ;
216200
217- user_auth_for_datasets ( & permissions, & streams) . map_err ( |_| {
218- Status :: permission_denied ( "User Does not have permission to access this" )
219- } ) ?;
201+ user_auth_for_datasets ( & permissions, & streams)
202+ . await
203+ . map_err ( |_| {
204+ Status :: permission_denied ( "User Does not have permission to access this" )
205+ } ) ?;
220206 let time = Instant :: now ( ) ;
221207
222208 let ( records, _) = execute ( query, & stream_name, false )
0 commit comments