@@ -8,7 +8,7 @@ use crate::sync::once::ExclusiveState;
88use crate :: sys:: futex:: { futex_wait, futex_wake_all} ;
99
1010// On some platforms, the OS is very nice and handles the waiter queue for us.
11- // This means we only need one atomic value with 5 states:
11+ // This means we only need one atomic value with 4 states:
1212
1313/// No initialization has run yet, and no thread is currently using the Once.
1414const INCOMPLETE : u32 = 0 ;
@@ -19,16 +19,20 @@ const POISONED: u32 = 1;
1919/// Some thread is currently attempting to run initialization. It may succeed,
2020/// so all future threads need to wait for it to finish.
2121const RUNNING : u32 = 2 ;
22- /// Some thread is currently attempting to run initialization and there are threads
23- /// waiting for it to finish.
24- const QUEUED : u32 = 3 ;
2522/// Initialization has completed and all future calls should finish immediately.
26- const COMPLETE : u32 = 4 ;
23+ const COMPLETE : u32 = 3 ;
2724
28- // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
25+ // An additional bit indicates whether there are waiting threads:
26+
27+ /// May only be set if the state is not COMPLETE.
28+ const QUEUED : u32 = 4 ;
29+
30+ // Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
2931// variable. When the running thread finishes, it will wake all waiting threads using
3032// `futex_wake_all`.
3133
34+ const STATE_MASK : u32 = 0b11 ;
35+
3236pub struct OnceState {
3337 poisoned : bool ,
3438 set_state_to : Cell < u32 > ,
@@ -47,7 +51,7 @@ impl OnceState {
4751}
4852
4953struct CompletionGuard < ' a > {
50- state : & ' a AtomicU32 ,
54+ state_and_queued : & ' a AtomicU32 ,
5155 set_state_on_drop_to : u32 ,
5256}
5357
@@ -56,64 +60,106 @@ impl<'a> Drop for CompletionGuard<'a> {
5660 // Use release ordering to propagate changes to all threads checking
5761 // up on the Once. `futex_wake_all` does its own synchronization, hence
5862 // we do not need `AcqRel`.
59- if self . state . swap ( self . set_state_on_drop_to , Release ) == QUEUED {
60- futex_wake_all ( & self . state ) ;
63+ if self . state_and_queued . swap ( self . set_state_on_drop_to , Release ) & QUEUED != 0 {
64+ futex_wake_all ( & self . state_and_queued ) ;
6165 }
6266 }
6367}
6468
6569pub struct Once {
66- state : AtomicU32 ,
70+ state_and_queued : AtomicU32 ,
6771}
6872
6973impl Once {
7074 #[ inline]
7175 pub const fn new ( ) -> Once {
72- Once { state : AtomicU32 :: new ( INCOMPLETE ) }
76+ Once { state_and_queued : AtomicU32 :: new ( INCOMPLETE ) }
7377 }
7478
7579 #[ inline]
7680 pub fn is_completed ( & self ) -> bool {
7781 // Use acquire ordering to make all initialization changes visible to the
7882 // current thread.
79- self . state . load ( Acquire ) == COMPLETE
83+ self . state_and_queued . load ( Acquire ) == COMPLETE
8084 }
8185
8286 #[ inline]
8387 pub ( crate ) fn state ( & mut self ) -> ExclusiveState {
84- match * self . state . get_mut ( ) {
88+ match * self . state_and_queued . get_mut ( ) {
8589 INCOMPLETE => ExclusiveState :: Incomplete ,
8690 POISONED => ExclusiveState :: Poisoned ,
8791 COMPLETE => ExclusiveState :: Complete ,
8892 _ => unreachable ! ( "invalid Once state" ) ,
8993 }
9094 }
9195
92- // This uses FnMut to match the API of the generic implementation. As this
93- // implementation is quite light-weight, it is generic over the closure and
94- // so avoids the cost of dynamic dispatch.
9596 #[ cold]
9697 #[ track_caller]
97- pub fn call ( & self , ignore_poisoning : bool , f : & mut impl FnMut ( & public :: OnceState ) ) {
98- let mut state = self . state . load ( Acquire ) ;
98+ pub fn wait ( & self , ignore_poisoning : bool ) {
99+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
99100 loop {
101+ let state = state_and_queued & STATE_MASK ;
102+ let queued = state_and_queued & QUEUED != 0 ;
100103 match state {
104+ COMPLETE => return ,
105+ POISONED if !ignore_poisoning => {
106+ // Panic to propagate the poison.
107+ panic ! ( "Once instance has previously been poisoned" ) ;
108+ }
109+ _ => {
110+ // Set the QUEUED bit if it has not already been set.
111+ if !queued {
112+ state_and_queued += QUEUED ;
113+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
114+ state,
115+ state_and_queued,
116+ Relaxed ,
117+ Acquire ,
118+ ) {
119+ state_and_queued = new;
120+ continue ;
121+ }
122+ }
123+
124+ futex_wait ( & self . state_and_queued , state_and_queued, None ) ;
125+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
126+ }
127+ }
128+ }
129+ }
130+
131+ #[ cold]
132+ #[ track_caller]
133+ pub fn call ( & self , ignore_poisoning : bool , f : & mut dyn FnMut ( & public:: OnceState ) ) {
134+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
135+ loop {
136+ let state = state_and_queued & STATE_MASK ;
137+ let queued = state_and_queued & QUEUED != 0 ;
138+ match state {
139+ COMPLETE => return ,
101140 POISONED if !ignore_poisoning => {
102141 // Panic to propagate the poison.
103142 panic ! ( "Once instance has previously been poisoned" ) ;
104143 }
105144 INCOMPLETE | POISONED => {
106145 // Try to register the current thread as the one running.
107- if let Err ( new) =
108- self . state . compare_exchange_weak ( state, RUNNING , Acquire , Acquire )
109- {
110- state = new;
146+ let next = RUNNING + if queued { QUEUED } else { 0 } ;
147+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
148+ state_and_queued,
149+ next,
150+ Acquire ,
151+ Acquire ,
152+ ) {
153+ state_and_queued = new;
111154 continue ;
112155 }
156+
113157 // `waiter_queue` will manage other waiting threads, and
114158 // wake them up on drop.
115- let mut waiter_queue =
116- CompletionGuard { state : & self . state , set_state_on_drop_to : POISONED } ;
159+ let mut waiter_queue = CompletionGuard {
160+ state_and_queued : & self . state_and_queued ,
161+ set_state_on_drop_to : POISONED ,
162+ } ;
117163 // Run the function, letting it know if we're poisoned or not.
118164 let f_state = public:: OnceState {
119165 inner : OnceState {
@@ -125,21 +171,27 @@ impl Once {
125171 waiter_queue. set_state_on_drop_to = f_state. inner . set_state_to . get ( ) ;
126172 return ;
127173 }
128- RUNNING | QUEUED => {
129- // Set the state to QUEUED if it is not already.
130- if state == RUNNING
131- && let Err ( new) =
132- self . state . compare_exchange_weak ( RUNNING , QUEUED , Relaxed , Acquire )
133- {
134- state = new;
135- continue ;
174+ _ => {
175+ // All other values must be RUNNING.
176+ assert ! ( state == RUNNING ) ;
177+
178+ // Set the QUEUED bit if it is not already set.
179+ if !queued {
180+ state_and_queued += QUEUED ;
181+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
182+ state,
183+ state_and_queued,
184+ Relaxed ,
185+ Acquire ,
186+ ) {
187+ state_and_queued = new;
188+ continue ;
189+ }
136190 }
137191
138- futex_wait ( & self . state , QUEUED , None ) ;
139- state = self . state . load ( Acquire ) ;
192+ futex_wait ( & self . state_and_queued , state_and_queued , None ) ;
193+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
140194 }
141- COMPLETE => return ,
142- _ => unreachable ! ( "state is never set to invalid values" ) ,
143195 }
144196 }
145197 }
0 commit comments