11#[ cfg( target_os = "windows" ) ]
22extern crate winapi;
33
4- use super :: { KVStore , TransactionalWrite } ;
4+ use super :: KVStore ;
55
66use std:: collections:: HashMap ;
77use std:: fs;
8- use std:: io:: { BufReader , BufWriter , Read , Write } ;
8+ use std:: io:: { BufReader , Read , Write } ;
99use std:: path:: { Path , PathBuf } ;
1010use std:: str:: FromStr ;
1111use std:: sync:: { Arc , Mutex , RwLock } ;
@@ -52,28 +52,84 @@ impl FilesystemStore {
5252
5353impl KVStore for FilesystemStore {
5454 type Reader = FilesystemReader ;
55- type Writer = FilesystemWriter ;
5655
5756 fn read ( & self , namespace : & str , key : & str ) -> std:: io:: Result < Self :: Reader > {
5857 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
5958 let lock_key = ( namespace. to_string ( ) , key. to_string ( ) ) ;
6059 let inner_lock_ref = Arc :: clone ( & outer_lock. entry ( lock_key) . or_default ( ) ) ;
6160
62- let mut dest_file = self . dest_dir . clone ( ) ;
63- dest_file . push ( namespace) ;
64- dest_file . push ( key) ;
65- FilesystemReader :: new ( dest_file , inner_lock_ref)
61+ let mut dest_file_path = self . dest_dir . clone ( ) ;
62+ dest_file_path . push ( namespace) ;
63+ dest_file_path . push ( key) ;
64+ FilesystemReader :: new ( dest_file_path , inner_lock_ref)
6665 }
6766
68- fn write ( & self , namespace : & str , key : & str ) -> std:: io:: Result < Self :: Writer > {
67+ fn write ( & self , namespace : & str , key : & str , buf : & [ u8 ] ) -> std:: io:: Result < ( ) > {
6968 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
7069 let lock_key = ( namespace. to_string ( ) , key. to_string ( ) ) ;
7170 let inner_lock_ref = Arc :: clone ( & outer_lock. entry ( lock_key) . or_default ( ) ) ;
71+ let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
72+
73+ let mut dest_file_path = self . dest_dir . clone ( ) ;
74+ dest_file_path. push ( namespace) ;
75+ dest_file_path. push ( key) ;
76+
77+ let msg = format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
78+ let parent_directory = dest_file_path
79+ . parent ( )
80+ . ok_or ( std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg) ) ?
81+ . to_path_buf ( ) ;
82+ fs:: create_dir_all ( parent_directory. clone ( ) ) ?;
83+
84+ // Do a crazy dance with lots of fsync()s to be overly cautious here...
85+ // We never want to end up in a state where we've lost the old data, or end up using the
86+ // old data on power loss after we've returned.
87+ // The way to atomically write a file on Unix platforms is:
88+ // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
89+ let mut tmp_file_path = dest_file_path. clone ( ) ;
90+ let mut rng = thread_rng ( ) ;
91+ let rand_str: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
92+ let ext = format ! ( "{}.tmp" , rand_str) ;
93+ tmp_file_path. set_extension ( ext) ;
94+
95+ let mut tmp_file = fs:: File :: create ( & tmp_file_path) ?;
96+ tmp_file. write_all ( & buf) ?;
97+ tmp_file. sync_all ( ) ?;
7298
73- let mut dest_file = self . dest_dir . clone ( ) ;
74- dest_file. push ( namespace) ;
75- dest_file. push ( key) ;
76- FilesystemWriter :: new ( dest_file, inner_lock_ref)
99+ #[ cfg( not( target_os = "windows" ) ) ]
100+ {
101+ fs:: rename ( & tmp_file_path, & dest_file_path) ?;
102+ let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( parent_directory. clone ( ) ) ?;
103+ unsafe {
104+ libc:: fsync ( dir_file. as_raw_fd ( ) ) ;
105+ }
106+ }
107+
108+ #[ cfg( target_os = "windows" ) ]
109+ {
110+ if dest_file_path. exists ( ) {
111+ unsafe {
112+ winapi:: um:: winbase:: ReplaceFileW (
113+ path_to_windows_str ( dest_file_path) . as_ptr ( ) ,
114+ path_to_windows_str ( tmp_file_path) . as_ptr ( ) ,
115+ std:: ptr:: null ( ) ,
116+ winapi:: um:: winbase:: REPLACEFILE_IGNORE_MERGE_ERRORS ,
117+ std:: ptr:: null_mut ( ) as * mut winapi:: ctypes:: c_void ,
118+ std:: ptr:: null_mut ( ) as * mut winapi:: ctypes:: c_void ,
119+ )
120+ } ;
121+ } else {
122+ call ! ( unsafe {
123+ winapi:: um:: winbase:: MoveFileExW (
124+ path_to_windows_str( tmp_file_path) . as_ptr( ) ,
125+ path_to_windows_str( dest_file_path) . as_ptr( ) ,
126+ winapi:: um:: winbase:: MOVEFILE_WRITE_THROUGH
127+ | winapi:: um:: winbase:: MOVEFILE_REPLACE_EXISTING ,
128+ )
129+ } ) ;
130+ }
131+ }
132+ Ok ( ( ) )
77133 }
78134
79135 fn remove ( & self , namespace : & str , key : & str ) -> std:: io:: Result < bool > {
@@ -83,19 +139,20 @@ impl KVStore for FilesystemStore {
83139
84140 let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
85141
86- let mut dest_file = self . dest_dir . clone ( ) ;
87- dest_file . push ( namespace) ;
88- dest_file . push ( key) ;
142+ let mut dest_file_path = self . dest_dir . clone ( ) ;
143+ dest_file_path . push ( namespace) ;
144+ dest_file_path . push ( key) ;
89145
90- if !dest_file . is_file ( ) {
146+ if !dest_file_path . is_file ( ) {
91147 return Ok ( false ) ;
92148 }
93149
94- fs:: remove_file ( & dest_file ) ?;
150+ fs:: remove_file ( & dest_file_path ) ?;
95151 #[ cfg( not( target_os = "windows" ) ) ]
96152 {
97- let msg = format ! ( "Could not retrieve parent directory of {}." , dest_file. display( ) ) ;
98- let parent_directory = dest_file
153+ let msg =
154+ format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
155+ let parent_directory = dest_file_path
99156 . parent ( )
100157 . ok_or ( std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg) ) ?;
101158 let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( parent_directory) ?;
@@ -110,14 +167,14 @@ impl KVStore for FilesystemStore {
110167 }
111168 }
112169
113- if dest_file . is_file ( ) {
170+ if dest_file_path . is_file ( ) {
114171 return Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "Removing key failed" ) ) ;
115172 }
116173
117174 if Arc :: strong_count ( & inner_lock_ref) == 2 {
118175 // It's safe to remove the lock entry if we're the only one left holding a strong
119176 // reference. Checking this is necessary to ensure we continue to distribute references to the
120- // same lock as long as some Writers/ Readers are around. However, we still want to
177+ // same lock as long as some Readers are around. However, we still want to
121178 // clean up the table when possible.
122179 //
123180 // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
@@ -171,8 +228,8 @@ pub struct FilesystemReader {
171228}
172229
173230impl FilesystemReader {
174- fn new ( dest_file : PathBuf , lock_ref : Arc < RwLock < ( ) > > ) -> std:: io:: Result < Self > {
175- let f = fs:: File :: open ( dest_file . clone ( ) ) ?;
231+ fn new ( dest_file_path : PathBuf , lock_ref : Arc < RwLock < ( ) > > ) -> std:: io:: Result < Self > {
232+ let f = fs:: File :: open ( dest_file_path . clone ( ) ) ?;
176233 let inner = BufReader :: new ( f) ;
177234 Ok ( Self { inner, lock_ref } )
178235 }
@@ -185,115 +242,26 @@ impl Read for FilesystemReader {
185242 }
186243}
187244
188- pub struct FilesystemWriter {
189- dest_file : PathBuf ,
190- parent_directory : PathBuf ,
191- tmp_file : PathBuf ,
192- tmp_writer : BufWriter < fs:: File > ,
193- lock_ref : Arc < RwLock < ( ) > > ,
194- }
195-
196- impl FilesystemWriter {
197- fn new ( dest_file : PathBuf , lock_ref : Arc < RwLock < ( ) > > ) -> std:: io:: Result < Self > {
198- let msg = format ! ( "Could not retrieve parent directory of {}." , dest_file. display( ) ) ;
199- let parent_directory = dest_file
200- . parent ( )
201- . ok_or ( std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg) ) ?
202- . to_path_buf ( ) ;
203- fs:: create_dir_all ( parent_directory. clone ( ) ) ?;
204-
205- // Do a crazy dance with lots of fsync()s to be overly cautious here...
206- // We never want to end up in a state where we've lost the old data, or end up using the
207- // old data on power loss after we've returned.
208- // The way to atomically write a file on Unix platforms is:
209- // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
210- let mut tmp_file = dest_file. clone ( ) ;
211- let mut rng = thread_rng ( ) ;
212- let rand_str: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
213- let ext = format ! ( "{}.tmp" , rand_str) ;
214- tmp_file. set_extension ( ext) ;
215-
216- let tmp_writer = BufWriter :: new ( fs:: File :: create ( & tmp_file) ?) ;
217-
218- Ok ( Self { dest_file, parent_directory, tmp_file, tmp_writer, lock_ref } )
219- }
220- }
221-
222- impl Write for FilesystemWriter {
223- fn write ( & mut self , buf : & [ u8 ] ) -> std:: io:: Result < usize > {
224- Ok ( self . tmp_writer . write ( buf) ?)
225- }
226-
227- fn flush ( & mut self ) -> std:: io:: Result < ( ) > {
228- self . tmp_writer . flush ( ) ?;
229- self . tmp_writer . get_ref ( ) . sync_all ( ) ?;
230- Ok ( ( ) )
231- }
232- }
233-
234- impl TransactionalWrite for FilesystemWriter {
235- fn commit ( & mut self ) -> std:: io:: Result < ( ) > {
236- self . flush ( ) ?;
237-
238- let _guard = self . lock_ref . write ( ) . unwrap ( ) ;
239- // Fsync the parent directory on Unix.
240- #[ cfg( not( target_os = "windows" ) ) ]
241- {
242- fs:: rename ( & self . tmp_file , & self . dest_file ) ?;
243- let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( self . parent_directory . clone ( ) ) ?;
244- unsafe {
245- libc:: fsync ( dir_file. as_raw_fd ( ) ) ;
246- }
247- }
248-
249- #[ cfg( target_os = "windows" ) ]
250- {
251- if dest_file. exists ( ) {
252- unsafe {
253- winapi:: um:: winbase:: ReplaceFileW (
254- path_to_windows_str ( dest_file) . as_ptr ( ) ,
255- path_to_windows_str ( tmp_file) . as_ptr ( ) ,
256- std:: ptr:: null ( ) ,
257- winapi:: um:: winbase:: REPLACEFILE_IGNORE_MERGE_ERRORS ,
258- std:: ptr:: null_mut ( ) as * mut winapi:: ctypes:: c_void ,
259- std:: ptr:: null_mut ( ) as * mut winapi:: ctypes:: c_void ,
260- )
261- } ;
262- } else {
263- call ! ( unsafe {
264- winapi:: um:: winbase:: MoveFileExW (
265- path_to_windows_str( tmp_file) . as_ptr( ) ,
266- path_to_windows_str( dest_file) . as_ptr( ) ,
267- winapi:: um:: winbase:: MOVEFILE_WRITE_THROUGH
268- | winapi:: um:: winbase:: MOVEFILE_REPLACE_EXISTING ,
269- )
270- } ) ;
271- }
272- }
273- Ok ( ( ) )
274- }
275- }
276-
277245impl KVStorePersister for FilesystemStore {
278246 fn persist < W : Writeable > ( & self , prefixed_key : & str , object : & W ) -> lightning:: io:: Result < ( ) > {
279247 let msg = format ! ( "Could not persist file for key {}." , prefixed_key) ;
280- let dest_file = PathBuf :: from_str ( prefixed_key) . map_err ( |_| {
248+ let dest_file_path = PathBuf :: from_str ( prefixed_key) . map_err ( |_| {
281249 lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: InvalidInput , msg. clone ( ) )
282250 } ) ?;
283251
284- let parent_directory = dest_file . parent ( ) . ok_or ( lightning:: io:: Error :: new (
252+ let parent_directory = dest_file_path . parent ( ) . ok_or ( lightning:: io:: Error :: new (
285253 lightning:: io:: ErrorKind :: InvalidInput ,
286254 msg. clone ( ) ,
287255 ) ) ?;
288256 let namespace = parent_directory. display ( ) . to_string ( ) ;
289257
290- let dest_without_namespace = dest_file
258+ let dest_without_namespace = dest_file_path
291259 . strip_prefix ( & namespace)
292260 . map_err ( |_| lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: InvalidInput , msg) ) ?;
293261 let key = dest_without_namespace. display ( ) . to_string ( ) ;
294- let mut writer = self . write ( & namespace , & key ) ? ;
295- object . write ( & mut writer ) ?;
296- Ok ( writer . commit ( ) ? )
262+
263+ self . write ( & namespace , & key , & object . encode ( ) ) ?;
264+ Ok ( ( ) )
297265 }
298266}
299267
@@ -302,7 +270,7 @@ mod tests {
302270 use super :: * ;
303271 use crate :: test:: utils:: random_storage_path;
304272 use lightning:: util:: persist:: KVStorePersister ;
305- use lightning:: util:: ser:: { Readable , Writeable } ;
273+ use lightning:: util:: ser:: Readable ;
306274
307275 use proptest:: prelude:: * ;
308276 proptest ! {
@@ -315,9 +283,7 @@ mod tests {
315283 let key = "testkey" ;
316284
317285 // Test the basic KVStore operations.
318- let mut writer = fs_store. write( namespace, key) . unwrap( ) ;
319- data. write( & mut writer) . unwrap( ) ;
320- writer. commit( ) . unwrap( ) ;
286+ fs_store. write( namespace, key, & data) . unwrap( ) ;
321287
322288 let listed_keys = fs_store. list( namespace) . unwrap( ) ;
323289 assert_eq!( listed_keys. len( ) , 1 ) ;
0 commit comments