Skip to content

Commit 1cff528

Browse files
committed
epoll: unblock threads when new interest is registered
1 parent e14780a commit 1cff528

File tree

9 files changed

+259
-122
lines changed

9 files changed

+259
-122
lines changed

src/tools/miri/src/shims/files.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,26 @@ impl<T: ?Sized> FileDescriptionRef<T> {
5050
}
5151
}
5252

53+
impl<T: ?Sized> PartialEq for FileDescriptionRef<T> {
54+
fn eq(&self, other: &Self) -> bool {
55+
self.id() == other.id()
56+
}
57+
}
58+
59+
impl<T: ?Sized> Eq for FileDescriptionRef<T> {}
60+
61+
impl<T: ?Sized> PartialOrd for FileDescriptionRef<T> {
62+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
63+
Some(self.cmp(other))
64+
}
65+
}
66+
67+
impl<T: ?Sized> Ord for FileDescriptionRef<T> {
68+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
69+
self.id().cmp(&other.id())
70+
}
71+
}
72+
5373
/// Holds a weak reference to the actual file description.
5474
#[derive(Debug)]
5575
pub struct WeakFileDescriptionRef<T: ?Sized>(Weak<FdIdWith<T>>);

src/tools/miri/src/shims/unix/linux_like/epoll.rs

Lines changed: 92 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::cell::RefCell;
2-
use std::collections::BTreeMap;
2+
use std::collections::{BTreeMap, BTreeSet};
33
use std::io;
44
use std::rc::{Rc, Weak};
55
use std::time::Duration;
@@ -18,11 +18,13 @@ use crate::*;
1818
struct Epoll {
1919
/// A map of EpollEventInterests registered under this epoll instance.
2020
/// Each entry is differentiated using FdId and file descriptor value.
21+
/// The interest are separately refcounted so that we can track, for each FD, what epoll
22+
/// instances are currently interested in it.
2123
interest_list: RefCell<BTreeMap<(FdId, i32), Rc<RefCell<EpollEventInterest>>>>,
2224
/// A map of EpollEventInstance that will be returned when `epoll_wait` is called.
2325
/// Similar to interest_list, the entry is also differentiated using FdId
2426
/// and file descriptor value.
25-
ready_list: ReadyList,
27+
ready_list: RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>,
2628
/// A list of thread ids blocked on this epoll instance.
2729
blocked_tid: RefCell<Vec<ThreadId>>,
2830
}
@@ -36,9 +38,9 @@ impl VisitProvenance for Epoll {
3638
/// EpollEventInstance contains information that will be returned by epoll_wait.
3739
#[derive(Debug)]
3840
pub struct EpollEventInstance {
39-
/// Xor-ed event types that happened to the file description.
41+
/// Bitmask of event types that happened to the file description.
4042
events: u32,
41-
/// Original data retrieved from `epoll_event` during `epoll_ctl`.
43+
/// User-defined data associated with the interest that triggered this instance.
4244
data: u64,
4345
/// The release clock associated with this event.
4446
clock: VClock,
@@ -99,11 +101,6 @@ pub struct EpollReadyEvents {
99101
pub epollerr: bool,
100102
}
101103

102-
#[derive(Debug, Default)]
103-
struct ReadyList {
104-
mapping: RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>,
105-
}
106-
107104
impl EpollReadyEvents {
108105
pub fn new() -> Self {
109106
EpollReadyEvents {
@@ -185,8 +182,8 @@ impl EpollInterestTable {
185182
}
186183
}
187184

188-
pub fn get_epoll_interest(&self, id: FdId) -> Option<&Vec<Weak<RefCell<EpollEventInterest>>>> {
189-
self.0.get(&id)
185+
pub fn get_epoll_interest(&self, id: FdId) -> Option<&[Weak<RefCell<EpollEventInterest>>]> {
186+
self.0.get(&id).map(|v| &**v)
190187
}
191188

192189
pub fn get_epoll_interest_mut(
@@ -328,43 +325,42 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
328325

329326
let epoll_key = (id, fd);
330327

331-
// Check the existence of fd in the interest list.
332-
if op == epoll_ctl_add {
333-
if interest_list.contains_key(&epoll_key) {
334-
return this.set_last_error_and_return_i32(LibcError("EEXIST"));
335-
}
336-
} else {
337-
if !interest_list.contains_key(&epoll_key) {
338-
return this.set_last_error_and_return_i32(LibcError("ENOENT"));
339-
}
340-
}
341-
342-
if op == epoll_ctl_add {
328+
let new_interest = if op == epoll_ctl_add {
343329
// Create an epoll_interest.
344330
let interest = Rc::new(RefCell::new(EpollEventInterest {
345331
fd_num: fd,
346332
events,
347333
data,
348334
weak_epfd: FileDescriptionRef::downgrade(&epfd),
349335
}));
350-
// Notification will be returned for current epfd if there is event in the file
351-
// descriptor we registered.
352-
check_and_update_one_event_interest(&fd_ref, &interest, id, this)?;
353-
354-
// Insert an epoll_interest to global epoll_interest list.
336+
// Register it in the right places.
355337
this.machine.epoll_interests.insert_epoll_interest(id, Rc::downgrade(&interest));
356-
interest_list.insert(epoll_key, interest);
338+
if interest_list.insert(epoll_key, interest.clone()).is_some() {
339+
// We already had interest in this.
340+
return this.set_last_error_and_return_i32(LibcError("EEXIST"));
341+
}
342+
343+
interest
357344
} else {
358345
// Modify the existing interest.
359-
let epoll_interest = interest_list.get_mut(&epoll_key).unwrap();
346+
let Some(interest) = interest_list.get_mut(&epoll_key) else {
347+
return this.set_last_error_and_return_i32(LibcError("ENOENT"));
348+
};
360349
{
361-
let mut epoll_interest = epoll_interest.borrow_mut();
362-
epoll_interest.events = events;
363-
epoll_interest.data = data;
350+
let mut interest = interest.borrow_mut();
351+
interest.events = events;
352+
interest.data = data;
364353
}
365-
// Updating an FD interest triggers events.
366-
check_and_update_one_event_interest(&fd_ref, epoll_interest, id, this)?;
367-
}
354+
interest.clone()
355+
};
356+
357+
// Deliver events for the new interest.
358+
send_ready_events_to_interests(
359+
this,
360+
fd_ref.as_unix(this).get_epoll_ready_events()?.get_event_bitmask(this),
361+
id,
362+
std::iter::once(new_interest),
363+
)?;
368364

369365
interp_ok(Scalar::from_i32(0))
370366
} else if op == epoll_ctl_del {
@@ -378,7 +374,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
378374
drop(epoll_interest);
379375

380376
// Remove related epoll_interest from ready list.
381-
epfd.ready_list.mapping.borrow_mut().remove(&epoll_key);
377+
epfd.ready_list.borrow_mut().remove(&epoll_key);
382378

383379
// Remove dangling EpollEventInterest from its global table.
384380
// .unwrap() below should succeed because the file description id must have registered
@@ -462,7 +458,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
462458
};
463459

464460
// We just need to know if the ready list is empty and borrow the thread_ids out.
465-
let ready_list_empty = epfd.ready_list.mapping.borrow().is_empty();
461+
let ready_list_empty = epfd.ready_list.borrow().is_empty();
466462
if timeout == 0 || !ready_list_empty {
467463
// If the ready list is not empty, or the timeout is 0, we can return immediately.
468464
return_ready_list(&epfd, dest, &event, this)?;
@@ -518,50 +514,71 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
518514
interp_ok(())
519515
}
520516

521-
/// For a specific file description, get its ready events and update the corresponding ready
522-
/// list. This function should be called whenever an event causes more bytes or an EOF to become
523-
/// newly readable from an FD, and whenever more bytes can be written to an FD or no more future
524-
/// writes are possible.
517+
/// For a specific file description, get its ready events and send it to everyone who registered
518+
/// interest in this FD. This function should be called whenever an event causes more bytes or
519+
/// an EOF to become newly readable from an FD, and whenever more bytes can be written to an FD
520+
/// or no more future writes are possible.
525521
///
526522
/// This *will* report an event if anyone is subscribed to it, without any further filtering, so
527523
/// do not call this function when an FD didn't have anything happen to it!
528-
fn check_and_update_readiness(
529-
&mut self,
530-
fd_ref: DynFileDescriptionRef,
531-
) -> InterpResult<'tcx, ()> {
524+
fn epoll_send_fd_ready_events(&mut self, fd_ref: DynFileDescriptionRef) -> InterpResult<'tcx> {
532525
let this = self.eval_context_mut();
526+
let ready_events_bitmask =
527+
fd_ref.as_unix(this).get_epoll_ready_events()?.get_event_bitmask(this);
533528
let id = fd_ref.id();
534-
let mut waiter = Vec::new();
535-
// Get a list of EpollEventInterest that is associated to a specific file description.
536-
if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) {
537-
for weak_epoll_interest in epoll_interests {
538-
if let Some(epoll_interest) = weak_epoll_interest.upgrade() {
539-
let is_updated =
540-
check_and_update_one_event_interest(&fd_ref, &epoll_interest, id, this)?;
541-
if is_updated {
542-
// Edge-triggered notification only notify one thread even if there are
543-
// multiple threads blocked on the same epfd.
544-
545-
// This unwrap can never fail because if the current epoll instance were
546-
// closed, the upgrade of weak_epoll_interest
547-
// above would fail. This guarantee holds because only the epoll instance
548-
// holds a strong ref to epoll_interest.
549-
let epfd = epoll_interest.borrow().weak_epfd.upgrade().unwrap();
550-
// FIXME: We can randomly pick a thread to unblock.
551-
if let Some(thread_id) = epfd.blocked_tid.borrow_mut().pop() {
552-
waiter.push(thread_id);
553-
};
554-
}
555-
}
556-
}
529+
// Figure out who is interested in this. We need to clone this list since we can't prove
530+
// that `send_ready_events_to_interests` won't mutate it.
531+
let interests = this.machine.epoll_interests.get_epoll_interest(id).unwrap_or(&[]);
532+
let interests = interests.iter().filter_map(|weak| weak.upgrade()).collect::<Vec<_>>();
533+
send_ready_events_to_interests(this, ready_events_bitmask, id, interests.into_iter())
534+
}
535+
}
536+
537+
/// Send the latest ready events for one particular FD (identified by `event_fd_id`) to everyone in
538+
/// the `interests` list, if they are interested in this kind of event.
539+
fn send_ready_events_to_interests<'tcx>(
540+
ecx: &mut MiriInterpCx<'tcx>,
541+
event_bitmask: u32,
542+
event_fd_id: FdId,
543+
interests: impl Iterator<Item = Rc<RefCell<EpollEventInterest>>>,
544+
) -> InterpResult<'tcx> {
545+
#[allow(clippy::mutable_key_type)] // yeah, we know
546+
let mut triggered_epolls = BTreeSet::new();
547+
for interest in interests {
548+
let interest = interest.borrow();
549+
let epfd = interest.weak_epfd.upgrade().unwrap();
550+
// This checks if any of the events specified in epoll_event_interest.events
551+
// match those in ready_events.
552+
let flags = interest.events & event_bitmask;
553+
if flags == 0 {
554+
continue;
557555
}
558-
waiter.sort();
559-
waiter.dedup();
560-
for thread_id in waiter {
561-
this.unblock_thread(thread_id, BlockReason::Epoll)?;
556+
// Geenrate a new event instance, with the flags that this one is interested in.
557+
let mut new_instance = EpollEventInstance::new(flags, interest.data);
558+
ecx.release_clock(|clock| {
559+
new_instance.clock.clone_from(clock);
560+
})?;
561+
// Add event to ready list for this epoll instance.
562+
// Tests confirm that we have to *overwrite* the old instance for the same key.
563+
let mut ready_list = epfd.ready_list.borrow_mut();
564+
ready_list.insert((event_fd_id, interest.fd_num), new_instance);
565+
drop(ready_list);
566+
// Remember to wake up this epoll later.
567+
// (We might encounter the same epoll multiple times if there are multiple interests for
568+
// different file descriptors that references the same file description.)
569+
triggered_epolls.insert(epfd);
570+
}
571+
572+
// For each epoll instance where an interest triggered, wake up one thread.
573+
for epoll in triggered_epolls {
574+
// Edge-triggered notification only notify one thread even if there are
575+
// multiple threads blocked on the same epfd.
576+
if let Some(thread_id) = epoll.blocked_tid.borrow_mut().pop() {
577+
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
562578
}
563-
interp_ok(())
564579
}
580+
581+
interp_ok(())
565582
}
566583

567584
/// This function takes in ready list and returns EpollEventInstance with file description
@@ -582,41 +599,6 @@ fn ready_list_next(
582599
None
583600
}
584601

585-
/// This helper function checks whether an epoll notification should be triggered for a specific
586-
/// epoll_interest and, if necessary, triggers the notification, and returns whether the
587-
/// notification was added/updated. Unlike check_and_update_readiness, this function sends a
588-
/// notification to only one epoll instance.
589-
fn check_and_update_one_event_interest<'tcx>(
590-
fd_ref: &DynFileDescriptionRef,
591-
interest: &RefCell<EpollEventInterest>,
592-
id: FdId,
593-
ecx: &MiriInterpCx<'tcx>,
594-
) -> InterpResult<'tcx, bool> {
595-
// Get the bitmask of ready events for a file description.
596-
let ready_events_bitmask = fd_ref.as_unix(ecx).get_epoll_ready_events()?.get_event_bitmask(ecx);
597-
let epoll_event_interest = interest.borrow();
598-
let epfd = epoll_event_interest.weak_epfd.upgrade().unwrap();
599-
// This checks if any of the events specified in epoll_event_interest.events
600-
// match those in ready_events.
601-
let flags = epoll_event_interest.events & ready_events_bitmask;
602-
// If there is any event that we are interested in being specified as ready,
603-
// insert an epoll_return to the ready list.
604-
if flags != 0 {
605-
let epoll_key = (id, epoll_event_interest.fd_num);
606-
let mut ready_list = epfd.ready_list.mapping.borrow_mut();
607-
let mut event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
608-
// If we are tracking data races, remember the current clock so we can sync with it later.
609-
ecx.release_clock(|clock| {
610-
event_instance.clock.clone_from(clock);
611-
})?;
612-
// Triggers the notification by inserting it to the ready list.
613-
ready_list.insert(epoll_key, event_instance);
614-
interp_ok(true)
615-
} else {
616-
interp_ok(false)
617-
}
618-
}
619-
620602
/// Stores the ready list of the `epfd` epoll instance into `events` (which must be an array),
621603
/// and the number of returned events into `dest`.
622604
fn return_ready_list<'tcx>(
@@ -625,7 +607,7 @@ fn return_ready_list<'tcx>(
625607
events: &MPlaceTy<'tcx>,
626608
ecx: &mut MiriInterpCx<'tcx>,
627609
) -> InterpResult<'tcx> {
628-
let mut ready_list = epfd.ready_list.mapping.borrow_mut();
610+
let mut ready_list = epfd.ready_list.borrow_mut();
629611
let mut num_of_events: i32 = 0;
630612
let mut array_iter = ecx.project_array_fields(events)?;
631613

src/tools/miri/src/shims/unix/linux_like/eventfd.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ fn eventfd_write<'tcx>(
216216

217217
// The state changed; we check and update the status of all supported event
218218
// types for current file description.
219-
ecx.check_and_update_readiness(eventfd)?;
219+
ecx.epoll_send_fd_ready_events(eventfd)?;
220220

221221
// Return how many bytes we consumed from the user-provided buffer.
222222
return finish.call(ecx, Ok(buf_place.layout.size.bytes_usize()));
@@ -311,7 +311,7 @@ fn eventfd_read<'tcx>(
311311

312312
// The state changed; we check and update the status of all supported event
313313
// types for current file description.
314-
ecx.check_and_update_readiness(eventfd)?;
314+
ecx.epoll_send_fd_ready_events(eventfd)?;
315315

316316
// Tell userspace how many bytes we put into the buffer.
317317
return finish.call(ecx, Ok(buf_place.layout.size.bytes_usize()));

src/tools/miri/src/shims/unix/unnamed_socket.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl FileDescription for AnonSocket {
9696
}
9797
}
9898
// Notify peer fd that close has happened, since that can unblock reads and writes.
99-
ecx.check_and_update_readiness(peer_fd)?;
99+
ecx.epoll_send_fd_ready_events(peer_fd)?;
100100
}
101101
interp_ok(Ok(()))
102102
}
@@ -276,7 +276,7 @@ fn anonsocket_write<'tcx>(
276276
}
277277
// Notification should be provided for peer fd as it became readable.
278278
// The kernel does this even if the fd was already readable before, so we follow suit.
279-
ecx.check_and_update_readiness(peer_fd)?;
279+
ecx.epoll_send_fd_ready_events(peer_fd)?;
280280

281281
return finish.call(ecx, Ok(write_size));
282282
}
@@ -369,7 +369,7 @@ fn anonsocket_read<'tcx>(
369369
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
370370
}
371371
// Notify epoll waiters.
372-
ecx.check_and_update_readiness(peer_fd)?;
372+
ecx.epoll_send_fd_ready_events(peer_fd)?;
373373
};
374374

375375
return finish.call(ecx, Ok(read_size));

0 commit comments

Comments
 (0)