1- use futures_core:: future:: { FusedFuture , Future } ;
2- use futures_core:: task:: { Context , Poll , Waker } ;
3- use slab:: Slab ;
41use std:: cell:: UnsafeCell ;
52use std:: marker:: PhantomData ;
63use std:: ops:: { Deref , DerefMut } ;
74use std:: pin:: Pin ;
85use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
9- use std:: sync:: Mutex as StdMutex ;
6+ use std:: sync:: { Arc , Mutex as StdMutex } ;
107use std:: { fmt, mem} ;
118
9+ use slab:: Slab ;
10+
11+ use futures_core:: future:: { FusedFuture , Future } ;
12+ use futures_core:: task:: { Context , Poll , Waker } ;
13+
1214/// A futures-aware mutex.
1315///
1416/// # Fairness
@@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
107109 }
108110 }
109111
112+ /// Attempt to acquire the lock immediately.
113+ ///
114+ /// If the lock is currently held, this will return `None`.
115+ pub fn try_lock_owned ( self : & Arc < Self > ) -> Option < OwnedMutexGuard < T > > {
116+ let old_state = self . state . fetch_or ( IS_LOCKED , Ordering :: Acquire ) ;
117+ if ( old_state & IS_LOCKED ) == 0 {
118+ Some ( OwnedMutexGuard { mutex : self . clone ( ) } )
119+ } else {
120+ None
121+ }
122+ }
123+
110124 /// Acquire the lock asynchronously.
111125 ///
112126 /// This method returns a future that will resolve once the lock has been
@@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
115129 MutexLockFuture { mutex : Some ( self ) , wait_key : WAIT_KEY_NONE }
116130 }
117131
132+ /// Acquire the lock asynchronously.
133+ ///
134+ /// This method returns a future that will resolve once the lock has been
135+ /// successfully acquired.
136+ pub fn lock_owned ( self : Arc < Self > ) -> OwnedMutexLockFuture < T > {
137+ OwnedMutexLockFuture { mutex : Some ( self ) , wait_key : WAIT_KEY_NONE }
138+ }
139+
118140 /// Returns a mutable reference to the underlying data.
119141 ///
120142 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
@@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
173195}
174196
175197// Sentinel for when no slot in the `Slab` has been dedicated to this object.
176- const WAIT_KEY_NONE : usize = usize:: max_value ( ) ;
198+ const WAIT_KEY_NONE : usize = usize:: MAX ;
199+
200+ /// A future which resolves when the target mutex has been successfully acquired, owned version.
201+ pub struct OwnedMutexLockFuture < T : ?Sized > {
202+ // `None` indicates that the mutex was successfully acquired.
203+ mutex : Option < Arc < Mutex < T > > > ,
204+ wait_key : usize ,
205+ }
206+
207+ impl < T : ?Sized > fmt:: Debug for OwnedMutexLockFuture < T > {
208+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
209+ f. debug_struct ( "OwnedMutexLockFuture" )
210+ . field ( "was_acquired" , & self . mutex . is_none ( ) )
211+ . field ( "mutex" , & self . mutex )
212+ . field (
213+ "wait_key" ,
214+ & ( if self . wait_key == WAIT_KEY_NONE { None } else { Some ( self . wait_key ) } ) ,
215+ )
216+ . finish ( )
217+ }
218+ }
219+
220+ impl < T : ?Sized > FusedFuture for OwnedMutexLockFuture < T > {
221+ fn is_terminated ( & self ) -> bool {
222+ self . mutex . is_none ( )
223+ }
224+ }
225+
226+ impl < T : ?Sized > Future for OwnedMutexLockFuture < T > {
227+ type Output = OwnedMutexGuard < T > ;
228+
229+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
230+ let this = self . get_mut ( ) ;
231+
232+ let mutex = this. mutex . as_ref ( ) . expect ( "polled OwnedMutexLockFuture after completion" ) ;
233+
234+ if let Some ( lock) = mutex. try_lock_owned ( ) {
235+ mutex. remove_waker ( this. wait_key , false ) ;
236+ this. mutex = None ;
237+ return Poll :: Ready ( lock) ;
238+ }
239+
240+ {
241+ let mut waiters = mutex. waiters . lock ( ) . unwrap ( ) ;
242+ if this. wait_key == WAIT_KEY_NONE {
243+ this. wait_key = waiters. insert ( Waiter :: Waiting ( cx. waker ( ) . clone ( ) ) ) ;
244+ if waiters. len ( ) == 1 {
245+ mutex. state . fetch_or ( HAS_WAITERS , Ordering :: Relaxed ) ; // released by mutex unlock
246+ }
247+ } else {
248+ waiters[ this. wait_key ] . register ( cx. waker ( ) ) ;
249+ }
250+ }
251+
252+ // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253+ // attempting to acquire the lock again.
254+ if let Some ( lock) = mutex. try_lock_owned ( ) {
255+ mutex. remove_waker ( this. wait_key , false ) ;
256+ this. mutex = None ;
257+ return Poll :: Ready ( lock) ;
258+ }
259+
260+ Poll :: Pending
261+ }
262+ }
263+
264+ impl < T : ?Sized > Drop for OwnedMutexLockFuture < T > {
265+ fn drop ( & mut self ) {
266+ if let Some ( mutex) = self . mutex . as_ref ( ) {
267+ // This future was dropped before it acquired the mutex.
268+ //
269+ // Remove ourselves from the map, waking up another waiter if we
270+ // had been awoken to acquire the lock.
271+ mutex. remove_waker ( self . wait_key , true ) ;
272+ }
273+ }
274+ }
275+
276+ /// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277+ /// When this structure is dropped (falls out of scope), the lock will be
278+ /// unlocked.
279+ pub struct OwnedMutexGuard < T : ?Sized > {
280+ mutex : Arc < Mutex < T > > ,
281+ }
282+
283+ impl < T : ?Sized + fmt:: Debug > fmt:: Debug for OwnedMutexGuard < T > {
284+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
285+ f. debug_struct ( "OwnedMutexGuard" )
286+ . field ( "value" , & & * * self )
287+ . field ( "mutex" , & self . mutex )
288+ . finish ( )
289+ }
290+ }
291+
292+ impl < T : ?Sized > Drop for OwnedMutexGuard < T > {
293+ fn drop ( & mut self ) {
294+ self . mutex . unlock ( )
295+ }
296+ }
297+
298+ impl < T : ?Sized > Deref for OwnedMutexGuard < T > {
299+ type Target = T ;
300+ fn deref ( & self ) -> & T {
301+ unsafe { & * self . mutex . value . get ( ) }
302+ }
303+ }
304+
305+ impl < T : ?Sized > DerefMut for OwnedMutexGuard < T > {
306+ fn deref_mut ( & mut self ) -> & mut T {
307+ unsafe { & mut * self . mutex . value . get ( ) }
308+ }
309+ }
177310
178311/// A future which resolves when the target mutex has been successfully acquired.
179312pub struct MutexLockFuture < ' a , T : ?Sized > {
@@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
386519// It's safe to switch which thread the acquire is being attempted on so long as
387520// `T` can be accessed on that thread.
388521unsafe impl < T : ?Sized + Send > Send for MutexLockFuture < ' _ , T > { }
522+
389523// doesn't have any interesting `&self` methods (only Debug)
390524unsafe impl < T : ?Sized > Sync for MutexLockFuture < ' _ , T > { }
391525
526+ // It's safe to switch which thread the acquire is being attempted on so long as
527+ // `T` can be accessed on that thread.
528+ unsafe impl < T : ?Sized + Send > Send for OwnedMutexLockFuture < T > { }
529+
530+ // doesn't have any interesting `&self` methods (only Debug)
531+ unsafe impl < T : ?Sized > Sync for OwnedMutexLockFuture < T > { }
532+
392533// Safe to send since we don't track any thread-specific details-- the inner
393534// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
394535unsafe impl < T : ?Sized + Send > Send for MutexGuard < ' _ , T > { }
395536unsafe impl < T : ?Sized + Sync > Sync for MutexGuard < ' _ , T > { }
537+
538+ unsafe impl < T : ?Sized + Send > Send for OwnedMutexGuard < T > { }
539+ unsafe impl < T : ?Sized + Sync > Sync for OwnedMutexGuard < T > { }
540+
396541unsafe impl < T : ?Sized + Send , U : ?Sized + Send > Send for MappedMutexGuard < ' _ , T , U > { }
397542unsafe impl < T : ?Sized + Sync , U : ?Sized + Sync > Sync for MappedMutexGuard < ' _ , T , U > { }
398543
0 commit comments