@@ -5,7 +5,7 @@ const Q = require('q');
55const logger = require ( 'cf-logs' ) . Logger ( 'codefresh:containerLogger' ) ;
66const CFError = require ( 'cf-errors' ) ;
77const LoggerStrategy = require ( './enums' ) . LoggerStrategy ;
8- const { Transform, pipeline } = require ( 'stream' ) ;
8+ const { Transform } = require ( 'stream' ) ;
99
1010class ContainerLogger extends EventEmitter {
1111
@@ -99,32 +99,25 @@ class ContainerLogger extends EventEmitter {
9999 }
100100
101101 _streamTty ( stdout , stderr ) {
102+ logger . info ( `Piping stdout and stderr step streams` ) ;
102103
103- pipeline ( stdout , this . _logSizeLimitStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
104- if ( err ) {
105- logger . error ( `Stdout streams pipeline failed on: ${ err } ` ) ;
106- return ;
107- }
108- logger . info ( 'Stdout streams pipeline succeeded.' ) ;
109- } ) ;
104+ const stepLoggerWritableStream = this . stepLogger . writeStream ( ) ;
105+ stepLoggerWritableStream . on ( 'error' , err => logger . error ( `stepLoggerWritableStream: ${ err } ` ) ) ;
106+
107+ // Attention(!) all streams piped to step logger writable stream must be a new streams(!) in order to avoid message piping twice to writable stream.
108+ stdout . pipe ( this . _logSizeLimitStream ( ) , { end : false } ) . pipe ( this . stepLogger . stepNameTransformStream ( ) , { end : false } ) . pipe ( stepLoggerWritableStream , { end : false } ) ;
110109
111110 if ( ! stderr ) {
112111 return ;
113112 }
114113
115- pipeline ( stderr , this . _logSizeLimitStream ( ) , this . _errorTransformerStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
116- if ( err ) {
117- logger . error ( `Stderr streams pipeline failed on: ${ err } ` ) ;
118- return ;
119- }
120- logger . info ( 'Stderr streams pipeline succeeded.' ) ;
121- } ) ;
114+ stderr . pipe ( this . _logSizeLimitStream ( ) , { end : false } )
115+ . pipe ( this . _errorTransformerStream ( ) , { end : false } ) . pipe ( this . stepLogger . stepNameTransformStream ( ) , { end : false } ) . pipe ( stepLoggerWritableStream , { end : false } ) ;
122116
123117 stderr . once ( 'end' , ( ) => {
124118 this . stepFinished = true ;
125119 logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
126120 } ) ;
127-
128121 }
129122
130123 _registerToTtyStreams ( stdout , stderr ) {
0 commit comments