1+ /* globals exports, setTimeout */
2+ /* jshint -W097 */
3+
14"use strict" ;
25
36var EMPTY = { } ;
@@ -16,58 +19,109 @@ function MutableCell (queue, value) {
1619 this . prev = null ;
1720}
1821
19- function putQueue ( queue , value ) {
22+ function putLast ( queue , value ) {
2023 var cell = new MutableCell ( queue , value ) ;
21- cell . prev = queue . tail ;
22- queue . last . next = cell ;
23- queue . last = cell ;
24+ switch ( queue . size ) {
25+ case 0 :
26+ queue . head = cell ;
27+ break ;
28+ case 1 :
29+ cell . prev = queue . head ;
30+ queue . head . next = cell ;
31+ queue . last = cell ;
32+ break ;
33+ default :
34+ cell . prev = queue . last ;
35+ queue . last . next = cell ;
36+ queue . last = cell ;
37+ }
2438 queue . size ++ ;
2539 return cell ;
2640}
2741
28- function insertQueue ( queue , value ) {
29- var cell = new MutableCell ( queue , value ) ;
30- cell . next = queue . head ;
31- queue . head . prev = cell ;
32- queue . head = cell ;
33- queue . size ++ ;
34- return cell ;
42+ function takeTail ( queue ) {
43+ var cell ;
44+ switch ( queue . size ) {
45+ case 0 :
46+ return null ;
47+ case 1 :
48+ cell = queue . head ;
49+ queue . head = null ;
50+ break ;
51+ case 2 :
52+ cell = queue . last ;
53+ queue . head . next = null ;
54+ queue . last = null ;
55+ break ;
56+ default :
57+ cell = queue . last ;
58+ queue . last = cell . prev ;
59+ queue . last . next = null ;
60+ }
61+ cell . prev = null ;
62+ cell . queue = null ;
63+ queue . size -- ;
64+ return cell . value ;
3565}
3666
37- function takeQueue ( queue ) {
38- if ( queue . size === 0 ) {
67+ function takeHead ( queue ) {
68+ var cell ;
69+ switch ( queue . size ) {
70+ case 0 :
3971 return null ;
72+ case 1 :
73+ cell = queue . head ;
74+ queue . head = null ;
75+ break ;
76+ case 2 :
77+ cell = queue . head ;
78+ queue . last . prev = null ;
79+ queue . head = queue . last ;
80+ queue . last = null ;
81+ break ;
82+ default :
83+ cell = queue . head ;
84+ queue . head = cell . next ;
85+ queue . head . prev = null ;
4086 }
41- var cell = queue . head ;
42- queue . head = cell . next ;
43- queue . head . prev = null ;
44- queue . size -- ;
4587 cell . next = null ;
4688 cell . queue = null ;
89+ queue . size -- ;
4790 return cell . value ;
4891}
4992
5093function deleteCell ( cell ) {
51- if ( cell . queue ) {
52- if ( cell . prev ) {
53- cell . prev . next = cell . next ;
54- }
55- if ( cell . next ) {
56- cell . next . prev = cell . prev ;
57- }
58- cell . queue = null ;
59- cell . value = null ;
60- cell . next = null ;
61- cell . prev = null ;
94+ if ( cell . queue === null ) {
95+ return ;
6296 }
97+ if ( cell . queue . tail === cell ) {
98+ takeTail ( cell . queue ) ;
99+ return ;
100+ }
101+ if ( cell . queue . head === cell ) {
102+ takeHead ( cell . queue ) ;
103+ return ;
104+ }
105+ if ( cell . prev ) {
106+ cell . prev . next = cell . next ;
107+ }
108+ if ( cell . next ) {
109+ cell . next . prev = cell . prev ;
110+ }
111+ cell . queue . size -- ;
112+ cell . queue = null ;
113+ cell . value = null ;
114+ cell . next = null ;
115+ cell . prev = null ;
63116}
64117
65118function AVar ( ) {
66- this . draining = false ;
67- this . error = null ;
68- this . value = EMPTY ;
69- this . consumers = new MutableQueue ( ) ;
70- this . producers = new MutableQueue ( ) ;
119+ this . draining = false ;
120+ this . error = null ;
121+ this . value = EMPTY ;
122+ this . takes = new MutableQueue ( ) ;
123+ this . reads = new MutableQueue ( ) ;
124+ this . puts = new MutableQueue ( ) ;
71125}
72126
73127exports . makeEmptyVar = function ( ) {
@@ -82,22 +136,23 @@ exports.makeVar = function (value) {
82136 } ;
83137} ;
84138
85- exports . _killVar = function ( left , right , error , avar ) {
139+ exports . _killVar = function ( left , right , avar , error ) {
86140 return function ( ) {
87141 if ( avar . error === null ) {
88142 avar . error = error ;
143+ avar . value = EMPTY ;
89144 drainVar ( left , right , avar ) ;
90145 }
91146 } ;
92147} ;
93148
94- exports . _putVar = function ( left , right , value , avar , cb ) {
149+ exports . _putVar = function ( left , right , avar , value , cb ) {
95150 return function ( ) {
96151 if ( avar . error !== null ) {
97152 runEff ( cb ( left ( avar . error ) ) ) ;
98153 return NO_EFFECT ;
99154 }
100- var cell = putQueue ( avar . producers , { cb : cb , value : value } ) ;
155+ var cell = putLast ( avar . puts , { cb : cb , value : value } ) ;
101156 drainVar ( left , right , avar ) ;
102157 return function ( ) {
103158 deleteCell ( cell ) ;
@@ -111,7 +166,7 @@ exports._takeVar = function (left, right, avar, cb) {
111166 runEff ( cb ( left ( avar . error ) ) ) ;
112167 return NO_EFFECT ;
113168 }
114- var cell = putQueue ( avar . consumers , { cb : cb , peek : false , value : value } ) ;
169+ var cell = putLast ( avar . takes , { cb : cb , read : false } ) ;
115170 drainVar ( left , right , avar ) ;
116171 return function ( ) {
117172 deleteCell ( cell ) ;
@@ -125,18 +180,18 @@ exports._readVar = function (left, right, avar, cb) {
125180 runEff ( cb ( left ( avar . error ) ) ) ;
126181 return NO_EFFECT ;
127182 }
128- var cell = insertQueue ( avar . consumers , { cb : cb , peek : true , value : value } ) ;
183+ var cell = putLast ( avar . reads , { cb : cb , read : true } ) ;
129184 drainVar ( left , right , avar ) ;
130185 return function ( ) {
131186 deleteCell ( cell ) ;
132187 } ;
133- }
188+ } ;
134189} ;
135190
136- exports . _tryPutVar = function ( left , right , value , avar ) {
191+ exports . _tryPutVar = function ( left , right , avar , value ) {
137192 return function ( ) {
138- if ( avar . value === EMPTY && value . error === null ) {
139- putQueue ( avar . queue , value ) ;
193+ if ( avar . value === EMPTY && avar . error === null ) {
194+ putLast ( avar . puts , { value : value , cb : null } ) ;
140195 drainVar ( left , right , avar ) ;
141196 return true ;
142197 } else {
@@ -148,7 +203,7 @@ exports._tryPutVar = function (left, right, value, avar) {
148203exports . _tryTakeVar = function ( left , right , nothing , just , avar ) {
149204 return function ( ) {
150205 var value = avar . value ;
151- if ( value === EMPTY || value . error !== null ) {
206+ if ( value === EMPTY || avar . error !== null ) {
152207 return nothing ;
153208 } else {
154209 avar . value = EMPTY ;
@@ -159,11 +214,13 @@ exports._tryTakeVar = function (left, right, nothing, just, avar) {
159214} ;
160215
161216exports . _tryReadVar = function ( nothing , just , avar ) {
162- if ( avar . value === EMPTY || value . error !== null ) {
163- return nothing ;
164- } else {
165- return just ( avar . value ) ;
166- }
217+ return function ( ) {
218+ if ( avar . value === EMPTY ) {
219+ return nothing ;
220+ } else {
221+ return just ( avar . value ) ;
222+ }
223+ } ;
167224} ;
168225
169226exports . isEmptyVar = function ( avar ) {
@@ -177,61 +234,78 @@ function drainVar (left, right, avar) {
177234 return ;
178235 }
179236
180- var ps = avar . producers ;
181- var cs = avar . consumers ;
182- var value = avar . value ;
183- var p , c ;
237+ var ps = avar . puts ;
238+ var ts = avar . takes ;
239+ var rs = avar . reads ;
240+ var tcs = null ;
241+ var p , r , t , value , rsize ;
184242
185243 avar . draining = true ;
186244
187- if ( avar . error === null ) {
188- while ( 1 ) {
189- p = null ;
190- c = null ;
245+ /* jshint -W084 */
246+ while ( 1 ) {
247+ p = null ;
248+ r = null ;
249+ t = null ;
250+ value = avar . value ;
251+ rsize = rs . size ;
191252
192- if ( cs . size === 0 || ps . size === 0 ) {
193- break ;
194- }
253+ if ( avar . error !== null ) {
254+ value = left ( avar . error ) ;
255+ // Error callback ordering is somewhat undefined, but we try to at least
256+ // be somewhat fair by interleaving puts and takes.
257+ while ( 1 ) {
258+ if ( ps . size === 0 && ts . size === 0 && rs . size === 0 ) {
259+ break ;
260+ }
261+ if ( p = takeHead ( ps ) ) {
262+ runEff ( p . cb ( value ) ) ;
263+ }
195264
196- if ( value === EMPTY && ( p = takeQueue ( ps ) ) ) {
197- value = avar . value = p . value ;
198- }
265+ while ( r = takeHead ( rs ) ) {
266+ runEff ( r . cb ( value ) ) ;
267+ }
199268
200- if ( value !== EMPTY ) {
201- value = right ( value ) ;
202- while ( c = takeQueue ( cs ) ) {
203- runEff ( c . cb ( value ) ) ;
204- if ( ! c . peek ) {
205- break ;
206- }
269+ if ( t = takeHead ( ts ) ) {
270+ runEff ( t . cb ( value ) ) ;
207271 }
208- value = EMPTY ;
209272 }
273+ break ;
274+ }
210275
211- if ( p !== null ) {
212- runEff ( p . cb ( right ( void 0 ) ) ) ;
213- }
276+ // Process the next put. We do not immediately invoke the callback
277+ // because we want to preserve ordering. If there are takes/reads
278+ // we want to run those first.
279+ if ( value === EMPTY && ( p = takeHead ( ps ) ) ) {
280+ avar . value = value = p . value ;
214281 }
215- }
216282
217- if ( avar . error !== null ) {
218- value = left ( avar . error ) ;
219- while ( 1 ) {
220- if ( ps . size === 0 && cs . size === 0 ) {
221- break ;
283+ if ( value !== EMPTY ) {
284+ // We go ahead and queue up the next take for the same reasons as
285+ // above. Invoking the read callbacks can affect the mutable queue.
286+ t = takeHead ( ts ) ;
287+ // We only want to process the reads queued up before running these
288+ // callbacks so we guard on rsize.
289+ while ( rsize -- && ( r = takeHead ( rs ) ) ) {
290+ runEff ( r . cb ( right ( value ) ) ) ;
222291 }
223- if ( p = takeQueue ( ps ) ) {
224- runEff ( p . cb ( value ) ) ;
225- }
226- while ( c = takeQueue ( cs ) ) {
227- runEff ( c . cb ( value ) ) ;
228- if ( ! c . peek ) {
229- break ;
230- }
292+ if ( t !== null ) {
293+ avar . value = EMPTY ;
294+ runEff ( t . cb ( right ( value ) ) ) ;
231295 }
232296 }
233- break ;
297+
298+ if ( p !== null && p . cb !== null ) {
299+ runEff ( p . cb ( right ( void 0 ) ) ) ;
300+ }
301+
302+ // Callbacks could have queued up more items so we need to guard on the
303+ // actual mutable properties.
304+ if ( avar . value === EMPTY && ps . size === 0 || avar . value !== EMPTY && ts . size === 0 ) {
305+ break ;
306+ }
234307 }
308+ /* jshint +W084 */
235309
236310 avar . draining = false ;
237311}
0 commit comments