diff --git a/Cargo.lock b/Cargo.lock index 3032811869e40..df393499585ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2810,14 +2810,14 @@ dependencies = [ [[package]] name = "csv" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" dependencies = [ "csv-core", "itoa", "ryu", - "serde", + "serde_core", ] [[package]] @@ -4605,6 +4605,7 @@ dependencies = [ "async-backtrace", "async-trait", "bstr", + "csv", "csv-core", "databend-common-ast", "databend-common-base", @@ -4627,6 +4628,7 @@ dependencies = [ "databend-storages-common-stage", "databend-storages-common-table-meta", "enum-as-inner", + "fastrace", "jsonb", "lexical-core", "log", @@ -4638,6 +4640,7 @@ dependencies = [ "parquet", "serde", "serde_json", + "tokio", "typetag", ] @@ -13849,10 +13852,11 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -13893,11 +13897,20 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 73df776befd9f..671e2aa64e1a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -285,7 +285,8 @@ cookie = "0.18.1" crc32fast = "1.3.2" cron = "0.12.0" crossbeam-channel = "0.5.6" -csv-core = "0.1.11" +csv = "1.4" +csv-core = "0.1.12" ctor = "0.2" ctrlc = { version = "3.2.3", features = ["termination"] } dashmap = { version = "6.1.0", features = ["serde"] } diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index dbecc0980d831..b3137ee90a2e7 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -87,7 +87,6 @@ impl PartialEq for Box { } } -#[allow(dead_code)] pub type PartInfoPtr = Arc>; /// For cache affinity, we consider some strategies when reshuffle partitions. diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index 73aa268eb77dd..4a8c350997f63 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -122,7 +122,11 @@ impl CopyIntoTableInterpreter { Some(result_columns), ) } else { - let stage_table = StageTable::try_create(plan.stage_table_info.clone())?; + let stage_table = if self.ctx.get_settings().get_enable_split_file_loading()? { + StageTable::try_create_with_split_file(plan.stage_table_info.clone())? + } else { + StageTable::try_create(plan.stage_table_info.clone())? + }; let data_source_plan = stage_table .read_plan(self.ctx.clone(), None, None, false, false) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4947e071f0242..ee9565c79e79c 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -306,6 +306,7 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + // outdated ("enable_new_copy_for_text_formats", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Use new implementation for loading CSV files.", @@ -313,6 +314,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_split_file_loading", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Split file when loading CSV files.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("purge_duplicated_files_in_copy", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Purge duplicated files detected during execution of copy into table.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 67b4733edf283..cc24347f52d54 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -293,8 +293,8 @@ impl Settings { self.try_get_u64("input_read_buffer_size") } - pub fn get_enable_new_copy_for_text_formats(&self) -> Result { - self.try_get_u64("enable_new_copy_for_text_formats") + pub fn get_enable_split_file_loading(&self) -> Result { + Ok(self.try_get_u64("enable_split_file_loading")? != 0) } pub fn get_enable_purge_duplicated_files_in_copy(&self) -> Result { diff --git a/src/query/storages/common/stage/src/read/single_file_partition.rs b/src/query/storages/common/stage/src/read/single_file_partition.rs index 4d71086d94ee5..1bc2827511597 100644 --- a/src/query/storages/common/stage/src/read/single_file_partition.rs +++ b/src/query/storages/common/stage/src/read/single_file_partition.rs @@ -22,7 +22,7 @@ use databend_common_catalog::plan::PartInfoPtr; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq)] pub struct SingleFilePartition { pub path: String, pub size: usize, diff --git a/src/query/storages/stage/Cargo.toml b/src/query/storages/stage/Cargo.toml index 9a0852c1501f8..9249e5ae5d63a 100644 --- a/src/query/storages/stage/Cargo.toml +++ b/src/query/storages/stage/Cargo.toml @@ -34,6 +34,7 @@ databend-common-storages-parquet = { workspace = true } databend-storages-common-stage = { workspace = true } databend-storages-common-table-meta = { workspace = true } enum-as-inner = { workspace = true } +fastrace = { workspace = true } jsonb = { workspace = true } lexical-core = { workspace = true } log = { workspace = true } @@ -45,8 +46,12 @@ parking_lot = { workspace = true } parquet = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true } typetag = { workspace = true } +[dev-dependencies] +csv = { workspace = true } + [lints] workspace = true diff --git a/src/query/storages/stage/src/lib.rs b/src/query/storages/stage/src/lib.rs index 07304f5e39268..a2cf3e1cccb85 100644 --- a/src/query/storages/stage/src/lib.rs +++ b/src/query/storages/stage/src/lib.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(internal_features)] -#![feature(core_intrinsics)] +#![feature(likely_unlikely)] #![feature(impl_trait_in_assoc_type)] #![feature(box_patterns)] #![feature(iter_intersperse)] -#![allow(clippy::uninlined_format_args)] +#![feature(iter_map_windows)] mod append; mod compression; diff --git a/src/query/storages/stage/src/read/row_based/batch.rs b/src/query/storages/stage/src/read/row_based/batch.rs index 67fda2b069b9d..5e69b9fecc27a 100644 --- a/src/query/storages/stage/src/read/row_based/batch.rs +++ b/src/query/storages/stage/src/read/row_based/batch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::intrinsics::unlikely; +use std::hint::unlikely; use databend_common_expression::BlockMetaInfo; use enum_as_inner::EnumAsInner; diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/format.rs b/src/query/storages/stage/src/read/row_based/formats/csv/format.rs index df59df8cb8a8c..a017e9307861c 100644 --- a/src/query/storages/stage/src/read/row_based/formats/csv/format.rs +++ b/src/query/storages/stage/src/read/row_based/formats/csv/format.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_exception::Result; +use databend_common_formats::RecordDelimiter; use databend_common_meta_app::principal::CsvFileFormatParams; use crate::read::load_context::LoadContext; @@ -29,6 +30,21 @@ pub struct CsvInputFormat { pub(crate) params: CsvFileFormatParams, } +impl CsvInputFormat { + pub fn create_reader(params: &CsvFileFormatParams) -> Result { + let reader = csv_core::ReaderBuilder::new() + .delimiter(params.field_delimiter.as_bytes()[0]) + .quote(params.quote.as_bytes()[0]) + .escape((!params.escape.is_empty()).then(|| params.escape.as_bytes()[0])) + .terminator(match params.record_delimiter.as_str().try_into()? { + RecordDelimiter::Crlf => csv_core::Terminator::CRLF, + RecordDelimiter::Any(v) => csv_core::Terminator::Any(v), + }) + .build(); + Ok(reader) + } +} + impl RowBasedFileFormat for CsvInputFormat { fn try_create_separator( &self, diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs b/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs index 6b4e544d0040b..88fd9e1f3f22d 100644 --- a/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs +++ b/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs @@ -14,6 +14,8 @@ mod block_builder; mod format; +mod partitions; mod separator; pub use format::CsvInputFormat; +pub use partitions::*; diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/partitions.rs b/src/query/storages/stage/src/read/row_based/formats/csv/partitions.rs new file mode 100644 index 0000000000000..523e146b3fbf3 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/csv/partitions.rs @@ -0,0 +1,266 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_meta_app::principal::CsvFileFormatParams; +use databend_common_meta_app::principal::FileFormatParams; +use databend_common_meta_app::principal::StageFileCompression; +use databend_common_storage::init_stage_operator; +use databend_common_storage::StageFileInfo; +use databend_storages_common_stage::SingleFilePartition; +use opendal::Operator; + +use crate::read::row_based::formats::CsvInputFormat; +use crate::read::row_based::split::SplitRowBase; + +#[fastrace::trace(name = "csv_read_partitions")] +#[async_backtrace::framed] +pub async fn csv_read_partitions( + stage_table_info: &StageTableInfo, + ctx: Arc, +) -> Result<(PartStatistics, Partitions)> { + let fmt = match &stage_table_info.stage_info.file_format_params { + FileFormatParams::Csv(fmt) => fmt, + _ => unreachable!("do_read_partitions expect csv"), + }; + + let thread_num = ctx.get_settings().get_max_threads()? as usize; + + let files = if let Some(files) = &stage_table_info.files_to_copy { + files.clone() + } else { + stage_table_info.list_files(thread_num, None).await? + }; + + if matches!(fmt.compression, StageFileCompression::None) { + let op = init_stage_operator(&stage_table_info.stage_info)?; + + let read_bytes = files.iter().map(|f| f.size as usize).sum(); + let read_rows = std::cmp::max(read_bytes / (stage_table_info.schema.fields.len() + 1), 1); + + let mut partitions = Vec::new(); + for file in files { + match split_file(&op, &file, fmt, 1024 * 1024 * 512, 1024 * 2).await? { + None => { + if file.size > 0 { + let part = SingleFilePartition { + path: file.path, + size: file.size as _, + }; + partitions.push(Arc::new(Box::new(part) as Box)); + } + } + Some(splits) => { + partitions.extend( + splits + .into_iter() + .map(|x| Arc::new(Box::new(x) as Box)), + ); + } + }; + } + + let statistics = PartStatistics { + snapshot: None, + read_rows, + read_bytes, + partitions_scanned: partitions.len(), + partitions_total: partitions.len(), + is_exact: false, + pruning_stats: Default::default(), + }; + + Ok(( + statistics, + Partitions::create(PartitionsShuffleKind::Seq, partitions), + )) + } else { + let size = files.iter().map(|f| f.size as usize).sum(); + // assuming all fields are empty + let max_rows = std::cmp::max(size / (stage_table_info.schema.fields.len() + 1), 1); + let statistics = PartStatistics { + snapshot: None, + read_rows: max_rows, + read_bytes: size, + partitions_scanned: files.len(), + partitions_total: files.len(), + is_exact: false, + pruning_stats: Default::default(), + }; + + let partitions = files + .into_iter() + .filter(|f| f.size > 0) + .map(|v| { + let part = SingleFilePartition { + path: v.path.clone(), + size: v.size as usize, + }; + let part_info: Box = Box::new(part); + Arc::new(part_info) + }) + .collect::>(); + + Ok(( + statistics, + Partitions::create(PartitionsShuffleKind::Seq, partitions), + )) + } +} + +async fn split_file( + op: &Operator, + file: &StageFileInfo, + format: &CsvFileFormatParams, + split_size: u64, + probe_size: u64, +) -> Result>> { + let n = file.size / split_size; + if n <= 1 { + return Ok(None); + } + + let mut offsets = Vec::with_capacity(n as _); + offsets.push(0_usize); + for start in (1..n).map(|i| (split_size * i)) { + let buf = op + .read_with(&file.path) + .range(start..start + probe_size) + .await? + .to_bytes(); + let mut temp = [0; 1024]; + let reader = CsvInputFormat::create_reader(format)?; + let offset = match next_row_start(reader, &buf, &mut temp) { + None => return Ok(None), + Some(offset) => start as usize + offset, + }; + offsets.push(offset); + } + + let num_file_splits = n as _; + Ok(Some( + offsets + .into_iter() + .chain(Some(file.size as _)) + .enumerate() + .map_windows(|&[(seq_in_file, offset), (_, end)]| SplitRowBase { + file: SingleFilePartition { + path: file.path.clone(), + size: file.size as _, + }, + seq_in_file, + num_file_splits, + offset, + size: end - offset, + }) + .collect(), + )) +} + +fn next_row_start(mut reader: csv_core::Reader, buf: &[u8], temp: &mut [u8]) -> Option { + use csv_core::ReadFieldResult::*; + let mut readded = 0; + loop { + let (result, n, _) = reader.read_field(&buf[readded..], temp); + match result { + Field { record_end } => { + readded += n; + if record_end { + return Some(readded); + } + } + InputEmpty | OutputFull | End => return None, + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use databend_common_formats::RecordDelimiter; + use databend_common_storage::StageFileStatus; + use opendal::services; + + use super::*; + + #[tokio::test] + async fn test_split_file_with_memory_operator() -> Result<()> { + let operator = Operator::new(services::Memory::default())?.finish(); + let path = "test.csv".to_string(); + + let mut data = Vec::new(); + for i in 0..327680 { + let _ = writeln!(data, "{i},asdfe,\"eeess\","); + } + + operator.write(&path, data.clone()).await?; + + let file = StageFileInfo { + path: path.clone(), + size: data.len() as u64, + md5: None, + last_modified: None, + etag: None, + status: StageFileStatus::NeedCopy, + creator: None, + }; + + let format = CsvFileFormatParams::default(); + let split_size = 512 * 1024; + let splits = split_file(&operator, &file, &format, split_size, 1024) + .await? + .expect("expected file splits"); + + assert_eq!(splits.len(), splits[0].num_file_splits); + + let mut counter = 0; + for (index, split) in splits.iter().enumerate() { + assert_eq!(split.file.size, data.len()); + assert_eq!(split.seq_in_file, index); + + let start = split.offset as u64; + let end = start + split.size as u64; + let buf = operator.read_with(&path).range(start..end).await?; + + let mut rdr = csv::ReaderBuilder::new() + .delimiter(format.field_delimiter.as_bytes()[0]) + .quote(format.quote.as_bytes()[0]) + .escape((!format.escape.is_empty()).then(|| format.escape.as_bytes()[0])) + .terminator(match format.record_delimiter.as_str().try_into()? { + RecordDelimiter::Crlf => csv::Terminator::CRLF, + RecordDelimiter::Any(v) => csv::Terminator::Any(v), + }) + .has_headers(false) + .from_reader(buf.clone()); + + for result in rdr.records() { + let record = result.unwrap(); + let i = record.get(0).unwrap().parse::().unwrap(); + assert_eq!(i, counter); + counter += 1; + } + } + + Ok(()) + } +} diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs b/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs index 9599ce63b5461..5036c7e8e23df 100644 --- a/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs +++ b/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use csv_core::ReadRecordResult; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_formats::RecordDelimiter; use databend_common_storage::FileParseError; use databend_common_storage::FileStatus; use log::debug; @@ -74,20 +73,7 @@ impl CsvReader { path: &str, format: &CsvInputFormat, ) -> Result { - let escape = if format.params.escape.is_empty() { - None - } else { - Some(format.params.escape.as_bytes()[0]) - }; - let reader = csv_core::ReaderBuilder::new() - .delimiter(format.params.field_delimiter.as_bytes()[0]) - .quote(format.params.quote.as_bytes()[0]) - .escape(escape) - .terminator(match format.params.record_delimiter.as_str().try_into()? { - RecordDelimiter::Crlf => csv_core::Terminator::CRLF, - RecordDelimiter::Any(v) => csv_core::Terminator::Any(v), - }) - .build(); + let reader = CsvInputFormat::create_reader(&format.params)?; let projection = load_ctx.pos_projection.clone(); let max_fields = match &projection { Some(p) => p.iter().copied().max().unwrap_or(1), diff --git a/src/query/storages/stage/src/read/row_based/formats/mod.rs b/src/query/storages/stage/src/read/row_based/formats/mod.rs index 2b7d9daa14fd1..6530f582092d5 100644 --- a/src/query/storages/stage/src/read/row_based/formats/mod.rs +++ b/src/query/storages/stage/src/read/row_based/formats/mod.rs @@ -16,6 +16,7 @@ mod csv; mod ndjson; mod tsv; +pub use csv::csv_read_partitions; pub use csv::CsvInputFormat; pub use ndjson::NdJsonInputFormat; pub use tsv::TsvInputFormat; diff --git a/src/query/storages/stage/src/read/row_based/mod.rs b/src/query/storages/stage/src/read/row_based/mod.rs index 69e24b53db07a..80020891de7db 100644 --- a/src/query/storages/stage/src/read/row_based/mod.rs +++ b/src/query/storages/stage/src/read/row_based/mod.rs @@ -14,9 +14,10 @@ mod batch; pub(crate) mod format; -mod formats; +pub mod formats; pub(crate) mod processors; mod read_pipeline; +mod split; mod utils; pub use batch::BytesBatch; diff --git a/src/query/storages/stage/src/read/row_based/processors/reader.rs b/src/query/storages/stage/src/read/row_based/processors/reader.rs index e82e149718788..075d71ce3ddd6 100644 --- a/src/query/storages/stage/src/read/row_based/processors/reader.rs +++ b/src/query/storages/stage/src/read/row_based/processors/reader.rs @@ -27,6 +27,7 @@ use log::debug; use opendal::Operator; use crate::read::row_based::batch::BytesBatch; +use crate::read::row_based::split::SplitRowBase; struct FileState { file: SingleFilePartition, @@ -114,15 +115,26 @@ impl PrefetchAsyncSource for BytesReader { Some(part) => part, None => return Ok(None), }; - let file = SingleFilePartition::from_part(&part)?.clone(); - let reader = self.op.reader(&file.path).await?; - - self.file_state = Some(FileState { - file, - reader, - offset: 0, - }) + if let Ok(file) = SingleFilePartition::from_part(&part) { + let reader = self.op.reader(&file.path).await?; + self.file_state = Some(FileState { + file: file.clone(), + reader, + offset: 0, + }) + } else { + let split = SplitRowBase::from_part(&part)?; + let reader = self.op.reader(&split.file.path).await?; + self.file_state = Some(FileState { + file: SingleFilePartition { + path: split.file.path.clone(), + size: split.offset + split.size, + }, + reader, + offset: split.offset, + }) + } } match self.read_batch().await { Ok(block) => Ok(Some(block)), diff --git a/src/query/storages/stage/src/read/row_based/split.rs b/src/query/storages/stage/src/read/row_based/split.rs new file mode 100644 index 0000000000000..468cc7622db62 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/split.rs @@ -0,0 +1,62 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::hash::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; + +use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_storages_common_stage::SingleFilePartition; + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct SplitRowBase { + pub file: SingleFilePartition, + pub seq_in_file: usize, + pub num_file_splits: usize, + pub offset: usize, + pub size: usize, +} + +#[typetag::serde(name = "split_row_base")] +impl PartInfo for SplitRowBase { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + info.as_any() + .downcast_ref::() + .is_some_and(|other| self == other) + } + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.file.path.hash(&mut s); + self.offset.hash(&mut s); + self.size.hash(&mut s); + s.finish() + } +} + +impl SplitRowBase { + pub fn from_part(info: &PartInfoPtr) -> Result<&Self> { + info.as_any() + .downcast_ref::() + .ok_or_else(|| ErrorCode::Internal("Cannot downcast from PartInfo to SplitRowBase.")) + } +} diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 971221e7a1626..4e2ee582f2467 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -44,6 +44,7 @@ use databend_storages_common_stage::SingleFilePartition; use opendal::Operator; use crate::read::avro::AvroReadPipelineBuilder; +use crate::read::row_based::formats::csv_read_partitions; use crate::read::row_based::RowBasedReadPipelineBuilder; /// TODO: we need to track the data metrics in stage table. @@ -53,6 +54,7 @@ pub struct StageTable { // But the Table trait need it: // fn get_table_info(&self) -> &TableInfo). table_info_placeholder: TableInfo, + enable_split_file: bool, } impl StageTable { @@ -67,6 +69,22 @@ impl StageTable { Ok(Arc::new(Self { table_info, table_info_placeholder, + enable_split_file: false, + })) + } + + pub fn try_create_with_split_file(table_info: StageTableInfo) -> Result> { + let table_info_placeholder = TableInfo { + // `system.stage` is used to forbid the user to select * from text files. + name: "stage".to_string(), + ..Default::default() + } + .set_schema(table_info.schema()); + + Ok(Arc::new(Self { + table_info, + table_info_placeholder, + enable_split_file: true, })) } @@ -169,6 +187,9 @@ impl Table for StageTable { FileFormatParams::Orc(_) => { OrcTableForCopy::do_read_partitions(stage_table_info, ctx, _push_downs).await } + FileFormatParams::Csv(_) if self.enable_split_file => { + csv_read_partitions(stage_table_info, ctx).await + } FileFormatParams::Csv(_) | FileFormatParams::NdJson(_) | FileFormatParams::Tsv(_)