1414//! communication with the jobserver (otherwise it's too hard to work with the
1515//! jobserver API). This is non-ideal, and it would be good to avoid, but
1616//! currently that cost is pretty much required for correct functionality, as we
17- //! must be able to atomically wait on both a Condvar (for other threads
18- //! releasing the implicit token) and the jobserver itself. That's not possible
19- //! with the jobserver API today unless we spawn up an additional thread.
17+ //! must be able to atomically wait on both other threads releasing the implicit
18+ //! token and the jobserver itself. That's not possible with the jobserver API
19+ //! today unless we spawn up an additional thread.
2020//!
2121//! There are 3 primary APIs this crate exposes:
22- //! * acquire()
23- //! * release()
24- //! * acquire_from_request()
25- //! * request_token()
22+ //! * `acquire_thread()`
23+ //! * `release_thread()`
24+ //! * `request_token(impl FnOnce(Acquired))`
2625//!
27- //! The first two, acquire and release, are blocking functions which acquire
28- //! and release a jobserver token .
26+ //! `acquire_thread` blocks on obtaining a token, `release_thread` releases a
27+ //! token without blocking .
2928//!
30- //! The latter two help manage async requesting of tokens: specifically,
31- //! acquire_from_request() will block on acquiring token but will not request it
32- //! from the jobserver itself, whereas the last one just requests a token (and
33- //! should return pretty quickly, i.e., it does not block on some event).
29+ //! `request_token` queues up the called function without blocking.
3430//!
3531//! -------------------------------------
3632//!
@@ -62,8 +58,9 @@ use jobserver::Client;
6258use lazy_static:: lazy_static;
6359use rustc_serialize:: json:: as_json;
6460use std:: collections:: VecDeque ;
61+ use std:: mem;
6562use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
66- use std:: sync:: { Arc , Condvar , Mutex } ;
63+ use std:: sync:: { Arc , Condvar , Mutex , MutexGuard } ;
6764
6865lazy_static ! {
6966 // We can only call `from_env` once per process
@@ -91,7 +88,7 @@ lazy_static! {
9188 } )
9289 } ;
9390
94- static ref HELPER : Mutex < Helper > = Mutex :: new ( Helper :: new( ) ) ;
91+ static ref HELPER : Helper = Helper :: new( ) ;
9592}
9693
9794// These are the values for TOKEN_REQUESTS, which is an enum between these
@@ -139,22 +136,38 @@ pub fn initialize(token_requests: bool) {
139136 }
140137}
141138
142- pub struct Helper {
139+ struct Helper {
143140 helper : jobserver:: HelperThread ,
141+
142+ token_requests : Arc < Mutex < TokenRequests > > ,
143+ }
144+
145+ struct TokenRequests {
144146 tokens : usize ,
145- requests : Arc < Mutex < VecDeque < Box < dyn FnOnce ( Acquired ) + Send > > > > ,
147+ requests : VecDeque < Box < dyn FnOnce ( Acquired ) + Send > > ,
148+ }
149+
150+ impl TokenRequests {
151+ fn new ( ) -> Self {
152+ Self { tokens : 1 , requests : VecDeque :: new ( ) }
153+ }
154+
155+ fn push_request ( & mut self , request : impl FnOnce ( Acquired ) + Send + ' static ) {
156+ self . requests . push_back ( Box :: new ( request) ) ;
157+ }
146158}
147159
148160impl Helper {
149161 fn new ( ) -> Self {
150- let requests: Arc < Mutex < VecDeque < Box < dyn FnOnce ( Acquired ) + Send > > > > =
151- Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ;
152- let requests2 = requests. clone ( ) ;
162+ let requests = Arc :: new ( Mutex :: new ( TokenRequests :: new ( ) ) ) ;
163+ let helper_thread_requests = requests. clone ( ) ;
153164 let helper = GLOBAL_CLIENT
154165 . clone ( )
155166 . into_helper_thread ( move |token| {
156167 log:: trace!( "Helper thread token sending into channel" ) ;
157- if let Some ( sender) = requests2. lock ( ) . unwrap ( ) . pop_front ( ) {
168+ let mut helper_thread_requests = helper_thread_requests. lock ( ) . unwrap ( ) ;
169+ let sender = helper_thread_requests. requests . pop_front ( ) ;
170+ if let Some ( sender) = sender {
158171 // We've acquired a token, but we need to not use it as we have our own
159172 // custom release-on-drop struct since we'll want different logic than
160173 // just normally releasing the token in this case.
@@ -163,49 +176,47 @@ impl Helper {
163176 // was in the pipe (i.e., we just write back the same byte all the time)
164177 // but that's not expected to be a problem.
165178 token. expect ( "acquire token" ) . drop_without_releasing ( ) ;
166- sender ( Acquired :: new ( ) ) ;
179+ sender ( Acquired :: new ( helper_thread_requests ) ) ;
167180 }
168181
169182 // If we didn't manage to send the token off, just drop it on
170183 // the ground; it'll get released automatically.
171184 } )
172185 . expect ( "spawned helper" ) ;
173- Helper { helper, tokens : 1 , requests }
186+ Helper { helper, token_requests : requests }
174187 }
175188
176- // This blocks on acquiring a token ( that must have been previously
177- // requested) .
178- fn acquire_token_from_prior_request ( & mut self ) -> Acquired {
179- if self . tokens == 0 {
180- self . tokens += 1 ;
181- return Acquired :: new ( ) ;
189+ // This blocks on acquiring a token that was requested from the
190+ // HelperThread, i.e., through `Helper::request_token` .
191+ fn acquire_token_from_prior_request ( & self ) -> Acquired {
192+ let mut token_requests = self . token_requests . lock ( ) . unwrap ( ) ;
193+ if token_requests . tokens == 0 {
194+ return Acquired :: new ( token_requests ) ;
182195 }
183196
184197 let receiver = Arc :: new ( ( Mutex :: new ( None ) , Condvar :: new ( ) ) ) ;
185198 let receiver2 = receiver. clone ( ) ;
186-
187- self . requests . lock ( ) . unwrap ( ) . push_back ( Box :: new ( move |token| {
199+ token_requests. push_request ( move |token| {
188200 let mut slot = receiver. 0 . lock ( ) . unwrap ( ) ;
189201 * slot = Some ( token) ;
190202 receiver. 1 . notify_one ( ) ;
191- } ) ) ;
203+ } ) ;
192204
193- let ( lock , cvar ) = & * receiver2 ;
194- let mut guard = cvar . wait_while ( lock . lock ( ) . unwrap ( ) , |slot| slot . is_none ( ) ) . unwrap ( ) ;
205+ // Release tokens guard after registering our callback
206+ mem :: drop ( token_requests ) ;
195207
196- self . tokens += 1 ;
208+ let ( lock, cvar) = & * receiver2;
209+ let mut guard = cvar. wait_while ( lock. lock ( ) . unwrap ( ) , |s| s. is_none ( ) ) . unwrap ( ) ;
197210 guard. take ( ) . unwrap ( )
198211 }
199212
200- fn release_token ( & mut self ) {
201- let mut requests = self . requests . lock ( ) . unwrap ( ) ;
202-
203- self . tokens -= 1 ;
204-
205- if self . tokens == 0 {
213+ fn release_token ( & self ) {
214+ let mut token_requests = self . token_requests . lock ( ) . unwrap ( ) ;
215+ token_requests. tokens -= 1 ;
216+ if token_requests. tokens == 0 {
206217 // If there is a sender, then it needs to be given this token.
207- if let Some ( sender) = requests. pop_front ( ) {
208- sender ( Acquired :: new ( ) ) ;
218+ if let Some ( sender) = token_requests . requests . pop_front ( ) {
219+ sender ( Acquired :: new ( token_requests ) ) ;
209220 return ;
210221 }
211222
@@ -219,7 +230,7 @@ impl Helper {
219230 }
220231 }
221232
222- pub fn request_token ( & self ) {
233+ fn request_token ( & self ) {
223234 log:: trace!( "{:?} requesting token" , std:: thread:: current( ) . id( ) ) ;
224235 // Just notify, don't actually acquire here.
225236 notify_acquiring_token ( ) ;
@@ -241,7 +252,9 @@ impl Drop for Acquired {
241252}
242253
243254impl Acquired {
244- fn new ( ) -> Self {
255+ fn new ( mut requests : MutexGuard < ' _ , TokenRequests > ) -> Self {
256+ // When we create a token, bump up the acquired token counter
257+ requests. tokens += 1 ;
245258 Self { armed : true }
246259 }
247260
@@ -267,21 +280,19 @@ fn notify_acquiring_token() {
267280 }
268281}
269282
283+ /// This does not block the current thread, but schedules the passed callback to
284+ /// be called at some point in the future when a token is acquired.
270285pub fn request_token ( f : impl FnOnce ( Acquired ) + Send + ' static ) {
271- HELPER . lock ( ) . unwrap ( ) . requests . lock ( ) . unwrap ( ) . push_back ( Box :: new ( move |token| {
272- f ( token) ;
273- } ) ) ;
274- }
275-
276- pub fn acquire_from_request ( ) -> Acquired {
277- HELPER . lock ( ) . unwrap ( ) . acquire_token_from_prior_request ( )
286+ HELPER . token_requests . lock ( ) . unwrap ( ) . push_request ( f) ;
287+ HELPER . request_token ( ) ;
278288}
279289
290+ /// This blocks the current thread until a token is acquired.
280291pub fn acquire_thread ( ) {
281- HELPER . lock ( ) . unwrap ( ) . request_token ( ) ;
282- HELPER . lock ( ) . unwrap ( ) . acquire_token_from_prior_request ( ) . disarm ( ) ;
292+ HELPER . request_token ( ) ;
293+ HELPER . acquire_token_from_prior_request ( ) . disarm ( ) ;
283294}
284295
285296pub fn release_thread ( ) {
286- HELPER . lock ( ) . unwrap ( ) . release_token ( ) ;
297+ HELPER . release_token ( ) ;
287298}
0 commit comments