Skip to content

Commit 12ea8dd

Browse files
authored
fix(query): Preserve case for Variant MapAccess in computed columns (#18872)
* fix(query): Preserve case for Variant MapAccess in computed columns * test nested * fix test
1 parent a90032a commit 12ea8dd

File tree

10 files changed

+91
-60
lines changed

10 files changed

+91
-60
lines changed

โ€Žsrc/query/service/src/interpreters/interpreter_table_recluster.rsโ€Ž

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,9 +620,7 @@ impl ReclusterTableInterpreter {
620620
let cluster_keys_len = ast_exprs.len();
621621
let mut cluster_key_strs = Vec::with_capacity(cluster_keys_len);
622622
for mut ast in ast_exprs {
623-
let mut normalizer = IdentifierNormalizer {
624-
ctx: &name_resolution_ctx,
625-
};
623+
let mut normalizer = IdentifierNormalizer::new(&name_resolution_ctx);
626624
ast.drive_mut(&mut normalizer);
627625
cluster_key_strs.push(format!("{:#}", &ast));
628626
}

โ€Žsrc/query/service/tests/it/sql/planner/semantic/name_resolution.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ fn test_normalize_identifiers_in_expr() {
106106
let mut expr = parse_expr(&tokens, Dialect::PostgreSQL).unwrap();
107107

108108
let ctx = NameResolutionContext::default();
109-
let mut normalizer = IdentifierNormalizer { ctx: &ctx };
109+
let mut normalizer = IdentifierNormalizer::new(&ctx);
110110

111111
expr.drive_mut(&mut normalizer);
112112

โ€Žsrc/query/sql/src/planner/binder/ddl/table.rsโ€Ž

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2170,9 +2170,7 @@ impl Binder {
21702170
}
21712171

21722172
let mut cluster_expr = cluster_expr.clone();
2173-
let mut normalizer = IdentifierNormalizer {
2174-
ctx: &self.name_resolution_ctx,
2175-
};
2173+
let mut normalizer = IdentifierNormalizer::new(&self.name_resolution_ctx);
21762174
cluster_expr.drive_mut(&mut normalizer);
21772175
cluster_keys.push(format!("{:#}", &cluster_expr));
21782176
}

โ€Žsrc/query/sql/src/planner/expression/expression_parser.rsโ€Ž

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,7 @@ pub fn parse_computed_expr_to_string(
303303
)));
304304
}
305305
let mut ast = ast.clone();
306-
let mut normalizer = IdentifierNormalizer {
307-
ctx: &name_resolution_ctx,
308-
};
306+
let mut normalizer = IdentifierNormalizer::new(&name_resolution_ctx);
309307
ast.drive_mut(&mut normalizer);
310308
Ok(format!("{:#}", ast))
311309
}
@@ -492,9 +490,7 @@ pub fn analyze_cluster_keys(
492490
exprs.push(expr);
493491

494492
let mut cluster_by = ast.clone();
495-
let mut normalizer = IdentifierNormalizer {
496-
ctx: &name_resolution_ctx,
497-
};
493+
let mut normalizer = IdentifierNormalizer::new(&name_resolution_ctx);
498494
cluster_by.drive_mut(&mut normalizer);
499495
cluster_keys.push(format!("{:#}", &cluster_by));
500496
}

โ€Žsrc/query/sql/src/planner/semantic/name_resolution.rsโ€Ž

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use databend_common_ast::ast::quote::ident_needs_quote;
1818
use databend_common_ast::ast::Identifier;
1919
use databend_common_ast::ast::IdentifierType;
20+
use databend_common_ast::ast::MapAccessor;
2021
use databend_common_catalog::table_context::TableContext;
2122
use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
@@ -110,15 +111,38 @@ pub fn compare_table_name(
110111
}
111112

112113
#[derive(VisitorMut)]
113-
#[visitor(Identifier(enter))]
114+
#[visitor(Identifier(enter), MapAccessor)]
114115
pub struct IdentifierNormalizer<'a> {
115-
pub ctx: &'a NameResolutionContext,
116+
ctx: &'a NameResolutionContext,
117+
in_map_accessor: bool,
116118
}
117119

118-
impl IdentifierNormalizer<'_> {
120+
impl<'a> IdentifierNormalizer<'a> {
121+
pub fn new(ctx: &'a NameResolutionContext) -> Self {
122+
Self {
123+
ctx,
124+
in_map_accessor: false,
125+
}
126+
}
127+
119128
fn enter_identifier(&mut self, ident: &mut Identifier) {
120-
let normalized_ident = normalize_identifier(ident, self.ctx);
121-
*ident = normalized_ident;
129+
// Skip normalization if inside a MapAccessor,
130+
// because MapAccessor is used to extract internal fields of nested types,
131+
// altering the case may prevent the desired data from being retrieved.
132+
if !self.in_map_accessor {
133+
let normalized_ident = normalize_identifier(ident, self.ctx);
134+
*ident = normalized_ident;
135+
}
136+
}
137+
138+
fn enter_map_accessor(&mut self, _accessor: &mut MapAccessor) {
139+
// Set flag to true before processing the identifier inside the accessor
140+
self.in_map_accessor = true;
141+
}
142+
143+
fn exit_map_accessor(&mut self, _accessor: &mut MapAccessor) {
144+
// Reset the flag after processing
145+
self.in_map_accessor = false;
122146
}
123147
}
124148

โ€Žsrc/query/storages/common/index/src/inverted_index.rsโ€Ž

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ use tantivy::query::Query;
9696
use tantivy::query::QueryClone;
9797
use tantivy::query::TermQuery;
9898
use tantivy::schema::Field;
99+
use tantivy::version;
99100
use tantivy::Directory;
100101
use tantivy::Term;
102+
use tantivy::Version;
101103
use tantivy_common::BinarySerializable;
102104
use tantivy_common::HasLen;
103105
use tantivy_common::VInt;
@@ -108,28 +110,12 @@ use tantivy_fst::Streamer;
108110

109111
// tantivy version is used to generate the footer data
110112

111-
// Index major version.
112-
const INDEX_MAJOR_VERSION: u32 = 0;
113-
// Index minor version.
114-
const INDEX_MINOR_VERSION: u32 = 25;
115-
// Index patch version.
116-
const INDEX_PATCH_VERSION: u32 = 0;
117-
// Index format version.
118-
const INDEX_FORMAT_VERSION: u32 = 7;
119113
// The magic byte of the footer to identify corruption
120114
// or an old version of the footer.
121115
const FOOTER_MAGIC_NUMBER: u32 = 1337;
122116

123117
type CrcHashU32 = u32;
124118

125-
/// Structure version for the index.
126-
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
127-
pub struct Version {
128-
major: u32,
129-
minor: u32,
130-
patch: u32,
131-
index_format_version: u32,
132-
}
133119
/// A Footer is appended every part of data, like tantivy file.
134120
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
135121
struct Footer {
@@ -139,12 +125,7 @@ struct Footer {
139125

140126
impl Footer {
141127
fn new(crc: CrcHashU32) -> Self {
142-
let version = Version {
143-
major: INDEX_MAJOR_VERSION,
144-
minor: INDEX_MINOR_VERSION,
145-
patch: INDEX_PATCH_VERSION,
146-
index_format_version: INDEX_FORMAT_VERSION,
147-
};
128+
let version = version().clone();
148129
Footer { version, crc }
149130
}
150131

โ€Žsrc/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rsโ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use databend_storages_common_io::MergeIOReader;
3838
use databend_storages_common_io::ReadSettings;
3939
use databend_storages_common_table_meta::meta::SingleColumnMeta;
4040
use databend_storages_common_table_meta::table::TableCompression;
41+
use log::info;
4142
use opendal::Operator;
4243
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
4344
use parquet::arrow::parquet_to_arrow_field_levels;
@@ -115,6 +116,7 @@ pub(crate) async fn legacy_load_inverted_index_files<'a>(
115116
operator: &'a Operator,
116117
) -> Result<Vec<Arc<InvertedIndexFile>>> {
117118
let start = Instant::now();
119+
info!("load inverted index directory version 2");
118120

119121
let mut files = vec![];
120122
let mut ranges = vec![];
@@ -300,6 +302,7 @@ pub(crate) async fn load_inverted_index_directory<'a>(
300302
// 7. meta.json file
301303
// 8. .managed.json file
302304
if version == 1 {
305+
info!("load inverted index directory version 1");
303306
let mut columns = Vec::with_capacity(inverted_index_meta_map.len());
304307
for (col_name, col_meta) in inverted_index_meta_map {
305308
let col_range = col_meta.offset..(col_meta.offset + col_meta.len);

โ€Žsrc/query/storages/fuse/src/io/write/inverted_index_writer.rsโ€Ž

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use databend_storages_common_table_meta::meta::Location;
4242
use databend_storages_common_table_meta::table::TableCompression;
4343
use jsonb::from_raw_jsonb;
4444
use jsonb::RawJsonb;
45+
use log::debug;
46+
use log::info;
4547
use tantivy::index::SegmentComponent;
4648
use tantivy::indexer::UserOperation;
4749
use tantivy::schema::Field;
@@ -106,6 +108,10 @@ pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec<InvertedInd
106108
}
107109
// ignore invalid index
108110
if index_fields.len() != index.column_ids.len() {
111+
debug!(
112+
"Ignoring invalid inverted index: {}, missing columns",
113+
index.name
114+
);
109115
continue;
110116
}
111117
let index_schema = DataSchema::new(index_fields);
@@ -145,6 +151,19 @@ impl InvertedIndexState {
145151
inverted_index_builder: &InvertedIndexBuilder,
146152
) -> Result<Self> {
147153
let start = Instant::now();
154+
155+
let inverted_index_location =
156+
TableMetaLocationGenerator::gen_inverted_index_location_from_block_location(
157+
&block_location.0,
158+
&inverted_index_builder.name,
159+
&inverted_index_builder.version,
160+
);
161+
162+
info!(
163+
"Start build inverted index for location: {}",
164+
inverted_index_location
165+
);
166+
148167
let mut writer = InvertedIndexWriter::try_create(
149168
Arc::new(inverted_index_builder.schema.clone()),
150169
&inverted_index_builder.options,
@@ -153,18 +172,16 @@ impl InvertedIndexState {
153172
let data = writer.finalize()?;
154173

155174
// Perf.
175+
let size = data.len();
176+
let elapsed_ms = start.elapsed().as_millis() as u64;
156177
{
157-
metrics_inc_block_inverted_index_generate_milliseconds(
158-
start.elapsed().as_millis() as u64
159-
);
178+
metrics_inc_block_inverted_index_generate_milliseconds(elapsed_ms);
160179
}
180+
info!(
181+
"Finish build inverted index: location={}, size={} bytes in {} ms",
182+
inverted_index_location, size, elapsed_ms
183+
);
161184

162-
let inverted_index_location =
163-
TableMetaLocationGenerator::gen_inverted_index_location_from_block_location(
164-
&block_location.0,
165-
&inverted_index_builder.name,
166-
&inverted_index_builder.version,
167-
);
168185
Self::try_create(data, inverted_index_location)
169186
}
170187
}
@@ -183,7 +200,6 @@ impl InvertedIndexWriter {
183200
let (index_schema, _) = create_index_schema(schema.clone(), index_options)?;
184201

185202
let index_settings = IndexSettings {
186-
// sort_by_field: None,
187203
..Default::default()
188204
};
189205

@@ -320,7 +336,6 @@ impl InvertedIndexWriter {
320336
&mut data,
321337
// Zstd has the best compression ratio
322338
TableCompression::Zstd,
323-
// Some(metadata),
324339
None,
325340
)?;
326341

โ€Žtests/sqllogictests/suites/base/15_procedure/15_0001_execute_immediate.testโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use test_procedure;
77
query I
88
EXECUTE IMMEDIATE $$
99
BEGIN
10-
RETURN;
10+
RETURN NULL;
1111
END;
1212
$$;
1313
----

โ€Žtests/sqllogictests/suites/ee/01_ee_system/01_0001_computed_column.testโ€Ž

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,22 +254,22 @@ a1 a1-c1 c1
254254
a2 a2-c2 c2
255255

256256
statement ok
257-
drop table if exists t_virtual;
257+
drop table if exists t_virtual_merge;
258258

259259
statement ok
260-
create table t_virtual(a string null default 'a', b string null as (concat(a, '-', c)) virtual, c string null default 'c');
260+
create table t_virtual_merge(a string null default 'a', b string null as (concat(a, '-', c)) virtual, c string null default 'c');
261261

262262
statement ok
263-
insert into t_virtual values ('a1', 'c1'), ('a2', 'c2');
263+
insert into t_virtual_merge values ('a1', 'c1'), ('a2', 'c2');
264264

265265
statement ok
266-
insert into t_virtual (a) values ('a3'), ('a4');
266+
insert into t_virtual_merge (a) values ('a3'), ('a4');
267267

268268
statement ok
269-
insert into t_virtual (c) values ('c5'), ('c6');
269+
insert into t_virtual_merge (c) values ('c5'), ('c6');
270270

271271
query T
272-
select * from t_virtual order by a, c;
272+
select * from t_virtual_merge order by a, c;
273273
----
274274
a a-c5 c5
275275
a a-c6 c6
@@ -279,10 +279,10 @@ a3 a3-c c
279279
a4 a4-c c
280280

281281
statement ok
282-
merge into t_virtual target using t_virtual source on target.a = source.a when matched and source.a = 'a4' then update set target.a = 'aa', target.c = 'cc';
282+
merge into t_virtual_merge target using t_virtual_merge source on target.a = source.a when matched and source.a = 'a4' then update set target.a = 'aa', target.c = 'cc';
283283

284284
query T
285-
select * from t_virtual order by a, c;
285+
select * from t_virtual_merge order by a, c;
286286
----
287287
a a-c5 c5
288288
a a-c6 c6
@@ -291,5 +291,21 @@ a2 a2-c2 c2
291291
a3 a3-c c
292292
aa aa-cc cc
293293

294+
statement ok
295+
drop table if exists t_variant;
296+
297+
statement ok
298+
create table t_variant(id int, data variant, name string as (data:videoInfo:Name::string) stored, duration int as(DATA:videoInfo:Duration::int) stored);
299+
300+
statement ok
301+
insert into t_variant values(1, '{"videoInfo":{"Name":"H264","Duration":10}}'),(2, '{"videoInfo":{"Name":"H265","Duration":20}}');
302+
303+
query ITTI
304+
select * from t_variant;
305+
----
306+
1 {"videoInfo":{"Duration":10,"Name":"H264"}} H264 10
307+
2 {"videoInfo":{"Duration":20,"Name":"H265"}} H265 20
308+
294309
statement ok
295310
DROP DATABASE test_computed_column
311+

0 commit comments

Comments
ย (0)