@@ -4,12 +4,13 @@ const { LambdaClient, InvokeCommand } = require("@aws-sdk/client-lambda");
44const cwl = new CloudWatchLogsClient ( ) ;
55const lambda = new LambdaClient ( { apiVersion : '2015-03-31' } ) ; // Update to the appropriate Lambda API version you require
66const maxRetryCounter = 3 ;
7+ const timeoutThreshold = 12000 ;
78
8- async function createSubscriptionFilter ( lambdaLogGroupName , destinationArn , roleArn ) {
9- var params = { } ;
9+ async function createSubscriptionFilter ( lambdaLogGroupName , destinationArn , roleArn , additionalArgs ) {
10+ var params = { } ;
1011 if ( destinationArn . startsWith ( "arn:aws:lambda" ) ) {
1112 params = {
12- destinationArn : destinationArn ,
13+ destinationArn : destinationArn ,
1314 filterName : 'SumoLGLBDFilter' ,
1415 filterPattern : '' ,
1516 logGroupName : lambdaLogGroupName
@@ -28,6 +29,7 @@ async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, role
2829 try {
2930 const cmd = new PutSubscriptionFilterCommand ( params ) ;
3031 await cwl . send ( cmd ) ;
32+ additionalArgs . subscribeCount += 1
3133 console . log ( "Successfully subscribed logGroup: " , lambdaLogGroupName ) ;
3234 } catch ( err ) {
3335 console . log ( "Error in subscribing" , lambdaLogGroupName , err ) ;
@@ -58,7 +60,7 @@ function filterLogGroups(event, logGroupRegex) {
5860 return false ;
5961}
6062
61- async function subscribeExistingLogGroups ( logGroups , retryCounter ) {
63+ async function subscribeExistingLogGroups ( logGroups , retryCounter , additionalArgs ) {
6264 var logGroupRegex = new RegExp ( process . env . LOG_GROUP_PATTERN , "i" ) ;
6365 var destinationArn = process . env . DESTINATION_ARN ;
6466 var roleArn = process . env . ROLE_ARN ;
@@ -70,8 +72,8 @@ async function subscribeExistingLogGroups(logGroups, retryCounter) {
7072 console . log ( "Unmatched logGroup: " , logGroupName ) ;
7173 return Promise . resolve ( ) ;
7274 } else {
73- return createSubscriptionFilter ( logGroupName , destinationArn , roleArn ) . catch ( function ( err ) {
74- if ( err && err . code == "ThrottlingException ") {
75+ return createSubscriptionFilter ( logGroupName , destinationArn , roleArn , additionalArgs ) . catch ( function ( err ) {
76+ if ( err && err . message === "Rate exceeded ") {
7577 failedLogGroupNames . push ( { logGroupName : logGroupName } ) ;
7678 }
7779 } ) ;
@@ -80,80 +82,110 @@ async function subscribeExistingLogGroups(logGroups, retryCounter) {
8082
8183 if ( retryCounter <= maxRetryCounter && failedLogGroupNames . length > 0 ) {
8284 console . log ( "Retrying Subscription for Failed Log Groups due to throttling with counter number as " + retryCounter ) ;
83- await subscribeExistingLogGroups ( failedLogGroupNames , retryCounter + 1 ) ;
85+ await subscribeExistingLogGroups ( failedLogGroupNames , retryCounter + 1 , additionalArgs ) ;
8486 }
8587}
8688
87- async function processExistingLogGroups ( token , context , errorHandler ) {
89+ async function processExistingLogGroups ( context , token , additionalArgs , errorHandler ) {
8890 var params = { limit : 50 } ;
8991 if ( token ) {
9092 params = {
9193 limit : 50 ,
9294 nextToken : token
9395 } ;
9496 }
95-
97+
9698 try {
99+ console . log ( "Previous record count " + additionalArgs . recordCount ) ;
97100 const data = await cwl . send ( new DescribeLogGroupsCommand ( params ) ) ;
98- console . log (
99- "fetched logGroups: " + data . logGroups . length + " nextToken: " + data . nextToken
100- ) ;
101- await subscribeExistingLogGroups ( data . logGroups , 1 ) ;
102-
101+ additionalArgs . recordCount += data . logGroups . length ;
102+ console . log ( "Updated record count " + additionalArgs . recordCount ) ;
103+ await subscribeExistingLogGroups ( data . logGroups , 1 , additionalArgs ) ;
104+ console . log ( "Updated subscribeCount " + additionalArgs . subscribeCount ) ;
103105 if ( data . nextToken ) {
104- console . log (
105- "Log Groups remaining...Calling the lambda again with token " + data . nextToken
106- ) ;
107- await invoke_lambda ( context , data . nextToken , errorHandler ) ;
108- console . log ( "Lambda invoke complete with token " + data . nextToken ) ;
106+ const remainingTime = context . getRemainingTimeInMillis ( ) ; // 60000
107+ const diffTime = remainingTime - timeoutThreshold // 14552-12000=2792
108+ if ( diffTime < timeoutThreshold ) {
109+ additionalArgs . invokeCount += 1
110+ console . log ( "Lambda invoke complete with token " + data . nextToken ) ;
111+ console . log ( "InvokeCount " + additionalArgs . invokeCount ) ;
112+ await invoke_lambda ( context , data . nextToken , additionalArgs , errorHandler ) ;
113+ return
114+ }
115+ console . log ( "Remaining time " + remainingTime ) ;
116+ console . log ( "Log Groups remaining...Calling the lambda again with token " + data . nextToken ) ;
117+ await processExistingLogGroups ( context , data . nextToken , additionalArgs , errorHandler )
109118 } else {
110- console . log ( "All Log Groups are subscribed to Destination Type " + process . env . DESTINATION_ARN ) ;
119+ console . log ( "Total " + additionalArgs . subscribeCount + " out of " + additionalArgs . recordCount
120+ + " Log Groups are subscribed to Destination Type "
121+ + process . env . DESTINATION_ARN ) ;
122+ console . log ( "Last invokeCount " + additionalArgs . invokeCount ) ;
111123 errorHandler ( null , "Success" ) ;
112124 }
113125 } catch ( err ) {
114126 errorHandler ( err , "Error in fetching logGroups" ) ;
115127 }
116128 }
117-
118- async function invoke_lambda ( context , token , errorHandler ) {
119- var payload = { "existingLogs" : "true" , "token" : token } ;
120- try {
121- await lambda . send ( new InvokeCommand ( {
122- InvocationType : 'Event' ,
123- FunctionName : context . functionName ,
124- Payload : JSON . stringify ( payload )
125- } ) ) ;
126- } catch ( err ) {
129+
130+ async function invoke_lambda ( context , token , additionalArgs , errorHandler ) {
131+ var payload = { "existingLogs" : "true" , "token" : token , "additionalArgs" : additionalArgs } ;
132+ try {
133+ await lambda . send ( new InvokeCommand ( {
134+ InvocationType : 'Event' ,
135+ FunctionName : context . functionName ,
136+ Payload : JSON . stringify ( payload )
137+ } ) ) ;
138+ } catch ( err ) {
127139 errorHandler ( err , "Error invoking Lambda" ) ;
128- }
129140 }
130-
131- async function processEvents ( env , event , errorHandler ) {
132- var logGroupName = event . detail . requestParameters . logGroupName ;
133- if ( filterLogGroups ( event , env . LOG_GROUP_PATTERN ) ) {
134- console . log ( "Subscribing: " , logGroupName , env . DESTINATION_ARN ) ;
135- await createSubscriptionFilter ( logGroupName , env . DESTINATION_ARN , env . ROLE_ARN )
136- . catch ( function ( err ) {
137- errorHandler ( err , "Error in Subscribing." ) ;
138- } ) ;
139- } else {
141+ }
142+
143+ async function delay ( ms ) {
144+ return new Promise ( resolve => setTimeout ( resolve , ms ) ) ;
145+ }
146+
147+ async function processEvents ( env , event , additionalArgs , errorHandler , retryCounter = 0 ) {
148+ var logGroupName = event . detail . requestParameters . logGroupName ;
149+ if ( filterLogGroups ( event , env . LOG_GROUP_PATTERN ) ) {
150+ console . log ( "Subscribing: " , logGroupName , env . DESTINATION_ARN ) ;
151+ try {
152+ await createSubscriptionFilter ( logGroupName , env . DESTINATION_ARN , env . ROLE_ARN , additionalArgs ) ;
153+ } catch ( err ) {
154+ errorHandler ( err , "Error in Subscribing." ) ;
155+ if ( err && err . message === "Rate exceeded" && retryCounter <= maxRetryCounter ) {
156+ retryCounter += 1
157+ const delayTime = Math . pow ( 2 , retryCounter ) * 1000 ; // Exponential backoff
158+ console . log ( `ThrottlingException encountered. Retrying in ${ delayTime } ms...Attempt ${ retryCounter } /${ maxRetryCounter } ` ) ;
159+ await delay ( delayTime ) ;
160+ await processEvents ( env , event , additionalArgs , errorHandler , retryCounter ) ;
161+ }
162+ } ;
163+ } else {
140164 console . log ( "Unmatched: " , logGroupName , env . DESTINATION_ARN ) ;
141- }
142165 }
143-
144- exports . handler = async function ( event , context , callback ) {
145- console . log ( "Invoking Log Group connector function" ) ;
146- function errorHandler ( err , msg ) {
147- if ( err ) {
148- console . log ( err , msg ) ;
166+ }
167+
168+ exports . handler = async function ( event , context , callback ) {
169+ let additionalArgs = {
170+ recordCount : 0 ,
171+ subscribeCount : 0 ,
172+ invokeCount : 0
173+ } ;
174+ if ( event . additionalArgs ) {
175+ additionalArgs = event . additionalArgs
176+ }
177+ console . log ( "Invoking Log Group connector function" ) ;
178+ function errorHandler ( err , msg ) {
179+ if ( err ) {
180+ console . log ( err , msg ) ;
149181 callback ( err ) ;
150182 } else {
151183 callback ( null , "Success" ) ;
152184 }
153185 }
154186 if ( event . existingLogs == "true" ) {
155- await processExistingLogGroups ( event . token , context , errorHandler ) ;
187+ await processExistingLogGroups ( context , event . token , additionalArgs , errorHandler ) ;
156188 } else {
157- await processEvents ( process . env , event , errorHandler ) ;
189+ await processEvents ( process . env , event , additionalArgs , errorHandler ) ;
158190 }
159- } ;
191+ } ;
0 commit comments