diff --git a/Cargo.lock b/Cargo.lock index 2b66f07dc8090..9eada2160b075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4902,7 +4902,6 @@ dependencies = [ "async-trait", "databend-common-base", "databend-common-exception", - "databend-common-meta-api", "databend-common-meta-app", "databend-common-meta-store", "databend-common-meta-types", diff --git a/src/meta/api/src/data_mask_api.rs b/src/meta/api/src/data_mask_api.rs index 4c78ed7ebe8bb..69d5fd5c451e3 100644 --- a/src/meta/api/src/data_mask_api.rs +++ b/src/meta/api/src/data_mask_api.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_app::data_mask::data_mask_name_ident; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskId; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use crate::errors::MaskingPolicyError; -use crate::kv_app_error::KVAppError; use crate::meta_txn_error::MetaTxnError; #[async_trait::async_trait] @@ -30,7 +31,7 @@ pub trait DatamaskApi: Send + Sync { async fn create_data_mask( &self, req: CreateDatamaskReq, - ) -> Result; + ) -> Result>, MetaError>; /// On success, returns the dropped id and data mask. /// Returning None, means nothing is removed. diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 97ec03003560e..fa78697f3fe9b 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::data_mask::data_mask_name_ident; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskId; @@ -24,8 +24,10 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -38,7 +40,6 @@ use log::debug; use crate::data_mask_api::DatamaskApi; use crate::errors::MaskingPolicyError; use crate::fetch_id; -use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; use crate::meta_txn_error::MetaTxnError; use crate::txn_backoff::txn_backoff; @@ -55,87 +56,93 @@ impl> DatamaskApi for KV { async fn create_data_mask( &self, req: CreateDatamaskReq, - ) -> Result { + ) -> Result>, MetaError> + { debug!(req :? =(&req); "DatamaskApi: {}", func_name!()); let name_ident = &req.name; - let mut trials = txn_backoff(None, func_name!()); - let id = loop { - trials.next().unwrap()?.await; + let row_access_name_ident = RowAccessPolicyNameIdent::new( + name_ident.tenant().clone(), + name_ident.data_mask_name().to_string(), + ); - let mut txn = TxnRequest::default(); + let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?; - let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "create_data_mask"); - - let mut curr_seq = 0; - - if let Some((seq_id, seq_meta)) = res { - match req.create_option { - CreateOption::Create => { - return Err(AppError::DatamaskAlreadyExists( - name_ident.exist_error(func_name!()), - ) - .into()); - } - CreateOption::CreateIfNotExists => { - return Ok(CreateDatamaskReply { id: *seq_id.data }); - } - CreateOption::CreateOrReplace => { - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - - txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - - curr_seq = seq_id.seq; - } - }; - } + let mut txn = TxnRequest::default(); - // Create data mask by inserting these record: - // name -> id - // id -> policy - // data mask name -> data mask table id list - - let id = fetch_id(self, IdGenerator::data_mask_id()).await?; - - let id = DataMaskId::new(id); - let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); - let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); - - debug!( - id :? =(&id_ident), - name_key :? =(name_ident); - "new datamask id" - ); - - { - let meta: DatamaskMeta = req.data_mask_meta.clone(); - let id_list = MaskpolicyTableIdList::default(); - txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); - txn.if_then.extend(vec![ - txn_op_put_pb(name_ident, &id, None)?, // name -> db_id - txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta - // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later - txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list - ]); - - let (succ, _responses) = send_txn(self, txn).await?; - - debug!( - name :? =(name_ident), - id :? =(&id_ident), - succ = succ; - "create_data_mask" - ); - - if succ { - break id; + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "create_data_mask"); + + let mut curr_seq = 0; + + if let Some((seq_id, seq_meta)) = res { + match req.create_option { + CreateOption::Create => { + return Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )); } - } - }; + CreateOption::CreateIfNotExists => { + return Ok(Ok(CreateDatamaskReply { id: *seq_id.data })); + } + CreateOption::CreateOrReplace => { + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - Ok(CreateDatamaskReply { id: *id }) + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + + curr_seq = seq_id.seq; + } + }; + } + + // Create data mask by inserting these record: + // name -> id + // id -> policy + // data mask name -> data mask table id list + + let id = DataMaskId::new(masking_policy_id); + let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); + let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); + + debug!( + id :? =(&id_ident), + name_key :? =(name_ident); + "new datamask id" + ); + + { + let meta: DatamaskMeta = req.data_mask_meta.clone(); + let id_list = MaskpolicyTableIdList::default(); + txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition + .push(txn_cond_eq_seq(&row_access_name_ident, 0)); + txn.if_then.extend(vec![ + txn_op_put_pb(name_ident, &id, None)?, // name -> db_id + txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta + // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later + txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list + ]); + } + + let (succ, _responses) = send_txn(self, txn).await?; + + debug!( + name :? =(name_ident), + id :? =(&id_ident), + succ = succ; + "create_data_mask" + ); + + if succ { + Ok(Ok(CreateDatamaskReply { + id: masking_policy_id, + })) + } else { + Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )) + } } async fn drop_data_mask( @@ -173,7 +180,6 @@ impl> DatamaskApi for KV { // Policy is in use - cannot drop if !table_policy_refs.is_empty() { return Ok(Err(MaskingPolicyError::policy_in_use( - tenant.tenant_name().to_string(), name_ident.data_mask_name().to_string(), ))); } diff --git a/src/meta/api/src/errors.rs b/src/meta/api/src/errors.rs index 9b28636b59ebe..cf52f96b6d61e 100644 --- a/src/meta/api/src/errors.rs +++ b/src/meta/api/src/errors.rs @@ -47,7 +47,7 @@ pub enum MaskingPolicyError { } impl MaskingPolicyError { - pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + pub fn policy_in_use(policy_name: impl Into) -> Self { Self::PolicyInUse { policy_name: policy_name.into(), } @@ -72,7 +72,7 @@ pub enum RowAccessPolicyError { } impl RowAccessPolicyError { - pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + pub fn policy_in_use(policy_name: impl Into) -> Self { Self::PolicyInUse { policy_name: policy_name.into(), } diff --git a/src/meta/api/src/kv_fetch_util.rs b/src/meta/api/src/kv_fetch_util.rs index 092c276fd1f60..7e68cc253250f 100644 --- a/src/meta/api/src/kv_fetch_util.rs +++ b/src/meta/api/src/kv_fetch_util.rs @@ -160,6 +160,9 @@ pub async fn fetch_id( kv_api: &(impl kvapi::KVApi + ?Sized), generator: T, ) -> Result { + // Each `upsert` bumps the seq atomically inside metasrv, therefore every caller + // receives a unique, monotonically increasing id even when multiple sessions + // fetch from the same generator concurrently. let res = kv_api .upsert_kv(UpsertKV::update(generator.to_string_key(), b"")) .await?; diff --git a/src/meta/api/src/row_access_policy_api.rs b/src/meta/api/src/row_access_policy_api.rs index 99832878f9a24..25526d67056ac 100644 --- a/src/meta/api/src/row_access_policy_api.rs +++ b/src/meta/api/src/row_access_policy_api.rs @@ -33,7 +33,7 @@ pub trait RowAccessPolicyApi: Send + Sync { req: CreateRowAccessPolicyReq, ) -> Result< Result>, - MetaTxnError, + MetaError, >; /// On success, returns the dropped id and row policy. diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 4c8e9aa53492a..8b02b6bfde968 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident; use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId; @@ -54,75 +55,78 @@ impl> RowAccessPolicyApi for KV { req: CreateRowAccessPolicyReq, ) -> Result< Result>, - MetaTxnError, + MetaError, > { debug!(req :? =(&req); "RowAccessPolicyApi: {}", func_name!()); let name_ident = &req.name; - let id = fetch_id(self, IdGenerator::row_access_id()).await?; - let mut trials = txn_backoff(None, func_name!()); - let id = loop { - trials.next().unwrap()?.await; + let mask_name_ident = DataMaskNameIdent::new( + name_ident.tenant().clone(), + name_ident.row_access_name().to_string(), + ); - let mut txn = TxnRequest::default(); + let row_access_id = fetch_id(self, IdGenerator::row_access_id()).await?; + let policy_id = RowAccessPolicyId::new(row_access_id); + let mut txn = TxnRequest::default(); - let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "create_row_access"); + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "create_row_access"); - let mut curr_seq = 0; + let mut curr_seq = 0; - if let Some((seq_id, seq_meta)) = res { - if req.can_replace { - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + if let Some((seq_id, seq_meta)) = res { + if req.can_replace { + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - // TODO(eason): need to remove row policy from table meta + // TODO(eason): need to remove row policy from table meta - curr_seq = seq_id.seq; - } else { - return Ok(Err(name_ident.exist_error(func_name!()))); - } + curr_seq = seq_id.seq; + } else { + return Ok(Err(name_ident.exist_error(func_name!()))); } + } - // Create row policy by inserting these record: - // name -> id - // id -> policy - - let id = RowAccessPolicyId::new(id); - let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), id); - - debug!( - id :? =(&id_ident), - name_key :? =(name_ident); - "new RowAccessPolicy id" - ); - - { - let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone(); - txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); - txn.if_then.extend(vec![ - txn_op_put_pb(name_ident, &id, None)?, // name -> policy_id - txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta - ]); - - let (succ, _responses) = send_txn(self, txn).await?; - - debug!( - name :? =(name_ident), - id :? =(&id_ident), - succ = succ; - "create_row_access" - ); - - if succ { - break id; - } - } - }; + // Create row policy by inserting these record: + // name -> id + // id -> policy + + let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id); + + debug!( + id :? =(&id_ident), + name_key :? =(name_ident); + "new RowAccessPolicy id" + ); + + { + let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone(); + txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0)); + txn.if_then.extend(vec![ + txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id + txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta + ]); + } - Ok(Ok(CreateRowAccessPolicyReply { id: *id })) + let (succ, _responses) = send_txn(self, txn).await?; + + debug!( + name :? =(name_ident), + id :? =(&id_ident), + succ = succ; + "create_row_access" + ); + + if succ { + Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id })) + } else { + Ok(Err( + name_ident.exist_error(format!("{} already exists", req.name)) + )) + } } async fn drop_row_access_policy( @@ -160,7 +164,6 @@ impl> RowAccessPolicyApi for KV { // Policy is in use - cannot drop if !table_policy_refs.is_empty() { return Ok(Err(RowAccessPolicyError::policy_in_use( - tenant.tenant_name().to_string(), name_ident.row_access_name().to_string(), ))); } diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index c20ffc8ae8b0f..304235aad4706 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -3195,7 +3195,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await??; let mask1_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_1.to_string()); mask1_id = get_kv_u64_data(mt, &mask1_name_ident).await?; @@ -3212,7 +3212,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await??; let mask2_name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_2.to_string()); mask2_id = get_kv_u64_data(mt, &mask2_name_ident).await?; @@ -3473,7 +3473,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await??; let old_id: u64 = get_kv_u64_data(mt, &name).await?; let id_key = DataMaskIdIdent::new(&tenant, old_id); @@ -3493,7 +3493,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_data_mask(req).await?; + mt.create_data_mask(req).await??; // assert old id key has been deleted let meta: Result = get_kv_data(mt, &id_key).await; @@ -3559,7 +3559,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access_policy(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await??; let name = RowAccessPolicyNameIdent::new(tenant.clone(), policy1.to_string()); let res = mt.get_row_access_policy(&name).await.unwrap().unwrap(); @@ -3576,7 +3576,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access_policy(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await??; let table_id_1; info!("--- apply mask1 policy to table 1 and check"); @@ -3898,7 +3898,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await?; + .await??; let mask_cleanup_id = get_kv_u64_data(mt, &mask_cleanup_ident).await?; let set_req = SetTableColumnMaskPolicyReq { @@ -3961,7 +3961,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await?; + .await??; let mask_guard_id = get_kv_u64_data(mt, &mask_guard_ident).await?; table_info = util.get_table().await?; @@ -4053,8 +4053,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await? - .unwrap(); + .await??; let cleanup_policy_id = { let res = mt .get_row_access_policy(&policy_cleanup_ident) @@ -4128,8 +4127,7 @@ impl SchemaApiTestSuite { update_on: None, }, }) - .await? - .unwrap(); + .await??; let guard_policy_id = { let res = mt .get_row_access_policy(&policy_guard_ident) diff --git a/src/query/ee/src/data_mask/data_mask_handler.rs b/src/query/ee/src/data_mask/data_mask_handler.rs index 417f2be71a01b..c6f9356f8a9cd 100644 --- a/src/query/ee/src/data_mask/data_mask_handler.rs +++ b/src/query/ee/src/data_mask/data_mask_handler.rs @@ -18,12 +18,16 @@ use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; use databend_common_meta_api::DatamaskApi; use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::data_mask::data_mask_name_ident::Resource; +use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_enterprise_data_mask_feature::data_mask_handler::DatamaskHandler; use databend_enterprise_data_mask_feature::data_mask_handler::DatamaskHandlerWrapper; @@ -36,10 +40,11 @@ impl DatamaskHandler for RealDatamaskHandler { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()> { - let _ = meta_api.create_data_mask(req).await?; - - Ok(()) + ) -> std::result::Result< + std::result::Result>, + MetaError, + > { + meta_api.create_data_mask(req).await } async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()> { diff --git a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs index fc44997895d85..53601bf3d6fc0 100644 --- a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs +++ b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; -use databend_common_meta_api::meta_txn_error::MetaTxnError; use databend_common_meta_api::RowAccessPolicyApi; use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident::Resource; @@ -29,6 +28,7 @@ use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_enterprise_row_access_policy_feature::row_access_policy_handler::RowAccessPolicyHandler; use databend_enterprise_row_access_policy_feature::row_access_policy_handler::RowAccessPolicyHandlerWrapper; @@ -43,7 +43,7 @@ impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, > { meta_api.create_row_access_policy(req).await } diff --git a/src/query/ee_features/data_mask/src/data_mask_handler.rs b/src/query/ee_features/data_mask/src/data_mask_handler.rs index a5b6a13f6499c..7e0b60f4cb297 100644 --- a/src/query/ee_features/data_mask/src/data_mask_handler.rs +++ b/src/query/ee_features/data_mask/src/data_mask_handler.rs @@ -30,11 +30,15 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; +use databend_common_meta_app::data_mask::data_mask_name_ident::Resource; +use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; #[async_trait::async_trait] @@ -43,7 +47,10 @@ pub trait DatamaskHandler: Sync + Send { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()>; + ) -> std::result::Result< + std::result::Result>, + MetaError, + >; async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()>; @@ -75,7 +82,10 @@ impl DatamaskHandlerWrapper { &self, meta_api: Arc, req: CreateDatamaskReq, - ) -> Result<()> { + ) -> std::result::Result< + std::result::Result>, + MetaError, + > { self.handler.create_data_mask(meta_api, req).await } diff --git a/src/query/ee_features/row_access_policy/Cargo.toml b/src/query/ee_features/row_access_policy/Cargo.toml index ed16d00eb3db5..2e8622e53ddad 100644 --- a/src/query/ee_features/row_access_policy/Cargo.toml +++ b/src/query/ee_features/row_access_policy/Cargo.toml @@ -11,7 +11,6 @@ edition = { workspace = true } async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } -databend-common-meta-api = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } diff --git a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs index f6c29d2e8a34a..6ad0b541871bb 100644 --- a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs +++ b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs @@ -30,7 +30,6 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; -use databend_common_meta_api::meta_txn_error::MetaTxnError; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident::Resource; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq; @@ -40,6 +39,7 @@ use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; #[async_trait::async_trait] @@ -50,7 +50,7 @@ pub trait RowAccessPolicyHandler: Sync + Send { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, >; async fn drop_row_access_policy( @@ -89,7 +89,7 @@ impl RowAccessPolicyHandlerWrapper { req: CreateRowAccessPolicyReq, ) -> std::result::Result< std::result::Result>, - MetaTxnError, + MetaError, > { self.handler.create_row_access_policy(meta_api, req).await } diff --git a/src/query/service/src/interpreters/interpreter_data_mask_create.rs b/src/query/service/src/interpreters/interpreter_data_mask_create.rs index 0b0bb4bece451..a8266b5c068d3 100644 --- a/src/query/service/src/interpreters/interpreter_data_mask_create.rs +++ b/src/query/service/src/interpreters/interpreter_data_mask_create.rs @@ -14,9 +14,11 @@ use std::sync::Arc; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_app::schema::CreateOption; use databend_common_sql::plans::CreateDatamaskPolicyPlan; use databend_common_users::UserApiProvider; use databend_enterprise_data_mask_feature::get_datamask_handler; @@ -53,9 +55,19 @@ impl Interpreter for CreateDataMaskInterpreter { .check_enterprise_enabled(self.ctx.get_license_key(), Feature::DataMask)?; let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_datamask_handler(); - handler + if let Err(_e) = handler .create_data_mask(meta_api, self.plan.clone().into()) - .await?; + .await? + { + return if let CreateOption::CreateIfNotExists = self.plan.create_option { + Ok(PipelineBuildResult::create()) + } else { + Err(ErrorCode::DatamaskAlreadyExists(format!( + "Security policy with name '{}' already exists", + self.plan.name + ))) + }; + } Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs index f02fece87c7d1..35266054fce7d 100644 --- a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs +++ b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs @@ -63,7 +63,7 @@ impl Interpreter for CreateRowAccessPolicyInterpreter { Ok(PipelineBuildResult::create()) } else { Err(ErrorCode::RowAccessPolicyAlreadyExists(format!( - "Row Access Policy '{}' already exists", + "Security policy with name '{}' already exists", self.plan.name ))) }; diff --git a/src/query/sql/src/planner/binder/ddl/data_mask.rs b/src/query/sql/src/planner/binder/ddl/data_mask.rs index 5ae5954b71eb9..0b3c78d17b908 100644 --- a/src/query/sql/src/planner/binder/ddl/data_mask.rs +++ b/src/query/sql/src/planner/binder/ddl/data_mask.rs @@ -36,12 +36,6 @@ impl Binder { // check if input type match to the return type let return_type = policy.return_type.to_string().to_lowercase(); - // // TODO: support mult args in next pr - // if policy.args.len() > 1 { - // return Err(ErrorCode::InvalidArgument( - // "Mask policy only support one argument", - // )); - // } let policy_data_type = policy.args[0].arg_type.to_string().to_lowercase(); if return_type != policy_data_type { return Err(ErrorCode::UnmatchMaskPolicyReturnType(format!( diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test index 8ac31ddf80b22..2178a712048b3 100644 --- a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test @@ -1089,6 +1089,3 @@ DROP MASKING POLICY mask_ssn_conditional; statement ok unset global enable_planner_cache; - -statement ok -unset global enable_experimental_row_access_policy; diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test new file mode 100644 index 0000000000000..fc69c3e3adb15 --- /dev/null +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0005_ddl_conflicts_policy.test @@ -0,0 +1,47 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + +statement ok +set global enable_experimental_row_access_policy = 1; + +## Cross-type policy name conflicts should be rejected +statement ok +drop row access policy if exists shared_policy_name; + +statement ok +drop masking policy if exists shared_policy_name; + +statement ok +CREATE ROW ACCESS POLICY shared_policy_name AS (user_id STRING) RETURNS boolean -> true; + +statement error 2321 +CREATE MASKING POLICY shared_policy_name AS (val STRING) RETURNS STRING -> '***'; + +statement ok +drop row access policy if exists shared_policy_name; + +statement ok +drop masking policy if exists mask_only_name; + +statement ok +drop row access policy if exists mask_only_name; + +statement ok +CREATE MASKING POLICY mask_only_name AS (val STRING) RETURNS STRING -> '***'; + +statement error 2324 +CREATE ROW ACCESS POLICY mask_only_name AS (user_id STRING) RETURNS boolean -> true; + +statement ok +drop masking policy if exists mask_only_name; diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test index 143b401907601..1186b4ba17b2f 100644 --- a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0010_policy_references.test @@ -92,7 +92,7 @@ select * from policy_references(POLICY_NAME => 'rap_multi_case1'); rap_multi_case1 ROW ACCESS POLICY default employees1 TABLE NULL name, department ACTIVE query T -select * from policy_references(POLICY_NAME => 'mask_ssn_conditional1'); +select * from policy_references(POLICY_NAME => 'mask_ssn_conditional1') order by ref_entity_name; ---- mask_ssn_conditional1 MASKING POLICY default employees1 TABLE ssn role ACTIVE mask_ssn_conditional1 MASKING POLICY default employees2 TABLE ssn role ACTIVE