@@ -100,21 +100,50 @@ var Aff = function () {
100100 }
101101 }
102102
103+ var schedule = function ( ) {
104+ var limit = 1024 ;
105+ var size = 0 ;
106+ var ix = 0 ;
107+ var queue = new Array ( limit ) ;
108+ var draining = false ;
109+
110+ return function ( cb ) {
111+ var i , thunk ;
112+ if ( size === limit ) {
113+ throw new Error ( "[Aff] Scheduler full" ) ;
114+ }
115+ queue [ ( ix + size ) % limit ] = cb ;
116+ size ++ ;
117+
118+ if ( ! draining ) {
119+ draining = true ;
120+ while ( size ) {
121+ size -- ;
122+ thunk = queue [ ix ] ;
123+ queue [ ix ] = void 0 ;
124+ ix = ( ix + 1 ) % limit ;
125+ thunk ( ) ;
126+ }
127+ draining = false ;
128+ }
129+ } ;
130+ } ( ) ;
131+
103132 // Fiber state machine
104- var BLOCKED = 0 ; // No effect is running .
105- var PENDING = 1 ; // An async effect is running .
106- var RETURN = 2 ; // The current stack has returned .
107- var CONTINUE = 3 ; // Run the next effect.
108- var BINDSTEP = 4 ; // Apply the next bind .
133+ var SUSPENDED = 0 ; // Suspended, pending a join .
134+ var CONTINUE = 1 ; // Interpret the next instruction .
135+ var BINDSTEP = 2 ; // Apply the next bind .
136+ var PENDING = 3 ; // An async effect is running .
137+ var RETURN = 4 ; // The current stack has returned .
109138 var KILLFORKS = 5 ; // Killing supervised forks.
110139 var COMPLETED = 6 ; // The entire fiber has completed.
111140
112- function runFiber ( util , suspended , aff , completeCb ) {
141+ function runFiber ( util , initStatus , aff , completeCb ) {
113142 // Monotonically increasing tick, increased on each asynchronous turn.
114143 var runTick = 0 ;
115144
116145 // The current branch of the state machine.
117- var status = CONTINUE ;
146+ var status = initStatus ;
118147
119148 // The current point of interest for the state machine branch.
120149 var step = aff ; // Successful step
@@ -146,10 +175,10 @@ var Aff = function () {
146175 // Temporary bindings for the various branches.
147176 var tmp , result , attempt , canceler ;
148177
149- function launchChildFiber ( fid , suspended , child ) {
178+ function launchChildFiber ( fid , childStatus , child ) {
150179 forkCount ++ ;
151180 var blocked = true ;
152- var fiber = runFiber ( util , suspended , child , function ( ) {
181+ var fiber = runFiber ( util , childStatus , child , function ( ) {
153182 forkCount -- ;
154183 if ( blocked ) {
155184 blocked = false ;
@@ -178,7 +207,7 @@ var Aff = function () {
178207 forks = { } ;
179208 forkCount = 0 ;
180209 for ( var i = 0 , len = killId ; i < len ; i ++ ) {
181- kills [ i ] = runFiber ( util , false , kills [ i ] , function ( ) {
210+ kills [ i ] = runFiber ( util , CONTINUE , kills [ i ] , function ( ) {
182211 delete kills [ i ] ;
183212 killId -- ;
184213 if ( killId === 0 ) {
@@ -190,7 +219,7 @@ var Aff = function () {
190219 return new Aff ( SYNC , function ( ) {
191220 for ( var k in kills ) {
192221 if ( kills . hasOwnProperty ( k ) ) {
193- runFiber ( util , false , kills [ k ] . kill ( error ) , function ( ) { } ) ;
222+ runFiber ( util , CONTINUE , kills [ k ] . kill ( error ) , function ( ) { } ) ;
194223 }
195224 }
196225 } ) ;
@@ -252,7 +281,6 @@ var Aff = function () {
252281 break ;
253282
254283 case SYNC :
255- status = BLOCKED ;
256284 result = runSync ( util . left , util . right , step . _1 ) ;
257285 if ( util . isLeft ( result ) ) {
258286 status = RETURN ;
@@ -267,41 +295,30 @@ var Aff = function () {
267295 break ;
268296
269297 case ASYNC :
270- status = BLOCKED ;
271- canceler = runAsync ( util . left , step . _1 , function ( result ) {
298+ status = PENDING ;
299+ step = runAsync ( util . left , step . _1 , function ( result ) {
272300 return function ( ) {
273301 if ( runTick !== localRunTick ) {
274302 return ;
275- }
276- tmp = status ;
277- if ( util . isLeft ( result ) ) {
278- status = RETURN ;
279- fail = result ;
280- } else if ( bhead === null ) {
281- status = RETURN ;
282- step = result ;
283303 } else {
284- status = BINDSTEP ;
285- step = util . fromRight ( result ) ;
286- }
287- // We only need to invoke `run` if the subsequent block has
288- // switch the status to PENDING. Otherwise the callback was
289- // resolved synchronously, and the current loop can continue
290- // normally.
291- if ( tmp === PENDING ) {
292- run ( ++ runTick ) ;
293- } else {
294- localRunTick = ++ runTick ;
304+ runTick ++ ;
295305 }
306+ schedule ( function ( ) {
307+ if ( util . isLeft ( result ) ) {
308+ status = RETURN ;
309+ fail = result ;
310+ } else if ( bhead === null ) {
311+ status = RETURN ;
312+ step = result ;
313+ } else {
314+ status = BINDSTEP ;
315+ step = util . fromRight ( result ) ;
316+ }
317+ run ( runTick ) ;
318+ } ) ;
296319 } ;
297320 } ) ;
298- // If the callback was resolved synchronously, the status will have
299- // switched to CONTINUE, and we should not move on to PENDING.
300- if ( status === BLOCKED ) {
301- status = PENDING ;
302- step = canceler ;
303- }
304- break ;
321+ return ;
305322
306323 // Enqueue the current stack of binds and continue
307324 case CATCH :
@@ -375,7 +392,7 @@ var Aff = function () {
375392 }
376393 break ;
377394
378- // If we have a bracket, we should enqueue the finalizer branch ,
395+ // If we have a bracket, we should enqueue the handlers ,
379396 // and continue with the success branch only if the fiber has
380397 // not been interrupted. If the bracket acquisition failed, we
381398 // should not run either.
@@ -393,6 +410,8 @@ var Aff = function () {
393410 }
394411 break ;
395412
413+ // Enqueue the appropriate handler. We increase the bracket count
414+ // because it should be cancelled.
396415 case BRACKETED :
397416 bracket ++ ;
398417 attempts = new Aff ( CONS , new Aff ( FINALIZED , step ) , attempts . _2 ) ;
@@ -447,14 +466,16 @@ var Aff = function () {
447466 if ( util . isLeft ( step ) && ! joins ) {
448467 setTimeout ( function ( ) {
449468 // Guard on joins because a completely synchronous fiber can
450- // still have an observer.
469+ // still have an observer which was added after-the-fact .
451470 if ( ! joins ) {
452471 throw util . fromLeft ( step ) ;
453472 }
454473 } , 0 ) ;
455474 }
456475 return ;
457- case BLOCKED : return ;
476+ case SUSPENDED :
477+ status = CONTINUE ;
478+ break ;
458479 case PENDING : return ;
459480 }
460481 }
@@ -479,12 +500,11 @@ var Aff = function () {
479500 var killCb = function ( ) {
480501 return cb ( util . right ( void 0 ) ) ;
481502 } ;
482- if ( suspended ) {
483- suspended = false ;
503+ switch ( status ) {
504+ case SUSPENDED :
484505 status = COMPLETED ;
485506 interrupt = util . left ( error ) ;
486- }
487- switch ( status ) {
507+ /* fallthrough */
488508 case COMPLETED :
489509 canceler = nonCanceler ;
490510 killCb ( ) ( ) ;
@@ -524,20 +544,25 @@ var Aff = function () {
524544
525545 var join = new Aff ( ASYNC , function ( cb ) {
526546 return function ( ) {
527- if ( suspended ) {
528- suspended = false ;
547+ var canceler ;
548+ switch ( status ) {
549+ case SUSPENDED :
550+ canceler = addJoinCallback ( cb ) ;
529551 run ( runTick ) ;
530- }
531- if ( status === COMPLETED ) {
532- joins = true ;
552+ break ;
553+ case COMPLETED :
554+ canceler = nonCanceler ;
555+ joins = true ;
533556 cb ( step ) ( ) ;
534- return nonCanceler ;
557+ break ;
558+ default :
559+ canceler = addJoinCallback ( cb ) ;
535560 }
536- return addJoinCallback ( cb ) ;
561+ return canceler ;
537562 } ;
538563 } ) ;
539564
540- if ( suspended === false ) {
565+ if ( status === CONTINUE ) {
541566 run ( runTick ) ;
542567 }
543568
@@ -592,7 +617,7 @@ var Aff = function () {
592617 // collect all the fibers first.
593618 kills [ count ++ ] = function ( aff ) {
594619 return function ( ) {
595- return runFiber ( util , false , aff , function ( result ) {
620+ return runFiber ( util , CONTINUE , aff , function ( result ) {
596621 count -- ;
597622 if ( fail === null && util . isLeft ( result ) ) {
598623 fail = result ;
@@ -817,7 +842,7 @@ var Aff = function () {
817842 // tree.
818843 fibers [ fid ] = function ( aff , completeCb ) {
819844 return new Aff ( THUNK , function ( ) {
820- return runFiber ( util , false , aff , completeCb ) ;
845+ return runFiber ( util , CONTINUE , aff , completeCb ) ;
821846 } ) ;
822847 } ( tmp , resolve ( step ) ) ;
823848 }
@@ -869,7 +894,7 @@ var Aff = function () {
869894 // We can drop the fibers here because we are only canceling join
870895 // attempts, which are synchronous anyway.
871896 for ( var kid = 0 , n = killId ; kid < n ; kid ++ ) {
872- runFiber ( util , false , kills [ kid ] . kill ( error ) , function ( ) { } ) ;
897+ runFiber ( util , CONTINUE , kills [ kid ] . kill ( error ) , function ( ) { } ) ;
873898 }
874899
875900 var newKills = kill ( error , root , cb ) ;
@@ -879,7 +904,7 @@ var Aff = function () {
879904 return function ( ) {
880905 for ( var kid in newKills ) {
881906 if ( newKills . hasOwnProperty ( kid ) ) {
882- runFiber ( util , false , newKills [ kid ] . kill ( killError ) , function ( ) { } ) ;
907+ runFiber ( util , CONTINUE , newKills [ kid ] . kill ( killError ) , function ( ) { } ) ;
883908 }
884909 }
885910 return nonCanceler ;
@@ -945,9 +970,9 @@ exports._bind = function (aff) {
945970 } ;
946971} ;
947972
948- exports . _fork = function ( suspended ) {
973+ exports . _fork = function ( status ) {
949974 return function ( aff ) {
950- return Aff . Fork ( suspended , aff ) ;
975+ return Aff . Fork ( status , aff ) ;
951976 } ;
952977} ;
953978
@@ -1026,9 +1051,9 @@ exports._delay = function () {
10261051 } ;
10271052} ( ) ;
10281053
1029- exports . _launchAff = function ( util , suspended , aff ) {
1054+ exports . _launchAff = function ( util , status , aff ) {
10301055 return function ( ) {
1031- return Aff . runFiber ( util , suspended , aff , function ( ) { } ) ;
1056+ return Aff . runFiber ( util , status , aff , function ( ) { } ) ;
10321057 } ;
10331058} ;
10341059
0 commit comments