|
1 | | -//! Parker implementaiton based on a Mutex and Condvar. |
2 | | -//! |
3 | | -//! The implementation currently uses the trivial strategy of a Mutex+Condvar |
4 | | -//! with wakeup flag, which does not actually allow spurious wakeups. In the |
5 | | -//! future, this will be implemented in a more efficient way, perhaps along the lines of |
6 | | -//! http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp |
7 | | -//! or futuxes, and in either case may allow spurious wakeups. |
8 | | -
|
9 | | -use crate::sync::atomic::AtomicUsize; |
10 | | -use crate::sync::atomic::Ordering::SeqCst; |
11 | | -use crate::sync::{Condvar, Mutex}; |
12 | | -use crate::time::Duration; |
13 | | - |
14 | | -const EMPTY: usize = 0; |
15 | | -const PARKED: usize = 1; |
16 | | -const NOTIFIED: usize = 2; |
17 | | - |
18 | | -pub struct Parker { |
19 | | - state: AtomicUsize, |
20 | | - lock: Mutex<()>, |
21 | | - cvar: Condvar, |
22 | | -} |
23 | | - |
24 | | -impl Parker { |
25 | | - pub fn new() -> Self { |
26 | | - Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() } |
27 | | - } |
28 | | - |
29 | | - // This implementaiton doesn't require `unsafe`, but other implementations |
30 | | - // may assume this is only called by the thread that owns the Parker. |
31 | | - pub unsafe fn park(&self) { |
32 | | - // If we were previously notified then we consume this notification and |
33 | | - // return quickly. |
34 | | - if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { |
35 | | - return; |
36 | | - } |
37 | | - |
38 | | - // Otherwise we need to coordinate going to sleep |
39 | | - let mut m = self.lock.lock().unwrap(); |
40 | | - match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { |
41 | | - Ok(_) => {} |
42 | | - Err(NOTIFIED) => { |
43 | | - // We must read here, even though we know it will be `NOTIFIED`. |
44 | | - // This is because `unpark` may have been called again since we read |
45 | | - // `NOTIFIED` in the `compare_exchange` above. We must perform an |
46 | | - // acquire operation that synchronizes with that `unpark` to observe |
47 | | - // any writes it made before the call to unpark. To do that we must |
48 | | - // read from the write it made to `state`. |
49 | | - let old = self.state.swap(EMPTY, SeqCst); |
50 | | - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); |
51 | | - return; |
52 | | - } // should consume this notification, so prohibit spurious wakeups in next park. |
53 | | - Err(_) => panic!("inconsistent park state"), |
54 | | - } |
55 | | - loop { |
56 | | - m = self.cvar.wait(m).unwrap(); |
57 | | - match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { |
58 | | - Ok(_) => return, // got a notification |
59 | | - Err(_) => {} // spurious wakeup, go back to sleep |
60 | | - } |
61 | | - } |
62 | | - } |
63 | | - |
64 | | - // This implementaiton doesn't require `unsafe`, but other implementations |
65 | | - // may assume this is only called by the thread that owns the Parker. |
66 | | - pub unsafe fn park_timeout(&self, dur: Duration) { |
67 | | - // Like `park` above we have a fast path for an already-notified thread, and |
68 | | - // afterwards we start coordinating for a sleep. |
69 | | - // return quickly. |
70 | | - if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { |
71 | | - return; |
72 | | - } |
73 | | - let m = self.lock.lock().unwrap(); |
74 | | - match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { |
75 | | - Ok(_) => {} |
76 | | - Err(NOTIFIED) => { |
77 | | - // We must read again here, see `park`. |
78 | | - let old = self.state.swap(EMPTY, SeqCst); |
79 | | - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); |
80 | | - return; |
81 | | - } // should consume this notification, so prohibit spurious wakeups in next park. |
82 | | - Err(_) => panic!("inconsistent park_timeout state"), |
83 | | - } |
84 | | - |
85 | | - // Wait with a timeout, and if we spuriously wake up or otherwise wake up |
86 | | - // from a notification we just want to unconditionally set the state back to |
87 | | - // empty, either consuming a notification or un-flagging ourselves as |
88 | | - // parked. |
89 | | - let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap(); |
90 | | - match self.state.swap(EMPTY, SeqCst) { |
91 | | - NOTIFIED => {} // got a notification, hurray! |
92 | | - PARKED => {} // no notification, alas |
93 | | - n => panic!("inconsistent park_timeout state: {}", n), |
94 | | - } |
95 | | - } |
96 | | - |
97 | | - pub fn unpark(&self) { |
98 | | - // To ensure the unparked thread will observe any writes we made |
99 | | - // before this call, we must perform a release operation that `park` |
100 | | - // can synchronize with. To do that we must write `NOTIFIED` even if |
101 | | - // `state` is already `NOTIFIED`. That is why this must be a swap |
102 | | - // rather than a compare-and-swap that returns if it reads `NOTIFIED` |
103 | | - // on failure. |
104 | | - match self.state.swap(NOTIFIED, SeqCst) { |
105 | | - EMPTY => return, // no one was waiting |
106 | | - NOTIFIED => return, // already unparked |
107 | | - PARKED => {} // gotta go wake someone up |
108 | | - _ => panic!("inconsistent state in unpark"), |
109 | | - } |
110 | | - |
111 | | - // There is a period between when the parked thread sets `state` to |
112 | | - // `PARKED` (or last checked `state` in the case of a spurious wake |
113 | | - // up) and when it actually waits on `cvar`. If we were to notify |
114 | | - // during this period it would be ignored and then when the parked |
115 | | - // thread went to sleep it would never wake up. Fortunately, it has |
116 | | - // `lock` locked at this stage so we can acquire `lock` to wait until |
117 | | - // it is ready to receive the notification. |
118 | | - // |
119 | | - // Releasing `lock` before the call to `notify_one` means that when the |
120 | | - // parked thread wakes it doesn't get woken only to have to wait for us |
121 | | - // to release `lock`. |
122 | | - drop(self.lock.lock().unwrap()); |
123 | | - self.cvar.notify_one() |
| 1 | +cfg_if::cfg_if! { |
| 2 | + if #[cfg(any(target_os = "linux", target_os = "android"))] { |
| 3 | + mod linux; |
| 4 | + pub use linux::Parker; |
| 5 | + } else { |
| 6 | + mod generic; |
| 7 | + pub use generic::Parker; |
124 | 8 | } |
125 | 9 | } |
0 commit comments