1313# limitations under the License.
1414
1515import base64
16+ import logging
1617import time
1718from typing import Optional , Tuple , Any
1819
3132from confluent_kafka .schema_registry .serde import RuleContext , \
3233 FieldRuleExecutor , FieldTransform , RuleError , FieldContext , FieldType
3334
35+
36+ log = logging .getLogger (__name__ )
37+
38+
3439aead .register ()
3540daead .register ()
3641
@@ -273,19 +278,13 @@ def _get_or_create_dek(self, ctx: RuleContext, version: Optional[int]) -> Dek:
273278 raw_dek = self ._cryptor .generate_key ()
274279 encrypted_dek = primitive .encrypt (raw_dek , self ._cryptor .EMPTY_AAD )
275280 new_version = dek .version + 1 if is_expired else 1
276- new_dek_id = DekId (
277- self ._kek_name ,
278- ctx .subject ,
279- new_version ,
280- self ._cryptor .dek_format ,
281- is_read
282- )
283- dek = self ._store_dek_to_registry (new_dek_id , encrypted_dek )
284- if dek is None :
285- # handle conflicts (409)
286- dek = self ._retrieve_dek_from_registry (dek_id )
287- if dek is None :
288- raise RuleError (f"no dek found for { self ._kek_name } during produce" )
281+ try :
282+ dek = self ._create_dek (dek_id , new_version , encrypted_dek )
283+ except RuleError as e :
284+ if dek is None :
285+ raise e
286+ log .warning ("failed to create dek for %s, subject %s, version %d, using existing dek" ,
287+ kek .name , ctx .subject , new_version )
289288 key_bytes = dek .get_key_material_bytes ()
290289 if key_bytes is None :
291290 if primitive is None :
@@ -295,6 +294,22 @@ def _get_or_create_dek(self, ctx: RuleContext, version: Optional[int]) -> Dek:
295294 dek .set_key_material (raw_dek )
296295 return dek
297296
297+ def _create_dek (self , dek_id : DekId , new_version : Optional [int ], encrypted_dek : Optional [bytes ]) -> Dek :
298+ new_dek_id = DekId (
299+ dek_id .kek_name ,
300+ dek_id .subject ,
301+ new_version ,
302+ dek_id .algorithm ,
303+ dek_id .deleted ,
304+ )
305+ dek = self ._store_dek_to_registry (new_dek_id , encrypted_dek )
306+ if dek is None :
307+ # handle conflicts (409)
308+ dek = self ._retrieve_dek_from_registry (dek_id )
309+ if dek is None :
310+ raise RuleError (f"no dek found for { dek_id .kek_name } during produce" )
311+ return dek
312+
298313 def _retrieve_dek_from_registry (self , key : DekId ) -> Optional [Dek ]:
299314 try :
300315 version = key .version
0 commit comments