@@ -5,6 +5,7 @@ const Q = require('q');
55const logger = require ( 'cf-logs' ) . Logger ( 'codefresh:containerLogger' ) ;
66const CFError = require ( 'cf-errors' ) ;
77const LoggerStrategy = require ( './enums' ) . LoggerStrategy ;
8+ const { Transform, pipeline } = require ( 'stream' ) ;
89
910class ContainerLogger extends EventEmitter {
1011
@@ -42,27 +43,25 @@ class ContainerLogger extends EventEmitter {
4243 } )
4344 . then ( ( [ stdout , stderr ] ) => {
4445 logger . info ( `Attached stream to container: ${ this . containerId } ` ) ;
46+
4547 // Listening on the stream needs to be performed different depending if a tty is attached or not
4648 // See documentation of the docker api here: https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
4749 if ( this . tty ) {
48- this . _handleTtyStream ( stdout , false ) ;
49- if ( stderr ) {
50- this . _handleTtyStream ( stderr , true ) ;
51- }
52- } else {
53- this . _handleNonTtyStream ( stdout , false ) ;
54- }
55-
56- stdout . on ( 'end' , ( ) => {
57- this . stepFinished = true ;
58- logger . info ( `stdout end event was fired for container: ${ this . containerId } ` ) ;
59- } ) ;
6050
61- if ( stderr ) {
62- stderr . on ( 'end' , ( ) => {
51+ stdout . on ( 'end' , ( ) => {
6352 this . stepFinished = true ;
64- logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
53+ logger . info ( `stdout end event was fired for container: ${ this . containerId } ` ) ;
6554 } ) ;
55+
56+ if ( this . stepLogger . opts && this . stepLogger . opts . logsRateLimitConfig ) {
57+ logger . info ( `Found logger rate limit configuration, using streams api` ) ;
58+ this . _streamTty ( stdout , stderr ) ;
59+ return ;
60+ }
61+
62+ this . _registerToTtyStreams ( stdout , stderr ) ;
63+ } else {
64+ this . _handleNonTtyStream ( stdout , false ) ;
6665 }
6766 } , ( err ) => {
6867 return Q . reject ( new CFError ( {
@@ -99,6 +98,47 @@ class ContainerLogger extends EventEmitter {
9998 ] ) ;
10099 }
101100
101+ _streamTty ( stdout , stderr ) {
102+
103+ pipeline ( stdout , this . _logSizeLimitStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
104+ if ( err ) {
105+ logger . error ( `Stdout streams pipeline failed on: ${ err } ` ) ;
106+ return ;
107+ }
108+ logger . info ( 'Stdout streams pipeline succeeded.' ) ;
109+ } ) ;
110+
111+ if ( ! stderr ) {
112+ return ;
113+ }
114+
115+ pipeline ( stderr , this . _logSizeLimitStream ( ) , this . _errorTransformerStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
116+ if ( err ) {
117+ logger . error ( `Stderr streams pipeline failed on: ${ err } ` ) ;
118+ return ;
119+ }
120+ logger . info ( 'Stderr streams pipeline succeeded.' ) ;
121+ } ) ;
122+
123+ stderr . once ( 'end' , ( ) => {
124+ this . stepFinished = true ;
125+ logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
126+ } ) ;
127+
128+ }
129+
130+ _registerToTtyStreams ( stdout , stderr ) {
131+ this . _handleTtyStream ( stdout , false ) ;
132+
133+ if ( stderr ) {
134+ stderr . once ( 'end' , ( ) => {
135+ this . stepFinished = true ;
136+ logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
137+ } ) ;
138+ this . _handleTtyStream ( stderr , true ) ;
139+ }
140+ }
141+
102142 _handleTtyStream ( stream , isError ) {
103143 stream . on ( 'data' , ( chunk ) => {
104144 const buf = new Buffer ( chunk ) ;
@@ -150,6 +190,41 @@ class ContainerLogger extends EventEmitter {
150190 this . emit ( 'message.logged' ) ;
151191 }
152192
193+ _errorTransformerStream ( ) {
194+ return new Transform ( {
195+ transform : ( data , encoding , done ) => {
196+ const message = `\x1B[31m${ data . toString ( 'utf8' ) } \x1B[0m` ;
197+ done ( null , Buffer . from ( message ) ) ;
198+ }
199+ } ) ;
200+ }
201+
202+ _logSizeLimitStream ( ) {
203+ return new Transform ( {
204+ transform : ( data , encoding , done ) => {
205+ if ( this . logSizeLimit && ( this . _stepLogSizeExceeded ( ) || this . isWorkflowLogSizeExceeded ( ) ) ) {
206+ if ( ! this . logExceededLimitsNotified ) {
207+ this . logExceededLimitsNotified = true ;
208+ const message = `\x1B[01;93mLog size exceeded for ${ this . _stepLogSizeExceeded ( ) ? 'this step' : 'the workflow' } .\nThe step will continue to execute until it finished but new logs will not be stored.\x1B[0m\r\n` ;
209+ done ( null , Buffer . from ( message ) ) ;
210+ return ;
211+ }
212+
213+ done ( null , Buffer . alloc ( 0 ) ) ; // discard chunk
214+ return ;
215+ }
216+
217+ if ( this . logSizeLimit ) {
218+ this . logSize += Buffer . byteLength ( data ) ;
219+ this . stepLogger . setLogSize ( this . logSize ) ;
220+ }
221+
222+ this . emit ( 'message.logged' ) ;
223+ done ( null , data ) ;
224+ }
225+ } ) ;
226+ }
227+
153228}
154229
155230module . exports = ContainerLogger ;
0 commit comments