@@ -2,14 +2,18 @@ use {Error, Errno, Result};
22use std:: os:: unix:: io:: RawFd ;
33use libc:: { c_void, off_t, size_t} ;
44use libc;
5+ use std:: fmt;
6+ use std:: fmt:: Debug ;
7+ use std:: io:: Write ;
8+ use std:: io:: stderr;
59use std:: marker:: PhantomData ;
610use std:: mem;
711use std:: ptr:: { null, null_mut} ;
812use sys:: signal:: * ;
913use sys:: time:: TimeSpec ;
1014
11- /// Mode for `aio_fsync `. Controls whether only data or both data and metadata
12- /// are synced.
15+ /// Mode for `AioCb::fsync `. Controls whether only data or both data and
16+ /// metadata are synced.
1317#[ repr( i32 ) ]
1418#[ derive( Clone , Copy , Debug , PartialEq ) ]
1519pub enum AioFsyncMode {
@@ -44,31 +48,34 @@ pub enum LioMode {
4448 LIO_NOWAIT = libc:: LIO_NOWAIT ,
4549}
4650
47- /// Return values for `aio_cancel `
51+ /// Return values for `AioCb::cancel and aio_cancel_all `
4852#[ repr( i32 ) ]
4953#[ derive( Clone , Copy , Debug , PartialEq ) ]
5054pub enum AioCancelStat {
5155 /// All outstanding requests were canceled
5256 AioCanceled = libc:: AIO_CANCELED ,
5357 /// Some requests were not canceled. Their status should be checked with
54- /// `aio_error `
58+ /// `AioCb::error `
5559 AioNotCanceled = libc:: AIO_NOTCANCELED ,
5660 /// All of the requests have already finished
5761 AioAllDone = libc:: AIO_ALLDONE ,
5862}
5963
6064/// The basic structure used by all aio functions. Each `aiocb` represents one
6165/// I/O request.
62- #[ repr( C ) ]
6366pub struct AioCb < ' a > {
6467 aiocb : libc:: aiocb ,
68+ /// Tracks whether the buffer pointed to by aiocb.aio_buf is mutable
69+ mutable : bool ,
70+ /// Could this `AioCb` potentially have any in-kernel state?
71+ in_progress : bool ,
6572 phantom : PhantomData < & ' a mut [ u8 ] >
6673}
6774
6875impl < ' a > AioCb < ' a > {
6976 /// Constructs a new `AioCb` with no associated buffer.
7077 ///
71- /// The resulting `AioCb` structure is suitable for use with `aio_fsync `.
78+ /// The resulting `AioCb` structure is suitable for use with `AioCb::fsync `.
7279 /// * `fd` File descriptor. Required for all aio functions.
7380 /// * `prio` If POSIX Prioritized IO is supported, then the operation will
7481 /// be prioritized at the process's priority level minus `prio`
@@ -81,7 +88,8 @@ impl<'a> AioCb<'a> {
8188 a. aio_nbytes = 0 ;
8289 a. aio_buf = null_mut ( ) ;
8390
84- let aiocb = AioCb { aiocb : a, phantom : PhantomData } ;
91+ let aiocb = AioCb { aiocb : a, mutable : false , in_progress : false ,
92+ phantom : PhantomData } ;
8593 aiocb
8694 }
8795
@@ -102,37 +110,41 @@ impl<'a> AioCb<'a> {
102110 let mut a = AioCb :: common_init ( fd, prio, sigev_notify) ;
103111 a. aio_offset = offs;
104112 a. aio_nbytes = buf. len ( ) as size_t ;
113+ // casting an immutable buffer to a mutable pointer looks unsafe, but
114+ // technically its only unsafe to dereference it, not to create it.
105115 a. aio_buf = buf. as_ptr ( ) as * mut c_void ;
106116 a. aio_lio_opcode = opcode as :: c_int ;
107117
108- let aiocb = AioCb { aiocb : a, phantom : PhantomData } ;
118+ let aiocb = AioCb { aiocb : a, mutable : true , in_progress : false ,
119+ phantom : PhantomData } ;
109120 aiocb
110121 }
111122
112123 /// Like `from_mut_slice`, but works on constant slices rather than
113124 /// mutable slices.
114125 ///
115- /// This is technically unsafe, but in practice it's fine
116- /// to use with any aio functions except `aio_read` and `lio_listio` (with
117- /// `opcode` set to `LIO_READ`). This method is useful when writing a const
118- /// buffer with `aio_write`, since from_mut_slice can't work with const
119- /// buffers.
126+ /// An `AioCb` created this way cannot be used with `read`, and its
127+ /// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when
128+ /// writing a const buffer with `AioCb::write`, since from_mut_slice can't
129+ /// work with const buffers.
120130 // Note: another solution to the problem of writing const buffers would be
121- // to genericize AioCb for both &mut [u8] and &[u8] buffers. aio_read could
122- // take the former and aio_write could take the latter. However, then
123- // lio_listio wouldn't work, because that function needs a slice of AioCb,
124- // and they must all be the same type. We're basically stuck with using an
125- // unsafe function, since aio (as designed in C) is an unsafe API.
126- pub unsafe fn from_slice ( fd : RawFd , offs : off_t , buf : & ' a [ u8 ] ,
127- prio : :: c_int , sigev_notify : SigevNotify ,
128- opcode : LioOpcode ) -> AioCb {
131+ // to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read
132+ // could take the former and AioCb::write could take the latter. However,
133+ // then lio_listio wouldn't work, because that function needs a slice of
134+ // AioCb, and they must all be the same type. We're basically stuck with
135+ // using an unsafe function, since aio (as designed in C) is an unsafe API.
136+ pub fn from_slice ( fd : RawFd , offs : off_t , buf : & ' a [ u8 ] ,
137+ prio : :: c_int , sigev_notify : SigevNotify ,
138+ opcode : LioOpcode ) -> AioCb {
129139 let mut a = AioCb :: common_init ( fd, prio, sigev_notify) ;
130140 a. aio_offset = offs;
131141 a. aio_nbytes = buf. len ( ) as size_t ;
132142 a. aio_buf = buf. as_ptr ( ) as * mut c_void ;
143+ assert ! ( opcode != LioOpcode :: LIO_READ , "Can't read into an immutable buffer" ) ;
133144 a. aio_lio_opcode = opcode as :: c_int ;
134145
135- let aiocb = AioCb { aiocb : a, phantom : PhantomData } ;
146+ let aiocb = AioCb { aiocb : a, mutable : false , in_progress : false ,
147+ phantom : PhantomData } ;
136148 aiocb
137149 }
138150
@@ -153,56 +165,73 @@ impl<'a> AioCb<'a> {
153165 pub fn set_sigev_notify ( & mut self , sigev_notify : SigevNotify ) {
154166 self . aiocb . aio_sigevent = SigEvent :: new ( sigev_notify) . sigevent ( ) ;
155167 }
156- }
157168
158- /// Cancels outstanding AIO requests. If `aiocb` is `None`, then all requests
159- /// for `fd` will be cancelled. Otherwise, only the given `AioCb` will be
160- /// cancelled.
161- pub fn aio_cancel ( fd : RawFd , aiocb : Option < & mut AioCb > ) -> Result < AioCancelStat > {
162- let p: * mut libc:: aiocb = match aiocb {
163- None => null_mut ( ) ,
164- Some ( x) => & mut x. aiocb
165- } ;
166- match unsafe { libc:: aio_cancel ( fd, p) } {
167- libc:: AIO_CANCELED => Ok ( AioCancelStat :: AioCanceled ) ,
168- libc:: AIO_NOTCANCELED => Ok ( AioCancelStat :: AioNotCanceled ) ,
169- libc:: AIO_ALLDONE => Ok ( AioCancelStat :: AioAllDone ) ,
170- -1 => Err ( Error :: last ( ) ) ,
171- _ => panic ! ( "unknown aio_cancel return value" )
169+ /// Cancels an outstanding AIO request.
170+ pub fn cancel ( & mut self ) -> Result < AioCancelStat > {
171+ match unsafe { libc:: aio_cancel ( self . aiocb . aio_fildes , & mut self . aiocb ) } {
172+ libc:: AIO_CANCELED => Ok ( AioCancelStat :: AioCanceled ) ,
173+ libc:: AIO_NOTCANCELED => Ok ( AioCancelStat :: AioNotCanceled ) ,
174+ libc:: AIO_ALLDONE => Ok ( AioCancelStat :: AioAllDone ) ,
175+ -1 => Err ( Error :: last ( ) ) ,
176+ _ => panic ! ( "unknown aio_cancel return value" )
177+ }
172178 }
173- }
174179
175- /// Retrieve error status of an asynchronous operation. If the request has not
176- /// yet completed, returns `EINPROGRESS`. Otherwise, returns `Ok` or any other
177- /// error.
178- pub fn aio_error ( aiocb : & mut AioCb ) -> Result < ( ) > {
179- let p : * mut libc:: aiocb = & mut aiocb. aiocb ;
180- match unsafe { libc :: aio_error ( p ) } {
181- 0 => Ok ( ( ) ) ,
182- num if num > 0 => Err ( Error :: from_errno ( Errno :: from_i32 ( num ) ) ) ,
183- - 1 => Err ( Error :: last ( ) ) ,
184- num => panic ! ( "unknown aio_error return value {:?}" , num )
180+ /// Retrieve error status of an asynchronous operation. If the request has
181+ /// not yet completed, returns `EINPROGRESS`. Otherwise, returns `Ok` or
182+ /// any other error.
183+ pub fn error ( & mut self ) -> Result < ( ) > {
184+ match unsafe { libc:: aio_error ( & mut self . aiocb as * mut libc :: aiocb ) } {
185+ 0 => Ok ( ( ) ) ,
186+ num if num > 0 => Err ( Error :: from_errno ( Errno :: from_i32 ( num ) ) ) ,
187+ - 1 => Err ( Error :: last ( ) ) ,
188+ num => panic ! ( "unknown aio_error return value {:?}" , num )
189+ }
185190 }
186- }
187191
188- /// An asynchronous version of `fsync`.
189- pub fn aio_fsync ( mode : AioFsyncMode , aiocb : & mut AioCb ) -> Result < ( ) > {
190- let p: * mut libc:: aiocb = & mut aiocb. aiocb ;
191- Errno :: result ( unsafe { libc:: aio_fsync ( mode as :: c_int , p) } ) . map ( drop)
192- }
192+ /// An asynchronous version of `fsync`.
193+ pub fn fsync ( & mut self , mode : AioFsyncMode ) -> Result < ( ) > {
194+ let p: * mut libc:: aiocb = & mut self . aiocb ;
195+ self . in_progress = true ;
196+ Errno :: result ( unsafe { libc:: aio_fsync ( mode as :: c_int , p) } ) . map ( drop)
197+ }
198+
199+ /// Asynchronously reads from a file descriptor into a buffer
200+ pub fn read ( & mut self ) -> Result < ( ) > {
201+ assert ! ( self . mutable, "Can't read into an immutable buffer" ) ;
202+ let p: * mut libc:: aiocb = & mut self . aiocb ;
203+ self . in_progress = true ;
204+ Errno :: result ( unsafe { libc:: aio_read ( p) } ) . map ( drop)
205+ }
206+
207+ /// Retrieve return status of an asynchronous operation. Should only be
208+ /// called once for each `AioCb`, after `AioCb::error` indicates that it has
209+ /// completed. The result is the same as for `read`, `write`, of `fsync`.
210+ // Note: this should be just `return`, but that's a reserved word
211+ pub fn aio_return ( & mut self ) -> Result < isize > {
212+ let p: * mut libc:: aiocb = & mut self . aiocb ;
213+ self . in_progress = false ;
214+ Errno :: result ( unsafe { libc:: aio_return ( p) } )
215+ }
216+
217+ /// Asynchronously writes from a buffer to a file descriptor
218+ pub fn write ( & mut self ) -> Result < ( ) > {
219+ let p: * mut libc:: aiocb = & mut self . aiocb ;
220+ self . in_progress = true ;
221+ Errno :: result ( unsafe { libc:: aio_write ( p) } ) . map ( drop)
222+ }
193223
194- /// Asynchronously reads from a file descriptor into a buffer
195- pub fn aio_read ( aiocb : & mut AioCb ) -> Result < ( ) > {
196- let p: * mut libc:: aiocb = & mut aiocb. aiocb ;
197- Errno :: result ( unsafe { libc:: aio_read ( p) } ) . map ( drop)
198224}
199225
200- /// Retrieve return status of an asynchronous operation. Should only be called
201- /// once for each `AioCb`, after `aio_error` indicates that it has completed.
202- /// The result the same as for `read`, `write`, of `fsync`.
203- pub fn aio_return ( aiocb : & mut AioCb ) -> Result < isize > {
204- let p: * mut libc:: aiocb = & mut aiocb. aiocb ;
205- Errno :: result ( unsafe { libc:: aio_return ( p) } )
226+ /// Cancels outstanding AIO requests. All requests for `fd` will be cancelled.
227+ pub fn aio_cancel_all ( fd : RawFd ) -> Result < AioCancelStat > {
228+ match unsafe { libc:: aio_cancel ( fd, null_mut ( ) ) } {
229+ libc:: AIO_CANCELED => Ok ( AioCancelStat :: AioCanceled ) ,
230+ libc:: AIO_NOTCANCELED => Ok ( AioCancelStat :: AioNotCanceled ) ,
231+ libc:: AIO_ALLDONE => Ok ( AioCancelStat :: AioAllDone ) ,
232+ -1 => Err ( Error :: last ( ) ) ,
233+ _ => panic ! ( "unknown aio_cancel return value" )
234+ }
206235}
207236
208237/// Suspends the calling process until at least one of the specified `AioCb`s
@@ -224,11 +253,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
224253 } ) . map ( drop)
225254}
226255
227- /// Asynchronously writes from a buffer to a file descriptor
228- pub fn aio_write ( aiocb : & mut AioCb ) -> Result < ( ) > {
229- let p: * mut libc:: aiocb = & mut aiocb. aiocb ;
230- Errno :: result ( unsafe { libc:: aio_write ( p) } ) . map ( drop)
231- }
232256
233257/// Submits multiple asynchronous I/O requests with a single system call. The
234258/// order in which the requests are carried out is not specified.
@@ -247,3 +271,44 @@ pub fn lio_listio(mode: LioMode, list: &[&mut AioCb],
247271 libc:: lio_listio ( mode as i32 , p, list. len ( ) as i32 , sigevp)
248272 } ) . map ( drop)
249273}
274+
275+ impl < ' a > Debug for AioCb < ' a > {
276+ fn fmt ( & self , fmt : & mut fmt:: Formatter ) -> fmt:: Result {
277+ fmt. debug_struct ( "AioCb" )
278+ . field ( "aio_fildes" , & self . aiocb . aio_fildes )
279+ . field ( "aio_offset" , & self . aiocb . aio_offset )
280+ . field ( "aio_buf" , & self . aiocb . aio_buf )
281+ . field ( "aio_nbytes" , & self . aiocb . aio_nbytes )
282+ . field ( "aio_lio_opcode" , & self . aiocb . aio_lio_opcode )
283+ . field ( "aio_reqprio" , & self . aiocb . aio_reqprio )
284+ . field ( "aio_sigevent" , & SigEvent :: from ( & self . aiocb . aio_sigevent ) )
285+ . field ( "mutable" , & self . mutable )
286+ . field ( "in_progress" , & self . in_progress )
287+ . field ( "phantom" , & self . phantom )
288+ . finish ( )
289+ }
290+ }
291+
292+ impl < ' a > Drop for AioCb < ' a > {
293+ /// If the `AioCb` has no remaining state in the kernel, just drop it.
294+ /// Otherwise, collect its error and return values, so as not to leak
295+ /// resources.
296+ fn drop ( & mut self ) {
297+ if self . in_progress {
298+ // Well-written programs should never get here. They should always
299+ // wait for an AioCb to complete before dropping it
300+ let _ = write ! ( stderr( ) , "WARNING: dropped an in-progress AioCb" ) ;
301+ loop {
302+ let ret = aio_suspend ( & [ & self ] , None ) ;
303+ match ret {
304+ Ok ( ( ) ) => break ,
305+ Err ( Error :: Sys ( Errno :: EINVAL ) ) => panic ! (
306+ "Inconsistent AioCb.in_progress value" ) ,
307+ Err ( Error :: Sys ( Errno :: EINTR ) ) => ( ) , // Retry interrupted syscall
308+ _ => panic ! ( "Unexpected aio_suspend return value {:?}" , ret)
309+ } ;
310+ }
311+ let _ = self . aio_return ( ) ;
312+ }
313+ }
314+ }
0 commit comments