@@ -10,7 +10,7 @@ use async_task::{Builder, Runnable};
1010use futures_lite:: { future, prelude:: * } ;
1111use slab:: Slab ;
1212
13- use crate :: AsyncCallOnDrop ;
13+ use crate :: { AsyncCallOnDrop , Sleepers } ;
1414#[ doc( no_inline) ]
1515pub use async_task:: Task ;
1616
@@ -101,9 +101,7 @@ impl<'a> LocalExecutor<'a> {
101101 /// ```
102102 pub fn spawn < T : ' a > ( & self , future : impl Future < Output = T > + ' a ) -> Task < T > {
103103 let mut active = self . state ( ) . active ( ) ;
104-
105- // SAFETY: `T` and the future are `Send`.
106- unsafe { self . spawn_inner ( future, & mut active) }
104+ self . spawn_inner ( future, & mut active)
107105 }
108106
109107 /// Spawns many tasks onto the executor.
@@ -151,32 +149,19 @@ impl<'a> LocalExecutor<'a> {
151149 futures : impl IntoIterator < Item = F > ,
152150 handles : & mut impl Extend < Task < F :: Output > > ,
153151 ) {
154- let mut active = Some ( self . state ( ) . active ( ) ) ;
152+ let mut active = self . state ( ) . active ( ) ;
155153
156154 // Convert the futures into tasks.
157- let tasks = futures. into_iter ( ) . enumerate ( ) . map ( move |( i, future) | {
158- // SAFETY: `T` and the future are `Send`.
159- let task = unsafe { self . spawn_inner ( future, active. as_mut ( ) . unwrap ( ) ) } ;
160-
161- // Yield the lock every once in a while to ease contention.
162- if i. wrapping_sub ( 1 ) % 500 == 0 {
163- drop ( active. take ( ) ) ;
164- active = Some ( self . state ( ) . active ( ) ) ;
165- }
166-
167- task
155+ let tasks = futures. into_iter ( ) . map ( move |future| {
156+ self . spawn_inner ( future, & mut active)
168157 } ) ;
169158
170159 // Push the tasks to the user's collection.
171160 handles. extend ( tasks) ;
172161 }
173162
174163 /// Spawn a future while holding the inner lock.
175- ///
176- /// # Safety
177- ///
178- /// If this is an `Executor`, `F` and `T` must be `Send`.
179- unsafe fn spawn_inner < T : ' a > (
164+ fn spawn_inner < T : ' a > (
180165 & self ,
181166 future : impl Future < Output = T > + ' a ,
182167 active : & mut Slab < Waker > ,
@@ -191,8 +176,7 @@ impl<'a> LocalExecutor<'a> {
191176 //
192177 // SAFETY:
193178 //
194- // If `future` is not `Send`, this must be a `LocalExecutor` as per this
195- // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
179+ // `future` may not `Send`. Since `LocalExecutor` is `!Sync`,
196180 // `try_tick`, `tick` and `run` can only be called from the origin
197181 // thread of the `LocalExecutor`. Similarly, `spawn` can only be called
198182 // from the origin thread, ensuring that `future` and the executor share
@@ -206,12 +190,17 @@ impl<'a> LocalExecutor<'a> {
206190 // the `Executor` is drained of all of its runnables. This ensures that
207191 // runnables are dropped and this precondition is satisfied.
208192 //
209- // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
210- // Therefore we do not need to worry about what is done with the
211- // `Waker`.
212- let ( runnable, task) = Builder :: new ( )
193+ // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not
194+ // `Send`, the `Waker` is guaranteed// to only be used on the same thread
195+ // it was spawned on.
196+ //
197+ // `self.schedule()` is `'static`, and thus will outlive all borrowed
198+ // variables in the future.
199+ let ( runnable, task) = unsafe {
200+ Builder :: new ( )
213201 . propagate_panic ( true )
214- . spawn_unchecked ( |( ) | future, self . schedule ( ) ) ;
202+ . spawn_unchecked ( |( ) | future, self . schedule ( ) )
203+ } ;
215204 entry. insert ( runnable. waker ( ) ) ;
216205
217206 runnable. schedule ( ) ;
@@ -314,19 +303,19 @@ impl<'a> LocalExecutor<'a> {
314303 /// Returns a reference to the inner state.
315304 #[ inline]
316305 fn state ( & self ) -> & State {
317- // SAFETY: So long as an Executor lives, it's state pointer will always be valid
306+ // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
318307 // when accessed through state_ptr.
319308 unsafe { & * self . state_ptr ( ) }
320309 }
321310
322311 // Clones the inner state Arc
323312 #[ inline]
324313 fn state_as_rc ( & self ) -> Rc < State > {
325- // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
314+ // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be a valid
326315 // Arc when accessed through state_ptr.
327- let arc = unsafe { Rc :: from_raw ( self . state_ptr ( ) ) } ;
328- let clone = arc . clone ( ) ;
329- std:: mem:: forget ( arc ) ;
316+ let rc = unsafe { Rc :: from_raw ( self . state_ptr ( ) ) } ;
317+ let clone = rc . clone ( ) ;
318+ std:: mem:: forget ( rc ) ;
330319 clone
331320 }
332321}
@@ -367,7 +356,7 @@ pub(crate) struct State {
367356 sleepers : RefCell < Sleepers > ,
368357
369358 /// Currently active tasks.
370- pub ( crate ) active : RefCell < Slab < Waker > > ,
359+ active : RefCell < Slab < Waker > > ,
371360}
372361
373362impl State {
@@ -385,15 +374,14 @@ impl State {
385374 }
386375
387376 /// Returns a reference to currently active tasks.
388- fn active ( & self ) -> RefMut < ' _ , Slab < Waker > > {
377+ pub ( crate ) fn active ( & self ) -> RefMut < ' _ , Slab < Waker > > {
389378 self . active . borrow_mut ( )
390379 }
391380
392381 /// Notifies a sleeping ticker.
393382 #[ inline]
394- fn notify ( & self ) {
395- let waker = self . sleepers . borrow_mut ( ) . notify ( ) ;
396- if let Some ( w) = waker {
383+ pub ( crate ) fn notify ( & self ) {
384+ if let Some ( w) = self . sleepers . borrow_mut ( ) . notify ( ) {
397385 w. wake ( ) ;
398386 }
399387 }
@@ -402,10 +390,6 @@ impl State {
402390 match self . queue . borrow_mut ( ) . pop_back ( ) {
403391 None => false ,
404392 Some ( runnable) => {
405- // Notify another ticker now to pick up where this ticker left off, just in case
406- // running the task takes a long time.
407- self . notify ( ) ;
408-
409393 // Run the task.
410394 runnable. run ( ) ;
411395 true
@@ -414,20 +398,16 @@ impl State {
414398 }
415399
416400 pub ( crate ) async fn tick ( & self ) {
417- let runnable = Ticker :: new ( self ) . runnable ( ) . await ;
418- runnable. run ( ) ;
401+ Ticker :: new ( self ) . runnable ( ) . await . run ( ) ;
419402 }
420403
421404 pub async fn run < T > ( & self , future : impl Future < Output = T > ) -> T {
422- let mut runner = Runner :: new ( self ) ;
405+ let mut ticker = Ticker :: new ( self ) ;
423406
424407 // A future that runs tasks forever.
425408 let run_forever = async {
426409 loop {
427- for _ in 0 ..200 {
428- let runnable = runner. runnable ( ) . await ;
429- runnable. run ( ) ;
430- }
410+ ticker. runnable ( ) . await . run ( ) ;
431411 future:: yield_now ( ) . await ;
432412 }
433413 } ;
@@ -437,75 +417,6 @@ impl State {
437417 }
438418}
439419
440- /// A list of sleeping tickers.
441- struct Sleepers {
442- /// Number of sleeping tickers (both notified and unnotified).
443- count : usize ,
444-
445- /// IDs and wakers of sleeping unnotified tickers.
446- ///
447- /// A sleeping ticker is notified when its waker is missing from this list.
448- wakers : Vec < ( usize , Waker ) > ,
449-
450- /// Reclaimed IDs.
451- free_ids : Vec < usize > ,
452- }
453-
454- impl Sleepers {
455- /// Inserts a new sleeping ticker.
456- fn insert ( & mut self , waker : & Waker ) -> usize {
457- let id = match self . free_ids . pop ( ) {
458- Some ( id) => id,
459- None => self . count + 1 ,
460- } ;
461- self . count += 1 ;
462- self . wakers . push ( ( id, waker. clone ( ) ) ) ;
463- id
464- }
465-
466- /// Re-inserts a sleeping ticker's waker if it was notified.
467- ///
468- /// Returns `true` if the ticker was notified.
469- fn update ( & mut self , id : usize , waker : & Waker ) -> bool {
470- for item in & mut self . wakers {
471- if item. 0 == id {
472- item. 1 . clone_from ( waker) ;
473- return false ;
474- }
475- }
476-
477- self . wakers . push ( ( id, waker. clone ( ) ) ) ;
478- true
479- }
480-
481- /// Removes a previously inserted sleeping ticker.
482- ///
483- /// Returns `true` if the ticker was notified.
484- fn remove ( & mut self , id : usize ) -> bool {
485- self . count -= 1 ;
486- self . free_ids . push ( id) ;
487-
488- for i in ( 0 ..self . wakers . len ( ) ) . rev ( ) {
489- if self . wakers [ i] . 0 == id {
490- self . wakers . remove ( i) ;
491- return false ;
492- }
493- }
494- true
495- }
496-
497- /// Returns notification waker for a sleeping ticker.
498- ///
499- /// If a ticker was notified already or there are no tickers, `None` will be returned.
500- fn notify ( & mut self ) -> Option < Waker > {
501- if self . wakers . len ( ) == self . count {
502- self . wakers . pop ( ) . map ( |item| item. 1 )
503- } else {
504- None
505- }
506- }
507- }
508-
509420/// Runs task one by one.
510421struct Ticker < ' a > {
511422 /// The executor state.
@@ -552,23 +463,16 @@ impl Ticker<'_> {
552463 /// Moves the ticker into woken state.
553464 fn wake ( & mut self ) {
554465 if self . sleeping != 0 {
555- let mut sleepers = self . state . sleepers . borrow_mut ( ) ;
556- sleepers. remove ( self . sleeping ) ;
466+ self . state . sleepers . borrow_mut ( ) . remove ( self . sleeping ) ;
557467 }
558468 self . sleeping = 0 ;
559469 }
560470
561- /// Waits for the next runnable task to run.
562- async fn runnable ( & mut self ) -> Runnable {
563- self . runnable_with ( || self . state . queue . borrow_mut ( ) . pop_back ( ) )
564- . await
565- }
566-
567471 /// Waits for the next runnable task to run, given a function that searches for a task.
568- async fn runnable_with ( & mut self , mut search : impl FnMut ( ) -> Option < Runnable > ) -> Runnable {
472+ async fn runnable ( & mut self ) -> Runnable {
569473 future:: poll_fn ( |cx| {
570474 loop {
571- match search ( ) {
475+ match self . state . queue . borrow_mut ( ) . pop_back ( ) {
572476 None => {
573477 // Move to sleeping and unnotified state.
574478 if !self . sleep ( cx. waker ( ) ) {
@@ -580,10 +484,6 @@ impl Ticker<'_> {
580484 // Wake up.
581485 self . wake ( ) ;
582486
583- // Notify another ticker now to pick up where this ticker left off, just in
584- // case running the task takes a long time.
585- self . state . notify ( ) ;
586-
587487 return Poll :: Ready ( r) ;
588488 }
589489 }
@@ -609,48 +509,6 @@ impl Drop for Ticker<'_> {
609509 }
610510}
611511
612- /// A worker in a work-stealing executor.
613- ///
614- /// This is just a ticker that also has an associated local queue for improved cache locality.
615- struct Runner < ' a > {
616- /// The executor state.
617- state : & ' a State ,
618-
619- /// Inner ticker.
620- ticker : Ticker < ' a > ,
621-
622- /// Bumped every time a runnable task is found.
623- ticks : usize ,
624- }
625-
626- impl Runner < ' _ > {
627- /// Creates a runner and registers it in the executor state.
628- fn new ( state : & State ) -> Runner < ' _ > {
629- let runner = Runner {
630- state,
631- ticker : Ticker :: new ( state) ,
632- ticks : 0 ,
633- } ;
634- runner
635- }
636-
637- /// Waits for the next runnable task to run.
638- async fn runnable ( & mut self ) -> Runnable {
639- let runnable = self
640- . ticker
641- . runnable_with ( || {
642- // Try popping from the queue.
643- self . state . queue . borrow_mut ( ) . pop_back ( )
644- } )
645- . await ;
646-
647- // Bump the tick counter.
648- self . ticks = self . ticks . wrapping_add ( 1 ) ;
649-
650- runnable
651- }
652- }
653-
654512/// Debug implementation for `Executor` and `LocalExecutor`.
655513fn debug_executor (
656514 executor : & LocalExecutor < ' _ > ,
0 commit comments