1- use std:: cmp:: Ordering ;
2-
3- use async_recursion:: async_recursion;
4- use async_trait:: async_trait;
5- use byteorder:: { LittleEndian , WriteBytesExt } ;
6-
71use crate :: directory:: directory_partition:: DirectoryPartition ;
82use crate :: directory:: directory_subspace:: DirectorySubspace ;
93use crate :: directory:: error:: DirectoryError ;
104use crate :: directory:: node:: Node ;
115use crate :: directory:: { compare_slice, Directory , DirectoryOutput } ;
12- use crate :: future:: { FdbKeyValue , FdbSlice , FdbValue , FdbValuesIter } ;
6+ use crate :: future:: FdbSlice ;
137use crate :: tuple:: hca:: HighContentionAllocator ;
14- use crate :: tuple:: { Subspace , TuplePack } ;
8+ use crate :: tuple:: { unpack , Element , Subspace , TuplePack } ;
159use crate :: RangeOption ;
1610use crate :: { FdbResult , Transaction } ;
17- use futures:: prelude:: stream:: { Iter , Next } ;
18- use futures:: stream:: StreamExt ;
11+ use async_recursion:: async_recursion;
12+ use async_trait:: async_trait;
13+ use byteorder:: { LittleEndian , WriteBytesExt } ;
1914use futures:: try_join;
20- use futures:: { join, TryStreamExt } ;
21- use parking_lot:: { FairMutex , RawMutex } ;
15+ use std:: cmp:: Ordering ;
2216use std:: option:: Option :: Some ;
23- use std:: rc:: Rc ;
24- use std:: sync:: { Arc , Mutex , MutexGuard , PoisonError } ;
2517
2618pub ( crate ) const DEFAULT_SUB_DIRS : i64 = 0 ;
2719const MAJOR_VERSION : u32 = 1 ;
2820const MINOR_VERSION : u32 = 0 ;
2921const PATCH_VERSION : u32 = 0 ;
30- pub ( crate ) const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
22+ pub const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
3123const DEFAULT_HCA_PREFIX : & [ u8 ] = b"hca" ;
3224pub ( crate ) const PARTITION_LAYER : & [ u8 ] = b"partition" ;
3325
26+ /// Directories are identified by hierarchical paths analogous to the paths
27+ /// in a Unix-like file system. A path is represented as a List of strings.
28+ /// Each directory has an associated subspace used to store its content. The
29+ /// layer maps each path to a short prefix used for the corresponding
30+ /// subspace. In effect, directories provide a level of indirection for
31+ /// access to subspaces.
3432#[ derive( Debug , Clone ) ]
3533pub struct DirectoryLayer {
3634 pub root_node : Subspace ,
@@ -71,6 +69,10 @@ impl DirectoryLayer {
7169 }
7270 }
7371
72+ pub fn get_path ( & self ) -> Vec < String > {
73+ self . path . clone ( )
74+ }
75+
7476 fn node_with_optional_prefix ( & self , prefix : Option < FdbSlice > ) -> Option < Subspace > {
7577 match prefix {
7678 None => None ,
@@ -94,10 +96,13 @@ impl DirectoryLayer {
9496 // walking through the provided path
9597 for path_name in path. iter ( ) {
9698 node. current_path . push ( path_name. clone ( ) ) ;
97- let key = node
98- . subspace
99- . unwrap ( )
100- . subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
99+ let node_subspace = match node. subspace {
100+ // unreachable because on first iteration, it is set to root_node,
101+ // on other iteration, `node.exists` is checking for the subspace's value
102+ None => unreachable ! ( "node's subspace is not set" ) ,
103+ Some ( s) => s,
104+ } ;
105+ let key = node_subspace. subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
101106
102107 // finding the next node
103108 let fdb_slice_value = trx. get ( key. bytes ( ) , false ) . await ?;
@@ -141,8 +146,6 @@ impl DirectoryLayer {
141146 ) -> Result < DirectoryOutput , DirectoryError > {
142147 let prefix: Vec < u8 > = self . node_subspace . unpack ( node. bytes ( ) ) ?;
143148
144- println ! ( "prefix: {:?}" , & prefix) ;
145-
146149 if layer. eq ( PARTITION_LAYER ) {
147150 Ok ( DirectoryOutput :: DirectoryPartition (
148151 DirectoryPartition :: new ( self . to_absolute_path ( & path) , prefix, self . clone ( ) ) ,
@@ -188,8 +191,13 @@ impl DirectoryLayer {
188191 // TODO true or false?
189192 if node. is_in_partition ( false ) {
190193 let sub_path = node. get_partition_subpath ( ) ;
194+ let subspace_node = match node. subspace {
195+ // not reachable because `self.find` is creating a node with a subspace,
196+ None => unreachable ! ( "node's subspace is not set" ) ,
197+ Some ( s) => s,
198+ } ;
191199 let dir_space =
192- self . contents_of_node ( node . subspace . unwrap ( ) , node. current_path , node. layer ) ?;
200+ self . contents_of_node ( subspace_node , node. current_path , node. layer ) ?;
193201 dir_space
194202 . create_or_open ( trx, sub_path. to_owned ( ) , prefix, layer)
195203 . await ?;
@@ -223,10 +231,16 @@ impl DirectoryLayer {
223231 } ,
224232 }
225233
234+ let subspace_node = match node. to_owned ( ) . subspace {
235+ // not reachable because `self.find` is creating a node with a subspace,
236+ None => unreachable ! ( "node's subspace is not set" ) ,
237+ Some ( s) => s,
238+ } ;
239+
226240 self . contents_of_node (
227- node . subspace . as_ref ( ) . unwrap ( ) . clone ( ) ,
241+ subspace_node . clone ( ) . subspace ( & ( ) ) ,
228242 node. target_path . to_owned ( ) ,
229- layer. unwrap_or ( vec ! [ ] ) ,
243+ node . layer . to_owned ( ) ,
230244 )
231245 }
232246
@@ -247,8 +261,6 @@ impl DirectoryLayer {
247261 self . check_version ( trx, allow_create) . await ?;
248262 let new_prefix = self . get_prefix ( trx, prefix. clone ( ) ) . await ?;
249263
250- println ! ( "new_prefix: {:?}" , & new_prefix) ;
251-
252264 let is_prefix_free = self
253265 . is_prefix_free ( trx, new_prefix. to_owned ( ) , !prefix. is_some ( ) )
254266 . await ?;
@@ -258,18 +270,12 @@ impl DirectoryLayer {
258270 }
259271
260272 let parent_node = self . get_parent_node ( trx, path. to_owned ( ) ) . await ?;
261- println ! ( "parent_node: {:?}" , & parent_node) ;
262273 let node = self . node_with_prefix ( & new_prefix) ;
263- println ! ( "node: {:?}" , & node) ;
264274
265275 let key = parent_node. subspace ( & ( DEFAULT_SUB_DIRS , path. last ( ) . unwrap ( ) ) ) ;
266276
267277 trx. set ( & key. bytes ( ) , & new_prefix) ;
268- trx. set ( node. subspace ( & b"layer" . to_vec ( ) ) . bytes ( ) , & layer) ;
269- println ! (
270- "writing layer in row {:?}" ,
271- node. subspace( & b"layer" . to_vec( ) ) . bytes( )
272- ) ;
278+ trx. set ( & node. pack ( & ( String :: from ( "layer" ) ) ) , & layer) ;
273279
274280 self . contents_of_node ( node, path. to_owned ( ) , layer. to_owned ( ) )
275281 }
@@ -282,12 +288,9 @@ impl DirectoryLayer {
282288 if path. len ( ) > 1 {
283289 let ( _, list) = path. split_last ( ) . unwrap ( ) ;
284290
285- println ! ( "searching for parent" ) ;
286-
287291 let parent = self
288292 . create_or_open_internal ( trx, list. to_vec ( ) , None , None , true , true )
289293 . await ?;
290- println ! ( "found a parent: {:?}" , parent. bytes( ) ) ;
291294 Ok ( self . node_with_prefix ( & parent. bytes ( ) . to_vec ( ) ) )
292295 } else {
293296 Ok ( self . root_node . clone ( ) )
@@ -329,27 +332,21 @@ impl DirectoryLayer {
329332 if key. starts_with ( self . node_subspace . bytes ( ) ) {
330333 return Ok ( Some ( self . root_node . clone ( ) ) ) ;
331334 }
332- // FIXME: got sometimes an error where the scan include another layer...
333- // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryLayer.actor.cpp#L186-L194
334- let ( begin_range, _) = self . node_subspace . range ( ) ;
335- let mut end_range = self . node_subspace . pack ( & key) ;
336- // simulate keyAfter
337- end_range. push ( 0 ) ;
338335
339- // checking range
340- let result = trx
341- . get_range ( & RangeOption :: from ( ( begin_range, end_range) ) , 1 , snapshot)
342- . await ?;
336+ let key = self . node_subspace . pack ( & key) ;
343337
344- if result. len ( ) > 0 {
345- let previous_prefix: ( String ) =
346- self . node_subspace . unpack ( result. get ( 0 ) . unwrap ( ) . key ( ) ) ?;
347- if key. starts_with ( previous_prefix. as_bytes ( ) ) {
348- return Ok ( Some ( self . node_with_prefix ( & ( previous_prefix) ) ) ) ;
338+ // checking range
339+ match trx. get ( & key, snapshot) . await ? {
340+ None => Ok ( None ) ,
341+ Some ( _) => {
342+ let previous_prefix: Vec < u8 > = self . node_subspace . unpack ( key. as_slice ( ) ) ?;
343+ if key. starts_with ( & previous_prefix) {
344+ Ok ( Some ( self . node_with_prefix ( & previous_prefix) ) )
345+ } else {
346+ Ok ( None )
347+ }
349348 }
350349 }
351-
352- Ok ( None )
353350 }
354351
355352 async fn get_prefix (
@@ -390,7 +387,7 @@ impl DirectoryLayer {
390387 if allow_creation {
391388 self . initialize_directory ( trx) . await
392389 } else {
393- Err ( DirectoryError :: MissingDirectory )
390+ return Ok ( ( ) ) ;
394391 }
395392 }
396393 Some ( versions) => {
@@ -440,6 +437,7 @@ impl DirectoryLayer {
440437 async fn get_version_value ( & self , trx : & Transaction ) -> FdbResult < Option < FdbSlice > > {
441438 let version_subspace: & [ u8 ] = b"version" ;
442439 let version_key = self . root_node . subspace ( & version_subspace) ;
440+
443441 trx. get ( version_key. bytes ( ) , false ) . await
444442 }
445443
@@ -450,19 +448,21 @@ impl DirectoryLayer {
450448 ) -> Result < bool , DirectoryError > {
451449 self . check_version ( trx, false ) . await ?;
452450
453- if path. is_empty ( ) {
454- return Err ( DirectoryError :: NoPathProvided ) ;
455- }
456-
457451 let node = self . find ( trx, path. to_owned ( ) ) . await ?;
458452
459453 if !node. exists ( ) {
460454 return Ok ( false ) ;
461455 }
462456
463457 if node. is_in_partition ( false ) {
458+ let subspace_node = match node. subspace {
459+ // not reachable because `self.find` is creating a node with a subspace,
460+ None => unreachable ! ( "node's subspace is not set" ) ,
461+ Some ( ref s) => s. clone ( ) ,
462+ } ;
463+
464464 let directory_partition = self . contents_of_node (
465- node . clone ( ) . subspace . unwrap ( ) ,
465+ subspace_node ,
466466 node. current_path . to_owned ( ) ,
467467 node. layer . to_owned ( ) ,
468468 ) ?;
@@ -485,9 +485,15 @@ impl DirectoryLayer {
485485 if !node. exists ( ) {
486486 return Err ( DirectoryError :: PathDoesNotExists ) ;
487487 }
488- if node. is_in_partition ( true ) {
488+ if node. is_in_partition ( false ) {
489+ let subspace_node = match node. subspace {
490+ // not reachable because `self.find` is creating a node with a subspace.
491+ None => unreachable ! ( "node's subspace is not set" ) ,
492+ Some ( ref s) => s. clone ( ) ,
493+ } ;
494+
489495 let directory_partition = self . contents_of_node (
490- node . clone ( ) . subspace . unwrap ( ) ,
496+ subspace_node ,
491497 node. current_path . to_owned ( ) ,
492498 node. layer . to_owned ( ) ,
493499 ) ?;
@@ -528,8 +534,14 @@ impl DirectoryLayer {
528534 return Err ( DirectoryError :: CannotMoveBetweenPartition ) ;
529535 }
530536
537+ let subspace_new_node = match new_node. subspace {
538+ // not reachable because `self.find` is creating a node with a subspace,
539+ None => unreachable ! ( "node's subspace is not set" ) ,
540+ Some ( ref s) => s. clone ( ) ,
541+ } ;
542+
531543 let directory_partition = self . contents_of_node (
532- new_node . clone ( ) . subspace . unwrap ( ) ,
544+ subspace_new_node ,
533545 new_node. current_path . to_owned ( ) ,
534546 new_node. layer . to_owned ( ) ,
535547 ) ?;
@@ -557,10 +569,14 @@ impl DirectoryLayer {
557569 return Err ( DirectoryError :: ParentDirDoesNotExists ) ;
558570 }
559571
560- let key = parent_node
561- . subspace
562- . unwrap ( )
563- . subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
572+ let subspace_parent_node = match parent_node. subspace {
573+ // not reachable because `self.find` is creating a node with a subspace,
574+ None => unreachable ! ( "node's subspace is not set" ) ,
575+ Some ( ref s) => s. clone ( ) ,
576+ } ;
577+
578+ let key =
579+ subspace_parent_node. subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
564580 let value: Vec < u8 > = self
565581 . node_subspace
566582 . unpack ( old_node. subspace . clone ( ) . unwrap ( ) . bytes ( ) ) ?;
@@ -589,8 +605,8 @@ impl DirectoryLayer {
589605 match parent_node. subspace {
590606 None => { }
591607 Some ( subspace) => {
592- let key = subspace. subspace ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
593- trx. clear ( & key. bytes ( ) ) ;
608+ let key = subspace. pack ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
609+ trx. clear ( & key) ;
594610 }
595611 }
596612
@@ -625,10 +641,9 @@ impl DirectoryLayer {
625641 . await ;
626642 }
627643
628- try_join ! (
629- self . remove_recursive( trx, node. subspace. unwrap( ) . clone( ) ) ,
630- self . remove_from_parent( trx, path. to_owned( ) )
631- ) ;
644+ self . remove_recursive ( trx, node. subspace . unwrap ( ) . clone ( ) )
645+ . await ?;
646+ self . remove_from_parent ( trx, path. to_owned ( ) ) . await ?;
632647
633648 Ok ( true )
634649 }
@@ -645,18 +660,14 @@ impl DirectoryLayer {
645660 loop {
646661 let range_option = RangeOption :: from ( ( begin. as_slice ( ) , end. as_slice ( ) ) ) ;
647662
648- let range = trx. get_range ( & range_option, 1 , false ) . await ?;
663+ let range = trx. get_range ( & range_option, 1024 , false ) . await ?;
649664 let has_more = range. more ( ) ;
650- let range: Arc < FairMutex < FdbValuesIter > > = Arc :: new ( FairMutex :: new ( range. into_iter ( ) ) ) ;
651-
652- loop {
653- let value_row = match range. lock ( ) . next ( ) {
654- None => break ,
655- Some ( next_key_value) => next_key_value. value ( ) . to_vec ( ) ,
656- } ;
657665
658- let sub_node = self . node_with_prefix ( & value_row) ;
666+ for row_key in range {
667+ let sub_node = self . node_with_prefix ( & row_key. value ( ) ) ;
659668 self . remove_recursive ( trx, sub_node) . await ?;
669+ begin = row_key. key ( ) . pack_to_vec ( ) ;
670+ begin. push ( 0 ) ;
660671 }
661672
662673 if !has_more {
@@ -667,7 +678,7 @@ impl DirectoryLayer {
667678 let mut node_prefix: Vec < u8 > = self . node_subspace . unpack ( node_sub. bytes ( ) ) ?;
668679
669680 // equivalent of strinc?
670- node_prefix. remove ( node_prefix. len ( ) ) ;
681+ node_prefix. remove ( node_prefix. len ( ) - 1 ) ;
671682
672683 trx. clear_range ( node_prefix. as_slice ( ) , node_prefix. as_slice ( ) ) ;
673684 trx. clear_subspace_range ( & node_sub) ;
0 commit comments