Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ cookie = "0.18.1"
crc32fast = "1.3.2"
cron = "0.12.0"
crossbeam-channel = "0.5.6"
csv-core = "0.1.11"
csv = "1.4"
csv-core = "0.1.12"
ctor = "0.2"
ctrlc = { version = "3.2.3", features = ["termination"] }
dashmap = { version = "6.1.0", features = ["serde"] }
Expand Down
1 change: 0 additions & 1 deletion src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ impl PartialEq for Box<dyn PartInfo> {
}
}

#[allow(dead_code)]
pub type PartInfoPtr = Arc<Box<dyn PartInfo>>;

/// For cache affinity, we consider some strategies when reshuffle partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ impl CopyIntoTableInterpreter {
Some(result_columns),
)
} else {
let stage_table = StageTable::try_create(plan.stage_table_info.clone())?;
let stage_table = if self.ctx.get_settings().get_enable_split_file_loading()? {
StageTable::try_create_with_split_file(plan.stage_table_info.clone())?
} else {
StageTable::try_create(plan.stage_table_info.clone())?
};

let data_source_plan = stage_table
.read_plan(self.ctx.clone(), None, None, false, false)
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,21 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
// outdated
("enable_new_copy_for_text_formats", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Use new implementation for loading CSV files.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_split_file_loading", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Split file when loading CSV files.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("purge_duplicated_files_in_copy", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Purge duplicated files detected during execution of copy into table.",
Expand Down
4 changes: 2 additions & 2 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ impl Settings {
self.try_get_u64("input_read_buffer_size")
}

pub fn get_enable_new_copy_for_text_formats(&self) -> Result<u64> {
self.try_get_u64("enable_new_copy_for_text_formats")
pub fn get_enable_split_file_loading(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_split_file_loading")? != 0)
}

pub fn get_enable_purge_duplicated_files_in_copy(&self) -> Result<bool> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use databend_common_catalog::plan::PartInfoPtr;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq)]
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq)]
pub struct SingleFilePartition {
pub path: String,
pub size: usize,
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/stage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ databend-common-storages-parquet = { workspace = true }
databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
enum-as-inner = { workspace = true }
fastrace = { workspace = true }
jsonb = { workspace = true }
lexical-core = { workspace = true }
log = { workspace = true }
Expand All @@ -45,8 +46,12 @@ parking_lot = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
typetag = { workspace = true }

[dev-dependencies]
csv = { workspace = true }

[lints]
workspace = true

Expand Down
5 changes: 2 additions & 3 deletions src/query/storages/stage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(internal_features)]
#![feature(core_intrinsics)]
#![feature(likely_unlikely)]
#![feature(impl_trait_in_assoc_type)]
#![feature(box_patterns)]
#![feature(iter_intersperse)]
#![allow(clippy::uninlined_format_args)]
#![feature(iter_map_windows)]

mod append;
mod compression;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/stage/src/read/row_based/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::intrinsics::unlikely;
use std::hint::unlikely;

use databend_common_expression::BlockMetaInfo;
use enum_as_inner::EnumAsInner;
Expand Down
16 changes: 16 additions & 0 deletions src/query/storages/stage/src/read/row_based/formats/csv/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_formats::RecordDelimiter;
use databend_common_meta_app::principal::CsvFileFormatParams;

use crate::read::load_context::LoadContext;
Expand All @@ -29,6 +30,21 @@ pub struct CsvInputFormat {
pub(crate) params: CsvFileFormatParams,
}

impl CsvInputFormat {
pub fn create_reader(params: &CsvFileFormatParams) -> Result<csv_core::Reader> {
let reader = csv_core::ReaderBuilder::new()
.delimiter(params.field_delimiter.as_bytes()[0])
.quote(params.quote.as_bytes()[0])
.escape((!params.escape.is_empty()).then(|| params.escape.as_bytes()[0]))
.terminator(match params.record_delimiter.as_str().try_into()? {
RecordDelimiter::Crlf => csv_core::Terminator::CRLF,
RecordDelimiter::Any(v) => csv_core::Terminator::Any(v),
})
.build();
Ok(reader)
}
}

impl RowBasedFileFormat for CsvInputFormat {
fn try_create_separator(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

mod block_builder;
mod format;
mod partitions;
mod separator;

pub use format::CsvInputFormat;
pub use partitions::*;
Loading