11import { boundMethod } from 'autobind-decorator' ;
22import { EventEmitter } from 'events' ;
33import CloudWatchLogs , {
4+ DescribeLogStreamsResponse ,
5+ LogStream ,
46 InputLogEvent ,
57 PutLogEventsRequest ,
68 PutLogEventsResponse ,
79} from 'aws-sdk/clients/cloudwatchlogs' ;
810import S3 , { PutObjectRequest , PutObjectOutput } from 'aws-sdk/clients/s3' ;
11+ import promiseSequential from 'promise-sequential' ;
912
1013import { SessionProxy } from './proxy' ;
11- import { HandlerRequest , runInSequence } from './utils' ;
14+ import { delay , HandlerRequest } from './utils' ;
1215
1316type Console = globalThis . Console ;
17+ type PromiseFunction = ( ) => Promise < any > ;
1418
1519interface LogOptions {
1620 groupName : string ;
@@ -26,13 +30,14 @@ export class ProviderLogHandler {
2630 private static instance : ProviderLogHandler ;
2731 public emitter : LogEmitter ;
2832 public client : CloudWatchLogs ;
29- public sequenceToken = '' ;
33+ public sequenceToken : string = null ;
3034 public accountId : string ;
3135 public groupName : string ;
3236 public stream : string ;
3337 public logger : Console ;
3438 public clientS3 : S3 ;
35- private stack : Array < Promise < any > > = [ ] ;
39+ private stack : Array < PromiseFunction > = [ ] ;
40+ private isProcessing = false ;
3641
3742 /**
3843 * The ProviderLogHandler's constructor should always be private to prevent direct
@@ -50,28 +55,50 @@ export class ProviderLogHandler {
5055 const logger = options . logger || global . console ;
5156 this . logger = logger ;
5257 this . emitter . on ( 'log' , ( ...args : any [ ] ) => {
53- this . stack . push ( this . deliverLog ( args ) ) ;
58+ // this.logger.debug('Emitting log event...' );
5459 } ) ;
5560 // Create maps of each logger method and then alias that.
5661 Object . entries ( this . logger ) . forEach ( ( [ key , val ] ) => {
5762 if ( typeof val === 'function' ) {
5863 if ( [ 'log' , 'error' , 'warn' , 'info' ] . includes ( key ) ) {
59- this . logger [ key as 'log' | 'error' | 'warn' | 'info' ] = function (
64+ this . logger [ key as 'log' | 'error' | 'warn' | 'info' ] = (
6065 ...args : any [ ]
61- ) : void {
62- // For adding other event watchers later.
63- setImmediate ( ( ) => emitter . emit ( 'log' , ...args ) ) ;
66+ ) : void => {
67+ if ( ! this . isProcessing ) {
68+ const logLevel = key . toUpperCase ( ) ;
69+ // Add log level when not present
70+ if (
71+ args . length &&
72+ ( typeof args [ 0 ] !== 'string' ||
73+ args [ 0 ]
74+ . substring ( 0 , logLevel . length )
75+ . toUpperCase ( ) !== logLevel )
76+ ) {
77+ args . unshift ( logLevel ) ;
78+ }
79+ this . stack . push ( ( ) =>
80+ this . deliverLog ( args ) . catch ( this . logger . debug )
81+ ) ;
82+ // For adding other event watchers later.
83+ setImmediate ( ( ) => {
84+ this . emitter . emit ( 'log' , ...args ) ;
85+ } ) ;
86+ } else {
87+ this . logger . debug (
88+ 'Logs are being delivered at the moment...'
89+ ) ;
90+ }
6491
6592 // Calls the logger method.
66- val . apply ( this , args ) ;
93+ val . apply ( this . logger , args ) ;
6794 } ;
6895 }
6996 }
7097 } ) ;
7198 }
7299
73100 private async initialize ( ) : Promise < void > {
74- this . sequenceToken = '' ;
101+ this . sequenceToken = null ;
75102 this . stack = [ ] ;
76103 try {
77104 await this . deliverLogCloudWatch ( [ 'Initialize CloudWatch' ] ) ;
@@ -142,11 +169,13 @@ export class ProviderLogHandler {
142169
143170 @boundMethod
144171 public async processLogs ( ) : Promise < void > {
172+ this . isProcessing = true ;
145173 if ( this . stack . length > 0 ) {
146- this . stack . push ( this . deliverLog ( [ 'Log delivery finalized.' ] ) ) ;
174+ this . stack . push ( ( ) => this . deliverLog ( [ 'Log delivery finalized.' ] ) ) ;
147175 }
148- await runInSequence ( this . stack ) ;
176+ await promiseSequential ( this . stack ) ;
149177 this . stack = [ ] ;
178+ this . isProcessing = false ;
150179 }
151180
152181 private async createLogGroup ( ) : Promise < void > {
@@ -199,27 +228,48 @@ export class ProviderLogHandler {
199228 const response : PutLogEventsResponse = await this . client
200229 . putLogEvents ( logEventsParams )
201230 . promise ( ) ;
202- this . sequenceToken = response ?. nextSequenceToken ;
231+ this . sequenceToken = response ?. nextSequenceToken || null ;
203232 this . logger . debug ( 'Response from "putLogEvents"' , response ) ;
204233 return response ;
205234 } catch ( err ) {
206235 const errorCode = err . code || err . name ;
207- this . logger . debug ( 'Error from "deliverLogCloudWatch"' , err ) ;
208- this . logger . debug ( `Error from 'putLogEvents' ${ JSON . stringify ( err ) } ` ) ;
236+ this . logger . debug (
237+ `Error from "putLogEvents" with sequence token ${ this . sequenceToken } ` ,
238+ err
239+ ) ;
209240 if (
210241 errorCode === 'DataAlreadyAcceptedException' ||
211242 errorCode === 'InvalidSequenceTokenException'
212243 ) {
213- this . sequenceToken = ( err . message || '' ) . split ( ' ' ) . pop ( ) ;
214- this . putLogEvents ( record ) ;
244+ this . sequenceToken = null ;
245+ // Delay to avoid throttling
246+ await delay ( 1 ) ;
247+ try {
248+ const response : DescribeLogStreamsResponse = await this . client
249+ . describeLogStreams ( {
250+ logGroupName : this . groupName ,
251+ logStreamNamePrefix : this . stream ,
252+ limit : 1 ,
253+ } )
254+ . promise ( ) ;
255+ this . logger . debug ( 'Response from "describeLogStreams"' , response ) ;
256+ if ( response . logStreams && response . logStreams . length ) {
257+ const logStream = response . logStreams [ 0 ] as LogStream ;
258+ this . sequenceToken = logStream . uploadSequenceToken ;
259+ }
260+ } catch ( err ) {
261+ this . logger . debug ( 'Error from "describeLogStreams"' , err ) ;
262+ }
215263 } else {
216264 throw err ;
217265 }
218266 }
219267 }
220268
221269 @boundMethod
222- private async deliverLogCloudWatch ( messages : any [ ] ) : Promise < PutLogEventsResponse > {
270+ private async deliverLogCloudWatch (
271+ messages : any [ ]
272+ ) : Promise < PutLogEventsResponse | void > {
223273 const currentTime = new Date ( Date . now ( ) ) ;
224274 const record : InputLogEvent = {
225275 message : JSON . stringify ( { messages } ) ,
@@ -236,8 +286,20 @@ export class ProviderLogHandler {
236286 await this . createLogGroup ( ) ;
237287 }
238288 await this . createLogStream ( ) ;
239- return this . putLogEvents ( record ) ;
240- } else {
289+ } else if (
290+ errorCode !== 'DataAlreadyAcceptedException' &&
291+ errorCode !== 'InvalidSequenceTokenException'
292+ ) {
293+ throw err ;
294+ }
295+ try {
296+ const response = await this . putLogEvents ( record ) ;
297+ return response ;
298+ } catch ( err ) {
299+ // Additional retry for sequence token error
300+ if ( this . sequenceToken ) {
301+ return this . putLogEvents ( record ) ;
302+ }
241303 throw err ;
242304 }
243305 }
@@ -316,7 +378,7 @@ export class ProviderLogHandler {
316378 @boundMethod
317379 private async deliverLog (
318380 messages : any [ ]
319- ) : Promise < PutLogEventsResponse | PutObjectOutput > {
381+ ) : Promise < PutLogEventsResponse | PutObjectOutput | void > {
320382 if ( this . clientS3 ) {
321383 return this . deliverLogS3 ( messages ) ;
322384 }
0 commit comments