@@ -4,7 +4,7 @@ use std::io;
44use std:: io:: ErrorKind ;
55
66use crate :: concurrency:: VClock ;
7- use crate :: shims:: unix:: fd:: FileDescriptionRef ;
7+ use crate :: shims:: unix:: fd:: { FileDescriptionRef , WeakFileDescriptionRef } ;
88use crate :: shims:: unix:: linux:: epoll:: { EpollReadyEvents , EvalContextExt as _} ;
99use crate :: shims:: unix:: * ;
1010use crate :: * ;
@@ -26,6 +26,10 @@ struct Event {
2626 counter : Cell < u64 > ,
2727 is_nonblock : bool ,
2828 clock : RefCell < VClock > ,
29+ /// A list of thread ids blocked on eventfd::read.
30+ blocked_read_tid : RefCell < Vec < ThreadId > > ,
31+ /// A list of thread ids blocked on eventfd::write.
32+ blocked_write_tid : RefCell < Vec < ThreadId > > ,
2933}
3034
3135impl FileDescription for Event {
@@ -72,31 +76,8 @@ impl FileDescription for Event {
7276 // eventfd read at the size of u64.
7377 let buf_place = ecx. ptr_to_mplace_unaligned ( ptr, ty) ;
7478
75- // Block when counter == 0.
76- let counter = self . counter . get ( ) ;
77- if counter == 0 {
78- if self . is_nonblock {
79- return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
80- }
81-
82- throw_unsup_format ! ( "eventfd: blocking is unsupported" ) ;
83- } else {
84- // Synchronize with all prior `write` calls to this FD.
85- ecx. acquire_clock ( & self . clock . borrow ( ) ) ;
86-
87- // Give old counter value to userspace, and set counter value to 0.
88- ecx. write_int ( counter, & buf_place) ?;
89- self . counter . set ( 0 ) ;
90-
91- // When any of the event happened, we check and update the status of all supported event
92- // types for current file description.
93- ecx. check_and_update_readiness ( self_ref) ?;
94-
95- // Tell userspace how many bytes we wrote.
96- ecx. write_int ( buf_place. layout . size . bytes ( ) , dest) ?;
97- }
98-
99- interp_ok ( ( ) )
79+ let weak_eventfd = self_ref. downgrade ( ) ;
80+ eventfd_read ( buf_place, dest, weak_eventfd, ecx)
10081 }
10182
10283 /// A write call adds the 8-byte integer value supplied in
@@ -127,7 +108,7 @@ impl FileDescription for Event {
127108 return ecx. set_last_error_and_return ( ErrorKind :: InvalidInput , dest) ;
128109 }
129110
130- // Read the user supplied value from the pointer.
111+ // Read the user- supplied value from the pointer.
131112 let buf_place = ecx. ptr_to_mplace_unaligned ( ptr, ty) ;
132113 let num = ecx. read_scalar ( & buf_place) ?. to_u64 ( ) ?;
133114
@@ -137,27 +118,8 @@ impl FileDescription for Event {
137118 }
138119 // If the addition does not let the counter to exceed the maximum value, update the counter.
139120 // Else, block.
140- match self . counter . get ( ) . checked_add ( num) {
141- Some ( new_count @ 0 ..=MAX_COUNTER ) => {
142- // Future `read` calls will synchronize with this write, so update the FD clock.
143- ecx. release_clock ( |clock| {
144- self . clock . borrow_mut ( ) . join ( clock) ;
145- } ) ;
146- self . counter . set ( new_count) ;
147- }
148- None | Some ( u64:: MAX ) =>
149- if self . is_nonblock {
150- return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
151- } else {
152- throw_unsup_format ! ( "eventfd: blocking is unsupported" ) ;
153- } ,
154- } ;
155- // When any of the event happened, we check and update the status of all supported event
156- // types for current file description.
157- ecx. check_and_update_readiness ( self_ref) ?;
158-
159- // Return how many bytes we read.
160- ecx. write_int ( buf_place. layout . size . bytes ( ) , dest)
121+ let weak_eventfd = self_ref. downgrade ( ) ;
122+ eventfd_write ( num, buf_place, dest, weak_eventfd, ecx)
161123 }
162124}
163125
@@ -217,8 +179,151 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
217179 counter : Cell :: new ( val. into ( ) ) ,
218180 is_nonblock,
219181 clock : RefCell :: new ( VClock :: default ( ) ) ,
182+ blocked_read_tid : RefCell :: new ( Vec :: new ( ) ) ,
183+ blocked_write_tid : RefCell :: new ( Vec :: new ( ) ) ,
220184 } ) ;
221185
222186 interp_ok ( Scalar :: from_i32 ( fd_value) )
223187 }
224188}
189+
190+ /// Block thread if the value addition will exceed u64::MAX -1,
191+ /// else just add the user-supplied value to current counter.
192+ fn eventfd_write < ' tcx > (
193+ num : u64 ,
194+ buf_place : MPlaceTy < ' tcx > ,
195+ dest : & MPlaceTy < ' tcx > ,
196+ weak_eventfd : WeakFileDescriptionRef ,
197+ ecx : & mut MiriInterpCx < ' tcx > ,
198+ ) -> InterpResult < ' tcx > {
199+ let Some ( eventfd_ref) = weak_eventfd. upgrade ( ) else {
200+ throw_unsup_format ! ( "eventfd FD got closed while blocking." )
201+ } ;
202+
203+ // Since we pass the weak file description ref, it is guaranteed to be
204+ // an eventfd file description.
205+ let eventfd = eventfd_ref. downcast :: < Event > ( ) . unwrap ( ) ;
206+
207+ match eventfd. counter . get ( ) . checked_add ( num) {
208+ Some ( new_count @ 0 ..=MAX_COUNTER ) => {
209+ // Future `read` calls will synchronize with this write, so update the FD clock.
210+ ecx. release_clock ( |clock| {
211+ eventfd. clock . borrow_mut ( ) . join ( clock) ;
212+ } ) ;
213+
214+ // When this function is called, the addition is guaranteed to not exceed u64::MAX - 1.
215+ eventfd. counter . set ( new_count) ;
216+
217+ // When any of the event happened, we check and update the status of all supported event
218+ // types for current file description.
219+ ecx. check_and_update_readiness ( & eventfd_ref) ?;
220+
221+ // Unblock *all* threads previously blocked on `read`.
222+ // We need to take out the blocked thread ids and unblock them together,
223+ // because `unblock_threads` may block them again and end up re-adding the
224+ // thread to the blocked list.
225+ let waiting_threads = std:: mem:: take ( & mut * eventfd. blocked_read_tid . borrow_mut ( ) ) ;
226+ // FIXME: We can randomize the order of unblocking.
227+ for thread_id in waiting_threads {
228+ ecx. unblock_thread ( thread_id, BlockReason :: Eventfd ) ?;
229+ }
230+
231+ // Return how many bytes we wrote.
232+ return ecx. write_int ( buf_place. layout . size . bytes ( ) , dest) ;
233+ }
234+ None | Some ( u64:: MAX ) => {
235+ if eventfd. is_nonblock {
236+ return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
237+ }
238+
239+ let dest = dest. clone ( ) ;
240+
241+ eventfd. blocked_write_tid . borrow_mut ( ) . push ( ecx. active_thread ( ) ) ;
242+
243+ ecx. block_thread (
244+ BlockReason :: Eventfd ,
245+ None ,
246+ callback ! (
247+ @capture<' tcx> {
248+ num: u64 ,
249+ buf_place: MPlaceTy <' tcx>,
250+ dest: MPlaceTy <' tcx>,
251+ weak_eventfd: WeakFileDescriptionRef ,
252+ }
253+ @unblock = |this| {
254+ eventfd_write( num, buf_place, & dest, weak_eventfd, this)
255+ }
256+ ) ,
257+ ) ;
258+ }
259+ } ;
260+ interp_ok ( ( ) )
261+ }
262+
263+ /// Block thread if the current counter is 0,
264+ /// else just return the current counter value to the caller and set the counter to 0.
265+ fn eventfd_read < ' tcx > (
266+ buf_place : MPlaceTy < ' tcx > ,
267+ dest : & MPlaceTy < ' tcx > ,
268+ weak_eventfd : WeakFileDescriptionRef ,
269+ ecx : & mut MiriInterpCx < ' tcx > ,
270+ ) -> InterpResult < ' tcx > {
271+ let Some ( eventfd_ref) = weak_eventfd. upgrade ( ) else {
272+ throw_unsup_format ! ( "eventfd FD got closed while blocking." )
273+ } ;
274+
275+ // Since we pass the weak file description ref to the callback function, it is guaranteed to be
276+ // an eventfd file description.
277+ let eventfd = eventfd_ref. downcast :: < Event > ( ) . unwrap ( ) ;
278+
279+ // Block when counter == 0.
280+ let counter = eventfd. counter . replace ( 0 ) ;
281+
282+ if counter == 0 {
283+ if eventfd. is_nonblock {
284+ return ecx. set_last_error_and_return ( ErrorKind :: WouldBlock , dest) ;
285+ }
286+ let dest = dest. clone ( ) ;
287+
288+ eventfd. blocked_read_tid . borrow_mut ( ) . push ( ecx. active_thread ( ) ) ;
289+
290+ ecx. block_thread (
291+ BlockReason :: Eventfd ,
292+ None ,
293+ callback ! (
294+ @capture<' tcx> {
295+ buf_place: MPlaceTy <' tcx>,
296+ dest: MPlaceTy <' tcx>,
297+ weak_eventfd: WeakFileDescriptionRef ,
298+ }
299+ @unblock = |this| {
300+ eventfd_read( buf_place, & dest, weak_eventfd, this)
301+ }
302+ ) ,
303+ ) ;
304+ } else {
305+ // Synchronize with all prior `write` calls to this FD.
306+ ecx. acquire_clock ( & eventfd. clock . borrow ( ) ) ;
307+
308+ // Give old counter value to userspace, and set counter value to 0.
309+ ecx. write_int ( counter, & buf_place) ?;
310+
311+ // When any of the events happened, we check and update the status of all supported event
312+ // types for current file description.
313+ ecx. check_and_update_readiness ( & eventfd_ref) ?;
314+
315+ // Unblock *all* threads previously blocked on `write`.
316+ // We need to take out the blocked thread ids and unblock them together,
317+ // because `unblock_threads` may block them again and end up re-adding the
318+ // thread to the blocked list.
319+ let waiting_threads = std:: mem:: take ( & mut * eventfd. blocked_write_tid . borrow_mut ( ) ) ;
320+ // FIXME: We can randomize the order of unblocking.
321+ for thread_id in waiting_threads {
322+ ecx. unblock_thread ( thread_id, BlockReason :: Eventfd ) ?;
323+ }
324+
325+ // Tell userspace how many bytes we read.
326+ return ecx. write_int ( buf_place. layout . size . bytes ( ) , dest) ;
327+ }
328+ interp_ok ( ( ) )
329+ }
0 commit comments