Skip to content

Commit 54c2a76

Browse files
authored
feat(query): masking policy USING clause with multi-column support (#18884)
* feat(query): masking policy USING clause with multi-column support Enable USING clause to reference multiple table columns instead of only the masked column itself. Move masking policy application from storage read layer to SQL binder for unified handling across SELECT/WHERE/HAVING. Core changes: 1. Multi-column USING support with parameter substitution - Add clone_expr_with_replacement() to walk AST and replace column refs - Map policy parameters to USING columns by position - Support N policy args mapped to N USING columns 2. Move masking from read_plan to type checker (binder layer) - Apply masking in TypeChecker::resolve() for all query paths - Add in_masking_policy flag to prevent infinite recursion 3. Meta protobuf schema fixes (v155) - args: BTreeMapโ†’Vec to preserve parameter order (was losing order) - Only write first USING column to column_mask_policy_columns_ids.key * fix * debug * optimize * try fix
1 parent 6933034 commit 54c2a76

File tree

28 files changed

+1341
-271
lines changed

28 files changed

+1341
-271
lines changed

โ€Žsrc/meta/api/src/schema_api_test_suite.rsโ€Ž

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3238,7 +3238,6 @@ impl SchemaApiTestSuite {
32383238
tenant: tenant.clone(),
32393239
seq: MatchSeq::Exact(res.ident.seq),
32403240
table_id,
3241-
column: "number".to_string(),
32423241
action: SetSecurityPolicyAction::Set(mask1_id, vec![column_id]),
32433242
};
32443243
mt.set_table_column_mask_policy(req).await?;
@@ -3299,7 +3298,6 @@ impl SchemaApiTestSuite {
32993298
tenant: tenant.clone(),
33003299
seq: MatchSeq::Exact(res.ident.seq),
33013300
table_id,
3302-
column: "number".to_string(),
33033301
action: SetSecurityPolicyAction::Set(mask1_id, vec![column_id]),
33043302
};
33053303
mt.set_table_column_mask_policy(req).await?;
@@ -3353,12 +3351,10 @@ impl SchemaApiTestSuite {
33533351
.find(|f| f.name == "c1")
33543352
.unwrap()
33553353
.column_id;
3356-
println!("fields is {:?}", res.meta.schema.fields);
33573354
let req = SetTableColumnMaskPolicyReq {
33583355
tenant: tenant.clone(),
33593356
seq: MatchSeq::Exact(res.ident.seq),
33603357
table_id: table_id_1,
3361-
column: "c1".to_string(),
33623358
action: SetSecurityPolicyAction::Set(mask2_id, vec![column_id]),
33633359
};
33643360
mt.set_table_column_mask_policy(req).await?;
@@ -3410,7 +3406,6 @@ impl SchemaApiTestSuite {
34103406
tenant: tenant.clone(),
34113407
seq: MatchSeq::Exact(res.ident.seq),
34123408
table_id: table_id_1,
3413-
column: "number".to_string(),
34143409
action: SetSecurityPolicyAction::Unset(mask2_id),
34153410
};
34163411
mt.set_table_column_mask_policy(req).await?;

โ€Žsrc/meta/api/src/security_api.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ where
113113
columns_ids: column_ids.clone(),
114114
};
115115

116-
for column_id in column_ids {
116+
if let Some(column_id) = column_ids.first() {
117117
new_table_meta
118118
.column_mask_policy_columns_ids
119119
.insert(*column_id, policy_map.clone());

โ€Žsrc/meta/app/src/schema/table/mod.rsโ€Ž

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ pub struct TableMeta {
173173
pub shared_by: BTreeSet<u64>,
174174
// should be discard
175175
pub column_mask_policy: Option<BTreeMap<String, String>>,
176+
// ColumnId always equals the first value in SecurityPolicyColumnMap::columns_ids
177+
// One table can have multiple masking policies
176178
pub column_mask_policy_columns_ids: BTreeMap<ColumnId, SecurityPolicyColumnMap>,
177179
// One table only has an unique row access policy
178180
// should be discard
@@ -794,7 +796,6 @@ pub struct SetTableColumnMaskPolicyReq {
794796
pub tenant: Tenant,
795797
pub table_id: u64,
796798
pub seq: MatchSeq,
797-
pub column: String,
798799
pub action: SetSecurityPolicyAction,
799800
}
800801

โ€Žsrc/meta/proto-conv/src/data_mask_from_to_protobuf_impl.rsโ€Ž

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,20 @@ impl FromToProto for mt::DatamaskMeta {
3636
fn from_pb(p: pb::DatamaskMeta) -> Result<Self, Incompatible> {
3737
reader_check_msg(p.ver, p.min_reader_ver)?;
3838

39+
// Prioritize args_v2 (preserves order), fallback to args (backward compatibility)
40+
let args: Vec<(String, String)> = if !p.args_v2.is_empty() {
41+
p.args_v2
42+
.into_iter()
43+
.map(|arg| (arg.name, arg.r#type))
44+
.collect()
45+
} else {
46+
// Backward compatibility: read from old args map
47+
// Note: BTreeMap sorts keys alphabetically, so order may be lost for old data
48+
p.args.into_iter().collect()
49+
};
50+
3951
let v = Self {
40-
args: p
41-
.args
42-
.iter()
43-
.map(|(arg_name, arg_type)| (arg_name.clone(), arg_type.clone()))
44-
.collect::<Vec<_>>(),
52+
args,
4553
return_type: p.return_type,
4654
body: p.body,
4755
comment: p.comment.clone(),
@@ -55,14 +63,21 @@ impl FromToProto for mt::DatamaskMeta {
5563
}
5664

5765
fn to_pb(&self) -> Result<pb::DatamaskMeta, Incompatible> {
58-
let mut args = BTreeMap::new();
59-
for (arg_name, arg_type) in &self.args {
60-
args.insert(arg_name.to_string(), arg_type.to_string());
61-
}
66+
// Write to args_v2 (new format that preserves order)
67+
let args_v2: Vec<pb::DataMaskArg> = self
68+
.args
69+
.iter()
70+
.map(|(arg_name, arg_type)| pb::DataMaskArg {
71+
name: arg_name.clone(),
72+
r#type: arg_type.clone(),
73+
})
74+
.collect();
75+
6276
let p = pb::DatamaskMeta {
6377
ver: VER,
6478
min_reader_ver: MIN_READER_VER,
65-
args,
79+
args: BTreeMap::new(),
80+
args_v2,
6681
return_type: self.return_type.clone(),
6782
body: self.body.clone(),
6883
comment: self.comment.clone(),

โ€Žsrc/meta/proto-conv/src/util.rsโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
185185
(153, "2025-10-16: Add: RowAccessPolicyColumnMap rename to SecurityColumnMap store mask/row_access policy name and column id"),
186186
(154, "2025-10-16: Add: VacuumWatermark"),
187187
(155, "2025-10-24: Add: RowAccessPolicyMeta::RowAccessPolicyArg"),
188+
(156, "2025-10-22: Add: DataMaskMeta add DataMaskArg"),
188189
// Dear developer:
189190
// If you're gonna add a new metadata version, you'll have to add a test for it.
190191
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

โ€Žsrc/meta/proto-conv/tests/it/main.rsโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,4 @@ mod v152_external_udf;
147147
mod v153_security_column_map;
148148
mod v154_vacuum_watermark;
149149
mod v155_row_access_policy_args;
150+
mod v156_data_mask_args;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::TimeZone;
16+
use chrono::Utc;
17+
use fastrace::func_name;
18+
19+
use crate::common;
20+
21+
// These bytes are built when a new version in introduced,
22+
// and are kept for backward compatibility test.
23+
//
24+
// *************************************************************
25+
// * These messages should never be updated, *
26+
// * only be added when a new version is added, *
27+
// * or be removed when an old version is no longer supported. *
28+
// *************************************************************
29+
//
30+
// The message bytes are built from the output of `test_pb_from_to()`
31+
#[test]
32+
fn test_decode_v156_data_mask() -> anyhow::Result<()> {
33+
let datamask_meta_v156 = vec![
34+
18, 6, 83, 116, 114, 105, 110, 103, 26, 68, 67, 65, 83, 69, 32, 87, 72, 69, 78, 32, 99,
35+
117, 114, 114, 101, 110, 116, 95, 114, 111, 108, 101, 40, 41, 32, 73, 78, 40, 39, 65, 78,
36+
65, 76, 89, 83, 84, 39, 41, 32, 84, 72, 69, 78, 32, 86, 65, 76, 32, 69, 76, 83, 69, 32, 39,
37+
42, 42, 42, 42, 42, 42, 42, 42, 42, 39, 32, 69, 78, 68, 34, 12, 115, 111, 109, 101, 32, 99,
38+
111, 109, 109, 101, 110, 116, 42, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50,
39+
58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 50, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32,
40+
49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 58, 14, 10, 4, 116, 101, 115, 116, 18, 6,
41+
83, 116, 114, 105, 110, 103, 58, 11, 10, 1, 97, 18, 6, 83, 116, 114, 105, 110, 103, 160, 6,
42+
156, 1, 168, 6, 24,
43+
];
44+
45+
let want = || databend_common_meta_app::data_mask::DatamaskMeta {
46+
args: vec![
47+
("test".to_string(), "String".to_string()),
48+
("a".to_string(), "String".to_string()),
49+
],
50+
return_type: "String".to_string(),
51+
body: "CASE WHEN current_role() IN('ANALYST') THEN VAL ELSE '*********' END".to_string(),
52+
comment: Some("some comment".to_string()),
53+
create_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
54+
update_on: Some(Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap()),
55+
};
56+
57+
common::test_pb_from_to(func_name!(), want())?;
58+
common::test_load_old(func_name!(), datamask_meta_v156.as_slice(), 156, want())
59+
}

โ€Žsrc/meta/protos/proto/data_mask.protoโ€Ž

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,25 @@ syntax = "proto3";
1616

1717
package databend_proto;
1818

19+
message DataMaskArg {
20+
string name = 1;
21+
string type = 2;
22+
}
23+
1924
message DatamaskMeta {
2025
uint64 ver = 100;
2126
uint64 min_reader_ver = 101;
2227

28+
// Deprecated: preserved for backward compatibility, use args_v2 instead
2329
map<string, string> args = 1;
2430
string return_type = 2;
2531
string body = 3;
2632
optional string comment = 4;
2733
string create_on = 5;
2834
optional string update_on = 6;
35+
36+
// New field that preserves argument order
37+
repeated DataMaskArg args_v2 = 7;
2938
}
3039

3140
message MaskpolicyTableIdList {

โ€Žsrc/query/ast/src/ast/statements/table.rsโ€Ž

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,9 +1099,20 @@ impl Display for ColumnComment {
10991099

11001100
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
11011101
pub enum ModifyColumnAction {
1102-
// (column name id, masking policy name)
1103-
SetMaskingPolicy(Identifier, String),
1104-
// column name id
1102+
// Set masking policy on a column with optional USING clause
1103+
// Syntax: ALTER TABLE ... MODIFY COLUMN <column> SET MASKING POLICY <policy_name> [USING (<columns>)]
1104+
//
1105+
// Fields:
1106+
// - Identifier: The column name to apply the masking policy to
1107+
// - String: The masking policy name
1108+
// - Option<Vec<Identifier>>: Optional USING clause columns
1109+
//
1110+
// USING clause behavior (follows Snowflake semantics):
1111+
// - First column: The column to mask/tokenize (must match the target column)
1112+
// - Additional columns: Condition columns to evaluate for conditional masking
1113+
// - If USING is omitted: Treats as normal (non-conditional) masking policy
1114+
SetMaskingPolicy(Identifier, String, Option<Vec<Identifier>>),
1115+
// Unset masking policy from a column
11051116
UnsetMaskingPolicy(Identifier),
11061117
// vec<ColumnDefinition>
11071118
SetDataType(Vec<ColumnDefinition>),
@@ -1114,8 +1125,14 @@ pub enum ModifyColumnAction {
11141125
impl Display for ModifyColumnAction {
11151126
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
11161127
match &self {
1117-
ModifyColumnAction::SetMaskingPolicy(column, name) => {
1118-
write!(f, "{} SET MASKING POLICY {}", column, name)?
1128+
ModifyColumnAction::SetMaskingPolicy(column, name, using_columns) => {
1129+
if let Some(using_columns) = using_columns {
1130+
write!(f, "{} SET MASKING POLICY {} USING (", column, name)?;
1131+
write_comma_separated_list(f, using_columns)?;
1132+
write!(f, ")")?
1133+
} else {
1134+
write!(f, "{} SET MASKING POLICY {}", column, name)?
1135+
}
11191136
}
11201137
ModifyColumnAction::UnsetMaskingPolicy(column) => {
11211138
write!(f, "{} UNSET MASKING POLICY", column)?

โ€Žsrc/query/ast/src/parser/data_mask.rsโ€Ž

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,10 @@ fn data_mask_arg(i: Input) -> IResult<DataMaskArg> {
3333
})(i)
3434
}
3535

36-
fn data_mask_more_arg(i: Input) -> IResult<DataMaskArg> {
37-
map(rule! { "," ~ #data_mask_arg }, |(_, arg)| arg)(i)
38-
}
39-
40-
fn data_mask_arg_list(i: Input) -> IResult<Vec<DataMaskArg>> {
41-
map(rule! { #data_mask_more_arg* }, |args| args)(i)
42-
}
43-
4436
fn data_mask_args(i: Input) -> IResult<Vec<DataMaskArg>> {
4537
map(
46-
rule! { AS ~ "(" ~ #data_mask_arg ~ #data_mask_arg_list ~ ")" },
47-
|(_, _, arg_0, more_args, _)| {
48-
let mut args = vec![];
49-
args.push(arg_0);
50-
for arg in more_args {
51-
args.push(arg);
52-
}
53-
args
54-
},
38+
rule! { AS ~ "(" ~ #comma_separated_list1(data_mask_arg) ~ ")" },
39+
|(_, _, args, _)| args,
5540
)(i)
5641
}
5742

0 commit comments

Comments
ย (0)