2525// NB this does *not* include globs, please keep it that way.
2626#[ feature( macro_rules) ] ;
2727
28+ // Allow check-stage0-green for now
29+ #[ cfg( test, stage0) ] extern mod green;
30+
2831use std:: os;
2932use std:: rt:: crate_map;
30- use std:: rt:: local:: Local ;
3133use std:: rt:: rtio;
32- use std:: rt:: task:: Task ;
3334use std:: rt:: thread:: Thread ;
3435use std:: rt;
3536use std:: sync:: atomics:: { SeqCst , AtomicUint , INIT_ATOMIC_UINT } ;
3637use std:: sync:: deque;
3738use std:: task:: TaskOpts ;
3839use std:: util;
3940use std:: vec;
41+ use std:: sync:: arc:: UnsafeArc ;
4042
4143use sched:: { Shutdown , Scheduler , SchedHandle , TaskFromFriend , NewNeighbor } ;
4244use sleeper_list:: SleeperList ;
@@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int {
118120 os:: set_exit_status ( rt:: DEFAULT_ERROR_CODE ) ;
119121 }
120122
121- // Once the main task has exited and we've set our exit code, wait for all
122- // spawned sub-tasks to finish running. This is done to allow all schedulers
123- // to remain active while there are still tasks possibly running.
124- unsafe {
125- let mut task = Local :: borrow ( None :: < Task > ) ;
126- task. get ( ) . wait_for_other_tasks ( ) ;
127- }
128-
129123 // Now that we're sure all tasks are dead, shut down the pool of schedulers,
130124 // waiting for them all to return.
131125 pool. shutdown ( ) ;
@@ -164,6 +158,17 @@ pub struct SchedPool {
164158 priv deque_pool : deque:: BufferPool < ~task:: GreenTask > ,
165159 priv sleepers : SleeperList ,
166160 priv factory : fn ( ) -> ~rtio:: EventLoop ,
161+ priv task_state : TaskState ,
162+ priv tasks_done : Port < ( ) > ,
163+ }
164+
165+ /// This is an internal state shared among a pool of schedulers. This is used to
166+ /// keep track of how many tasks are currently running in the pool and then
167+ /// sending on a channel once the entire pool has been drained of all tasks.
168+ #[ deriving( Clone ) ]
169+ struct TaskState {
170+ cnt : UnsafeArc < AtomicUint > ,
171+ done : SharedChan < ( ) > ,
167172}
168173
169174impl SchedPool {
@@ -182,6 +187,7 @@ impl SchedPool {
182187 assert ! ( nscheds > 0 ) ;
183188
184189 // The pool of schedulers that will be returned from this function
190+ let ( p, state) = TaskState :: new ( ) ;
185191 let mut pool = SchedPool {
186192 threads : ~[ ] ,
187193 handles : ~[ ] ,
@@ -192,6 +198,8 @@ impl SchedPool {
192198 deque_pool : deque:: BufferPool :: new ( ) ,
193199 next_friend : 0 ,
194200 factory : factory,
201+ task_state : state,
202+ tasks_done : p,
195203 } ;
196204
197205 // Create a work queue for each scheduler, ntimes. Create an extra
@@ -210,21 +218,30 @@ impl SchedPool {
210218 ( pool. factory ) ( ) ,
211219 worker,
212220 pool. stealers . clone ( ) ,
213- pool. sleepers . clone ( ) ) ;
221+ pool. sleepers . clone ( ) ,
222+ pool. task_state . clone ( ) ) ;
214223 pool. handles . push ( sched. make_handle ( ) ) ;
215224 let sched = sched;
216- pool. threads . push ( do Thread :: start {
217- sched. bootstrap ( ) ;
218- } ) ;
225+ pool. threads . push ( do Thread :: start { sched. bootstrap ( ) ; } ) ;
219226 }
220227
221228 return pool;
222229 }
223230
231+ /// Creates a new task configured to run inside of this pool of schedulers.
232+ /// This is useful to create a task which can then be sent to a specific
233+ /// scheduler created by `spawn_sched` (and possibly pin it to that
234+ /// scheduler).
224235 pub fn task ( & mut self , opts : TaskOpts , f : proc ( ) ) -> ~GreenTask {
225236 GreenTask :: configure ( & mut self . stack_pool , opts, f)
226237 }
227238
239+ /// Spawns a new task into this pool of schedulers, using the specified
240+ /// options to configure the new task which is spawned.
241+ ///
242+ /// New tasks are spawned in a round-robin fashion to the schedulers in this
243+ /// pool, but tasks can certainly migrate among schedulers once they're in
244+ /// the pool.
228245 pub fn spawn ( & mut self , opts : TaskOpts , f : proc ( ) ) {
229246 let task = self . task ( opts, f) ;
230247
@@ -262,7 +279,8 @@ impl SchedPool {
262279 ( self . factory ) ( ) ,
263280 worker,
264281 self . stealers . clone ( ) ,
265- self . sleepers . clone ( ) ) ;
282+ self . sleepers . clone ( ) ,
283+ self . task_state . clone ( ) ) ;
266284 let ret = sched. make_handle ( ) ;
267285 self . handles . push ( sched. make_handle ( ) ) ;
268286 let sched = sched;
@@ -271,9 +289,28 @@ impl SchedPool {
271289 return ret;
272290 }
273291
292+ /// Consumes the pool of schedulers, waiting for all tasks to exit and all
293+ /// schedulers to shut down.
294+ ///
295+ /// This function is required to be called in order to drop a pool of
296+ /// schedulers, it is considered an error to drop a pool without calling
297+ /// this method.
298+ ///
299+ /// This only waits for all tasks in *this pool* of schedulers to exit, any
300+ /// native tasks or extern pools will not be waited on
274301 pub fn shutdown ( mut self ) {
275302 self . stealers = ~[ ] ;
276303
304+ // Wait for everyone to exit. We may have reached a 0-task count
305+ // multiple times in the past, meaning there could be several buffered
306+ // messages on the `tasks_done` port. We're guaranteed that after *some*
307+ // message the current task count will be 0, so we just receive in a
308+ // loop until everything is totally dead.
309+ while self . task_state . active ( ) {
310+ self . tasks_done . recv ( ) ;
311+ }
312+
313+ // Now that everyone's gone, tell everything to shut down.
277314 for mut handle in util:: replace ( & mut self . handles , ~[ ] ) . move_iter ( ) {
278315 handle. send ( Shutdown ) ;
279316 }
@@ -283,6 +320,31 @@ impl SchedPool {
283320 }
284321}
285322
323+ impl TaskState {
324+ fn new ( ) -> ( Port < ( ) > , TaskState ) {
325+ let ( p, c) = SharedChan :: new ( ) ;
326+ ( p, TaskState {
327+ cnt : UnsafeArc :: new ( AtomicUint :: new ( 0 ) ) ,
328+ done : c,
329+ } )
330+ }
331+
332+ fn increment ( & mut self ) {
333+ unsafe { ( * self . cnt . get ( ) ) . fetch_add ( 1 , SeqCst ) ; }
334+ }
335+
336+ fn active ( & self ) -> bool {
337+ unsafe { ( * self . cnt . get ( ) ) . load ( SeqCst ) != 0 }
338+ }
339+
340+ fn decrement ( & mut self ) {
341+ let prev = unsafe { ( * self . cnt . get ( ) ) . fetch_sub ( 1 , SeqCst ) } ;
342+ if prev == 1 {
343+ self . done . send ( ( ) ) ;
344+ }
345+ }
346+ }
347+
286348impl Drop for SchedPool {
287349 fn drop ( & mut self ) {
288350 if self . threads . len ( ) > 0 {
0 commit comments