Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824
[log]

[log.file]
level = "ERROR"
level = "WARN"
format = "text"
dir = "./.databend/logs_1"

Expand Down
7 changes: 7 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ pub struct ProcessInfo {
pub created_time: SystemTime,
}

#[derive(Debug, Clone)]
pub struct SideloadOptions {
pub uri: Option<String>,
pub stage: Option<String>,
}

#[async_trait::async_trait]
pub trait TableContext: Send + Sync {
/// Build a table instance the plan wants to operate on.
Expand Down Expand Up @@ -99,4 +105,5 @@ pub trait TableContext: Send + Sync {
async fn get_table(&self, catalog: &str, database: &str, table: &str)
-> Result<Arc<dyn Table>>;
fn get_processes_info(&self) -> Vec<ProcessInfo>;
fn get_sideload(&self) -> Option<SideloadOptions>;
}
160 changes: 160 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ use std::io::BufRead;
use std::io::Cursor;
use std::ops::Not;
use std::sync::Arc;
use std::time::Instant;

use aho_corasick::AhoCorasick;
use common_ast::ast::Expr;
use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::StageFileStatus;
use common_catalog::plan::StageTableInfo;
use common_catalog::table::AppendMode;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
Expand All @@ -33,19 +36,26 @@ use common_formats::parse_timezone;
use common_formats::FastFieldDecoderValues;
use common_io::cursor_ext::ReadBytesExt;
use common_io::cursor_ext::ReadCheckPointExt;
use common_meta_types::UserStageInfo;
use common_pipeline_core::Pipeline;
use common_pipeline_sources::processors::sources::AsyncSource;
use common_pipeline_sources::processors::sources::AsyncSourcer;
use common_pipeline_transforms::processors::transforms::Transform;
use common_sql::evaluator::ChunkOperator;
use common_sql::evaluator::CompoundChunkOperator;
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use common_sql::Metadata;
use common_sql::MetadataRef;
use common_storages_factory::Table;
use common_storages_stage::StageTable;
use common_users::UserApiProvider;
use parking_lot::Mutex;
use parking_lot::RwLock;

use crate::interpreters::common::append2table;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::processors::TransformAddOn;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::SourcePipeBuilder;
use crate::schedulers::build_query_pipeline;
Expand Down Expand Up @@ -101,6 +111,113 @@ impl InsertInterpreterV2 {
let cast_needed = select_schema != *output_schema;
Ok(cast_needed)
}

// TODO:(everpcpc)
async fn build_insert_from_stage_pipeline(
&self,
table: Arc<dyn Table>,
stage_location: &str,
pipeline: &mut Pipeline,
) -> Result<()> {
let start = Instant::now();
let ctx = self.ctx.clone();
let table_ctx: Arc<dyn TableContext> = ctx.clone();
let source_schema = self.plan.schema();
let target_schema = table.schema();
let catalog_name = self.plan.catalog.clone();
let overwrite = self.plan.overwrite;

let (stage_info, path) = parse_stage_location(&self.ctx, stage_location).await?;

let mut stage_table_info = StageTableInfo {
schema: source_schema.clone(),
user_stage_info: stage_info,
path: path.to_string(),
files: vec![],
pattern: "".to_string(),
files_to_copy: None,
};

let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?;

// TODO: color_copied_files

let mut need_copied_file_infos = vec![];
for file in &all_source_file_infos {
if file.status == StageFileStatus::NeedCopy {
need_copied_file_infos.push(file.clone());
}
}

// DEBUG:
tracing::warn!(
"insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}",
all_source_file_infos.len(),
need_copied_file_infos.len(),
start.elapsed().as_secs()
);

if need_copied_file_infos.is_empty() {
return Ok(());
}

stage_table_info.files_to_copy = Some(need_copied_file_infos.clone());
let stage_table = StageTable::try_create(stage_table_info.clone())?;
let read_source_plan = {
stage_table
.read_plan_with_catalog(ctx.clone(), catalog_name, None)
.await?
};

stage_table.read_data(table_ctx, &read_source_plan, pipeline)?;

let need_fill_missing_columns = target_schema != source_schema;
if need_fill_missing_columns {
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformAddOn::try_create(
transform_input_port,
transform_output_port,
source_schema.clone(),
target_schema.clone(),
ctx.clone(),
)
})?;
}
table.append_data(ctx.clone(), pipeline, AppendMode::Copy, false)?;

pipeline.set_on_finished(move |may_error| {
// capture out variable
let overwrite = overwrite;
let ctx = ctx.clone();
let table = table.clone();

match may_error {
Some(error) => {
tracing::error!("insert stage file error: {}", error);
Err(may_error.as_ref().unwrap().clone())
}
None => {
let append_entries = ctx.consume_precommit_blocks();
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
return GlobalIORuntime::instance().block_on(async move {
// DEBUG:
tracing::warn!(
"insert: try to commit append entries:{}, elapsed:{}",
append_entries.len(),
start.elapsed().as_secs()
);
table
.commit_insertion(ctx, append_entries, overwrite)
.await?;
Ok(())
// TODO: purge copied files
});
}
}
});

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -156,6 +273,27 @@ impl Interpreter for InsertInterpreterV2 {
.format
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
}
InsertInputSource::Sideload(opts) => {
// DEBUG:
tracing::warn!("==> sideload insert: {:?}", opts);

match &opts.stage {
None => {
return Err(ErrorCode::BadDataValueType(
"No stage location provided".to_string(),
));
}
Some(stage_location) => {
self.build_insert_from_stage_pipeline(
table.clone(),
stage_location,
&mut build_res.main_pipeline,
)
.await?;
}
}
return Ok(build_res);
}
InsertInputSource::SelectPlan(plan) => {
let table1 = table.clone();
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
Expand Down Expand Up @@ -602,3 +740,25 @@ async fn exprs_to_datavalue<'a>(
let datavalues: Vec<DataValue> = res.columns().iter().skip(1).map(|col| col.get(0)).collect();
Ok(datavalues)
}

// FIXME: tmp copy from src/query/sql/src/planner/binder/copy.rs
async fn parse_stage_location(
ctx: &Arc<QueryContext>,
location: &str,
) -> Result<(UserStageInfo, String)> {
let s: Vec<&str> = location.split('@').collect();
// @my_ext_stage/abc/
let names: Vec<&str> = s[1].splitn(2, '/').filter(|v| !v.is_empty()).collect();

let stage = if names[0] == "~" {
UserStageInfo::new_user_stage(&ctx.get_current_user()?.name)
} else {
UserApiProvider::instance()
.get_stage(&ctx.get_tenant(), names[0])
.await?
};

let path = names.get(1).unwrap_or(&"").trim_start_matches('/');

Ok((stage, path.to_string()))
}
16 changes: 16 additions & 0 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_base::base::tokio;
use common_base::base::tokio::sync::Mutex as TokioMutex;
use common_base::base::tokio::sync::RwLock;
use common_base::runtime::TrySpawn;
use common_catalog::table_context::SideloadOptions;
use common_exception::ErrorCode;
use common_exception::Result;
use serde::Deserialize;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub struct HttpQueryRequest {
pub pagination: PaginationConf,
#[serde(default = "default_as_true")]
pub string_fields: bool,
pub sideload: Option<SideloadConf>,
}

const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000;
Expand Down Expand Up @@ -141,6 +143,12 @@ impl HttpSessionConf {
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct SideloadConf {
pub(crate) stage: Option<String>,
pub(crate) url: Option<String>,
}

#[derive(Debug, Clone)]
pub struct ResponseState {
pub running_time_ms: f64,
Expand Down Expand Up @@ -229,6 +237,14 @@ impl HttpQuery {
let sql = &request.sql;
tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'");

match &request.sideload {
Some(sideload) => ctx.attach_sideload(SideloadOptions {
uri: sideload.url.clone(),
stage: sideload.stage.clone(),
}),
None => {}
};

let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
let start_time = Instant::now();
let state = Arc::new(RwLock::new(Executor {
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::PartInfoPtr;
use common_catalog::plan::Partitions;
use common_catalog::plan::StageTableInfo;
use common_catalog::table_context::SideloadOptions;
use common_config::DATABEND_COMMIT_VERSION;
use common_datablocks::DataBlock;
use common_datavalues::DataValue;
Expand Down Expand Up @@ -192,6 +193,10 @@ impl QueryContext {
self.shared.set_executor(weak_ptr)
}

pub fn attach_sideload(&self, sideload: SideloadOptions) {
self.shared.attach_sideload(sideload);
}

pub fn get_created_time(&self) -> SystemTime {
self.shared.created_time
}
Expand Down Expand Up @@ -354,6 +359,11 @@ impl TableContext for QueryContext {
fn get_processes_info(&self) -> Vec<ProcessInfo> {
SessionManager::instance().processes_info()
}

// Get Sideload Options.
fn get_sideload(&self) -> Option<SideloadOptions> {
self.shared.get_sideload()
}
}

impl TrySpawn for QueryContext {
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::SystemTime;

use common_base::base::Progress;
use common_base::runtime::Runtime;
use common_catalog::table_context::SideloadOptions;
use common_config::Config;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) data_operator: DataOperator,
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
pub(in crate::sessions) sideload_config: Arc<RwLock<Option<SideloadOptions>>>,
pub(in crate::sessions) created_time: SystemTime,
}

Expand Down Expand Up @@ -107,6 +109,7 @@ impl QueryContextShared {
affect: Arc::new(Mutex::new(None)),
executor: Arc::new(RwLock::new(Weak::new())),
precommit_blocks: Arc::new(RwLock::new(vec![])),
sideload_config: Arc::new(RwLock::new(None)),
created_time: SystemTime::now(),
}))
}
Expand Down Expand Up @@ -316,6 +319,15 @@ impl QueryContextShared {
swaped_precommit_blocks
}

pub fn get_sideload(&self) -> Option<SideloadOptions> {
self.sideload_config.read().clone()
}

pub fn attach_sideload(&self, sideload: SideloadOptions) {
let mut sideload_config = self.sideload_config.write();
*sideload_config = Some(sideload);
}

pub fn get_created_time(&self) -> SystemTime {
self.created_time
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_catalog::plan::PartInfoPtr;
use common_catalog::plan::Partitions;
use common_catalog::table::Table;
use common_catalog::table_context::ProcessInfo;
use common_catalog::table_context::SideloadOptions;
use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -453,6 +454,10 @@ impl TableContext for CtxDelegation {
fn get_processes_info(&self) -> Vec<ProcessInfo> {
todo!()
}

fn get_sideload(&self) -> Option<SideloadOptions> {
todo!()
}
}

#[derive(Clone)]
Expand Down
11 changes: 7 additions & 4 deletions src/query/sql/src/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ impl<'a> Binder {
opts, start, None,
))
}
InsertSource::Values { rest_str } => {
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
Ok(InsertInputSource::Values(data))
}
InsertSource::Values { rest_str } => match self.ctx.get_sideload() {
Some(sideload) => Ok(InsertInputSource::Sideload(Arc::new(sideload))),
None => {
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
Ok(InsertInputSource::Values(data))
}
},
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
Expand Down
Loading