1- use std:: cmp:: Ordering ;
2-
3- use async_recursion:: async_recursion;
4- use async_trait:: async_trait;
5- use byteorder:: { LittleEndian , WriteBytesExt } ;
6-
71use crate :: directory:: directory_partition:: DirectoryPartition ;
82use crate :: directory:: directory_subspace:: DirectorySubspace ;
93use crate :: directory:: error:: DirectoryError ;
104use crate :: directory:: node:: Node ;
115use crate :: directory:: { compare_slice, Directory , DirectoryOutput } ;
12- use crate :: future:: { FdbKeyValue , FdbSlice , FdbValue , FdbValuesIter } ;
6+ use crate :: future:: { FdbKeyValue , FdbSlice , FdbValues } ;
137use crate :: tuple:: hca:: HighContentionAllocator ;
14- use crate :: tuple:: { Subspace , TuplePack } ;
15- use crate :: RangeOption ;
8+ use crate :: tuple:: { unpack , Element , Subspace , TuplePack } ;
9+ use crate :: { FdbError , RangeOption } ;
1610use crate :: { FdbResult , Transaction } ;
17- use futures:: prelude:: stream:: { Iter , Next } ;
18- use futures:: stream:: StreamExt ;
11+ use async_recursion:: async_recursion;
12+ use async_trait:: async_trait;
13+ use byteorder:: { LittleEndian , WriteBytesExt } ;
1914use futures:: try_join;
20- use futures:: { join, TryStreamExt } ;
21- use parking_lot:: { FairMutex , RawMutex } ;
15+ use std:: cmp:: Ordering ;
2216use std:: option:: Option :: Some ;
23- use std:: rc:: Rc ;
24- use std:: sync:: { Arc , Mutex , MutexGuard , PoisonError } ;
2517
2618pub ( crate ) const DEFAULT_SUB_DIRS : i64 = 0 ;
2719const MAJOR_VERSION : u32 = 1 ;
2820const MINOR_VERSION : u32 = 0 ;
2921const PATCH_VERSION : u32 = 0 ;
30- pub ( crate ) const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
22+ pub const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
3123const DEFAULT_HCA_PREFIX : & [ u8 ] = b"hca" ;
3224pub ( crate ) const PARTITION_LAYER : & [ u8 ] = b"partition" ;
3325
26+ /// Directories are identified by hierarchical paths analogous to the paths
27+ /// in a Unix-like file system. A path is represented as a List of strings.
28+ /// Each directory has an associated subspace used to store its content. The
29+ /// layer maps each path to a short prefix used for the corresponding
30+ /// subspace. In effect, directories provide a level of indirection for
31+ /// access to subspaces.
3432#[ derive( Debug , Clone ) ]
3533pub struct DirectoryLayer {
3634 pub root_node : Subspace ,
@@ -71,6 +69,10 @@ impl DirectoryLayer {
7169 }
7270 }
7371
72+ pub fn get_path ( & self ) -> Vec < String > {
73+ self . path . clone ( )
74+ }
75+
7476 fn node_with_optional_prefix ( & self , prefix : Option < FdbSlice > ) -> Option < Subspace > {
7577 match prefix {
7678 None => None ,
@@ -94,10 +96,13 @@ impl DirectoryLayer {
9496 // walking through the provided path
9597 for path_name in path. iter ( ) {
9698 node. current_path . push ( path_name. clone ( ) ) ;
97- let key = node
98- . subspace
99- . unwrap ( )
100- . subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
99+ let node_subspace = match node. subspace {
100+ // unreachable because on first iteration, it is set to root_node,
101+ // on other iteration, `node.exists` is checking for the subspace's value
102+ None => unreachable ! ( "node's subspace is not set" ) ,
103+ Some ( s) => s,
104+ } ;
105+ let key = node_subspace. subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
101106
102107 // finding the next node
103108 let fdb_slice_value = trx. get ( key. bytes ( ) , false ) . await ?;
@@ -141,8 +146,6 @@ impl DirectoryLayer {
141146 ) -> Result < DirectoryOutput , DirectoryError > {
142147 let prefix: Vec < u8 > = self . node_subspace . unpack ( node. bytes ( ) ) ?;
143148
144- println ! ( "prefix: {:?}" , & prefix) ;
145-
146149 if layer. eq ( PARTITION_LAYER ) {
147150 Ok ( DirectoryOutput :: DirectoryPartition (
148151 DirectoryPartition :: new ( self . to_absolute_path ( & path) , prefix, self . clone ( ) ) ,
@@ -188,8 +191,13 @@ impl DirectoryLayer {
188191 // TODO true or false?
189192 if node. is_in_partition ( false ) {
190193 let sub_path = node. get_partition_subpath ( ) ;
194+ let subspace_node = match node. subspace {
195+ // not reachable because `self.find` is creating a node with a subspace,
196+ None => unreachable ! ( "node's subspace is not set" ) ,
197+ Some ( s) => s,
198+ } ;
191199 let dir_space =
192- self . contents_of_node ( node . subspace . unwrap ( ) , node. current_path , node. layer ) ?;
200+ self . contents_of_node ( subspace_node , node. current_path , node. layer ) ?;
193201 dir_space
194202 . create_or_open ( trx, sub_path. to_owned ( ) , prefix, layer)
195203 . await ?;
@@ -223,10 +231,16 @@ impl DirectoryLayer {
223231 } ,
224232 }
225233
234+ let subspace_node = match node. to_owned ( ) . subspace {
235+ // not reachable because `self.find` is creating a node with a subspace,
236+ None => unreachable ! ( "node's subspace is not set" ) ,
237+ Some ( s) => s,
238+ } ;
239+
226240 self . contents_of_node (
227- node . subspace . as_ref ( ) . unwrap ( ) . clone ( ) ,
241+ subspace_node . clone ( ) . subspace ( & ( ) ) ,
228242 node. target_path . to_owned ( ) ,
229- layer. unwrap_or ( vec ! [ ] ) ,
243+ node . layer . to_owned ( ) ,
230244 )
231245 }
232246
@@ -247,8 +261,6 @@ impl DirectoryLayer {
247261 self . check_version ( trx, allow_create) . await ?;
248262 let new_prefix = self . get_prefix ( trx, prefix. clone ( ) ) . await ?;
249263
250- println ! ( "new_prefix: {:?}" , & new_prefix) ;
251-
252264 let is_prefix_free = self
253265 . is_prefix_free ( trx, new_prefix. to_owned ( ) , !prefix. is_some ( ) )
254266 . await ?;
@@ -258,18 +270,12 @@ impl DirectoryLayer {
258270 }
259271
260272 let parent_node = self . get_parent_node ( trx, path. to_owned ( ) ) . await ?;
261- println ! ( "parent_node: {:?}" , & parent_node) ;
262273 let node = self . node_with_prefix ( & new_prefix) ;
263- println ! ( "node: {:?}" , & node) ;
264274
265275 let key = parent_node. subspace ( & ( DEFAULT_SUB_DIRS , path. last ( ) . unwrap ( ) ) ) ;
266276
267277 trx. set ( & key. bytes ( ) , & new_prefix) ;
268- trx. set ( node. subspace ( & b"layer" . to_vec ( ) ) . bytes ( ) , & layer) ;
269- println ! (
270- "writing layer in row {:?}" ,
271- node. subspace( & b"layer" . to_vec( ) ) . bytes( )
272- ) ;
278+ trx. set ( & node. pack ( & ( String :: from ( "layer" ) ) ) , & layer) ;
273279
274280 self . contents_of_node ( node, path. to_owned ( ) , layer. to_owned ( ) )
275281 }
@@ -282,12 +288,9 @@ impl DirectoryLayer {
282288 if path. len ( ) > 1 {
283289 let ( _, list) = path. split_last ( ) . unwrap ( ) ;
284290
285- println ! ( "searching for parent" ) ;
286-
287291 let parent = self
288292 . create_or_open_internal ( trx, list. to_vec ( ) , None , None , true , true )
289293 . await ?;
290- println ! ( "found a parent: {:?}" , parent. bytes( ) ) ;
291294 Ok ( self . node_with_prefix ( & parent. bytes ( ) . to_vec ( ) ) )
292295 } else {
293296 Ok ( self . root_node . clone ( ) )
@@ -329,26 +332,36 @@ impl DirectoryLayer {
329332 if key. starts_with ( self . node_subspace . bytes ( ) ) {
330333 return Ok ( Some ( self . root_node . clone ( ) ) ) ;
331334 }
332- // FIXME: got sometimes an error where the scan include another layer...
333- // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryLayer.actor.cpp#L186-L194
334- let ( begin_range, _) = self . node_subspace . range ( ) ;
335- let mut end_range = self . node_subspace . pack ( & key) ;
336- // simulate keyAfter
337- end_range. push ( 0 ) ;
335+
336+ let mut key_after = key. to_vec ( ) ;
337+ // pushing 0x00 to simulate keyAfter
338+ key_after. push ( 0 ) ;
339+
340+ let range_end = self . node_subspace . pack ( & key_after) ;
341+
342+ let mut range_option = RangeOption :: from ( ( self . node_subspace . range ( ) . 0 , range_end) ) ;
343+ range_option. reverse = true ;
344+ range_option. limit = Some ( 1 ) ;
338345
339346 // checking range
340- let result = trx
341- . get_range ( & RangeOption :: from ( ( begin_range, end_range) ) , 1 , snapshot)
342- . await ?;
347+ let fdb_values = trx. get_range ( & range_option, 1 , snapshot) . await ?;
343348
344- if result. len ( ) > 0 {
345- let previous_prefix: ( String ) =
346- self . node_subspace . unpack ( result. get ( 0 ) . unwrap ( ) . key ( ) ) ?;
347- if key. starts_with ( previous_prefix. as_bytes ( ) ) {
348- return Ok ( Some ( self . node_with_prefix ( & ( previous_prefix) ) ) ) ;
349+ match fdb_values. get ( 0 ) {
350+ None => { }
351+ Some ( fdb_key_value) => {
352+ let previous_prefix: Vec < Element > =
353+ self . node_subspace . unpack ( fdb_key_value. key ( ) ) ?;
354+ match previous_prefix. get ( 0 ) {
355+ Some ( Element :: Bytes ( b) ) => {
356+ let previous_prefix = b. to_vec ( ) ;
357+ if key. starts_with ( & previous_prefix) {
358+ return Ok ( Some ( self . node_with_prefix ( & previous_prefix) ) ) ;
359+ } ;
360+ }
361+ _ => { }
362+ } ;
349363 }
350364 }
351-
352365 Ok ( None )
353366 }
354367
@@ -390,7 +403,7 @@ impl DirectoryLayer {
390403 if allow_creation {
391404 self . initialize_directory ( trx) . await
392405 } else {
393- Err ( DirectoryError :: MissingDirectory )
406+ return Ok ( ( ) ) ;
394407 }
395408 }
396409 Some ( versions) => {
@@ -440,6 +453,7 @@ impl DirectoryLayer {
440453 async fn get_version_value ( & self , trx : & Transaction ) -> FdbResult < Option < FdbSlice > > {
441454 let version_subspace: & [ u8 ] = b"version" ;
442455 let version_key = self . root_node . subspace ( & version_subspace) ;
456+
443457 trx. get ( version_key. bytes ( ) , false ) . await
444458 }
445459
@@ -450,19 +464,23 @@ impl DirectoryLayer {
450464 ) -> Result < bool , DirectoryError > {
451465 self . check_version ( trx, false ) . await ?;
452466
453- if path. is_empty ( ) {
454- return Err ( DirectoryError :: NoPathProvided ) ;
455- }
456-
457467 let node = self . find ( trx, path. to_owned ( ) ) . await ?;
458468
469+ dbg ! ( & node) ;
470+
459471 if !node. exists ( ) {
460472 return Ok ( false ) ;
461473 }
462474
463475 if node. is_in_partition ( false ) {
476+ let subspace_node = match node. subspace {
477+ // not reachable because `self.find` is creating a node with a subspace,
478+ None => unreachable ! ( "node's subspace is not set" ) ,
479+ Some ( ref s) => s. clone ( ) ,
480+ } ;
481+
464482 let directory_partition = self . contents_of_node (
465- node . clone ( ) . subspace . unwrap ( ) ,
483+ subspace_node ,
466484 node. current_path . to_owned ( ) ,
467485 node. layer . to_owned ( ) ,
468486 ) ?;
@@ -485,9 +503,15 @@ impl DirectoryLayer {
485503 if !node. exists ( ) {
486504 return Err ( DirectoryError :: PathDoesNotExists ) ;
487505 }
488- if node. is_in_partition ( true ) {
506+ if node. is_in_partition ( false ) {
507+ let subspace_node = match node. subspace {
508+ // not reachable because `self.find` is creating a node with a subspace.
509+ None => unreachable ! ( "node's subspace is not set" ) ,
510+ Some ( ref s) => s. clone ( ) ,
511+ } ;
512+
489513 let directory_partition = self . contents_of_node (
490- node . clone ( ) . subspace . unwrap ( ) ,
514+ subspace_node ,
491515 node. current_path . to_owned ( ) ,
492516 node. layer . to_owned ( ) ,
493517 ) ?;
@@ -528,8 +552,14 @@ impl DirectoryLayer {
528552 return Err ( DirectoryError :: CannotMoveBetweenPartition ) ;
529553 }
530554
555+ let subspace_new_node = match new_node. subspace {
556+ // not reachable because `self.find` is creating a node with a subspace,
557+ None => unreachable ! ( "node's subspace is not set" ) ,
558+ Some ( ref s) => s. clone ( ) ,
559+ } ;
560+
531561 let directory_partition = self . contents_of_node (
532- new_node . clone ( ) . subspace . unwrap ( ) ,
562+ subspace_new_node ,
533563 new_node. current_path . to_owned ( ) ,
534564 new_node. layer . to_owned ( ) ,
535565 ) ?;
@@ -557,10 +587,14 @@ impl DirectoryLayer {
557587 return Err ( DirectoryError :: ParentDirDoesNotExists ) ;
558588 }
559589
560- let key = parent_node
561- . subspace
562- . unwrap ( )
563- . subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
590+ let subspace_parent_node = match parent_node. subspace {
591+ // not reachable because `self.find` is creating a node with a subspace,
592+ None => unreachable ! ( "node's subspace is not set" ) ,
593+ Some ( ref s) => s. clone ( ) ,
594+ } ;
595+
596+ let key =
597+ subspace_parent_node. subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
564598 let value: Vec < u8 > = self
565599 . node_subspace
566600 . unpack ( old_node. subspace . clone ( ) . unwrap ( ) . bytes ( ) ) ?;
@@ -589,8 +623,8 @@ impl DirectoryLayer {
589623 match parent_node. subspace {
590624 None => { }
591625 Some ( subspace) => {
592- let key = subspace. subspace ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
593- trx. clear ( & key. bytes ( ) ) ;
626+ let key = subspace. pack ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
627+ trx. clear ( & key) ;
594628 }
595629 }
596630
@@ -625,10 +659,9 @@ impl DirectoryLayer {
625659 . await ;
626660 }
627661
628- try_join ! (
629- self . remove_recursive( trx, node. subspace. unwrap( ) . clone( ) ) ,
630- self . remove_from_parent( trx, path. to_owned( ) )
631- ) ;
662+ self . remove_recursive ( trx, node. subspace . unwrap ( ) . clone ( ) )
663+ . await ?;
664+ self . remove_from_parent ( trx, path. to_owned ( ) ) . await ?;
632665
633666 Ok ( true )
634667 }
@@ -645,18 +678,14 @@ impl DirectoryLayer {
645678 loop {
646679 let range_option = RangeOption :: from ( ( begin. as_slice ( ) , end. as_slice ( ) ) ) ;
647680
648- let range = trx. get_range ( & range_option, 1 , false ) . await ?;
681+ let range = trx. get_range ( & range_option, 1024 , false ) . await ?;
649682 let has_more = range. more ( ) ;
650- let range: Arc < FairMutex < FdbValuesIter > > = Arc :: new ( FairMutex :: new ( range. into_iter ( ) ) ) ;
651-
652- loop {
653- let value_row = match range. lock ( ) . next ( ) {
654- None => break ,
655- Some ( next_key_value) => next_key_value. value ( ) . to_vec ( ) ,
656- } ;
657683
658- let sub_node = self . node_with_prefix ( & value_row) ;
684+ for row_key in range {
685+ let sub_node = self . node_with_prefix ( & row_key. value ( ) ) ;
659686 self . remove_recursive ( trx, sub_node) . await ?;
687+ begin = row_key. key ( ) . pack_to_vec ( ) ;
688+ begin. push ( 0 ) ;
660689 }
661690
662691 if !has_more {
@@ -667,7 +696,7 @@ impl DirectoryLayer {
667696 let mut node_prefix: Vec < u8 > = self . node_subspace . unpack ( node_sub. bytes ( ) ) ?;
668697
669698 // equivalent of strinc?
670- node_prefix. remove ( node_prefix. len ( ) ) ;
699+ node_prefix. remove ( node_prefix. len ( ) - 1 ) ;
671700
672701 trx. clear_range ( node_prefix. as_slice ( ) , node_prefix. as_slice ( ) ) ;
673702 trx. clear_subspace_range ( & node_sub) ;
0 commit comments