1- use std:: cmp:: Ordering ;
2-
3- use async_recursion:: async_recursion;
4- use async_trait:: async_trait;
5- use byteorder:: { LittleEndian , WriteBytesExt } ;
6-
71use crate :: directory:: directory_partition:: DirectoryPartition ;
82use crate :: directory:: directory_subspace:: DirectorySubspace ;
93use crate :: directory:: error:: DirectoryError ;
104use crate :: directory:: node:: Node ;
115use crate :: directory:: { compare_slice, Directory , DirectoryOutput } ;
12- use crate :: future:: { FdbKeyValue , FdbSlice , FdbValue , FdbValuesIter } ;
6+ use crate :: future:: FdbSlice ;
137use crate :: tuple:: hca:: HighContentionAllocator ;
148use crate :: tuple:: { Subspace , TuplePack } ;
159use crate :: RangeOption ;
1610use crate :: { FdbResult , Transaction } ;
17- use futures:: prelude:: stream:: { Iter , Next } ;
18- use futures:: stream:: StreamExt ;
11+ use async_recursion:: async_recursion;
12+ use async_trait:: async_trait;
13+ use byteorder:: { LittleEndian , WriteBytesExt } ;
1914use futures:: try_join;
20- use futures:: { join, TryStreamExt } ;
21- use parking_lot:: { FairMutex , RawMutex } ;
15+ use std:: cmp:: Ordering ;
2216use std:: option:: Option :: Some ;
23- use std:: rc:: Rc ;
24- use std:: sync:: { Arc , Mutex , MutexGuard , PoisonError } ;
2517
2618pub ( crate ) const DEFAULT_SUB_DIRS : i64 = 0 ;
2719const MAJOR_VERSION : u32 = 1 ;
2820const MINOR_VERSION : u32 = 0 ;
2921const PATCH_VERSION : u32 = 0 ;
30- pub ( crate ) const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
22+ pub const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
3123const DEFAULT_HCA_PREFIX : & [ u8 ] = b"hca" ;
3224pub ( crate ) const PARTITION_LAYER : & [ u8 ] = b"partition" ;
3325
26+ /// Directories are identified by hierarchical paths analogous to the paths
27+ /// in a Unix-like file system. A path is represented as a List of strings.
28+ /// Each directory has an associated subspace used to store its content. The
29+ /// layer maps each path to a short prefix used for the corresponding
30+ /// subspace. In effect, directories provide a level of indirection for
31+ /// access to subspaces.
3432#[ derive( Debug , Clone ) ]
3533pub struct DirectoryLayer {
3634 pub root_node : Subspace ,
@@ -71,6 +69,10 @@ impl DirectoryLayer {
7169 }
7270 }
7371
72+ pub fn get_path ( & self ) -> Vec < String > {
73+ self . path . clone ( )
74+ }
75+
7476 fn node_with_optional_prefix ( & self , prefix : Option < FdbSlice > ) -> Option < Subspace > {
7577 match prefix {
7678 None => None ,
@@ -94,10 +96,13 @@ impl DirectoryLayer {
9496 // walking through the provided path
9597 for path_name in path. iter ( ) {
9698 node. current_path . push ( path_name. clone ( ) ) ;
97- let key = node
98- . subspace
99- . unwrap ( )
100- . subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
99+ let node_subspace = match node. subspace {
100+ // unreachable because on first iteration, it is set to root_node,
101+ // on other iteration, `node.exists` is checking for the subspace's value
102+ None => unreachable ! ( "node's subspace is not set" ) ,
103+ Some ( s) => s,
104+ } ;
105+ let key = node_subspace. subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
101106
102107 // finding the next node
103108 let fdb_slice_value = trx. get ( key. bytes ( ) , false ) . await ?;
@@ -141,8 +146,6 @@ impl DirectoryLayer {
141146 ) -> Result < DirectoryOutput , DirectoryError > {
142147 let prefix: Vec < u8 > = self . node_subspace . unpack ( node. bytes ( ) ) ?;
143148
144- println ! ( "prefix: {:?}" , & prefix) ;
145-
146149 if layer. eq ( PARTITION_LAYER ) {
147150 Ok ( DirectoryOutput :: DirectoryPartition (
148151 DirectoryPartition :: new ( self . to_absolute_path ( & path) , prefix, self . clone ( ) ) ,
@@ -188,8 +191,13 @@ impl DirectoryLayer {
188191 // TODO true or false?
189192 if node. is_in_partition ( false ) {
190193 let sub_path = node. get_partition_subpath ( ) ;
194+ let subspace_node = match node. subspace {
195+ // not reachable because `self.find` is creating a node with a subspace,
196+ None => unreachable ! ( "node's subspace is not set" ) ,
197+ Some ( s) => s,
198+ } ;
191199 let dir_space =
192- self . contents_of_node ( node . subspace . unwrap ( ) , node. current_path , node. layer ) ?;
200+ self . contents_of_node ( subspace_node , node. current_path , node. layer ) ?;
193201 dir_space
194202 . create_or_open ( trx, sub_path. to_owned ( ) , prefix, layer)
195203 . await ?;
@@ -223,8 +231,14 @@ impl DirectoryLayer {
223231 } ,
224232 }
225233
234+ let subspace_node = match node. to_owned ( ) . subspace {
235+ // not reachable because `self.find` is creating a node with a subspace,
236+ None => unreachable ! ( "node's subspace is not set" ) ,
237+ Some ( s) => s,
238+ } ;
239+
226240 self . contents_of_node (
227- node . subspace . as_ref ( ) . unwrap ( ) . clone ( ) ,
241+ subspace_node . clone ( ) ,
228242 node. target_path . to_owned ( ) ,
229243 layer. unwrap_or ( vec ! [ ] ) ,
230244 )
@@ -241,14 +255,11 @@ impl DirectoryLayer {
241255 if !allow_create {
242256 return Err ( DirectoryError :: DirectoryDoesNotExists ) ;
243257 }
244-
245258 let layer = layer. unwrap_or ( vec ! [ ] ) ;
246259
247260 self . check_version ( trx, allow_create) . await ?;
248261 let new_prefix = self . get_prefix ( trx, prefix. clone ( ) ) . await ?;
249262
250- println ! ( "new_prefix: {:?}" , & new_prefix) ;
251-
252263 let is_prefix_free = self
253264 . is_prefix_free ( trx, new_prefix. to_owned ( ) , !prefix. is_some ( ) )
254265 . await ?;
@@ -258,18 +269,14 @@ impl DirectoryLayer {
258269 }
259270
260271 let parent_node = self . get_parent_node ( trx, path. to_owned ( ) ) . await ?;
261- println ! ( "parent_node: {:?}" , & parent_node) ;
262272 let node = self . node_with_prefix ( & new_prefix) ;
263- println ! ( "node: {:?}" , & node) ;
264273
265274 let key = parent_node. subspace ( & ( DEFAULT_SUB_DIRS , path. last ( ) . unwrap ( ) ) ) ;
266275
267276 trx. set ( & key. bytes ( ) , & new_prefix) ;
268277 trx. set ( node. subspace ( & b"layer" . to_vec ( ) ) . bytes ( ) , & layer) ;
269- println ! (
270- "writing layer in row {:?}" ,
271- node. subspace( & b"layer" . to_vec( ) ) . bytes( )
272- ) ;
278+
279+ println ! ( "@@@ created directory on path {:?}" , & path) ;
273280
274281 self . contents_of_node ( node, path. to_owned ( ) , layer. to_owned ( ) )
275282 }
@@ -282,12 +289,9 @@ impl DirectoryLayer {
282289 if path. len ( ) > 1 {
283290 let ( _, list) = path. split_last ( ) . unwrap ( ) ;
284291
285- println ! ( "searching for parent" ) ;
286-
287292 let parent = self
288293 . create_or_open_internal ( trx, list. to_vec ( ) , None , None , true , true )
289294 . await ?;
290- println ! ( "found a parent: {:?}" , parent. bytes( ) ) ;
291295 Ok ( self . node_with_prefix ( & parent. bytes ( ) . to_vec ( ) ) )
292296 } else {
293297 Ok ( self . root_node . clone ( ) )
@@ -329,27 +333,21 @@ impl DirectoryLayer {
329333 if key. starts_with ( self . node_subspace . bytes ( ) ) {
330334 return Ok ( Some ( self . root_node . clone ( ) ) ) ;
331335 }
332- // FIXME: got sometimes an error where the scan include another layer...
333- // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryLayer.actor.cpp#L186-L194
334- let ( begin_range, _) = self . node_subspace . range ( ) ;
335- let mut end_range = self . node_subspace . pack ( & key) ;
336- // simulate keyAfter
337- end_range. push ( 0 ) ;
338336
339- // checking range
340- let result = trx
341- . get_range ( & RangeOption :: from ( ( begin_range, end_range) ) , 1 , snapshot)
342- . await ?;
337+ let key = self . node_subspace . pack ( & key) ;
343338
344- if result. len ( ) > 0 {
345- let previous_prefix: ( String ) =
346- self . node_subspace . unpack ( result. get ( 0 ) . unwrap ( ) . key ( ) ) ?;
347- if key. starts_with ( previous_prefix. as_bytes ( ) ) {
348- return Ok ( Some ( self . node_with_prefix ( & ( previous_prefix) ) ) ) ;
339+ // checking range
340+ match trx. get ( & key, snapshot) . await ? {
341+ None => Ok ( None ) ,
342+ Some ( _) => {
343+ let previous_prefix: Vec < u8 > = self . node_subspace . unpack ( key. as_slice ( ) ) ?;
344+ if key. starts_with ( & previous_prefix) {
345+ Ok ( Some ( self . node_with_prefix ( & previous_prefix) ) )
346+ } else {
347+ Ok ( None )
348+ }
349349 }
350350 }
351-
352- Ok ( None )
353351 }
354352
355353 async fn get_prefix (
@@ -450,19 +448,21 @@ impl DirectoryLayer {
450448 ) -> Result < bool , DirectoryError > {
451449 self . check_version ( trx, false ) . await ?;
452450
453- if path. is_empty ( ) {
454- return Err ( DirectoryError :: NoPathProvided ) ;
455- }
456-
457451 let node = self . find ( trx, path. to_owned ( ) ) . await ?;
458452
459453 if !node. exists ( ) {
460454 return Ok ( false ) ;
461455 }
462456
463457 if node. is_in_partition ( false ) {
458+ let subspace_node = match node. subspace {
459+ // not reachable because `self.find` is creating a node with a subspace,
460+ None => unreachable ! ( "node's subspace is not set" ) ,
461+ Some ( ref s) => s. clone ( ) ,
462+ } ;
463+
464464 let directory_partition = self . contents_of_node (
465- node . clone ( ) . subspace . unwrap ( ) ,
465+ subspace_node ,
466466 node. current_path . to_owned ( ) ,
467467 node. layer . to_owned ( ) ,
468468 ) ?;
@@ -486,8 +486,14 @@ impl DirectoryLayer {
486486 return Err ( DirectoryError :: PathDoesNotExists ) ;
487487 }
488488 if node. is_in_partition ( true ) {
489+ let subspace_node = match node. subspace {
490+ // not reachable because `self.find` is creating a node with a subspace.
491+ None => unreachable ! ( "node's subspace is not set" ) ,
492+ Some ( ref s) => s. clone ( ) ,
493+ } ;
494+
489495 let directory_partition = self . contents_of_node (
490- node . clone ( ) . subspace . unwrap ( ) ,
496+ subspace_node ,
491497 node. current_path . to_owned ( ) ,
492498 node. layer . to_owned ( ) ,
493499 ) ?;
@@ -528,8 +534,14 @@ impl DirectoryLayer {
528534 return Err ( DirectoryError :: CannotMoveBetweenPartition ) ;
529535 }
530536
537+ let subspace_new_node = match new_node. subspace {
538+ // not reachable because `self.find` is creating a node with a subspace,
539+ None => unreachable ! ( "node's subspace is not set" ) ,
540+ Some ( ref s) => s. clone ( ) ,
541+ } ;
542+
531543 let directory_partition = self . contents_of_node (
532- new_node . clone ( ) . subspace . unwrap ( ) ,
544+ subspace_new_node ,
533545 new_node. current_path . to_owned ( ) ,
534546 new_node. layer . to_owned ( ) ,
535547 ) ?;
@@ -557,10 +569,14 @@ impl DirectoryLayer {
557569 return Err ( DirectoryError :: ParentDirDoesNotExists ) ;
558570 }
559571
560- let key = parent_node
561- . subspace
562- . unwrap ( )
563- . subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
572+ let subspace_parent_node = match parent_node. subspace {
573+ // not reachable because `self.find` is creating a node with a subspace,
574+ None => unreachable ! ( "node's subspace is not set" ) ,
575+ Some ( ref s) => s. clone ( ) ,
576+ } ;
577+
578+ let key =
579+ subspace_parent_node. subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
564580 let value: Vec < u8 > = self
565581 . node_subspace
566582 . unpack ( old_node. subspace . clone ( ) . unwrap ( ) . bytes ( ) ) ?;
@@ -611,6 +627,7 @@ impl DirectoryLayer {
611627 }
612628
613629 let node = self . find ( & trx, path. to_owned ( ) ) . await ?;
630+ dbg ! ( & node) ;
614631 if !node. exists ( ) {
615632 return if fail_on_nonexistent {
616633 Err ( DirectoryError :: DirectoryDoesNotExists )
@@ -628,7 +645,7 @@ impl DirectoryLayer {
628645 try_join ! (
629646 self . remove_recursive( trx, node. subspace. unwrap( ) . clone( ) ) ,
630647 self . remove_from_parent( trx, path. to_owned( ) )
631- ) ;
648+ ) ? ;
632649
633650 Ok ( true )
634651 }
@@ -647,16 +664,12 @@ impl DirectoryLayer {
647664
648665 let range = trx. get_range ( & range_option, 1 , false ) . await ?;
649666 let has_more = range. more ( ) ;
650- let range: Arc < FairMutex < FdbValuesIter > > = Arc :: new ( FairMutex :: new ( range. into_iter ( ) ) ) ;
651-
652- loop {
653- let value_row = match range. lock ( ) . next ( ) {
654- None => break ,
655- Some ( next_key_value) => next_key_value. value ( ) . to_vec ( ) ,
656- } ;
657667
658- let sub_node = self . node_with_prefix ( & value_row) ;
668+ for row_key in range {
669+ let sub_node = self . node_with_prefix ( & row_key. value ( ) ) ;
659670 self . remove_recursive ( trx, sub_node) . await ?;
671+ begin = row_key. key ( ) . pack_to_vec ( ) ;
672+ begin. push ( 0 ) ;
660673 }
661674
662675 if !has_more {
@@ -667,7 +680,7 @@ impl DirectoryLayer {
667680 let mut node_prefix: Vec < u8 > = self . node_subspace . unpack ( node_sub. bytes ( ) ) ?;
668681
669682 // equivalent of strinc?
670- node_prefix. remove ( node_prefix. len ( ) ) ;
683+ node_prefix. remove ( node_prefix. len ( ) - 1 ) ;
671684
672685 trx. clear_range ( node_prefix. as_slice ( ) , node_prefix. as_slice ( ) ) ;
673686 trx. clear_subspace_range ( & node_sub) ;
0 commit comments