1818 */
1919const { PassThrough, Readable, Transform } = require ( 'stream' ) ;
2020const { TeePromise } = require ( '@heyputer/putility' ) . libs . promise ;
21- const { EWMA } = require ( './opmath' ) ;
2221
2322class StreamBuffer extends TeePromise {
2423 constructor ( ) {
@@ -47,6 +46,15 @@ const stream_to_the_void = stream => {
4746 stream . on ( 'error' , ( ) => { } ) ;
4847} ;
4948
49+ /**
50+ * This will split a stream (on the read side) into `n` streams.
51+ * The slowest reader will determine the speed the the source stream
52+ * is consumed at to avoid buffering.
53+ *
54+ * @param {* } source
55+ * @param {* } n
56+ * @returns
57+ */
5058const pausing_tee = ( source , n ) => {
5159 const { PassThrough } = require ( 'stream' ) ;
5260
@@ -59,39 +67,31 @@ const pausing_tee = (source, n) => {
5967 streams_ . push ( stream ) ;
6068 stream . on ( 'drain' , ( ) => {
6169 ready_ [ i ] = true ;
62- // console.log(source.id, 'PR :: drain from reader', i, ready_);
6370 if ( first_ ) {
6471 source . resume ( ) ;
6572 first_ = false ;
6673 }
6774 if ( ready_ . every ( v => ! ! v ) ) source . resume ( ) ;
6875 } ) ;
69- // stream.on('newListener', (event, listener) => {
70- // console.log('PR :: newListener', i, event, listener);
71- // });
7276 }
7377
7478 source . on ( 'data' , ( chunk ) => {
75- // console.log(source.id, 'PT :: data from source', chunk.length);
7679 ready_ . forEach ( ( v , i ) => {
7780 ready_ [ i ] = streams_ [ i ] . write ( chunk ) ;
7881 } ) ;
7982 if ( ! ready_ . every ( v => ! ! v ) ) {
80- // console.log('PT :: pausing source', ready_);
8183 source . pause ( ) ;
8284 return ;
8385 }
8486 } ) ;
8587
8688 source . on ( 'end' , ( ) => {
87- // console.log(source.id, 'PT :: end from source');
8889 for ( let i = 0 ; i < n ; i ++ ) {
8990 streams_ [ i ] . end ( ) ;
9091 }
9192 } ) ;
9293
9394 source . on ( 'error' , ( err ) => {
94- // console.log(source.id, 'PT :: error from source', err);
9595 for ( let i = 0 ; i < n ; i ++ ) {
9696 streams_ [ i ] . emit ( 'error' , err ) ;
9797 }
@@ -100,6 +100,9 @@ const pausing_tee = (source, n) => {
100100 return streams_ ;
101101} ;
102102
103+ /**
104+ * A debugging stream transform that logs the data it receives.
105+ */
103106class LoggingStream extends Transform {
104107 constructor ( options ) {
105108 super ( options ) ;
@@ -431,9 +434,7 @@ async function* chunk_stream(
431434 offset += amount ;
432435
433436 while ( offset >= chunk_size ) {
434- console . log ( 'start yield' ) ;
435437 yield buffer ;
436- console . log ( 'end yield' ) ;
437438
438439 buffer = Buffer . alloc ( chunk_size ) ;
439440 offset = 0 ;
@@ -449,13 +450,8 @@ async function* chunk_stream(
449450
450451 if ( chunk_time_ewma !== null ) {
451452 const chunk_time = chunk_time_ewma . get ( ) ;
452- // const sleep_time = chunk_size * chunk_time;
453453 const sleep_time = ( chunk . length / chunk_size ) * chunk_time / 2 ;
454- // const sleep_time = (amount / chunk_size) * chunk_time;
455- // const sleep_time = (amount / chunk_size) * chunk_time;
456- console . log ( `start sleep ${ amount } / ${ chunk_size } * ${ chunk_time } = ${ sleep_time } ` ) ;
457454 await new Promise ( resolve => setTimeout ( resolve , sleep_time ) ) ;
458- console . log ( 'end sleep' ) ;
459455 }
460456 }
461457
0 commit comments