File tree Expand file tree Collapse file tree 2 files changed +22
-18
lines changed Expand file tree Collapse file tree 2 files changed +22
-18
lines changed Original file line number Diff line number Diff line change 33const { Readable } = require ( 'node:stream' ) ;
44
55class QueueStream extends Readable {
6- constructor ( concurrent ) {
6+ constructor ( concurrency ) {
77 super ( { objectMode : true } ) ;
8- this . concurrent = concurrent ;
8+ this . concurrency = concurrency ;
99 this . count = 0 ;
10- this . queue = [ ] ;
10+ this . waiting = [ ] ;
1111 }
1212
13- static channels ( concurrent ) {
14- return new QueueStream ( concurrent ) ;
13+ static channels ( concurrency ) {
14+ return new QueueStream ( concurrency ) ;
1515 }
1616
1717 add ( task ) {
18- this . queue . push ( task ) ;
18+ this . waiting . push ( task ) ;
1919 }
2020
2121 _read ( ) {
22- while ( this . count < this . concurrent && this . queue . length > 0 ) {
23- const task = this . queue . shift ( ) ;
22+ const emptyChannels = this . concurrency - this . count ;
23+ let launchCount = Math . min ( emptyChannels , this . waiting . length ) ;
24+ while ( launchCount -- > 0 ) {
2425 this . count ++ ;
26+ const task = this . waiting . shift ( ) ;
2527 this . onProcess ( task , ( err , res ) => {
2628 if ( err ) this . emit ( 'error' , err ) ;
2729 this . push ( { err, res } ) ;
2830 this . count -- ;
2931 } ) ;
3032 }
31- if ( this . queue . length === 0 && this . count === 0 ) {
33+ if ( this . waiting . length === 0 && this . count === 0 ) {
3234 this . push ( null ) ;
3335 }
3436 }
Original file line number Diff line number Diff line change 33const { Readable, Writable, Transform, pipeline } = require ( 'node:stream' ) ;
44
55class QueueStream extends Readable {
6- constructor ( concurrent ) {
6+ constructor ( concurrency ) {
77 super ( { objectMode : true } ) ;
8- this . concurrent = concurrent ;
8+ this . concurrency = concurrency ;
99 this . count = 0 ;
10- this . queue = [ ] ;
10+ this . waiting = [ ] ;
1111 }
1212
13- static channels ( concurrent ) {
14- return new QueueStream ( concurrent ) ;
13+ static channels ( concurrency ) {
14+ return new QueueStream ( concurrency ) ;
1515 }
1616
1717 add ( task ) {
18- this . queue . push ( task ) ;
18+ this . waiting . push ( task ) ;
1919 }
2020
2121 _read ( ) {
22- while ( this . count < this . concurrent && this . queue . length > 0 ) {
23- const task = this . queue . shift ( ) ;
22+ const emptyChannels = this . concurrency - this . count ;
23+ let launchCount = Math . min ( emptyChannels , this . waiting . length ) ;
24+ while ( launchCount -- > 0 ) {
25+ const task = this . waiting . shift ( ) ;
2426 this . count ++ ;
2527 this . onProcess ( task , ( err , res ) => {
2628 if ( err ) this . emit ( 'error' , err ) ;
2729 this . push ( { err, res } ) ;
2830 this . count -- ;
2931 } ) ;
3032 }
31- if ( this . queue . length === 0 && this . count === 0 ) {
33+ if ( this . waiting . length === 0 && this . count === 0 ) {
3234 this . push ( null ) ;
3335 }
3436 }
You can’t perform that action at this time.
0 commit comments