@@ -181,6 +181,7 @@ struct Node {
181181 write : bool ,
182182 thread : OnceCell < Thread > ,
183183 completed : AtomicBool ,
184+ is_reading : AtomicBool ,
184185}
185186
186187impl Node {
@@ -193,6 +194,7 @@ impl Node {
193194 write,
194195 thread : OnceCell :: new ( ) ,
195196 completed : AtomicBool :: new ( false ) ,
197+ is_reading : AtomicBool :: new ( false ) ,
196198 }
197199 }
198200
@@ -317,24 +319,20 @@ impl RwLock {
317319 let mut count = 0 ;
318320 let mut has_slept = false ;
319321 loop {
320- if let Some ( next) = update ( state) {
322+ if node. is_reading . load ( Acquire ) {
323+ // This node was awaken by a call to `downgrade`, which means we are already in read
324+ // mode and we can return.
325+ debug_assert ! (
326+ has_slept,
327+ "Somehow `is_reading` is `true` before the node has gone to sleep"
328+ ) ;
329+ return ;
330+ } else if let Some ( next) = update ( state) {
321331 // The lock is available, try locking it.
322332 match self . state . compare_exchange_weak ( state, next, Acquire , Relaxed ) {
323333 Ok ( _) => return ,
324334 Err ( new) => state = new,
325335 }
326- } else if !write && has_slept {
327- // If we are trying to read and we have already gone to sleep, first check if the
328- // lock is in read mode before going to sleep again.
329- let tail = unsafe { add_backlinks_and_find_tail ( to_node ( state) ) . as_ref ( ) } ;
330- let _ = tail. next . 0 . fetch_update ( Release , Acquire , |count : * mut Node | {
331- if count. mask ( MASK ) . addr ( ) > 0 {
332- Some ( without_provenance_mut ( state. addr ( ) . checked_add ( SINGLE ) ? | LOCKED ) )
333- } else {
334- None
335- }
336- } ) ;
337- todo ! ( "This is very wrong" ) ;
338336 } else if state. addr ( ) & QUEUED == 0 && count < SPIN_COUNT {
339337 // If the lock is not available and no threads are queued, spin
340338 // for a while, using exponential backoff to decrease cache
@@ -468,56 +466,82 @@ impl RwLock {
468466 pub unsafe fn downgrade ( & self ) {
469467 // Atomically attempt to go from a single writer without any waiting threads to a single
470468 // reader without any waiting threads.
471- if let Err ( mut state ) = self . state . compare_exchange (
469+ match self . state . compare_exchange (
472470 without_provenance_mut ( LOCKED ) ,
473471 without_provenance_mut ( LOCKED | SINGLE ) ,
474472 Release ,
475473 Relaxed ,
476474 ) {
477- debug_assert ! (
475+ Ok ( _) => return ,
476+ Err ( state) => debug_assert ! (
478477 state. mask( LOCKED ) . addr( ) != 0 && state. mask( QUEUED ) . addr( ) != 0 ,
479478 "RwLock should be LOCKED and QUEUED"
480- ) ;
481- // 1. Attempt to grab the queue lock
482- // 2. Find the tail of the queue
483- // 3. While the current tail is not a writer:
484- // 4. The current tail must be a reader
485- // 5. Remove the current tail from the queue and update the tail to be the previous
486- // node if possible
487- // 6. Set the flag (somewhere on the node) to notify the reader thread that it has
488- // been woken up by a `downgrade` call
489- // 7. `complete` the node
490- // 8. Go back to step 3 with the updated tail if it exists, otherwise break
491- // 9. Once we find a writer or there are no more `prev` links, we write the correct
492- // number of readers into the current node state or the head state.
493-
494- // Attempt to grab the queue lock.
495- state = loop {
496- match self . state . fetch_update ( Release , Acquire , |ptr : State | {
497- // Go from not queue locked to being queue locked.
498- if ptr. mask ( QUEUE_LOCKED ) . addr ( ) != 0 {
499- Some ( without_provenance_mut ( ptr. addr ( ) | QUEUE_LOCKED ) )
500- } else {
501- None
502- }
503- } ) {
504- Ok ( state) => break state,
505- Err ( _) => { }
479+ ) ,
480+ }
481+
482+ // Attempt to grab the queue lock.
483+ let state = loop {
484+ match self . state . fetch_update ( Release , Acquire , |ptr : State | {
485+ // Go from not queue locked to being queue locked.
486+ if ptr. mask ( QUEUE_LOCKED ) . addr ( ) == 0 {
487+ Some ( without_provenance_mut ( ptr. addr ( ) | QUEUE_LOCKED ) )
488+ } else {
489+ None
506490 }
507- } ;
491+ } ) {
492+ Ok ( state) => break state,
493+ Err ( _) => { }
494+ }
495+ } ;
496+
497+ // SAFETY: By Invariant 2 we know that this tail is valid.
498+ let mut tail_ptr = unsafe { add_backlinks_and_find_tail ( to_node ( state) ) } ;
499+ let mut prev_ptr;
500+
501+ // We start with 1 reader, which is the current thread.
502+ let mut readers = 1 ;
508503
509- // SAFETY: FIXME
510- let _head = unsafe { to_node ( state) . as_ref ( ) } ;
504+ // Wake up all readers at the top of the queue.
505+ loop {
506+ // SAFETY: We have the queue lock so nobody else can be modifying the queue or the tail.
507+ let tail = unsafe { tail_ptr. as_mut ( ) } ;
508+
509+ // If the current `tail` is a writer thread, then we have no readers left to wake.
510+ // If there is no valid `prev` backlink, then we don't want to remove this node either
511+ // because we will have no `tail` that we can replace this node with.
512+ if tail. write || tail. prev . get ( ) . is_none ( ) {
513+ // Store the reader count in the `next` field.
514+ tail. next . 0 = AtomicPtr :: new ( without_provenance_mut ( readers) ) ;
515+ break ;
516+ }
517+
518+ prev_ptr = tail. prev . get ( ) . expect ( "Cannot be a `None` variant by the above" ) ;
519+
520+ // SAFETY: By Invariant 2, we know that all `prev` backlinks are valid, and since we
521+ // have the queue lock, nobody can be modifying the queue or this node.
522+ let prev = unsafe { prev_ptr. as_mut ( ) } ;
511523
512- // FIXME Is this correct?
513- // SAFETY: Since we have the write lock, nobody else can be modifying state, and since
514- // we got `state` from the `compare_exchange`, we know it is a valid head of the queue.
515- let tail = unsafe { add_backlinks_and_find_tail ( to_node ( state ) ) . as_ref ( ) } ;
524+ // Notify the reader thread that they are now in read mode.
525+ tail . is_reading . store ( true , Release ) ;
526+ readers += 1 ;
527+ prev . next . 0 = AtomicPtr :: new ( without_provenance_mut ( readers ) ) ;
516528
517- while !tail. write {
518- todo ! ( )
529+ // SAFETY: Because we observed that there were waiting threads from the first CAS, we
530+ // know that the state must have been a valid head to the wait queue as nobody can be
531+ // modifying the node itself.
532+ let head = unsafe { to_node ( self . state . load ( Acquire ) ) . as_ref ( ) } ;
533+
534+ // Split off the last node and update the `tail` field of `state`.
535+ head. tail . set ( Some ( prev_ptr) ) ;
536+ tail_ptr = prev_ptr;
537+
538+ unsafe {
539+ Node :: complete ( tail_ptr) ;
519540 }
520541 }
542+
543+ // Once we are done updating the queue, release the queue lock.
544+ self . state . fetch_byte_sub ( QUEUE_LOCKED , Release ) ;
521545 }
522546
523547 /// # Safety
0 commit comments