@@ -44,11 +44,15 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444 Box < dyn Fn ( & VssError ) -> bool + ' static + Send + Sync > ,
4545> ;
4646
47+ #[ derive( Debug , PartialEq ) ]
4748enum VssSchemaVersion {
4849 // The initial schema version.
4950 // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the
5051 // stored key.
5152 V0 ,
53+ // The second deployed schema version.
54+ // Here we started to obfuscate the primary and secondary namespaces and the obfuscated `store_key` (`obfuscate(primary_namespace)#obfuscate(secondary_namespace)#obfuscate(key)`) is now used as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored under.
55+ V1 ,
5256}
5357
5458// We set this to a small number of threads that would still allow to make some progress if one
@@ -321,9 +325,10 @@ impl Drop for VssStore {
321325}
322326
323327struct VssStoreInner {
328+ schema_version : VssSchemaVersion ,
324329 client : VssClient < CustomRetryPolicy > ,
325330 store_id : String ,
326- storable_builder : StorableBuilder < RandEntropySource > ,
331+ data_encryption_key : [ u8 ; 32 ] ,
327332 key_obfuscator : KeyObfuscator ,
328333 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
329334 // The lock also encapsulates the latest written version per key.
@@ -335,10 +340,10 @@ impl VssStoreInner {
335340 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
336341 header_provider : Arc < dyn VssHeaderProvider > ,
337342 ) -> Self {
343+ let schema_version = VssSchemaVersion :: V0 ;
338344 let ( data_encryption_key, obfuscation_master_key) =
339345 derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
340346 let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
341- let storable_builder = StorableBuilder :: new ( data_encryption_key, RandEntropySource ) ;
342347 let retry_policy = ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
343348 . with_max_attempts ( 10 )
344349 . with_max_total_delay ( Duration :: from_secs ( 15 ) )
@@ -354,7 +359,7 @@ impl VssStoreInner {
354359
355360 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
356361 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
357- Self { client, store_id, storable_builder , key_obfuscator, locks }
362+ Self { schema_version , client, store_id, data_encryption_key , key_obfuscator, locks }
358363 }
359364
360365 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -365,11 +370,35 @@ impl VssStoreInner {
365370 fn build_obfuscated_key (
366371 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
367372 ) -> String {
368- let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
369- if primary_namespace. is_empty ( ) {
370- obfuscated_key
373+ if self . schema_version == VssSchemaVersion :: V1 {
374+ let obfuscated_primary_namepace = self . key_obfuscator . obfuscate ( primary_namespace) ;
375+ let obfuscated_secondary_namepace = self . key_obfuscator . obfuscate ( secondary_namespace) ;
376+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
377+ format ! (
378+ "{}#{}#{}" ,
379+ obfuscated_primary_namepace, obfuscated_secondary_namepace, obfuscated_key
380+ )
381+ } else {
382+ // Default to V0 schema
383+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
384+ if primary_namespace. is_empty ( ) {
385+ obfuscated_key
386+ } else {
387+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
388+ }
389+ }
390+ }
391+
392+ fn build_obfuscated_prefix (
393+ & self , primary_namespace : & str , secondary_namespace : & str ,
394+ ) -> String {
395+ if self . schema_version == VssSchemaVersion :: V1 {
396+ let obfuscated_primary_namepace = self . key_obfuscator . obfuscate ( primary_namespace) ;
397+ let obfuscated_secondary_namepace = self . key_obfuscator . obfuscate ( secondary_namespace) ;
398+ format ! ( "{}#{}" , obfuscated_primary_namepace, obfuscated_secondary_namepace, )
371399 } else {
372- format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
400+ // Default to V0 schema
401+ format ! ( "{}#{}" , primary_namespace, secondary_namespace)
373402 }
374403 }
375404
@@ -390,7 +419,7 @@ impl VssStoreInner {
390419 ) -> io:: Result < Vec < String > > {
391420 let mut page_token = None ;
392421 let mut keys = vec ! [ ] ;
393- let key_prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
422+ let key_prefix = self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
394423 while page_token != Some ( "" . to_string ( ) ) {
395424 let request = ListKeyVersionsRequest {
396425 store_id : self . store_id . clone ( ) ,
@@ -420,9 +449,8 @@ impl VssStoreInner {
420449 ) -> io:: Result < Vec < u8 > > {
421450 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
422451
423- let obfuscated_key =
424- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
425- let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : obfuscated_key } ;
452+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
453+ let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : store_key. clone ( ) } ;
426454 let resp = self . client . get_object ( & request) . await . map_err ( |e| {
427455 let msg = format ! (
428456 "Failed to read from key {}/{}/{}: {}" ,
@@ -444,7 +472,11 @@ impl VssStoreInner {
444472 Error :: new ( ErrorKind :: Other , msg)
445473 } ) ?;
446474
447- Ok ( self . storable_builder . deconstruct ( storable) ?. 0 )
475+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
476+ let aad =
477+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
478+ let decrypted = storable_builder. deconstruct ( storable, & self . data_encryption_key , aad) ?. 0 ;
479+ Ok ( decrypted)
448480 }
449481
450482 async fn write_internal (
@@ -458,22 +490,25 @@ impl VssStoreInner {
458490 "write" ,
459491 ) ?;
460492
461- self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
462- let obfuscated_key =
463- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
464- let vss_version = -1 ;
465- let storable = self . storable_builder . build ( buf, vss_version) ;
466- let request = PutObjectRequest {
467- store_id : self . store_id . clone ( ) ,
468- global_version : None ,
469- transaction_items : vec ! [ KeyValue {
470- key: obfuscated_key,
471- version: vss_version,
472- value: storable. encode_to_vec( ) ,
473- } ] ,
474- delete_items : vec ! [ ] ,
475- } ;
493+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
494+ let vss_version = -1 ;
495+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
496+ let aad =
497+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
498+ let storable =
499+ storable_builder. build ( buf. to_vec ( ) , vss_version, & self . data_encryption_key , aad) ;
500+ let request = PutObjectRequest {
501+ store_id : self . store_id . clone ( ) ,
502+ global_version : None ,
503+ transaction_items : vec ! [ KeyValue {
504+ key: store_key,
505+ version: vss_version,
506+ value: storable. encode_to_vec( ) ,
507+ } ] ,
508+ delete_items : vec ! [ ] ,
509+ } ;
476510
511+ self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
477512 self . client . put_object ( & request) . await . map_err ( |e| {
478513 let msg = format ! (
479514 "Failed to write to key {}/{}/{}: {}" ,
0 commit comments