@@ -16,17 +16,21 @@ class Queue {
1616 this . priorityMode = false ;
1717 this . destination = null ;
1818 }
19+
1920 static channels ( concurrency ) {
2021 return new Queue ( concurrency ) ;
2122 }
23+
2224 wait ( msec ) {
2325 this . waitTimeout = msec ;
2426 return this ;
2527 }
28+
2629 timeout ( msec ) {
2730 this . processTimeout = msec ;
2831 return this ;
2932 }
33+
3034 add ( task , priority = 0 ) {
3135 if ( ! this . paused ) {
3236 const hasChannel = this . count < this . concurrency ;
@@ -40,6 +44,7 @@ class Queue {
4044 this . waiting . sort ( ( a , b ) => b . priority - a . priority ) ;
4145 }
4246 }
47+
4348 next ( task ) {
4449 this . count ++ ;
4550 let timer = null ;
@@ -59,6 +64,7 @@ class Queue {
5964 }
6065 onProcess ( task , finish ) ;
6166 }
67+
6268 takeNext ( ) {
6369 const { waiting, waitTimeout } = this ;
6470 const { task, start } = waiting . shift ( ) ;
@@ -79,6 +85,7 @@ class Queue {
7985 if ( hasChannel ) this . next ( task ) ;
8086 return ;
8187 }
88+
8289 finish ( err , res ) {
8390 const { onFailure, onSuccess, onDone, onDrain } = this ;
8491 if ( err ) {
@@ -90,30 +97,37 @@ class Queue {
9097 if ( onDone ) onDone ( err , res ) ;
9198 if ( this . count === 0 && onDrain ) onDrain ( ) ;
9299 }
100+
93101 process ( listener ) {
94102 this . onProcess = listener ;
95103 return this ;
96104 }
105+
97106 done ( listener ) {
98107 this . onDone = listener ;
99108 return this ;
100109 }
110+
101111 success ( listener ) {
102112 this . onSuccess = listener ;
103113 return this ;
104114 }
115+
105116 failure ( listener ) {
106117 this . onFailure = listener ;
107118 return this ;
108119 }
120+
109121 drain ( listener ) {
110122 this . onDrain = listener ;
111123 return this ;
112124 }
125+
113126 pause ( ) {
114127 this . paused = true ;
115128 return this ;
116129 }
130+
117131 resume ( ) {
118132 if ( this . waiting . length > 0 ) {
119133 const channels = this . concurrency - this . count ;
@@ -124,10 +138,12 @@ class Queue {
124138 this . paused = false ;
125139 return this ;
126140 }
141+
127142 priority ( flag = true ) {
128143 this . priorityMode = flag ;
129144 return this ;
130145 }
146+
131147 pipe ( destination ) {
132148 this . destination = destination ;
133149 return this ;
0 commit comments