Skip to content

Commit b340f08

Browse files
committed
add the tokio poll_fns test that triggered this investigation
1 parent fe6bc02 commit b340f08

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

src/tools/miri/tests/deps/Cargo.lock

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,79 @@ version = "2.3.0"
7272
source = "registry+https://github.com/rust-lang/crates.io-index"
7373
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
7474

75+
[[package]]
76+
name = "futures"
77+
version = "0.3.31"
78+
source = "registry+https://github.com/rust-lang/crates.io-index"
79+
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
80+
dependencies = [
81+
"futures-channel",
82+
"futures-core",
83+
"futures-io",
84+
"futures-sink",
85+
"futures-task",
86+
"futures-util",
87+
]
88+
89+
[[package]]
90+
name = "futures-channel"
91+
version = "0.3.31"
92+
source = "registry+https://github.com/rust-lang/crates.io-index"
93+
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
94+
dependencies = [
95+
"futures-core",
96+
"futures-sink",
97+
]
98+
99+
[[package]]
100+
name = "futures-core"
101+
version = "0.3.31"
102+
source = "registry+https://github.com/rust-lang/crates.io-index"
103+
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
104+
105+
[[package]]
106+
name = "futures-io"
107+
version = "0.3.31"
108+
source = "registry+https://github.com/rust-lang/crates.io-index"
109+
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
110+
111+
[[package]]
112+
name = "futures-macro"
113+
version = "0.3.31"
114+
source = "registry+https://github.com/rust-lang/crates.io-index"
115+
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
116+
dependencies = [
117+
"proc-macro2",
118+
"quote",
119+
"syn",
120+
]
121+
122+
[[package]]
123+
name = "futures-sink"
124+
version = "0.3.31"
125+
source = "registry+https://github.com/rust-lang/crates.io-index"
126+
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
127+
128+
[[package]]
129+
name = "futures-task"
130+
version = "0.3.31"
131+
source = "registry+https://github.com/rust-lang/crates.io-index"
132+
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
133+
134+
[[package]]
135+
name = "futures-util"
136+
version = "0.3.31"
137+
source = "registry+https://github.com/rust-lang/crates.io-index"
138+
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
139+
dependencies = [
140+
"futures-core",
141+
"futures-macro",
142+
"futures-sink",
143+
"futures-task",
144+
"pin-project-lite",
145+
"pin-utils",
146+
]
147+
75148
[[package]]
76149
name = "getrandom"
77150
version = "0.1.16"
@@ -190,6 +263,7 @@ name = "miri-test-deps"
190263
version = "0.1.0"
191264
dependencies = [
192265
"cfg-if",
266+
"futures",
193267
"getrandom 0.1.16",
194268
"getrandom 0.2.16",
195269
"getrandom 0.3.3",
@@ -242,6 +316,12 @@ version = "0.2.16"
242316
source = "registry+https://github.com/rust-lang/crates.io-index"
243317
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
244318

319+
[[package]]
320+
name = "pin-utils"
321+
version = "0.1.0"
322+
source = "registry+https://github.com/rust-lang/crates.io-index"
323+
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
324+
245325
[[package]]
246326
name = "proc-macro2"
247327
version = "1.0.95"

src/tools/miri/tests/deps/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ page_size = "0.6"
2323
# Avoid pulling in all of tokio's dependencies.
2424
# However, without `net` and `signal`, tokio uses fewer relevant system APIs.
2525
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "net", "fs", "sync", "signal", "io-util"] }
26+
futures = { version = "0.3.0", default-features = false, features = ["alloc", "async-await"] }
2627

2728
[target.'cfg(windows)'.dependencies]
2829
windows-sys = { version = "0.60", features = [
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! This is a stand-alone version of the `poll_fns` test in Tokio. It hits various
2+
//! interesting edge cases in the epoll logic, making it a good integration test.
3+
//! It also seems to depend on Tokio internals, so if Tokio changes we have have to update
4+
//! or remove the test.
5+
6+
//@only-target: linux # We only support tokio on Linux
7+
8+
use std::fs::File;
9+
use std::io::{ErrorKind, Read, Write};
10+
use std::os::fd::FromRawFd;
11+
use std::sync::Arc;
12+
use std::sync::atomic::{AtomicBool, Ordering};
13+
use std::task::{Context, Waker};
14+
use std::time::Duration;
15+
16+
use futures::poll;
17+
use tokio::io::unix::AsyncFd;
18+
19+
macro_rules! assert_pending {
20+
($e:expr) => {{
21+
use core::task::Poll;
22+
match $e {
23+
Poll::Pending => {}
24+
Poll::Ready(v) => panic!("ready; value = {:?}", v),
25+
}
26+
}};
27+
}
28+
29+
struct TestWaker {
30+
inner: Arc<TestWakerInner>,
31+
waker: Waker,
32+
}
33+
34+
#[derive(Default)]
35+
struct TestWakerInner {
36+
awoken: AtomicBool,
37+
}
38+
39+
impl futures::task::ArcWake for TestWakerInner {
40+
fn wake_by_ref(arc_self: &Arc<Self>) {
41+
arc_self.awoken.store(true, Ordering::SeqCst);
42+
}
43+
}
44+
45+
impl TestWaker {
46+
fn new() -> Self {
47+
let inner: Arc<TestWakerInner> = Default::default();
48+
49+
Self { inner: inner.clone(), waker: futures::task::waker(inner) }
50+
}
51+
52+
fn awoken(&self) -> bool {
53+
self.inner.awoken.swap(false, Ordering::SeqCst)
54+
}
55+
56+
fn context(&self) -> Context<'_> {
57+
Context::from_waker(&self.waker)
58+
}
59+
}
60+
61+
fn socketpair() -> (File, File) {
62+
let mut fds = [-1, -1];
63+
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
64+
assert_eq!(res, 0);
65+
66+
assert_eq!(unsafe { libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK) }, 0);
67+
assert_eq!(unsafe { libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK) }, 0);
68+
69+
unsafe { (File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])) }
70+
}
71+
72+
fn drain(mut fd: &File, mut amt: usize) {
73+
let mut buf = [0u8; 512];
74+
while amt > 0 {
75+
match fd.read(&mut buf[..]) {
76+
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
77+
Ok(0) => panic!("unexpected EOF"),
78+
Err(e) => panic!("unexpected error: {e:?}"),
79+
Ok(x) => amt -= x,
80+
}
81+
}
82+
}
83+
84+
fn main() {
85+
tokio::runtime::Builder::new_current_thread()
86+
.enable_all()
87+
.build()
88+
.unwrap()
89+
.block_on(the_test());
90+
}
91+
92+
async fn the_test() {
93+
let (a, b) = socketpair();
94+
let afd_a = Arc::new(AsyncFd::new(a).unwrap());
95+
let afd_b = Arc::new(AsyncFd::new(b).unwrap());
96+
97+
// Fill up the write side of A
98+
let mut bytes = 0;
99+
while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) {
100+
bytes += amt;
101+
}
102+
103+
let waker = TestWaker::new();
104+
105+
assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
106+
107+
let afd_a_2 = afd_a.clone();
108+
let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
109+
let barrier_clone = r_barrier.clone();
110+
111+
let read_fut = tokio::spawn(async move {
112+
// Move waker onto this task first
113+
assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx))));
114+
barrier_clone.wait().await;
115+
116+
let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
117+
});
118+
119+
let afd_a_2 = afd_a.clone();
120+
let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
121+
let barrier_clone = w_barrier.clone();
122+
123+
let mut write_fut = tokio::spawn(async move {
124+
// Move waker onto this task first
125+
assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx))));
126+
barrier_clone.wait().await;
127+
128+
let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
129+
});
130+
131+
r_barrier.wait().await;
132+
w_barrier.wait().await;
133+
134+
let readable = afd_a.readable();
135+
tokio::pin!(readable);
136+
137+
tokio::select! {
138+
_ = &mut readable => unreachable!(),
139+
_ = tokio::task::yield_now() => {}
140+
}
141+
142+
// Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
143+
afd_b.get_ref().write_all(b"0").unwrap();
144+
145+
let _ = tokio::join!(readable, read_fut);
146+
147+
// Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
148+
assert!(!waker.awoken());
149+
150+
// The writable side should not be awoken
151+
tokio::select! {
152+
_ = &mut write_fut => unreachable!(),
153+
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
154+
}
155+
156+
// Make it writable now
157+
drain(afd_b.get_ref(), bytes);
158+
159+
// now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
160+
let _ = write_fut.await;
161+
}

0 commit comments

Comments
 (0)