1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: collections:: BTreeMap ;
1516use std:: collections:: VecDeque ;
1617use std:: io:: BufRead ;
1718use std:: io:: Cursor ;
1819use std:: ops:: Not ;
20+ use std:: str:: FromStr ;
1921use std:: sync:: Arc ;
2022use std:: time:: Instant ;
2123
@@ -28,6 +30,7 @@ use common_base::runtime::GlobalIORuntime;
2830use common_catalog:: plan:: StageFileStatus ;
2931use common_catalog:: plan:: StageTableInfo ;
3032use common_catalog:: table:: AppendMode ;
33+ use common_catalog:: table_context:: StageAttachment ;
3134use common_datablocks:: DataBlock ;
3235use common_datavalues:: prelude:: * ;
3336use common_exception:: ErrorCode ;
@@ -36,6 +39,9 @@ use common_formats::parse_timezone;
3639use common_formats:: FastFieldDecoderValues ;
3740use common_io:: cursor_ext:: ReadBytesExt ;
3841use common_io:: cursor_ext:: ReadCheckPointExt ;
42+ use common_meta_types:: OnErrorMode ;
43+ use common_meta_types:: StageFileCompression ;
44+ use common_meta_types:: StageFileFormatType ;
3945use common_meta_types:: UserStageInfo ;
4046use common_pipeline_core:: Pipeline ;
4147use common_pipeline_sources:: processors:: sources:: AsyncSource ;
@@ -112,11 +118,74 @@ impl InsertInterpreterV2 {
112118 Ok ( cast_needed)
113119 }
114120
115- // TODO:(everpcpc)
121+ fn apply_stage_options (
122+ & self ,
123+ stage : & mut UserStageInfo ,
124+ params : & BTreeMap < String , String > ,
125+ ) -> Result < ( ) > {
126+ for ( k, v) in params. iter ( ) {
127+ match k. as_str ( ) {
128+ // file format options
129+ "format" => {
130+ let format = StageFileFormatType :: from_str ( v) ?;
131+ stage. file_format_options . format = format;
132+ }
133+ "skip_header" => {
134+ let skip_header = u64:: from_str ( v) ?;
135+ stage. file_format_options . skip_header = skip_header;
136+ }
137+ "field_delimiter" => stage. file_format_options . field_delimiter = v. clone ( ) ,
138+ "record_delimiter" => stage. file_format_options . record_delimiter = v. clone ( ) ,
139+ "nan_display" => stage. file_format_options . nan_display = v. clone ( ) ,
140+ "escape" => stage. file_format_options . escape = v. clone ( ) ,
141+ "compression" => {
142+ let compression = StageFileCompression :: from_str ( v) ?;
143+ stage. file_format_options . compression = compression;
144+ }
145+ "row_tag" => stage. file_format_options . row_tag = v. clone ( ) ,
146+ "quote" => stage. file_format_options . quote = v. clone ( ) ,
147+
148+ // copy options
149+ "on_error" => {
150+ let on_error = OnErrorMode :: from_str ( v) ?;
151+ stage. copy_options . on_error = on_error;
152+ }
153+ "size_limit" => {
154+ let size_limit = usize:: from_str ( v) ?;
155+ stage. copy_options . size_limit = size_limit;
156+ }
157+ "split_size" => {
158+ let split_size = usize:: from_str ( v) ?;
159+ stage. copy_options . split_size = split_size;
160+ }
161+ "purge" => {
162+ let purge = bool:: from_str ( v) . map_err ( |_| {
163+ ErrorCode :: StrParseError ( format ! ( "Cannot parse purge: {} as bool" , v) )
164+ } ) ?;
165+ stage. copy_options . purge = purge;
166+ }
167+ "single" => {
168+ let single = bool:: from_str ( v) . map_err ( |_| {
169+ ErrorCode :: StrParseError ( format ! ( "Cannot parse single: {} as bool" , v) )
170+ } ) ?;
171+ stage. copy_options . single = single;
172+ }
173+ "max_file_size" => {
174+ let max_file_size = usize:: from_str ( v) ?;
175+ stage. copy_options . max_file_size = max_file_size;
176+ }
177+
178+ _ => { }
179+ }
180+ }
181+
182+ Ok ( ( ) )
183+ }
184+
116185 async fn build_insert_from_stage_pipeline (
117186 & self ,
118187 table : Arc < dyn Table > ,
119- stage_location : & str ,
188+ attachment : Arc < StageAttachment > ,
120189 pipeline : & mut Pipeline ,
121190 ) -> Result < ( ) > {
122191 let start = Instant :: now ( ) ;
@@ -127,7 +196,8 @@ impl InsertInterpreterV2 {
127196 let catalog_name = self . plan . catalog . clone ( ) ;
128197 let overwrite = self . plan . overwrite ;
129198
130- let ( stage_info, path) = parse_stage_location ( & self . ctx , stage_location) . await ?;
199+ let ( mut stage_info, path) = parse_stage_location ( & self . ctx , & attachment. location ) . await ?;
200+ self . apply_stage_options ( & mut stage_info, & attachment. params ) ?;
131201
132202 let mut stage_table_info = StageTableInfo {
133203 schema : source_schema. clone ( ) ,
@@ -140,7 +210,7 @@ impl InsertInterpreterV2 {
140210
141211 let all_source_file_infos = StageTable :: list_files ( & table_ctx, & stage_table_info) . await ?;
142212
143- // TODO: color_copied_files
213+ // TODO:(everpcpc) color_copied_files
144214
145215 let mut need_copied_file_infos = vec ! [ ] ;
146216 for file in & all_source_file_infos {
@@ -149,9 +219,8 @@ impl InsertInterpreterV2 {
149219 }
150220 }
151221
152- // DEBUG:
153- tracing:: warn!(
154- "insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}" ,
222+ tracing:: info!(
223+ "insert: read all stage attachment files finished, all:{}, need copy:{}, elapsed:{}" ,
155224 all_source_file_infos. len( ) ,
156225 need_copied_file_infos. len( ) ,
157226 start. elapsed( ) . as_secs( )
@@ -199,19 +268,20 @@ impl InsertInterpreterV2 {
199268 None => {
200269 let append_entries = ctx. consume_precommit_blocks ( ) ;
201270 // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
202- return GlobalIORuntime :: instance ( ) . block_on ( async move {
203- // DEBUG:
204- tracing:: warn!(
271+ GlobalIORuntime :: instance ( ) . block_on ( async move {
272+ tracing:: info!(
205273 "insert: try to commit append entries:{}, elapsed:{}" ,
206274 append_entries. len( ) ,
207275 start. elapsed( ) . as_secs( )
208276 ) ;
209277 table
210278 . commit_insertion ( ctx, append_entries, overwrite)
211279 . await ?;
280+
281+ // TODO:(everpcpc) purge copied files
282+
212283 Ok ( ( ) )
213- // TODO: purge copied files
214- } ) ;
284+ } )
215285 }
216286 }
217287 } ) ;
@@ -273,25 +343,15 @@ impl Interpreter for InsertInterpreterV2 {
273343 . format
274344 . exec_stream ( input_context. clone ( ) , & mut build_res. main_pipeline ) ?;
275345 }
276- InsertInputSource :: Sideload ( opts) => {
346+ InsertInputSource :: Stage ( opts) => {
277347 // DEBUG:
278- tracing:: warn!( "==> sideload insert: {:?}" , opts) ;
279-
280- match & opts. stage {
281- None => {
282- return Err ( ErrorCode :: BadDataValueType (
283- "No stage location provided" . to_string ( ) ,
284- ) ) ;
285- }
286- Some ( stage_location) => {
287- self . build_insert_from_stage_pipeline (
288- table. clone ( ) ,
289- stage_location,
290- & mut build_res. main_pipeline ,
291- )
292- . await ?;
293- }
294- }
348+ tracing:: warn!( "==> insert from stage: {:?}" , opts) ;
349+ self . build_insert_from_stage_pipeline (
350+ table. clone ( ) ,
351+ opts. clone ( ) ,
352+ & mut build_res. main_pipeline ,
353+ )
354+ . await ?;
295355 return Ok ( build_res) ;
296356 }
297357 InsertInputSource :: SelectPlan ( plan) => {
0 commit comments