3434) ]
3535
3636use std:: fmt;
37+ use std:: future:: Future ;
3738use std:: marker:: PhantomData ;
3839use std:: panic:: { RefUnwindSafe , UnwindSafe } ;
3940use std:: rc:: Rc ;
4041use std:: sync:: atomic:: { AtomicBool , Ordering } ;
41- use std:: sync:: { Arc , Mutex , TryLockError } ;
42+ use std:: sync:: { Arc , Mutex , RwLock , TryLockError } ;
4243use std:: task:: { Poll , Waker } ;
4344
4445use async_lock:: OnceCell ;
4546use async_task:: { Builder , Runnable } ;
46- use atomic_waker:: AtomicWaker ;
4747use concurrent_queue:: ConcurrentQueue ;
4848use futures_lite:: { future, prelude:: * } ;
4949use slab:: Slab ;
50- use thread_local:: ThreadLocal ;
5150
5251#[ doc( no_inline) ]
5352pub use async_task:: Task ;
@@ -267,23 +266,8 @@ impl<'a> Executor<'a> {
267266 fn schedule ( & self ) -> impl Fn ( Runnable ) + Send + Sync + ' static {
268267 let state = self . state ( ) . clone ( ) ;
269268
270- move |mut runnable| {
271- // If possible, push into the current local queue and notify the ticker.
272- if let Some ( local) = state. local_queue . get ( ) {
273- runnable = if let Err ( err) = local. queue . push ( runnable) {
274- err. into_inner ( )
275- } else {
276- // Wake up this thread if it's asleep, otherwise notify another
277- // thread to try to have the task stolen.
278- if let Some ( waker) = local. waker . take ( ) {
279- waker. wake ( ) ;
280- } else {
281- state. notify ( ) ;
282- }
283- return ;
284- }
285- }
286- // If the local queue is full, fallback to pushing onto the global injector queue.
269+ // TODO: If possible, push into the current local queue and notify the ticker.
270+ move |runnable| {
287271 state. queue . push ( runnable) . unwrap ( ) ;
288272 state. notify ( ) ;
289273 }
@@ -526,16 +510,7 @@ struct State {
526510 queue : ConcurrentQueue < Runnable > ,
527511
528512 /// Local queues created by runners.
529- ///
530- /// If possible, tasks are scheduled onto the local queue, and will only defer
531- /// to other global queue when they're full, or the task is being scheduled from
532- /// a thread without a runner.
533- ///
534- /// Note: if a runner terminates and drains its local queue, any subsequent
535- /// spawn calls from the same thread will be added to the same queue, but won't
536- /// be executed until `Executor::run` is run on the thread again, or another
537- /// thread steals the task.
538- local_queue : ThreadLocal < LocalQueue > ,
513+ local_queues : RwLock < Vec < Arc < ConcurrentQueue < Runnable > > > > ,
539514
540515 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
541516 notified : AtomicBool ,
@@ -552,7 +527,7 @@ impl State {
552527 fn new ( ) -> State {
553528 State {
554529 queue : ConcurrentQueue :: unbounded ( ) ,
555- local_queue : ThreadLocal :: new ( ) ,
530+ local_queues : RwLock :: new ( Vec :: new ( ) ) ,
556531 notified : AtomicBool :: new ( true ) ,
557532 sleepers : Mutex :: new ( Sleepers {
558533 count : 0 ,
@@ -679,12 +654,6 @@ impl Ticker<'_> {
679654 ///
680655 /// Returns `false` if the ticker was already sleeping and unnotified.
681656 fn sleep ( & mut self , waker : & Waker ) -> bool {
682- self . state
683- . local_queue
684- . get_or_default ( )
685- . waker
686- . register ( waker) ;
687-
688657 let mut sleepers = self . state . sleepers . lock ( ) . unwrap ( ) ;
689658
690659 match self . sleeping {
@@ -723,14 +692,7 @@ impl Ticker<'_> {
723692
724693 /// Waits for the next runnable task to run.
725694 async fn runnable ( & mut self ) -> Runnable {
726- self . runnable_with ( || {
727- self . state
728- . local_queue
729- . get ( )
730- . and_then ( |local| local. queue . pop ( ) . ok ( ) )
731- . or_else ( || self . state . queue . pop ( ) . ok ( ) )
732- } )
733- . await
695+ self . runnable_with ( || self . state . queue . pop ( ) . ok ( ) ) . await
734696 }
735697
736698 /// Waits for the next runnable task to run, given a function that searches for a task.
@@ -792,6 +754,9 @@ struct Runner<'a> {
792754 /// Inner ticker.
793755 ticker : Ticker < ' a > ,
794756
757+ /// The local queue.
758+ local : Arc < ConcurrentQueue < Runnable > > ,
759+
795760 /// Bumped every time a runnable task is found.
796761 ticks : usize ,
797762}
@@ -802,34 +767,38 @@ impl Runner<'_> {
802767 let runner = Runner {
803768 state,
804769 ticker : Ticker :: new ( state) ,
770+ local : Arc :: new ( ConcurrentQueue :: bounded ( 512 ) ) ,
805771 ticks : 0 ,
806772 } ;
773+ state
774+ . local_queues
775+ . write ( )
776+ . unwrap ( )
777+ . push ( runner. local . clone ( ) ) ;
807778 runner
808779 }
809780
810781 /// Waits for the next runnable task to run.
811782 async fn runnable ( & mut self , rng : & mut fastrand:: Rng ) -> Runnable {
812- let local = self . state . local_queue . get_or_default ( ) ;
813-
814783 let runnable = self
815784 . ticker
816785 . runnable_with ( || {
817786 // Try the local queue.
818- if let Ok ( r) = local . queue . pop ( ) {
787+ if let Ok ( r) = self . local . pop ( ) {
819788 return Some ( r) ;
820789 }
821790
822791 // Try stealing from the global queue.
823792 if let Ok ( r) = self . state . queue . pop ( ) {
824- steal ( & self . state . queue , & local . queue ) ;
793+ steal ( & self . state . queue , & self . local ) ;
825794 return Some ( r) ;
826795 }
827796
828797 // Try stealing from other runners.
829- let local_queues = & self . state . local_queue ;
798+ let local_queues = self . state . local_queues . read ( ) . unwrap ( ) ;
830799
831800 // Pick a random starting point in the iterator list and rotate the list.
832- let n = local_queues. iter ( ) . count ( ) ;
801+ let n = local_queues. len ( ) ;
833802 let start = rng. usize ( ..n) ;
834803 let iter = local_queues
835804 . iter ( )
@@ -838,12 +807,12 @@ impl Runner<'_> {
838807 . take ( n) ;
839808
840809 // Remove this runner's local queue.
841- let iter = iter. filter ( |other | !core :: ptr :: eq ( * other , local) ) ;
810+ let iter = iter. filter ( |local | !Arc :: ptr_eq ( local , & self . local ) ) ;
842811
843812 // Try stealing from each local queue in the list.
844- for other in iter {
845- steal ( & other . queue , & local . queue ) ;
846- if let Ok ( r) = local . queue . pop ( ) {
813+ for local in iter {
814+ steal ( local , & self . local ) ;
815+ if let Ok ( r) = self . local . pop ( ) {
847816 return Some ( r) ;
848817 }
849818 }
@@ -857,7 +826,7 @@ impl Runner<'_> {
857826
858827 if self . ticks % 64 == 0 {
859828 // Steal tasks from the global queue to ensure fair task scheduling.
860- steal ( & self . state . queue , & local . queue ) ;
829+ steal ( & self . state . queue , & self . local ) ;
861830 }
862831
863832 runnable
@@ -867,13 +836,15 @@ impl Runner<'_> {
867836impl Drop for Runner < ' _ > {
868837 fn drop ( & mut self ) {
869838 // Remove the local queue.
870- if let Some ( local) = self . state . local_queue . get ( ) {
871- // Re-schedule remaining tasks in the local queue.
872- for r in local. queue . try_iter ( ) {
873- // Explicitly reschedule the runnable back onto the global
874- // queue to avoid rescheduling onto the local one.
875- self . state . queue . push ( r) . unwrap ( ) ;
876- }
839+ self . state
840+ . local_queues
841+ . write ( )
842+ . unwrap ( )
843+ . retain ( |local| !Arc :: ptr_eq ( local, & self . local ) ) ;
844+
845+ // Re-schedule remaining tasks in the local queue.
846+ while let Ok ( r) = self . local . pop ( ) {
847+ r. schedule ( ) ;
877848 }
878849 }
879850}
@@ -933,13 +904,18 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
933904 }
934905
935906 /// Debug wrapper for the local runners.
936- struct LocalRunners < ' a > ( & ' a ThreadLocal < LocalQueue > ) ;
907+ struct LocalRunners < ' a > ( & ' a RwLock < Vec < Arc < ConcurrentQueue < Runnable > > > > ) ;
937908
938909 impl fmt:: Debug for LocalRunners < ' _ > {
939910 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
940- f. debug_list ( )
941- . entries ( self . 0 . iter ( ) . map ( |local| local. queue . len ( ) ) )
942- . finish ( )
911+ match self . 0 . try_read ( ) {
912+ Ok ( lock) => f
913+ . debug_list ( )
914+ . entries ( lock. iter ( ) . map ( |queue| queue. len ( ) ) )
915+ . finish ( ) ,
916+ Err ( TryLockError :: WouldBlock ) => f. write_str ( "<locked>" ) ,
917+ Err ( TryLockError :: Poisoned ( _) ) => f. write_str ( "<poisoned>" ) ,
918+ }
943919 }
944920 }
945921
@@ -959,32 +935,11 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
959935 f. debug_struct ( name)
960936 . field ( "active" , & ActiveTasks ( & state. active ) )
961937 . field ( "global_tasks" , & state. queue . len ( ) )
962- . field ( "local_runners" , & LocalRunners ( & state. local_queue ) )
938+ . field ( "local_runners" , & LocalRunners ( & state. local_queues ) )
963939 . field ( "sleepers" , & SleepCount ( & state. sleepers ) )
964940 . finish ( )
965941}
966942
967- /// A queue local to each thread.
968- ///
969- /// It's Default implementation is used for initializing each
970- /// thread's queue via `ThreadLocal::get_or_default`.
971- ///
972- /// The local queue *must* be flushed, and all pending runnables
973- /// rescheduled onto the global queue when a runner is dropped.
974- struct LocalQueue {
975- queue : ConcurrentQueue < Runnable > ,
976- waker : AtomicWaker ,
977- }
978-
979- impl Default for LocalQueue {
980- fn default ( ) -> Self {
981- Self {
982- queue : ConcurrentQueue :: bounded ( 512 ) ,
983- waker : AtomicWaker :: new ( ) ,
984- }
985- }
986- }
987-
988943/// Runs a closure when dropped.
989944struct CallOnDrop < F : FnMut ( ) > ( F ) ;
990945
0 commit comments