11var AWS = require ( "aws-sdk" ) ;
2+ const util = require ( "util" ) ;
23var cwl = new AWS . CloudWatchLogs ( { apiVersion : '2014-03-28' } ) ;
34
4- async function createSubscriptionFilter ( lambdaLogGroupName , destinationArn , roleArn , errorHandler ) {
5+ async function createSubscriptionFilter ( lambdaLogGroupName , destinationArn , roleArn ) {
56 if ( destinationArn . startsWith ( "arn:aws:lambda" ) ) {
67 var params = {
78 destinationArn : destinationArn ,
@@ -20,7 +21,13 @@ async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, role
2021 }
2122
2223 // handle case where subscription filter exists/case where loggroup generated by target lambda
23- await cwl . putSubscriptionFilter ( params , errorHandler ) ;
24+ try {
25+ await util . promisify ( cwl . putSubscriptionFilter . bind ( cwl ) ) ( params ) ;
26+ console . log ( "Successfully subscribed logGroup: " , lambdaLogGroupName ) ;
27+ } catch ( err ) {
28+ console . log ( "Error in subscribing" , lambdaLogGroupName , err ) ;
29+ throw err ;
30+ }
2431}
2532
2633function filterLogGroups ( event , logGroupRegex ) {
@@ -46,24 +53,29 @@ function filterLogGroups(event, logGroupRegex) {
4653 return false ;
4754}
4855
49- async function subscribeExistingLogGroups ( logGroups ) {
50- var logGroupName ;
56+ async function subscribeExistingLogGroups ( logGroups , counter ) {
5157 var logGroupRegex = new RegExp ( process . env . LOG_GROUP_PATTERN , "i" ) ;
5258 var destinationArn = process . env . DESTINATION_ARN ;
5359 var roleArn = process . env . ROLE_ARN ;
54- for ( var i = logGroups . length - 1 ; i >= 0 ; i -- ) {
55- logGroupName = logGroups [ i ] . logGroupName ;
56- if ( logGroupName . match ( logGroupRegex ) ) {
57- await createSubscriptionFilter ( logGroupName , destinationArn , roleArn , ( function ( inner_logGroupName ) { return function ( err , data ) {
58- if ( err ) {
59- console . log ( "Error in subscribing" , inner_logGroupName , err ) ;
60- } else {
61- console . log ( "Successfully subscribed logGroup: " , inner_logGroupName ) ;
62- }
63- } ; } ) ( logGroupName ) ) ;
64- } else {
60+ const failedLogGroupNames = [ ] ;
61+ await logGroups . reduce ( async ( previousPromise , nextLogGroup ) => {
62+ await previousPromise ;
63+ const { logGroupName } = nextLogGroup ;
64+ if ( ! logGroupName . match ( logGroupRegex ) ) {
6565 console . log ( "Unmatched logGroup: " , logGroupName ) ;
66+ return Promise . resolve ( ) ;
67+ } else {
68+ return createSubscriptionFilter ( logGroupName , destinationArn , roleArn ) . catch ( function ( err ) {
69+ if ( err && err . code == "ThrottlingException" ) {
70+ failedLogGroupNames . push ( { logGroupName : logGroupName } ) ;
71+ }
72+ } ) ;
6673 }
74+ } , Promise . resolve ( ) ) ;
75+
76+ if ( counter < 4 && failedLogGroupNames . length > 0 ) {
77+ console . log ( "Retrying Subscription for Failed Log Groups due to throttling with counter number as " + counter ) ;
78+ await subscribeExistingLogGroups ( failedLogGroupNames , counter + 1 ) ;
6779 }
6880}
6981
@@ -83,12 +95,12 @@ function processExistingLogGroups(token, context, errorHandler) {
8395 reject ( err ) ;
8496 } else {
8597 console . log ( "fetched logGroups: " + data . logGroups . length + " nextToken: " + data . nextToken ) ;
86- subscribeExistingLogGroups ( data . logGroups ) ;
8798 resolve ( data ) ;
8899 }
89100 } ) ;
90101 } ) ;
91- var cb = function ( data ) {
102+ var cb = async function ( data ) {
103+ await subscribeExistingLogGroups ( data . logGroups , 0 ) ;
92104 if ( data . nextToken ) { // if next set of log groups exists, invoke next instance of lambda
93105 console . log ( "Log Groups remaining...Calling the lambda again with token " + data . nextToken ) ;
94106 invoke_lambda ( context , data . nextToken , errorHandler ) ;
@@ -118,7 +130,9 @@ function processEvents(env, event, errorHandler) {
118130 var logGroupName = event . detail . requestParameters . logGroupName ;
119131 if ( filterLogGroups ( event , env . LOG_GROUP_PATTERN ) ) {
120132 console . log ( "Subscribing: " , logGroupName , env . DESTINATION_ARN ) ;
121- createSubscriptionFilter ( logGroupName , env . DESTINATION_ARN , env . ROLE_ARN , errorHandler ) ;
133+ createSubscriptionFilter ( logGroupName , env . DESTINATION_ARN , env . ROLE_ARN ) . catch ( function ( err ) {
134+ errorHandler ( err , "Error in Subscribing." ) ;
135+ } ) ;
122136 } else {
123137 console . log ( "Unmatched: " , logGroupName , env . DESTINATION_ARN ) ;
124138 }
0 commit comments