@@ -36,6 +36,12 @@ struct AnonSocket {
3636 /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
3737 /// of closure.
3838 peer_lost_data : Cell < bool > ,
39+ /// A list of thread ids blocked because the buffer was empty.
40+ /// Once another thread writes some bytes, these threads will be unblocked.
41+ blocked_read_tid : RefCell < Vec < ThreadId > > ,
42+ /// A list of thread ids blocked because the buffer was full.
43+ /// Once another thread reads some bytes, these threads will be unblocked.
44+ blocked_write_tid : RefCell < Vec < ThreadId > > ,
3945 is_nonblock : bool ,
4046}
4147
@@ -83,7 +89,7 @@ impl FileDescription for AnonSocket {
8389
8490 fn read < ' tcx > (
8591 & self ,
86- _self_ref : & FileDescriptionRef ,
92+ self_ref : & FileDescriptionRef ,
8793 _communicate_allowed : bool ,
8894 ptr : Pointer ,
8995 len : usize ,
@@ -100,33 +106,21 @@ impl FileDescription for AnonSocket {
100106 // corresponding ErrorKind variant.
101107 throw_unsup_format ! ( "reading from the write end of a pipe" ) ;
102108 } ;
103- if readbuf. borrow ( ) . buf . is_empty ( ) {
104- if self . peer_fd ( ) . upgrade ( ) . is_none ( ) {
105- // Socketpair with no peer and empty buffer.
106- // 0 bytes successfully read indicates end-of-file.
107- return ecx. return_read_success ( ptr, & [ ] , 0 , dest) ;
108- } else {
109- if self . is_nonblock {
110- // Non-blocking socketpair with writer and empty buffer.
111- // https://linux.die.net/man/2/read
112- // EAGAIN or EWOULDBLOCK can be returned for socket,
113- // POSIX.1-2001 allows either error to be returned for this case.
114- // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
115- return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
116- } else {
117- // Blocking socketpair with writer and empty buffer.
118- // FIXME: blocking is currently not supported
119- throw_unsup_format ! ( "socketpair/pipe/pipe2 read: blocking isn't supported yet" ) ;
120- }
121- }
109+
110+ if readbuf. borrow ( ) . buf . is_empty ( ) && self . is_nonblock {
111+ // Non-blocking socketpair with writer and empty buffer.
112+ // https://linux.die.net/man/2/read
113+ // EAGAIN or EWOULDBLOCK can be returned for socket,
114+ // POSIX.1-2001 allows either error to be returned for this case.
115+ // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
116+ return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
122117 }
123- // TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
124- anonsocket_read ( self , self . peer_fd ( ) . upgrade ( ) , len, ptr, dest, ecx)
118+ anonsocket_read ( self_ref. downgrade ( ) , len, ptr, dest. clone ( ) , ecx)
125119 }
126120
127121 fn write < ' tcx > (
128122 & self ,
129- _self_ref : & FileDescriptionRef ,
123+ self_ref : & FileDescriptionRef ,
130124 _communicate_allowed : bool ,
131125 ptr : Pointer ,
132126 len : usize ,
@@ -153,16 +147,11 @@ impl FileDescription for AnonSocket {
153147 } ;
154148 let available_space =
155149 MAX_SOCKETPAIR_BUFFER_CAPACITY . strict_sub ( writebuf. borrow ( ) . buf . len ( ) ) ;
156- if available_space == 0 {
157- if self . is_nonblock {
158- // Non-blocking socketpair with a full buffer.
159- return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
160- } else {
161- // Blocking socketpair with a full buffer.
162- throw_unsup_format ! ( "socketpair/pipe/pipe2 write: blocking isn't supported yet" ) ;
163- }
150+ if available_space == 0 && self . is_nonblock {
151+ // Non-blocking socketpair with a full buffer.
152+ return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
164153 }
165- anonsocket_write ( available_space , & peer_fd , ptr, len, dest, ecx)
154+ anonsocket_write ( self_ref . downgrade ( ) , ptr, len, dest. clone ( ) , ecx)
166155 }
167156
168157 fn as_unix ( & self ) -> & dyn UnixFileDescription {
@@ -172,81 +161,161 @@ impl FileDescription for AnonSocket {
172161
173162/// Write to AnonSocket based on the space available and return the written byte size.
174163fn anonsocket_write < ' tcx > (
175- available_space : usize ,
176- peer_fd : & FileDescriptionRef ,
164+ weak_self_ref : WeakFileDescriptionRef ,
177165 ptr : Pointer ,
178166 len : usize ,
179- dest : & MPlaceTy < ' tcx > ,
167+ dest : MPlaceTy < ' tcx > ,
180168 ecx : & mut MiriInterpCx < ' tcx > ,
181169) -> InterpResult < ' tcx > {
170+ let Some ( self_ref) = weak_self_ref. upgrade ( ) else {
171+ // FIXME: We should raise a deadlock error if the self_ref upgrade failed.
172+ throw_unsup_format ! ( "This will be a deadlock error in future" )
173+ } ;
174+ let self_anonsocket = self_ref. downcast :: < AnonSocket > ( ) . unwrap ( ) ;
175+ let Some ( peer_fd) = self_anonsocket. peer_fd ( ) . upgrade ( ) else {
176+ // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
177+ // closed.
178+ return ecx. set_last_error_and_return ( ErrorKind :: BrokenPipe , & dest) ;
179+ } ;
182180 let Some ( writebuf) = & peer_fd. downcast :: < AnonSocket > ( ) . unwrap ( ) . readbuf else {
183181 // FIXME: This should return EBADF, but there's no nice way to do that as there's no
184182 // corresponding ErrorKind variant.
185183 throw_unsup_format ! ( "writing to the reading end of a pipe" )
186184 } ;
187- let mut writebuf = writebuf. borrow_mut ( ) ;
188185
189- // Remember this clock so `read` can synchronize with us.
190- ecx. release_clock ( |clock| {
191- writebuf. clock . join ( clock) ;
192- } ) ;
193- // Do full write / partial write based on the space available.
194- let actual_write_size = len. min ( available_space) ;
195- let bytes = ecx. read_bytes_ptr_strip_provenance ( ptr, Size :: from_bytes ( len) ) ?;
196- writebuf. buf . extend ( & bytes[ ..actual_write_size] ) ;
186+ let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY . strict_sub ( writebuf. borrow ( ) . buf . len ( ) ) ;
187+
188+ if available_space == 0 {
189+ // Blocking socketpair with a full buffer.
190+ let dest = dest. clone ( ) ;
191+ self_anonsocket. blocked_write_tid . borrow_mut ( ) . push ( ecx. active_thread ( ) ) ;
192+ ecx. block_thread (
193+ BlockReason :: UnnamedSocket ,
194+ None ,
195+ callback ! (
196+ @capture<' tcx> {
197+ weak_self_ref: WeakFileDescriptionRef ,
198+ ptr: Pointer ,
199+ len: usize ,
200+ dest: MPlaceTy <' tcx>,
201+ }
202+ @unblock = |this| {
203+ anonsocket_write( weak_self_ref, ptr, len, dest, this)
204+ }
205+ ) ,
206+ ) ;
207+ } else {
208+ let mut writebuf = writebuf. borrow_mut ( ) ;
209+ // Remember this clock so `read` can synchronize with us.
210+ ecx. release_clock ( |clock| {
211+ writebuf. clock . join ( clock) ;
212+ } ) ;
213+ // Do full write / partial write based on the space available.
214+ let actual_write_size = len. min ( available_space) ;
215+ let bytes = ecx. read_bytes_ptr_strip_provenance ( ptr, Size :: from_bytes ( len) ) ?;
216+ writebuf. buf . extend ( & bytes[ ..actual_write_size] ) ;
197217
198- // Need to stop accessing peer_fd so that it can be notified.
199- drop ( writebuf) ;
218+ // Need to stop accessing peer_fd so that it can be notified.
219+ drop ( writebuf) ;
200220
201- // Notification should be provided for peer fd as it became readable.
202- // The kernel does this even if the fd was already readable before, so we follow suit.
203- ecx. check_and_update_readiness ( peer_fd) ?;
221+ // Notification should be provided for peer fd as it became readable.
222+ // The kernel does this even if the fd was already readable before, so we follow suit.
223+ ecx. check_and_update_readiness ( & peer_fd) ?;
224+ let peer_anonsocket = peer_fd. downcast :: < AnonSocket > ( ) . unwrap ( ) ;
225+ // Unblock all threads that are currently blocked on peer_fd's read.
226+ let waiting_threads = std:: mem:: take ( & mut * peer_anonsocket. blocked_read_tid . borrow_mut ( ) ) ;
227+ // FIXME: We can randomize the order of unblocking.
228+ for thread_id in waiting_threads {
229+ ecx. unblock_thread ( thread_id, BlockReason :: UnnamedSocket ) ?;
230+ }
204231
205- ecx. return_write_success ( actual_write_size, dest)
232+ return ecx. return_write_success ( actual_write_size, & dest) ;
233+ }
234+ interp_ok ( ( ) )
206235}
207236
208237/// Read from AnonSocket and return the number of bytes read.
209238fn anonsocket_read < ' tcx > (
210- anonsocket : & AnonSocket ,
211- peer_fd : Option < FileDescriptionRef > ,
239+ weak_self_ref : WeakFileDescriptionRef ,
212240 len : usize ,
213241 ptr : Pointer ,
214- dest : & MPlaceTy < ' tcx > ,
242+ dest : MPlaceTy < ' tcx > ,
215243 ecx : & mut MiriInterpCx < ' tcx > ,
216244) -> InterpResult < ' tcx > {
217- let mut bytes = vec ! [ 0 ; len] ;
245+ let Some ( self_ref) = weak_self_ref. upgrade ( ) else {
246+ // FIXME: We should raise a deadlock error if the self_ref upgrade failed.
247+ throw_unsup_format ! ( "This will be a deadlock error in future" )
248+ } ;
249+ let self_anonsocket = self_ref. downcast :: < AnonSocket > ( ) . unwrap ( ) ;
218250
219- let Some ( readbuf) = & anonsocket . readbuf else {
251+ let Some ( readbuf) = & self_anonsocket . readbuf else {
220252 // FIXME: This should return EBADF, but there's no nice way to do that as there's no
221253 // corresponding ErrorKind variant.
222254 throw_unsup_format ! ( "reading from the write end of a pipe" )
223255 } ;
224- let mut readbuf = readbuf. borrow_mut ( ) ;
225-
226- // Synchronize with all previous writes to this buffer.
227- // FIXME: this over-synchronizes; a more precise approach would be to
228- // only sync with the writes whose data we will read.
229- ecx. acquire_clock ( & readbuf. clock ) ;
230-
231- // Do full read / partial read based on the space available.
232- // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
233- let actual_read_size = readbuf. buf . read ( & mut bytes[ ..] ) . unwrap ( ) ;
234-
235- // Need to drop before others can access the readbuf again.
236- drop ( readbuf) ;
237-
238- // A notification should be provided for the peer file description even when it can
239- // only write 1 byte. This implementation is not compliant with the actual Linux kernel
240- // implementation. For optimization reasons, the kernel will only mark the file description
241- // as "writable" when it can write more than a certain number of bytes. Since we
242- // don't know what that *certain number* is, we will provide a notification every time
243- // a read is successful. This might result in our epoll emulation providing more
244- // notifications than the real system.
245- if let Some ( peer_fd) = peer_fd {
246- ecx. check_and_update_readiness ( & peer_fd) ?;
247- }
248256
249- ecx. return_read_success ( ptr, & bytes, actual_read_size, dest)
257+ if readbuf. borrow_mut ( ) . buf . is_empty ( ) {
258+ if self_anonsocket. peer_fd ( ) . upgrade ( ) . is_none ( ) {
259+ // Socketpair with no peer and empty buffer.
260+ // 0 bytes successfully read indicates end-of-file.
261+ return ecx. return_read_success ( ptr, & [ ] , 0 , & dest) ;
262+ } else {
263+ // Blocking socketpair with writer and empty buffer.
264+ let weak_self_ref = weak_self_ref. clone ( ) ;
265+ self_anonsocket. blocked_read_tid . borrow_mut ( ) . push ( ecx. active_thread ( ) ) ;
266+ ecx. block_thread (
267+ BlockReason :: UnnamedSocket ,
268+ None ,
269+ callback ! (
270+ @capture<' tcx> {
271+ weak_self_ref: WeakFileDescriptionRef ,
272+ len: usize ,
273+ ptr: Pointer ,
274+ dest: MPlaceTy <' tcx>,
275+ }
276+ @unblock = |this| {
277+ anonsocket_read( weak_self_ref, len, ptr, dest, this)
278+ }
279+ ) ,
280+ ) ;
281+ }
282+ } else {
283+ let mut bytes = vec ! [ 0 ; len] ;
284+ let mut readbuf = readbuf. borrow_mut ( ) ;
285+ // Synchronize with all previous writes to this buffer.
286+ // FIXME: this over-synchronizes; a more precise approach would be to
287+ // only sync with the writes whose data we will read.
288+ ecx. acquire_clock ( & readbuf. clock ) ;
289+
290+ // Do full read / partial read based on the space available.
291+ // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
292+ let actual_read_size = readbuf. buf . read ( & mut bytes[ ..] ) . unwrap ( ) ;
293+
294+ // Need to drop before others can access the readbuf again.
295+ drop ( readbuf) ;
296+
297+ // A notification should be provided for the peer file description even when it can
298+ // only write 1 byte. This implementation is not compliant with the actual Linux kernel
299+ // implementation. For optimization reasons, the kernel will only mark the file description
300+ // as "writable" when it can write more than a certain number of bytes. Since we
301+ // don't know what that *certain number* is, we will provide a notification every time
302+ // a read is successful. This might result in our epoll emulation providing more
303+ // notifications than the real system.
304+ if let Some ( peer_fd) = self_anonsocket. peer_fd ( ) . upgrade ( ) {
305+ ecx. check_and_update_readiness ( & peer_fd) ?;
306+ let peer_anonsocket = peer_fd. downcast :: < AnonSocket > ( ) . unwrap ( ) ;
307+ // Unblock all threads that are currently blocked on peer_fd's write.
308+ let waiting_threads =
309+ std:: mem:: take ( & mut * peer_anonsocket. blocked_write_tid . borrow_mut ( ) ) ;
310+ // FIXME: We can randomize the order of unblocking.
311+ for thread_id in waiting_threads {
312+ ecx. unblock_thread ( thread_id, BlockReason :: UnnamedSocket ) ?;
313+ }
314+ } ;
315+
316+ return ecx. return_read_success ( ptr, & bytes, actual_read_size, & dest) ;
317+ }
318+ interp_ok ( ( ) )
250319}
251320
252321impl UnixFileDescription for AnonSocket {
@@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
360429 readbuf : Some ( RefCell :: new ( Buffer :: new ( ) ) ) ,
361430 peer_fd : OnceCell :: new ( ) ,
362431 peer_lost_data : Cell :: new ( false ) ,
432+ blocked_read_tid : RefCell :: new ( Vec :: new ( ) ) ,
433+ blocked_write_tid : RefCell :: new ( Vec :: new ( ) ) ,
363434 is_nonblock : is_sock_nonblock,
364435 } ) ;
365436 let fd1 = fds. new_ref ( AnonSocket {
366437 readbuf : Some ( RefCell :: new ( Buffer :: new ( ) ) ) ,
367438 peer_fd : OnceCell :: new ( ) ,
368439 peer_lost_data : Cell :: new ( false ) ,
440+ blocked_read_tid : RefCell :: new ( Vec :: new ( ) ) ,
441+ blocked_write_tid : RefCell :: new ( Vec :: new ( ) ) ,
369442 is_nonblock : is_sock_nonblock,
370443 } ) ;
371444
@@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
424497 readbuf : Some ( RefCell :: new ( Buffer :: new ( ) ) ) ,
425498 peer_fd : OnceCell :: new ( ) ,
426499 peer_lost_data : Cell :: new ( false ) ,
500+ blocked_read_tid : RefCell :: new ( Vec :: new ( ) ) ,
501+ blocked_write_tid : RefCell :: new ( Vec :: new ( ) ) ,
427502 is_nonblock,
428503 } ) ;
429504 let fd1 = fds. new_ref ( AnonSocket {
430505 readbuf : None ,
431506 peer_fd : OnceCell :: new ( ) ,
432507 peer_lost_data : Cell :: new ( false ) ,
508+ blocked_read_tid : RefCell :: new ( Vec :: new ( ) ) ,
509+ blocked_write_tid : RefCell :: new ( Vec :: new ( ) ) ,
433510 is_nonblock,
434511 } ) ;
435512
0 commit comments