From 8c42399a78b68bc4f19f83f2d1900bd23324e865 Mon Sep 17 00:00:00 2001 From: TCeason Date: Fri, 7 Nov 2025 10:13:24 +0800 Subject: [PATCH 1/7] feat(query): prevent masking/row access policy name conflicts --- src/meta/api/src/data_mask_api_impl.rs | 14 ++++++ .../api/src/row_access_policy_api_impl.rs | 12 +++++ .../05_0004_ddl_security_policy.test | 3 -- .../05_0005_ddl_conflicts_policy.test | 47 +++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 97ec03003560e..714a82dee4c39 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -24,6 +24,7 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::KeyWithTenant; @@ -92,6 +93,17 @@ impl> DatamaskApi for KV { }; } + let row_access_name_ident = RowAccessPolicyNameIdent::new( + name_ident.tenant().clone(), + name_ident.data_mask_name().to_string(), + ); + if self.get_pb(&row_access_name_ident).await?.is_some() { + return Err(AppError::DatamaskAlreadyExists( + name_ident.exist_error("name conflicts with an existing row access policy"), + ) + .into()); + } + // Create data mask by inserting these record: // name -> id // id -> policy @@ -113,6 +125,8 @@ impl> DatamaskApi for KV { let meta: DatamaskMeta = req.data_mask_meta.clone(); let id_list = MaskpolicyTableIdList::default(); txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition + .push(txn_cond_eq_seq(&row_access_name_ident, 0)); txn.if_then.extend(vec![ txn_op_put_pb(name_ident, &id, None)?, // name -> db_id txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 4c8e9aa53492a..a23a9950ddc77 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident; use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId; @@ -86,6 +87,16 @@ impl> RowAccessPolicyApi for KV { } } + let mask_name_ident = DataMaskNameIdent::new( + name_ident.tenant().clone(), + name_ident.row_access_name().to_string(), + ); + if self.get_pb(&mask_name_ident).await?.is_some() { + return Ok(Err( + name_ident.exist_error("name conflicts with an existing masking policy") + )); + } + // Create row policy by inserting these record: // name -> id // id -> policy @@ -102,6 +113,7 @@ impl> RowAccessPolicyApi for KV { { let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone(); txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0)); txn.if_then.extend(vec![ txn_op_put_pb(name_ident, &id, None)?, // name -> policy_id txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test index 8ac31ddf80b22..2178a712048b3 100644 --- a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test @@ -1089,6 +1089,3 @@ DROP MASKING POLICY mask_ssn_conditional; statement ok unset global enable_planner_cache; - -statement ok -unset global enable_experimental_row_access_policy; diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test new file mode 100644 index 0000000000000..fc69c3e3adb15 --- /dev/null +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test @@ -0,0 +1,47 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + +statement ok +set global enable_experimental_row_access_policy = 1; + +## Cross-type policy name conflicts should be rejected +statement ok +drop row access policy if exists shared_policy_name; + +statement ok +drop masking policy if exists shared_policy_name; + +statement ok +CREATE ROW ACCESS POLICY shared_policy_name AS (user_id STRING) RETURNS boolean -> true; + +statement error 2321 +CREATE MASKING POLICY shared_policy_name AS (val STRING) RETURNS STRING -> '***'; + +statement ok +drop row access policy if exists shared_policy_name; + +statement ok +drop masking policy if exists mask_only_name; + +statement ok +drop row access policy if exists mask_only_name; + +statement ok +CREATE MASKING POLICY mask_only_name AS (val STRING) RETURNS STRING -> '***'; + +statement error 2324 +CREATE ROW ACCESS POLICY mask_only_name AS (user_id STRING) RETURNS boolean -> true; + +statement ok +drop masking policy if exists mask_only_name; From 3cf9e415ea1e81e873b61dfee3f8c9013b640de4 Mon Sep 17 00:00:00 2001 From: TCeason Date: Sat, 8 Nov 2025 21:34:55 +0800 Subject: [PATCH 2/7] extract check and fetch_id from loop --- src/meta/api/src/data_mask_api_impl.rs | 38 ++++++++++--------- .../api/src/row_access_policy_api_impl.rs | 38 +++++++++---------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 714a82dee4c39..f5a8dcb31ec98 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -61,8 +61,21 @@ impl> DatamaskApi for KV { let name_ident = &req.name; + let row_access_name_ident = RowAccessPolicyNameIdent::new( + name_ident.tenant().clone(), + name_ident.data_mask_name().to_string(), + ); + if self.get_pb(&row_access_name_ident).await?.is_some() { + return Err(AppError::DatamaskAlreadyExists( + name_ident.exist_error("name conflicts with an existing row access policy"), + ) + .into()); + } + + let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?; + let mut trials = txn_backoff(None, func_name!()); - let id = loop { + loop { trials.next().unwrap()?.await; let mut txn = TxnRequest::default(); @@ -93,25 +106,12 @@ impl> DatamaskApi for KV { }; } - let row_access_name_ident = RowAccessPolicyNameIdent::new( - name_ident.tenant().clone(), - name_ident.data_mask_name().to_string(), - ); - if self.get_pb(&row_access_name_ident).await?.is_some() { - return Err(AppError::DatamaskAlreadyExists( - name_ident.exist_error("name conflicts with an existing row access policy"), - ) - .into()); - } - // Create data mask by inserting these record: // name -> id // id -> policy // data mask name -> data mask table id list - let id = fetch_id(self, IdGenerator::data_mask_id()).await?; - - let id = DataMaskId::new(id); + let id = DataMaskId::new(masking_policy_id); let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); @@ -144,12 +144,14 @@ impl> DatamaskApi for KV { ); if succ { - break id; + break; } } - }; + } - Ok(CreateDatamaskReply { id: *id }) + Ok(CreateDatamaskReply { + id: masking_policy_id, + }) } async fn drop_data_mask( diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index a23a9950ddc77..48265c9e66638 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -61,9 +61,20 @@ impl> RowAccessPolicyApi for KV { let name_ident = &req.name; - let id = fetch_id(self, IdGenerator::row_access_id()).await?; + let mask_name_ident = DataMaskNameIdent::new( + name_ident.tenant().clone(), + name_ident.row_access_name().to_string(), + ); + if self.get_pb(&mask_name_ident).await?.is_some() { + return Ok(Err( + name_ident.exist_error("name conflicts with an existing masking policy") + )); + } + + let row_access_id = fetch_id(self, IdGenerator::row_access_id()).await?; + let policy_id = RowAccessPolicyId::new(row_access_id); let mut trials = txn_backoff(None, func_name!()); - let id = loop { + loop { trials.next().unwrap()?.await; let mut txn = TxnRequest::default(); @@ -87,22 +98,11 @@ impl> RowAccessPolicyApi for KV { } } - let mask_name_ident = DataMaskNameIdent::new( - name_ident.tenant().clone(), - name_ident.row_access_name().to_string(), - ); - if self.get_pb(&mask_name_ident).await?.is_some() { - return Ok(Err( - name_ident.exist_error("name conflicts with an existing masking policy") - )); - } - // Create row policy by inserting these record: // name -> id // id -> policy - let id = RowAccessPolicyId::new(id); - let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), id); + let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id); debug!( id :? =(&id_ident), @@ -115,8 +115,8 @@ impl> RowAccessPolicyApi for KV { txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0)); txn.if_then.extend(vec![ - txn_op_put_pb(name_ident, &id, None)?, // name -> policy_id - txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta + txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id + txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta ]); let (succ, _responses) = send_txn(self, txn).await?; @@ -129,12 +129,12 @@ impl> RowAccessPolicyApi for KV { ); if succ { - break id; + break; } } - }; + } - Ok(Ok(CreateRowAccessPolicyReply { id: *id })) + Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id })) } async fn drop_row_access_policy( From 1c09f7bc13748bfde3e166a8ef4d2b588533de33 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 10 Nov 2025 11:16:47 +0800 Subject: [PATCH 3/7] delete loop --- src/meta/api/src/data_mask_api_impl.rs | 140 +++++++++--------- src/meta/api/src/kv_fetch_util.rs | 3 + .../api/src/row_access_policy_api_impl.rs | 103 +++++++------ 3 files changed, 122 insertions(+), 124 deletions(-) diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index f5a8dcb31ec98..5b8e3283d39af 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -13,6 +13,7 @@ // limitations under the License. use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::app_error::TxnRetryMaxTimes; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskId; @@ -74,84 +75,79 @@ impl> DatamaskApi for KV { let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?; - let mut trials = txn_backoff(None, func_name!()); - loop { - trials.next().unwrap()?.await; + let mut txn = TxnRequest::default(); - let mut txn = TxnRequest::default(); + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "create_data_mask"); - let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "create_data_mask"); - - let mut curr_seq = 0; - - if let Some((seq_id, seq_meta)) = res { - match req.create_option { - CreateOption::Create => { - return Err(AppError::DatamaskAlreadyExists( - name_ident.exist_error(func_name!()), - ) - .into()); - } - CreateOption::CreateIfNotExists => { - return Ok(CreateDatamaskReply { id: *seq_id.data }); - } - CreateOption::CreateOrReplace => { - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - - txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - - curr_seq = seq_id.seq; - } - }; - } + let mut curr_seq = 0; - // Create data mask by inserting these record: - // name -> id - // id -> policy - // data mask name -> data mask table id list - - let id = DataMaskId::new(masking_policy_id); - let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); - let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); - - debug!( - id :? =(&id_ident), - name_key :? =(name_ident); - "new datamask id" - ); - - { - let meta: DatamaskMeta = req.data_mask_meta.clone(); - let id_list = MaskpolicyTableIdList::default(); - txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); - txn.condition - .push(txn_cond_eq_seq(&row_access_name_ident, 0)); - txn.if_then.extend(vec![ - txn_op_put_pb(name_ident, &id, None)?, // name -> db_id - txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta - // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later - txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list - ]); - - let (succ, _responses) = send_txn(self, txn).await?; - - debug!( - name :? =(name_ident), - id :? =(&id_ident), - succ = succ; - "create_data_mask" - ); - - if succ { - break; + if let Some((seq_id, seq_meta)) = res { + match req.create_option { + CreateOption::Create => { + return Err(AppError::DatamaskAlreadyExists( + name_ident.exist_error(func_name!()), + ) + .into()); } - } + CreateOption::CreateIfNotExists => { + return Ok(CreateDatamaskReply { id: *seq_id.data }); + } + CreateOption::CreateOrReplace => { + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + + curr_seq = seq_id.seq; + } + }; + } + + // Create data mask by inserting these record: + // name -> id + // id -> policy + // data mask name -> data mask table id list + + let id = DataMaskId::new(masking_policy_id); + let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); + let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); + + debug!( + id :? =(&id_ident), + name_key :? =(name_ident); + "new datamask id" + ); + + { + let meta: DatamaskMeta = req.data_mask_meta.clone(); + let id_list = MaskpolicyTableIdList::default(); + txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition + .push(txn_cond_eq_seq(&row_access_name_ident, 0)); + txn.if_then.extend(vec![ + txn_op_put_pb(name_ident, &id, None)?, // name -> db_id + txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta + // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later + txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list + ]); } - Ok(CreateDatamaskReply { - id: masking_policy_id, - }) + let (succ, _responses) = send_txn(self, txn).await?; + + debug!( + name :? =(name_ident), + id :? =(&id_ident), + succ = succ; + "create_data_mask" + ); + + if succ { + Ok(CreateDatamaskReply { + id: masking_policy_id, + }) + } else { + Err(KVAppError::from(TxnRetryMaxTimes::new(func_name!(), 1))) + } } async fn drop_data_mask( diff --git a/src/meta/api/src/kv_fetch_util.rs b/src/meta/api/src/kv_fetch_util.rs index 092c276fd1f60..7e68cc253250f 100644 --- a/src/meta/api/src/kv_fetch_util.rs +++ b/src/meta/api/src/kv_fetch_util.rs @@ -160,6 +160,9 @@ pub async fn fetch_id( kv_api: &(impl kvapi::KVApi + ?Sized), generator: T, ) -> Result { + // Each `upsert` bumps the seq atomically inside metasrv, therefore every caller + // receives a unique, monotonically increasing id even when multiple sessions + // fetch from the same generator concurrently. let res = kv_api .upsert_kv(UpsertKV::update(generator.to_string_key(), b"")) .await?; diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 48265c9e66638..82d0cf6b2d9b3 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_app::app_error::TxnRetryMaxTimes; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident; @@ -73,68 +74,66 @@ impl> RowAccessPolicyApi for KV { let row_access_id = fetch_id(self, IdGenerator::row_access_id()).await?; let policy_id = RowAccessPolicyId::new(row_access_id); - let mut trials = txn_backoff(None, func_name!()); - loop { - trials.next().unwrap()?.await; - - let mut txn = TxnRequest::default(); + let mut txn = TxnRequest::default(); - let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "create_row_access"); + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "create_row_access"); - let mut curr_seq = 0; + let mut curr_seq = 0; - if let Some((seq_id, seq_meta)) = res { - if req.can_replace { - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + if let Some((seq_id, seq_meta)) = res { + if req.can_replace { + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - // TODO(eason): need to remove row policy from table meta + // TODO(eason): need to remove row policy from table meta - curr_seq = seq_id.seq; - } else { - return Ok(Err(name_ident.exist_error(func_name!()))); - } + curr_seq = seq_id.seq; + } else { + return Ok(Err(name_ident.exist_error(func_name!()))); } + } - // Create row policy by inserting these record: - // name -> id - // id -> policy - - let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id); - - debug!( - id :? =(&id_ident), - name_key :? =(name_ident); - "new RowAccessPolicy id" - ); - - { - let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone(); - txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); - txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0)); - txn.if_then.extend(vec![ - txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id - txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta - ]); - - let (succ, _responses) = send_txn(self, txn).await?; - - debug!( - name :? =(name_ident), - id :? =(&id_ident), - succ = succ; - "create_row_access" - ); - - if succ { - break; - } - } + // Create row policy by inserting these record: + // name -> id + // id -> policy + + let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id); + + debug!( + id :? =(&id_ident), + name_key :? =(name_ident); + "new RowAccessPolicy id" + ); + + { + let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone(); + txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0)); + txn.if_then.extend(vec![ + txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id + txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta + ]); } - Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id })) + let (succ, _responses) = send_txn(self, txn).await?; + + debug!( + name :? =(name_ident), + id :? =(&id_ident), + succ = succ; + "create_row_access" + ); + + if succ { + Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id })) + } else { + Err(MetaTxnError::TxnRetryMaxTimes(TxnRetryMaxTimes::new( + func_name!(), + 1, + ))) + } } async fn drop_row_access_policy( From b95c179ab5a15e27dd2d56fbd7de0b195535ed51 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 10 Nov 2025 15:22:44 +0800 Subject: [PATCH 4/7] fix --- Cargo.lock | 1 - src/meta/api/src/data_mask_api.rs | 5 +-- src/meta/api/src/data_mask_api_impl.rs | 33 +++++++++---------- src/meta/api/src/errors.rs | 4 +-- src/meta/api/src/row_access_policy_api.rs | 2 +- .../api/src/row_access_policy_api_impl.rs | 11 +++---- src/meta/api/src/schema_api_test_suite.rs | 16 +++++---- .../ee/src/data_mask/data_mask_handler.rs | 13 +++++--- .../row_access_policy_handler.rs | 4 +-- .../data_mask/src/data_mask_handler.rs | 14 ++++++-- .../ee_features/row_access_policy/Cargo.toml | 1 - .../src/row_access_policy_handler.rs | 6 ++-- .../interpreter_data_mask_create.rs | 16 +++++++-- .../interpreter_row_access_policy_create.rs | 2 +- .../sql/src/planner/binder/ddl/data_mask.rs | 6 ---- 15 files changed, 77 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b66f07dc8090..9eada2160b075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4902,7 +4902,6 @@ dependencies = [ "async-trait", "databend-common-base", "databend-common-exception", - "databend-common-meta-api", "databend-common-meta-app", "databend-common-meta-store", "databend-common-meta-types", diff --git a/src/meta/api/src/data_mask_api.rs b/src/meta/api/src/data_mask_api.rs index 4c78ed7ebe8bb..69d5fd5c451e3 100644 --- a/src/meta/api/src/data_mask_api.rs +++ b/src/meta/api/src/data_mask_api.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_app::data_mask::data_mask_name_ident; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskId; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use crate::errors::MaskingPolicyError; -use crate::kv_app_error::KVAppError; use crate::meta_txn_error::MetaTxnError; #[async_trait::async_trait] @@ -30,7 +31,7 @@ pub trait DatamaskApi: Send + Sync { async fn create_data_mask( &self, req: CreateDatamaskReq, - ) -> Result; + ) -> Result>, MetaError>; /// On success, returns the dropped id and data mask. /// Returning None, means nothing is removed. diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 5b8e3283d39af..477a2587dc395 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_app::app_error::AppError; -use databend_common_meta_app::app_error::TxnRetryMaxTimes; +use databend_common_meta_app::data_mask::data_mask_name_ident; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskId; @@ -28,6 +27,7 @@ use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -40,7 +40,6 @@ use log::debug; use crate::data_mask_api::DatamaskApi; use crate::errors::MaskingPolicyError; use crate::fetch_id; -use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; use crate::meta_txn_error::MetaTxnError; use crate::txn_backoff::txn_backoff; @@ -57,7 +56,8 @@ impl> DatamaskApi for KV { async fn create_data_mask( &self, req: CreateDatamaskReq, - ) -> Result { + ) -> Result>, MetaError> + { debug!(req :? =(&req); "DatamaskApi: {}", func_name!()); let name_ident = &req.name; @@ -67,10 +67,9 @@ impl> DatamaskApi for KV { name_ident.data_mask_name().to_string(), ); if self.get_pb(&row_access_name_ident).await?.is_some() { - return Err(AppError::DatamaskAlreadyExists( - name_ident.exist_error("name conflicts with an existing row access policy"), - ) - .into()); + return Ok(Err( + name_ident.exist_error("name conflicts with an existing masking policy") + )); } let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?; @@ -85,13 +84,12 @@ impl> DatamaskApi for KV { if let Some((seq_id, seq_meta)) = res { match req.create_option { CreateOption::Create => { - return Err(AppError::DatamaskAlreadyExists( - name_ident.exist_error(func_name!()), - ) - .into()); + return Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )); } CreateOption::CreateIfNotExists => { - return Ok(CreateDatamaskReply { id: *seq_id.data }); + return Ok(Ok(CreateDatamaskReply { id: *seq_id.data })); } CreateOption::CreateOrReplace => { let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); @@ -142,11 +140,13 @@ impl> DatamaskApi for KV { ); if succ { - Ok(CreateDatamaskReply { + Ok(Ok(CreateDatamaskReply { id: masking_policy_id, - }) + })) } else { - Err(KVAppError::from(TxnRetryMaxTimes::new(func_name!(), 1))) + Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )) } } @@ -185,7 +185,6 @@ impl> DatamaskApi for KV { // Policy is in use - cannot drop if !table_policy_refs.is_empty() { return Ok(Err(MaskingPolicyError::policy_in_use( - tenant.tenant_name().to_string(), name_ident.data_mask_name().to_string(), ))); } diff --git a/src/meta/api/src/errors.rs b/src/meta/api/src/errors.rs index 9b28636b59ebe..cf52f96b6d61e 100644 --- a/src/meta/api/src/errors.rs +++ b/src/meta/api/src/errors.rs @@ -47,7 +47,7 @@ pub enum MaskingPolicyError { } impl MaskingPolicyError { - pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + pub fn policy_in_use(policy_name: impl Into) -> Self { Self::PolicyInUse { policy_name: policy_name.into(), } @@ -72,7 +72,7 @@ pub enum RowAccessPolicyError { } impl RowAccessPolicyError { - pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + pub fn policy_in_use(policy_name: impl Into) -> Self { Self::PolicyInUse { policy_name: policy_name.into(), } diff --git a/src/meta/api/src/row_access_policy_api.rs b/src/meta/api/src/row_access_policy_api.rs index 99832878f9a24..25526d67056ac 100644 --- a/src/meta/api/src/row_access_policy_api.rs +++ b/src/meta/api/src/row_access_policy_api.rs @@ -33,7 +33,7 @@ pub trait RowAccessPolicyApi: Send + Sync { req: CreateRowAccessPolicyReq, ) -> Result< Result>, - MetaTxnError, + MetaError, >; /// On success, returns the dropped id and row policy. diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 82d0cf6b2d9b3..464db34c520e3 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_app::app_error::TxnRetryMaxTimes; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident; @@ -56,7 +55,7 @@ impl> RowAccessPolicyApi for KV { req: CreateRowAccessPolicyReq, ) -> Result< Result>, - MetaTxnError, + MetaError, > { debug!(req :? =(&req); "RowAccessPolicyApi: {}", func_name!()); @@ -129,10 +128,9 @@ impl> RowAccessPolicyApi for KV { if succ { Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id })) } else { - Err(MetaTxnError::TxnRetryMaxTimes(TxnRetryMaxTimes::new( - func_name!(), - 1, - ))) + Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )) } } @@ -171,7 +169,6 @@ impl> RowAccessPolicyApi for KV { // Policy is in use - cannot drop if !table_policy_refs.is_empty() { return Ok(Err(RowAccessPolicyError::policy_in_use( - tenant.tenant_name().to_string(), name_ident.row_access_name().to_string(), ))); } diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index c20ffc8ae8b0f..5e3e8ff4855c0 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -3195,7 +3195,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await.unwrap().unwrap(); let mask1_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_1.to_string()); mask1_id = get_kv_u64_data(mt, &mask1_name_ident).await?; @@ -3212,7 +3212,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await.unwrap().unwrap(); let mask2_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_2.to_string()); mask2_id = get_kv_u64_data(mt, &mask2_name_ident).await?; @@ -3473,7 +3473,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await.unwrap().unwrap(); let old_id: u64 = get_kv_u64_data(mt, &name).await?; let id_key = DataMaskIdIdent::new(&tenant, old_id); @@ -3493,7 +3493,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await.unwrap().unwrap(); // assert old id key has been deleted let meta: Result = get_kv_data(mt, &id_key).await; @@ -3898,7 +3898,9 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await?; + .await + .unwrap() + .unwrap(); let mask_cleanup_id = get_kv_u64_data(mt, &mask_cleanup_ident).await?; let set_req = SetTableColumnMaskPolicyReq { @@ -3961,7 +3963,9 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await?; + .await + .unwrap() + .unwrap(); let mask_guard_id = get_kv_u64_data(mt, &mask_guard_ident).await?; table_info = util.get_table().await?; diff --git a/src/query/ee/src/data_mask/data_mask_handler.rs b/src/query/ee/src/data_mask/data_mask_handler.rs index 417f2be71a01b..c6f9356f8a9cd 100644 --- a/src/query/ee/src/data_mask/data_mask_handler.rs +++ b/src/query/ee/src/data_mask/data_mask_handler.rs @@ -18,12 +18,16 @@ use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; use databend_common_meta_api::DatamaskApi; use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::data_mask::data_mask_name_ident::Resource; +use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_enterprise_data_mask_feature::data_mask_handler::DatamaskHandler; use databend_enterprise_data_mask_feature::data_mask_handler::DatamaskHandlerWrapper; @@ -36,10 +40,11 @@ impl DatamaskHandler for RealDatamaskHandler { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()> { - let _ = meta_api.create_data_mask(req).await?; - - Ok(()) + ) -> std::result::Result< + std::result::Result>, + MetaError, + > { + meta_api.create_data_mask(req).await } async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()> { diff --git a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs index fc44997895d85..53601bf3d6fc0 100644 --- a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs +++ b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; -use databend_common_meta_api::meta_txn_error::MetaTxnError; use databend_common_meta_api::RowAccessPolicyApi; use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident::Resource; @@ -29,6 +28,7 @@ use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_enterprise_row_access_policy_feature::row_access_policy_handler::RowAccessPolicyHandler; use databend_enterprise_row_access_policy_feature::row_access_policy_handler::RowAccessPolicyHandlerWrapper; @@ -43,7 +43,7 @@ impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, > { meta_api.create_row_access_policy(req).await } diff --git a/src/query/ee_features/data_mask/src/data_mask_handler.rs b/src/query/ee_features/data_mask/src/data_mask_handler.rs index a5b6a13f6499c..7e0b60f4cb297 100644 --- a/src/query/ee_features/data_mask/src/data_mask_handler.rs +++ b/src/query/ee_features/data_mask/src/data_mask_handler.rs @@ -30,11 +30,15 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; +use databend_common_meta_app::data_mask::data_mask_name_ident::Resource; +use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; #[async_trait::async_trait] @@ -43,7 +47,10 @@ pub trait DatamaskHandler: Sync + Send { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()>; + ) -> std::result::Result< + std::result::Result>, + MetaError, + >; async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()>; @@ -75,7 +82,10 @@ impl DatamaskHandlerWrapper { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()> { + ) -> std::result::Result< + std::result::Result>, + MetaError, + > { self.handler.create_data_mask(meta_api, req).await } diff --git a/src/query/ee_features/row_access_policy/Cargo.toml b/src/query/ee_features/row_access_policy/Cargo.toml index ed16d00eb3db5..2e8622e53ddad 100644 --- a/src/query/ee_features/row_access_policy/Cargo.toml +++ b/src/query/ee_features/row_access_policy/Cargo.toml @@ -11,7 +11,6 @@ edition = { workspace = true } async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } -databend-common-meta-api = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } diff --git a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs index f6c29d2e8a34a..6ad0b541871bb 100644 --- a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs +++ b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs @@ -30,7 +30,6 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; -use databend_common_meta_api::meta_txn_error::MetaTxnError; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident::Resource; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq; @@ -40,6 +39,7 @@ use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; #[async_trait::async_trait] @@ -50,7 +50,7 @@ pub trait RowAccessPolicyHandler: Sync + Send { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, >; async fn drop_row_access_policy( @@ -89,7 +89,7 @@ impl RowAccessPolicyHandlerWrapper { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, > { self.handler.create_row_access_policy(meta_api, req).await } diff --git a/src/query/service/src/interpreters/interpreter_data_mask_create.rs b/src/query/service/src/interpreters/interpreter_data_mask_create.rs index 0b0bb4bece451..a8266b5c068d3 100644 --- a/src/query/service/src/interpreters/interpreter_data_mask_create.rs +++ b/src/query/service/src/interpreters/interpreter_data_mask_create.rs @@ -14,9 +14,11 @@ use std::sync::Arc; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_app::schema::CreateOption; use databend_common_sql::plans::CreateDatamaskPolicyPlan; use databend_common_users::UserApiProvider; use databend_enterprise_data_mask_feature::get_datamask_handler; @@ -53,9 +55,19 @@ impl Interpreter for CreateDataMaskInterpreter { .check_enterprise_enabled(self.ctx.get_license_key(), Feature::DataMask)?; let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_datamask_handler(); - handler + if let Err(_e) = handler .create_data_mask(meta_api, self.plan.clone().into()) - .await?; + .await? + { + return if let CreateOption::CreateIfNotExists = self.plan.create_option { + Ok(PipelineBuildResult::create()) + } else { + Err(ErrorCode::DatamaskAlreadyExists(format!( + "Security policy with name '{}' already exists", + self.plan.name + ))) + }; + } Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs index f02fece87c7d1..35266054fce7d 100644 --- a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs +++ b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs @@ -63,7 +63,7 @@ impl Interpreter for CreateRowAccessPolicyInterpreter { Ok(PipelineBuildResult::create()) } else { Err(ErrorCode::RowAccessPolicyAlreadyExists(format!( - "Row Access Policy '{}' already exists", + "Security policy with name '{}' already exists", self.plan.name ))) }; diff --git a/src/query/sql/src/planner/binder/ddl/data_mask.rs b/src/query/sql/src/planner/binder/ddl/data_mask.rs index 5ae5954b71eb9..0b3c78d17b908 100644 --- a/src/query/sql/src/planner/binder/ddl/data_mask.rs +++ b/src/query/sql/src/planner/binder/ddl/data_mask.rs @@ -36,12 +36,6 @@ impl Binder { // check if input type match to the return type let return_type = policy.return_type.to_string().to_lowercase(); - // // TODO: support mult args in next pr - // if policy.args.len() > 1 { - // return Err(ErrorCode::InvalidArgument( - // "Mask policy only support one argument", - // )); - // } let policy_data_type = policy.args[0].arg_type.to_string().to_lowercase(); if return_type != policy_data_type { return Err(ErrorCode::UnmatchMaskPolicyReturnType(format!( From 3569a848dd77d99d39dfc3137c925f4f29b1146c Mon Sep 17 00:00:00 2001 From: TCeason Date: Wed, 12 Nov 2025 13:16:29 +0800 Subject: [PATCH 5/7] remove extra get_pb --- src/meta/api/src/data_mask_api_impl.rs | 5 ----- src/meta/api/src/row_access_policy_api_impl.rs | 5 ----- 2 files changed, 10 deletions(-) diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 477a2587dc395..fa78697f3fe9b 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -66,11 +66,6 @@ impl> DatamaskApi for KV { name_ident.tenant().clone(), name_ident.data_mask_name().to_string(), ); - if self.get_pb(&row_access_name_ident).await?.is_some() { - return Ok(Err( - name_ident.exist_error("name conflicts with an existing masking policy") - )); - } let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?; diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 464db34c520e3..8b02b6bfde968 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -65,11 +65,6 @@ impl> RowAccessPolicyApi for KV { name_ident.tenant().clone(), name_ident.row_access_name().to_string(), ); - if self.get_pb(&mask_name_ident).await?.is_some() { - return Ok(Err( - name_ident.exist_error("name conflicts with an existing masking policy") - )); - } let row_access_id = fetch_id(self, IdGenerator::row_access_id()).await?; let policy_id = RowAccessPolicyId::new(row_access_id); From fc0eab4234e86241611dab5141a9cfdd6ae01203 Mon Sep 17 00:00:00 2001 From: TCeason Date: Wed, 12 Nov 2025 13:57:29 +0800 Subject: [PATCH 6/7] use ?? replace double unwrap --- src/meta/api/src/schema_api_test_suite.rs | 26 +++++++++-------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 5e3e8ff4855c0..304235aad4706 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -3195,7 +3195,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await.unwrap().unwrap(); + mt.create_data_mask(req).await??; let mask1_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_1.to_string()); mask1_id = get_kv_u64_data(mt, &mask1_name_ident).await?; @@ -3212,7 +3212,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await.unwrap().unwrap(); + mt.create_data_mask(req).await??; let mask2_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_2.to_string()); mask2_id = get_kv_u64_data(mt, &mask2_name_ident).await?; @@ -3473,7 +3473,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await.unwrap().unwrap(); + mt.create_data_mask(req).await??; let old_id: u64 = get_kv_u64_data(mt, &name).await?; let id_key = DataMaskIdIdent::new(&tenant, old_id); @@ -3493,7 +3493,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await.unwrap().unwrap(); + mt.create_data_mask(req).await??; // assert old id key has been deleted let meta: Result = get_kv_data(mt, &id_key).await; @@ -3559,7 +3559,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access_policy(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await??; let name = RowAccessPolicyNameIdent::new(tenant.clone(), policy1.to_string()); let res = mt.get_row_access_policy(&name).await.unwrap().unwrap(); @@ -3576,7 +3576,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access_policy(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await??; let table_id_1; info!("--- apply mask1 policy to table 1 and check"); @@ -3898,9 +3898,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await - .unwrap() - .unwrap(); + .await??; let mask_cleanup_id = get_kv_u64_data(mt, &mask_cleanup_ident).await?; let set_req = SetTableColumnMaskPolicyReq { @@ -3963,9 +3961,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await - .unwrap() - .unwrap(); + .await??; let mask_guard_id = get_kv_u64_data(mt, &mask_guard_ident).await?; table_info = util.get_table().await?; @@ -4057,8 +4053,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await? - .unwrap(); + .await??; let cleanup_policy_id = { let res = mt .get_row_access_policy(&policy_cleanup_ident) @@ -4132,8 +4127,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await? - .unwrap(); + .await??; let guard_policy_id = { let res = mt .get_row_access_policy(&policy_guard_ident) From fb6473630e6259341c344f4d23d29bf403fbcc61 Mon Sep 17 00:00:00 2001 From: TCeason Date: Wed, 12 Nov 2025 15:37:45 +0800 Subject: [PATCH 7/7] fix flaky test --- .../suites/ee/05_ee_ddl/05_0010_policy_references.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test index 143b401907601..1186b4ba17b2f 100644 --- a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test @@ -92,7 +92,7 @@ select * from policy_references(POLICY_NAME => 'rap_multi_case1'); rap_multi_case1 ROW ACCESS POLICY default employees1 TABLE NULL name, department ACTIVE query T -select * from policy_references(POLICY_NAME => 'mask_ssn_conditional1'); +select * from policy_references(POLICY_NAME => 'mask_ssn_conditional1') order by ref_entity_name; ---- mask_ssn_conditional1 MASKING POLICY default employees1 TABLE ssn role ACTIVE mask_ssn_conditional1 MASKING POLICY default employees2 TABLE ssn role ACTIVE