44use crate :: latch:: CoreLatch ;
55use crate :: log:: Event :: * ;
66use crate :: log:: Logger ;
7+ use crate :: DeadlockHandler ;
78use crossbeam_utils:: CachePadded ;
89use std:: sync:: atomic:: Ordering ;
910use std:: sync:: { Condvar , Mutex } ;
@@ -14,6 +15,29 @@ mod counters;
1415pub ( crate ) use self :: counters:: THREADS_MAX ;
1516use self :: counters:: { AtomicCounters , JobsEventCounter } ;
1617
18+ struct SleepData {
19+ /// The number of threads in the thread pool.
20+ worker_count : usize ,
21+
22+ /// The number of threads in the thread pool which are running and
23+ /// aren't blocked in user code or sleeping.
24+ active_threads : usize ,
25+
26+ /// The number of threads which are blocked in user code.
27+ /// This doesn't include threads blocked by this module.
28+ blocked_threads : usize ,
29+ }
30+
31+ impl SleepData {
32+ /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33+ #[ inline]
34+ pub fn deadlock_check ( & self , deadlock_handler : & Option < Box < DeadlockHandler > > ) {
35+ if self . active_threads == 0 && self . blocked_threads > 0 {
36+ ( deadlock_handler. as_ref ( ) . unwrap ( ) ) ( ) ;
37+ }
38+ }
39+ }
40+
1741/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
1842/// of workers. It has callbacks that are invoked periodically at significant events,
1943/// such as when workers are looping and looking for work, when latches are set, or when
@@ -29,6 +53,8 @@ pub(super) struct Sleep {
2953 worker_sleep_states : Vec < CachePadded < WorkerSleepState > > ,
3054
3155 counters : AtomicCounters ,
56+
57+ data : Mutex < SleepData > ,
3258}
3359
3460/// An instance of this struct is created when a thread becomes idle.
@@ -68,9 +94,38 @@ impl Sleep {
6894 logger,
6995 worker_sleep_states : ( 0 ..n_threads) . map ( |_| Default :: default ( ) ) . collect ( ) ,
7096 counters : AtomicCounters :: new ( ) ,
97+ data : Mutex :: new ( SleepData {
98+ worker_count : n_threads,
99+ active_threads : n_threads,
100+ blocked_threads : 0 ,
101+ } ) ,
71102 }
72103 }
73104
105+ /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
106+ /// if no other worker thread is active
107+ #[ inline]
108+ pub fn mark_blocked ( & self , deadlock_handler : & Option < Box < DeadlockHandler > > ) {
109+ let mut data = self . data . lock ( ) . unwrap ( ) ;
110+ debug_assert ! ( data. active_threads > 0 ) ;
111+ debug_assert ! ( data. blocked_threads < data. worker_count) ;
112+ debug_assert ! ( data. active_threads > 0 ) ;
113+ data. active_threads -= 1 ;
114+ data. blocked_threads += 1 ;
115+
116+ data. deadlock_check ( deadlock_handler) ;
117+ }
118+
119+ /// Mark a previously blocked Rayon worker thread as unblocked
120+ #[ inline]
121+ pub fn mark_unblocked ( & self ) {
122+ let mut data = self . data . lock ( ) . unwrap ( ) ;
123+ debug_assert ! ( data. active_threads < data. worker_count) ;
124+ debug_assert ! ( data. blocked_threads > 0 ) ;
125+ data. active_threads += 1 ;
126+ data. blocked_threads -= 1 ;
127+ }
128+
74129 #[ inline]
75130 pub ( super ) fn start_looking ( & self , worker_index : usize , latch : & CoreLatch ) -> IdleState {
76131 self . logger . log ( || ThreadIdle {
@@ -106,6 +161,7 @@ impl Sleep {
106161 idle_state : & mut IdleState ,
107162 latch : & CoreLatch ,
108163 has_injected_jobs : impl FnOnce ( ) -> bool ,
164+ deadlock_handler : & Option < Box < DeadlockHandler > > ,
109165 ) {
110166 if idle_state. rounds < ROUNDS_UNTIL_SLEEPY {
111167 thread:: yield_now ( ) ;
@@ -119,7 +175,7 @@ impl Sleep {
119175 thread:: yield_now ( ) ;
120176 } else {
121177 debug_assert_eq ! ( idle_state. rounds, ROUNDS_UNTIL_SLEEPING ) ;
122- self . sleep ( idle_state, latch, has_injected_jobs) ;
178+ self . sleep ( idle_state, latch, has_injected_jobs, deadlock_handler ) ;
123179 }
124180 }
125181
@@ -142,6 +198,7 @@ impl Sleep {
142198 idle_state : & mut IdleState ,
143199 latch : & CoreLatch ,
144200 has_injected_jobs : impl FnOnce ( ) -> bool ,
201+ deadlock_handler : & Option < Box < DeadlockHandler > > ,
145202 ) {
146203 let worker_index = idle_state. worker_index ;
147204
@@ -215,6 +272,13 @@ impl Sleep {
215272 // the one that wakes us.)
216273 self . counters . sub_sleeping_thread ( ) ;
217274 } else {
275+ {
276+ // Decrement the number of active threads and check for a deadlock
277+ let mut data = self . data . lock ( ) . unwrap ( ) ;
278+ data. active_threads -= 1 ;
279+ data. deadlock_check ( deadlock_handler) ;
280+ }
281+
218282 // If we don't see an injected job (the normal case), then flag
219283 // ourselves as asleep and wait till we are notified.
220284 //
@@ -372,6 +436,9 @@ impl Sleep {
372436 // do.
373437 self . counters . sub_sleeping_thread ( ) ;
374438
439+ // Increment the number of active threads
440+ self . data . lock ( ) . unwrap ( ) . active_threads += 1 ;
441+
375442 self . logger . log ( || ThreadNotify { worker : index } ) ;
376443
377444 true
0 commit comments