@@ -119,91 +119,102 @@ impl KVStore for FilesystemStore {
119119 Ok ( buf)
120120 }
121121
122- fn write (
122+ fn write_async (
123123 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
124- ) -> lightning:: io:: Result < ( ) > {
125- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
126-
127- let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
128- dest_file_path. push ( key) ;
129-
130- let parent_directory = dest_file_path. parent ( ) . ok_or_else ( || {
131- let msg =
132- format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
133- std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg)
134- } ) ?;
135- fs:: create_dir_all ( & parent_directory) ?;
136-
137- // Do a crazy dance with lots of fsync()s to be overly cautious here...
138- // We never want to end up in a state where we've lost the old data, or end up using the
139- // old data on power loss after we've returned.
140- // The way to atomically write a file on Unix platforms is:
141- // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
142- let mut tmp_file_path = dest_file_path. clone ( ) ;
143- let tmp_file_ext = format ! ( "{}.tmp" , self . tmp_file_counter. fetch_add( 1 , Ordering :: AcqRel ) ) ;
144- tmp_file_path. set_extension ( tmp_file_ext) ;
145-
146- {
147- let mut tmp_file = fs:: File :: create ( & tmp_file_path) ?;
148- tmp_file. write_all ( & buf) ?;
149- tmp_file. sync_all ( ) ?;
150- }
151-
152- let res = {
153- let inner_lock_ref = {
154- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
155- Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
156- } ;
157- let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
124+ ) -> AsyncResultType < ' static , ( ) , lightning:: io:: Error > {
125+ Box :: pin ( async move {
126+ check_namespace_key_validity (
127+ primary_namespace,
128+ secondary_namespace,
129+ Some ( key) ,
130+ "write" ,
131+ ) ?;
132+
133+ let mut dest_file_path =
134+ self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
135+ dest_file_path. push ( key) ;
136+
137+ let parent_directory = dest_file_path. parent ( ) . ok_or_else ( || {
138+ let msg =
139+ format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
140+ std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg)
141+ } ) ?;
142+ fs:: create_dir_all ( & parent_directory) ?;
143+
144+ // Do a crazy dance with lots of fsync()s to be overly cautious here...
145+ // We never want to end up in a state where we've lost the old data, or end up using the
146+ // old data on power loss after we've returned.
147+ // The way to atomically write a file on Unix platforms is:
148+ // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
149+ let mut tmp_file_path = dest_file_path. clone ( ) ;
150+ let tmp_file_ext =
151+ format ! ( "{}.tmp" , self . tmp_file_counter. fetch_add( 1 , Ordering :: AcqRel ) ) ;
152+ tmp_file_path. set_extension ( tmp_file_ext) ;
158153
159- #[ cfg( not( target_os = "windows" ) ) ]
160154 {
161- fs:: rename ( & tmp_file_path, & dest_file_path) ?;
162- let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( & parent_directory) ?;
163- dir_file. sync_all ( ) ?;
164- Ok ( ( ) )
155+ let mut tmp_file = fs:: File :: create ( & tmp_file_path) ?;
156+ tmp_file. write_all ( & buf) ?;
157+ tmp_file. sync_all ( ) ?;
165158 }
166159
167- #[ cfg( target_os = "windows" ) ]
168- {
169- let res = if dest_file_path. exists ( ) {
170- call ! ( unsafe {
171- windows_sys:: Win32 :: Storage :: FileSystem :: ReplaceFileW (
160+ let res = {
161+ let inner_lock_ref = {
162+ let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
163+ Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
164+ } ;
165+ let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
166+
167+ #[ cfg( not( target_os = "windows" ) ) ]
168+ {
169+ fs:: rename ( & tmp_file_path, & dest_file_path) ?;
170+ let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( & parent_directory) ?;
171+ dir_file. sync_all ( ) ?;
172+ Ok ( ( ) )
173+ }
174+
175+ #[ cfg( target_os = "windows" ) ]
176+ {
177+ let res = if dest_file_path. exists ( ) {
178+ call ! ( unsafe {
179+ windows_sys:: Win32 :: Storage :: FileSystem :: ReplaceFileW (
172180 path_to_windows_str( & dest_file_path) . as_ptr( ) ,
173181 path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
174182 std:: ptr:: null( ) ,
175183 windows_sys:: Win32 :: Storage :: FileSystem :: REPLACEFILE_IGNORE_MERGE_ERRORS ,
176184 std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
177185 std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
178186 )
179- } )
180- } else {
181- call ! ( unsafe {
182- windows_sys:: Win32 :: Storage :: FileSystem :: MoveFileExW (
187+ } )
188+ } else {
189+ call ! ( unsafe {
190+ windows_sys:: Win32 :: Storage :: FileSystem :: MoveFileExW (
183191 path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
184192 path_to_windows_str( & dest_file_path) . as_ptr( ) ,
185193 windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_WRITE_THROUGH
186194 | windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_REPLACE_EXISTING ,
187195 )
188- } )
189- } ;
190-
191- match res {
192- Ok ( ( ) ) => {
193- // We fsync the dest file in hopes this will also flush the metadata to disk.
194- let dest_file =
195- fs:: OpenOptions :: new ( ) . read ( true ) . write ( true ) . open ( & dest_file_path) ?;
196- dest_file. sync_all ( ) ?;
197- Ok ( ( ) )
198- } ,
199- Err ( e) => Err ( e. into ( ) ) ,
196+ } )
197+ } ;
198+
199+ match res {
200+ Ok ( ( ) ) => {
201+ // We fsync the dest file in hopes this will also flush the metadata to disk.
202+ let dest_file = fs:: OpenOptions :: new ( )
203+ . read ( true )
204+ . write ( true )
205+ . open ( & dest_file_path) ?;
206+ dest_file. sync_all ( ) ?;
207+ Ok ( ( ) )
208+ } ,
209+ Err ( e) => Err ( e. into ( ) ) ,
210+ }
200211 }
201- }
202- } ;
212+ } ;
203213
204- self . garbage_collect_locks ( ) ;
214+ self . garbage_collect_locks ( ) ;
205215
206- res
216+ res
217+ } )
207218 }
208219
209220 fn remove (
@@ -330,12 +341,6 @@ impl KVStore for FilesystemStore {
330341
331342 Ok ( keys)
332343 }
333-
334- fn write_async (
335- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
336- ) -> AsyncResultType < ' static , ( ) , lightning:: io:: Error > {
337- todo ! ( )
338- }
339344}
340345
341346fn dir_entry_is_key ( p : & Path ) -> Result < bool , lightning:: io:: Error > {
0 commit comments