Skip to content

Commit c9ec731

Browse files
author
sourabh
committed
updating the code for existing log groups. Existing is called when user select existing as true in parameters.
1 parent 7693943 commit c9ec731

File tree

4 files changed

+156
-38
lines changed

4 files changed

+156
-38
lines changed

loggroup-lambda-connector/sam/packaged.yaml

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Description: '"Lambda Function for automatic subscription of any Sumo Logic lamb
66
'
77
Globals:
88
Function:
9-
Timeout: 300
9+
Timeout: 900
1010
MemorySize: 128
1111
Metadata:
1212
AWS::ServerlessRepo::Application:
@@ -73,6 +73,10 @@ Conditions:
7373
Fn::Equals:
7474
- Ref: DestinationArnType
7575
- Kinesis
76+
invoke_existing:
77+
Fn::Equals:
78+
- Ref: UseExistingLogs
79+
- 'true'
7680
Rules:
7781
testRoleArnWithLambda:
7882
RuleCondition:
@@ -103,17 +107,15 @@ Resources:
103107
SumoLogGroupLambdaConnector:
104108
Type: AWS::Serverless::Function
105109
Properties:
106-
CodeUri: s3://cf-templates-1qpf3unpuo1hw-us-east-1/test-log-group-lambda-connector/44a32060109e4c7efa4feed59007b04f
110+
CodeUri: s3://cf-templates-1qpf3unpuo1hw-us-east-1/test-log-group-lambda-connector/0d12ca7b2219c4fc3fe3cd29d6ba56d0
107111
Handler: loggroup-lambda-connector.handler
108-
Runtime: nodejs14.x
112+
Runtime: nodejs12.x
109113
Environment:
110114
Variables:
111115
DESTINATION_ARN:
112116
Ref: DestinationArnValue
113117
LOG_GROUP_PATTERN:
114118
Ref: LogGroupPattern
115-
USE_EXISTING_LOG_GROUPS:
116-
Ref: UseExistingLogs
117119
LOG_GROUP_TAGS:
118120
Fn::Join:
119121
- ','
@@ -130,6 +132,12 @@ Resources:
130132
- logs:PutSubscriptionFilter
131133
Resource:
132134
- Fn::Sub: arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*
135+
- Sid: InvokePolicy
136+
Effect: Allow
137+
Action:
138+
- lambda:InvokeFunction
139+
Resource:
140+
- Fn::Sub: arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*
133141
Events:
134142
LambdaTrigger:
135143
Type: CloudWatchEvent
@@ -170,6 +178,50 @@ Resources:
170178
Ref: AWS::AccountId
171179
SourceArn:
172180
Fn::Sub: arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:*
181+
SumoLogGroupExistingLambdaConnector:
182+
Type: AWS::Serverless::Function
183+
Condition: invoke_existing
184+
Properties:
185+
InlineCode: "var aws = require('aws-sdk');\nvar response = require('cfn-response');\n\
186+
\nexports.handler = function(event, context) {\n var lambda = new aws.Lambda();\n\
187+
\ var payload = {\"existingLogs\": \"true\", \"token\": \"\"};\n var\
188+
\ responseStatus = \"FAILED\";\n var responseData = {};\n lambda.invoke(\n\
189+
\ {\n InvocationType: 'Event',\n FunctionName: process.env.FUNCTION_NAME,\n\
190+
\ Payload: JSON.stringify(payload),\n }, function(err, invokeResult)\
191+
\ {\n if (err) {\n responseData = {Error: \"Invoke call\
192+
\ failed\"};\n console.log(responseData.Error + \":\\n\", err);\n\
193+
\ }\n else {\n responseStatus = \"SUCCESS\";\n\
194+
\ }\n response.send(event, context, responseStatus, responseData);\n\
195+
\ });\n};\n"
196+
Handler: index.handler
197+
Runtime: nodejs12.x
198+
Environment:
199+
Variables:
200+
FUNCTION_NAME:
201+
Ref: SumoLogGroupLambdaConnector
202+
Policies:
203+
- Statement:
204+
- Sid: InvokePolicy
205+
Effect: Allow
206+
Action:
207+
- lambda:InvokeFunction
208+
Resource:
209+
- Fn::Sub: arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${SumoLogGroupLambdaConnector}
210+
primerinvoke:
211+
Type: AWS::CloudFormation::CustomResource
212+
Version: '1.0'
213+
Condition: invoke_existing
214+
Properties:
215+
ServiceToken:
216+
Fn::GetAtt:
217+
- SumoLogGroupExistingLambdaConnector
218+
- Arn
219+
DESTINATION_ARN:
220+
Ref: DestinationArnValue
221+
LOG_GROUP_PATTERN:
222+
Ref: LogGroupPattern
223+
ROLE_ARN:
224+
Ref: RoleArn
173225
Outputs:
174226
SumoLogGroupLambdaConnector:
175227
Description: SumoLogGroupLambdaConnector Function ARN

loggroup-lambda-connector/sam/template.yaml

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Description: >
66
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
77
Globals:
88
Function:
9-
Timeout: 300
9+
Timeout: 900
1010
MemorySize: 128
1111

1212
Metadata:
@@ -69,6 +69,8 @@ Conditions:
6969
create_invoke_permission: !Equals [ !Ref DestinationArnType, 'Lambda' ]
7070
create_pass_role: !Equals [ !Ref DestinationArnType, 'Kinesis' ]
7171

72+
invoke_existing: !Equals [ !Ref UseExistingLogs, 'true' ]
73+
7274
Rules:
7375
testRoleArnWithLambda:
7476
RuleCondition: !Equals
@@ -97,12 +99,11 @@ Resources:
9799
Properties:
98100
CodeUri: ../src/
99101
Handler: "loggroup-lambda-connector.handler"
100-
Runtime: nodejs14.x
102+
Runtime: nodejs12.x
101103
Environment:
102104
Variables:
103105
DESTINATION_ARN: !Ref "DestinationArnValue"
104106
LOG_GROUP_PATTERN: !Ref "LogGroupPattern"
105-
USE_EXISTING_LOG_GROUPS: !Ref "UseExistingLogs"
106107
LOG_GROUP_TAGS: !Join [ ",", { "Ref": "LogGroupTags" } ]
107108
ROLE_ARN: !Ref "RoleArn"
108109
Policies:
@@ -115,6 +116,12 @@ Resources:
115116
- logs:PutSubscriptionFilter
116117
Resource:
117118
- !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*'
119+
- Sid: InvokePolicy
120+
Effect: Allow
121+
Action:
122+
- lambda:InvokeFunction
123+
Resource:
124+
- !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*'
118125
Events:
119126
LambdaTrigger:
120127
Type: CloudWatchEvent
@@ -153,6 +160,59 @@ Resources:
153160
SourceAccount: !Ref AWS::AccountId
154161
SourceArn: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:*'
155162

163+
SumoLogGroupExistingLambdaConnector:
164+
Type: AWS::Serverless::Function
165+
Condition: invoke_existing
166+
Properties:
167+
InlineCode: |
168+
var aws = require('aws-sdk');
169+
var response = require('cfn-response');
170+
171+
exports.handler = function(event, context) {
172+
var lambda = new aws.Lambda();
173+
var payload = {"existingLogs": "true", "token": ""};
174+
var responseStatus = "FAILED";
175+
var responseData = {};
176+
lambda.invoke(
177+
{
178+
InvocationType: 'Event',
179+
FunctionName: process.env.FUNCTION_NAME,
180+
Payload: JSON.stringify(payload),
181+
}, function(err, invokeResult) {
182+
if (err) {
183+
responseData = {Error: "Invoke call failed"};
184+
console.log(responseData.Error + ":\n", err);
185+
}
186+
else {
187+
responseStatus = "SUCCESS";
188+
}
189+
response.send(event, context, responseStatus, responseData);
190+
});
191+
};
192+
Handler: "index.handler"
193+
Runtime: nodejs12.x
194+
Environment:
195+
Variables:
196+
FUNCTION_NAME: !Ref SumoLogGroupLambdaConnector
197+
Policies:
198+
- Statement:
199+
- Sid: InvokePolicy
200+
Effect: Allow
201+
Action:
202+
- lambda:InvokeFunction
203+
Resource:
204+
- !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${SumoLogGroupLambdaConnector}'
205+
206+
primerinvoke:
207+
Type: AWS::CloudFormation::CustomResource
208+
Version: "1.0"
209+
Condition: invoke_existing
210+
Properties:
211+
ServiceToken: !GetAtt SumoLogGroupExistingLambdaConnector.Arn
212+
DESTINATION_ARN: !Ref "DestinationArnValue"
213+
LOG_GROUP_PATTERN: !Ref "LogGroupPattern"
214+
ROLE_ARN: !Ref "RoleArn"
215+
156216
Outputs:
157217

158218
SumoLogGroupLambdaConnector:

loggroup-lambda-connector/src/loggroup-lambda-connector.js

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
var AWS = require("aws-sdk");
22
var cwl = new AWS.CloudWatchLogs({apiVersion: '2014-03-28'});
3-
async function sleep(waitTimeInMs) {
4-
console.log("sleeping for " + waitTimeInMs + " ms...");
5-
return new Promise((resolve) => setTimeout(resolve, waitTimeInMs));
6-
}
73

8-
function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn, errorHandler) {
4+
async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn, errorHandler) {
95
if (destinationArn.startsWith("arn:aws:lambda")){
106
var params = {
117
destinationArn: destinationArn,
@@ -16,15 +12,15 @@ function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn, e
1612
} else {
1713
var params = {
1814
destinationArn: destinationArn,
19-
filterName: 'SumoLGKinesisFilter',
15+
filterName: 'SumoLGLBDFilter',
2016
filterPattern: '',
2117
logGroupName: lambdaLogGroupName,
2218
roleArn: roleArn
2319
};
2420
}
2521

2622
// handle case where subscription filter exists/case where loggroup generated by target lambda
27-
cwl.putSubscriptionFilter(params, errorHandler);
23+
await cwl.putSubscriptionFilter(params, errorHandler);
2824
}
2925

3026
function filterLogGroups(event, logGroupRegex) {
@@ -58,11 +54,7 @@ async function subscribeExistingLogGroups(logGroups) {
5854
for (var i = logGroups.length - 1; i >= 0; i--) {
5955
logGroupName = logGroups[i].logGroupName;
6056
if (logGroupName.match(logGroupRegex)) {
61-
62-
// sleep time between calls
63-
await sleep(1000*(process.env.SUBSCRIBE_DELAY_SECONDS || 2)); // 5 seconds
64-
65-
createSubscriptionFilter(logGroupName, destinationArn, roleArn, (function(inner_logGroupName) { return function (err, data) {
57+
await createSubscriptionFilter(logGroupName, destinationArn, roleArn, (function(inner_logGroupName) { return function (err, data) {
6658
if (err) {
6759
console.log("Error in subscribing", inner_logGroupName, err);
6860
} else {
@@ -75,29 +67,34 @@ async function subscribeExistingLogGroups(logGroups) {
7567
}
7668
}
7769

78-
function processExistingLogGroups(token, errorHandler) {
79-
80-
var params = {
81-
limit: 50,
82-
// logGroupNamePrefix: '',
83-
nextToken: token
84-
};
70+
function processExistingLogGroups(token, context, errorHandler) {
71+
var params = {limit: 50};
72+
if (token) {
73+
params = {
74+
limit: 50,
75+
// logGroupNamePrefix: '',
76+
nextToken: token
77+
};
78+
}
8579
var p = new Promise(function(resolve, reject) {
8680
cwl.describeLogGroups(params, function(err, data) {
8781
if (err) {
8882
console.log("error in fetching logGroups", err, err.stack);
8983
reject(err);
9084
} else {
9185
console.log("fetched logGroups: " + data.logGroups.length + " nextToken: " + data.nextToken);
86+
subscribeExistingLogGroups(data.logGroups);
9287
resolve(data);
9388
}
9489
});
9590
});
9691
var cb = function (data) {
97-
subscribeExistingLogGroups(data.logGroups);
98-
if (data.nextToken) {// if next set of log groups exists
99-
processExistingLogGroups(data.nextToken, errorHandler)
92+
if (data.nextToken) {// if next set of log groups exists, invoke next instance of lambda
93+
console.log("Log Groups remaining...Calling the lambda again with token " + data.nextToken);
94+
invoke_lambda(context, data.nextToken, errorHandler);
95+
console.log("Lambda invoke complete with token " + data.nextToken);
10096
} else {
97+
console.log("ALl Log Groups are subscribed to Destination Type " + process.env.DESTINATION_ARN);
10198
errorHandler(null, "Success");
10299
}
103100
};
@@ -106,6 +103,16 @@ function processExistingLogGroups(token, errorHandler) {
106103
});
107104
}
108105

106+
function invoke_lambda(context, token, errorHandler) {
107+
var lambda = new AWS.Lambda();
108+
var payload = {"existingLogs": "true", "token": token};
109+
lambda.invoke({
110+
InvocationType: 'Event',
111+
FunctionName: context.functionName,
112+
Payload: JSON.stringify(payload),
113+
}, errorHandler);
114+
}
115+
109116
function processEvents(env, event, errorHandler) {
110117

111118
var logGroupName = event.detail.requestParameters.logGroupName;
@@ -119,7 +126,7 @@ function processEvents(env, event, errorHandler) {
119126
}
120127

121128
exports.handler = function (event, context, callback) {
122-
console.log("Invoking Log Group connector function")
129+
console.log("Invoking Log Group connector function");
123130
function errorHandler(err, msg) {
124131
if (err) {
125132
console.log(err, msg);
@@ -128,10 +135,9 @@ exports.handler = function (event, context, callback) {
128135
callback(null, "Success");
129136
}
130137
}
131-
if (process.env.USE_EXISTING_LOG_GROUPS == "true") {
132-
processExistingLogGroups(null, errorHandler);
138+
if (event.existingLogs == "true") {
139+
processExistingLogGroups(event.token, context, errorHandler);
133140
} else {
134141
processEvents(process.env, event, errorHandler);
135142
}
136-
137143
};

loggroup-lambda-connector/test/test_loggroup_lambda_connector.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,23 @@ def test_2_existing_logs(self):
5454
self.create_stack(self.stack_name, self.template_data, self.create_stack_parameters("Lambda", "true"))
5555
print("Testing Stack Creation")
5656
self.assertTrue(self.stack_exists(self.stack_name))
57-
self.invoke_lambda()
57+
#self.invoke_lambda()
5858
self.assert_subscription_filter("SumoLGLBDFilter")
5959

6060
def test_3_kinesis(self):
6161
self.create_stack(self.stack_name, self.template_data, self.create_stack_parameters("Kinesis", "false"))
6262
print("Testing Stack Creation")
6363
self.assertTrue(self.stack_exists(self.stack_name))
6464
self.create_log_group()
65-
self.assert_subscription_filter("SumoLGKinesisFilter")
65+
self.assert_subscription_filter("SumoLGLBDFilter")
6666

6767
def test_4_existing_kinesis(self):
6868
self.create_log_group()
6969
self.create_stack(self.stack_name, self.template_data, self.create_stack_parameters("Kinesis", "true"))
7070
print("Testing Stack Creation")
7171
self.assertTrue(self.stack_exists(self.stack_name))
72-
self.invoke_lambda()
73-
self.assert_subscription_filter("SumoLGKinesisFilter")
72+
#self.invoke_lambda()
73+
self.assert_subscription_filter("SumoLGLBDFilter")
7474

7575
def create_stack_parameters(self, destination, existing, pattern='test'):
7676
return [

0 commit comments

Comments
 (0)