@@ -208,7 +208,14 @@ impl<T, S: Storage> QueueInner<T, S> {
208208
209209 /// Returns the item in the front of the queue, or `None` if the queue is empty.
210210 pub fn dequeue ( & self ) -> Option < T > {
211- unsafe { dequeue ( S :: as_ptr ( self . buffer . get ( ) ) , & self . dequeue_pos , self . mask ( ) ) }
211+ unsafe {
212+ dequeue (
213+ S :: as_ptr ( self . buffer . get ( ) ) ,
214+ & self . dequeue_pos ,
215+ & self . enqueue_pos ,
216+ self . mask ( ) ,
217+ )
218+ }
212219 }
213220
214221 /// Adds an `item` to the end of the queue.
@@ -218,6 +225,7 @@ impl<T, S: Storage> QueueInner<T, S> {
218225 unsafe {
219226 enqueue (
220227 S :: as_ptr ( self . buffer . get ( ) ) ,
228+ & self . dequeue_pos ,
221229 & self . enqueue_pos ,
222230 self . mask ( ) ,
223231 item,
@@ -255,18 +263,25 @@ impl<T> Cell<T> {
255263 }
256264}
257265
266+ const CONTENTION_RETRY_COUNT : usize = 10000 ;
267+
258268unsafe fn dequeue < T > (
259269 buffer : * mut Cell < T > ,
260270 dequeue_pos : & AtomicTargetSize ,
271+ enqueue_pos : & AtomicTargetSize ,
261272 mask : UintSize ,
262273) -> Option < T > {
263274 let mut pos = dequeue_pos. load ( Ordering :: Relaxed ) ;
264275
265276 let mut cell;
277+ let mut seq;
278+ let mut dif;
279+ let mut contention_retry_count = 0 ;
280+
266281 loop {
267282 cell = buffer. add ( usize:: from ( pos & mask) ) ;
268- let seq = ( * cell) . sequence . load ( Ordering :: Acquire ) ;
269- let dif = ( seq as IntSize ) . wrapping_sub ( ( pos. wrapping_add ( 1 ) ) as IntSize ) ;
283+ seq = ( * cell) . sequence . load ( Ordering :: Acquire ) ;
284+ dif = ( seq as IntSize ) . wrapping_sub ( ( pos. wrapping_add ( 1 ) ) as IntSize ) ;
270285
271286 match dif. cmp ( & 0 ) {
272287 core:: cmp:: Ordering :: Equal => {
@@ -283,6 +298,16 @@ unsafe fn dequeue<T>(
283298 }
284299 }
285300 core:: cmp:: Ordering :: Less => {
301+ if pos != enqueue_pos. load ( Ordering :: Relaxed )
302+ && contention_retry_count < CONTENTION_RETRY_COUNT
303+ {
304+ // In this case according to the positions the queue is not empty
305+ // This suggests that there is a enqueue operations that is in progress in some other task
306+ // Therefore we can wait a bit hoping that the other task can finish its `enqueue` operation complete
307+ core:: hint:: spin_loop ( ) ;
308+ contention_retry_count += 1 ;
309+ continue ;
310+ }
286311 return None ;
287312 }
288313 core:: cmp:: Ordering :: Greater => {
@@ -300,13 +325,15 @@ unsafe fn dequeue<T>(
300325
301326unsafe fn enqueue < T > (
302327 buffer : * mut Cell < T > ,
328+ dequeue_pos : & AtomicTargetSize ,
303329 enqueue_pos : & AtomicTargetSize ,
304330 mask : UintSize ,
305331 item : T ,
306332) -> Result < ( ) , T > {
307333 let mut pos = enqueue_pos. load ( Ordering :: Relaxed ) ;
308334
309335 let mut cell;
336+ let mut contention_retry_count = 0 ;
310337 loop {
311338 cell = buffer. add ( usize:: from ( pos & mask) ) ;
312339 let seq = ( * cell) . sequence . load ( Ordering :: Acquire ) ;
@@ -325,8 +352,19 @@ unsafe fn enqueue<T>(
325352 {
326353 break ;
327354 }
355+ pos = enqueue_pos. load ( Ordering :: Relaxed ) ;
328356 }
329357 core:: cmp:: Ordering :: Less => {
358+ if dequeue_pos. load ( Ordering :: Relaxed ) . wrapping_add ( mask + 1 ) != pos
359+ && contention_retry_count < CONTENTION_RETRY_COUNT
360+ {
361+ // In this case according to the positions the queue is not full
362+ // This suggests that there is a dequeue operation that is in progress in some other task
363+ // Therefore we can wait a bit hoping that the other task can finish its `dequeue` operation completes
364+ core:: hint:: spin_loop ( ) ;
365+ contention_retry_count += 1 ;
366+ continue ;
367+ }
330368 return Err ( item) ;
331369 }
332370 core:: cmp:: Ordering :: Greater => {
0 commit comments