Skip to content

Commit 6a79a61

Browse files
committed
Updating nodejs code of cw logs with dlq
1 parent fc4d198 commit 6a79a61

File tree

4 files changed

+139
-121
lines changed

4 files changed

+139
-121
lines changed
Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,82 @@
1-
var AWS = require("aws-sdk");
2-
var processLogsHandler = require('./cloudwatchlogs_lambda').processLogs;
3-
var getEndpointURL = require('./cloudwatchlogs_lambda').getEndpointURL;
4-
var DLQUtils = require("./sumo-dlq-function-utils").DLQUtils;
5-
var Messages = DLQUtils.Messages;
6-
var invokeLambdas = DLQUtils.invokeLambdas;
1+
const { processLogs: processLogsHandler, getEndpointURL } = require('./cloudwatchlogs_lambda');
2+
const { DLQUtils } = require("./sumo-dlq-function-utils");
3+
4+
const { Messages, invokeLambdas } = DLQUtils;
75

86
exports.consumeMessages = async function (env, context, callback) {
9-
var sqs = new AWS.SQS({region: env.AWS_REGION});
10-
var MessagesObj = new Messages(env);
11-
env.SUMO_CLIENT_HEADER="dlq-aws-lambda";
7+
const MessagesObj = new Messages(env);
8+
env.SUMO_CLIENT_HEADER = "dlq-aws-lambda";
9+
1210
if (!env.SUMO_ENDPOINT) {
13-
let SUMO_ENDPOINT = await getEndpointURL();
14-
if (SUMO_ENDPOINT instanceof Error) {
15-
console.log("Error in getEndpointURL: ", SUMO_ENDPOINT);
16-
callback(SUMO_ENDPOINT, null);
11+
try {
12+
let SUMO_ENDPOINT = await getEndpointURL();
13+
env.SUMO_ENDPOINT = SUMO_ENDPOINT;
14+
} catch (error) {
15+
console.log("Error in getEndpointURL: ", error);
16+
callback(error, null);
1717
return;
1818
}
19-
env.SUMO_ENDPOINT = SUMO_ENDPOINT;
2019
} else {
2120
console.log("consumeMessages: Getting SUMO_ENDPOINT from env");
2221
}
23-
MessagesObj.receiveMessages(10, function (err, data) {
24-
var messages = (data)? data.Messages: null;
25-
if (err) {
26-
callback(err);
27-
} else if (messages && messages.length > 0) {
28-
var fail_cnt = 0, msgCount = 0;
22+
23+
try {
24+
const messages = await MessagesObj.receiveMessages(10);
25+
26+
27+
if (messages && messages.length > 0) {
28+
let fail_cnt = 0, msgCount = 0;
2929
console.log("Messages Received", messages.length);
30-
for (var i = 0; i < messages.length; i++) {
31-
(function(idx) {
32-
var payload = JSON.parse(messages[idx].Body);
33-
var receiptHandle = messages[idx].ReceiptHandle;
30+
31+
for (let i = 0; i < messages.length; i++) {
32+
(function (idx) {
33+
const payload = JSON.parse(messages[idx].Body);
34+
const receiptHandle = messages[idx].ReceiptHandle;
35+
3436
if (!(payload.awslogs && payload.awslogs.data)) {
3537
console.log("Message does not contain awslogs or awslogs.data attributes", payload);
36-
//deleting msg in DLQ after injesting in sumo
37-
MessagesObj.deleteMessage(receiptHandle, function (err, data) {
38-
if (err) console.log(err, err.stack);
39-
});
38+
39+
MessagesObj.deleteMessage(receiptHandle)
40+
.catch((err) => console.log(err, err.stack));
41+
4042
return;
4143
}
42-
var logdata = payload.awslogs.data;
44+
45+
const logdata = payload.awslogs.data;
46+
4347
processLogsHandler(env, logdata, function (err, msg) {
4448
msgCount++;
49+
4550
if (err) {
4651
console.log(err, msg);
4752
fail_cnt++;
4853
} else {
49-
//deleting msg in DLQ after injesting in sumo
50-
MessagesObj.deleteMessage(receiptHandle, function (err, data) {
51-
if (err) console.log(err, err.stack);
52-
});
54+
MessagesObj.deleteMessage(receiptHandle)
55+
.catch((err) => console.log(err, err.stack));
5356
}
54-
if (msgCount == messages.length) {
55-
if (fail_cnt == 0 && (parseInt(env.is_worker) === 0)) {
57+
58+
if (msgCount === messages.length) {
59+
if (fail_cnt === 0 && parseInt(env.is_worker) === 0) {
5660
invokeLambdas(env.AWS_REGION, parseInt(env.NUM_OF_WORKERS),
57-
context.functionName, '{"is_worker": "1"}', context);
61+
context.functionName, '{"is_worker": "1"}', context);
5862
}
59-
callback(null, (messages.length-fail_cnt) + ' success');
63+
64+
callback(null, `${messages.length - fail_cnt} success`);
6065
}
6166
});
6267
})(i);
6368
}
64-
6569
} else {
70+
6671
callback(null, 'success');
6772
}
68-
});
73+
} catch (error) {
74+
callback(error);
75+
}
6976
};
7077

7178
exports.handler = function (event, context, callback) {
72-
73-
var env = Object.assign({}, process.env);
74-
env['is_worker'] = event.is_worker || 0;
79+
const env = Object.assign({}, process.env);
80+
env.is_worker = event.is_worker || 0;
7581
exports.consumeMessages(env, context, callback);
76-
};
77-
82+
};

cloudwatchlogs-with-dlq/cloudwatchlogs_lambda.js

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,25 @@ var url = require('url');
1919
var vpcutils = require('./vpcutils');
2020
var SumoLogsClient = require('./sumo-dlq-function-utils').SumoLogsClient;
2121
var Utils = require('./sumo-dlq-function-utils').Utils;
22-
const AWS = require('aws-sdk');
23-
const ssm = new AWS.SSM();
22+
23+
const { SSMClient, GetParameterCommand } = require("@aws-sdk/client-ssm");
2424

2525
exports.getEndpointURL = async function() {
26-
console.log('Getting SUMO_ENDPOINT from AWS SSM Parameter Store');
27-
return new Promise((resolve, reject) => {
28-
ssm.getParameter(
29-
{
30-
Name: 'SUMO_ENDPOINT',
31-
WithDecryption: true
32-
},
33-
(err, data) => {
34-
if (err) {
35-
console.log(err, err.stack);
36-
reject(new Error('Unable to get EndpointURL from SSM: ' + err));
37-
} else {
38-
// console.log(data);
39-
resolve(data.Parameter.Value);
40-
}
41-
}
42-
);
43-
});
44-
}
26+
console.log('Getting SUMO_ENDPOINT from AWS SSM Parameter Store');
27+
const ssmClient = new SSMClient();
28+
try {
29+
const data = await ssmClient.send(
30+
new GetParameterCommand({
31+
Name: 'SUMO_ENDPOINT',
32+
WithDecryption: true
33+
})
34+
);
35+
return data.Parameter.Value;
36+
} catch (error) {
37+
console.error('Unable to get EndpointURL from SSM:', error);
38+
throw new Error('Unable to get EndpointURL from SSM: ' + error);
39+
}
40+
}
4541

4642
function createRecords(config, events, awslogsData) {
4743
var records = [];
@@ -210,4 +206,4 @@ exports.handler = function (event, context, callback) {
210206

211207
exports.processLogs(process.env, event.awslogs.data, callback);
212208

213-
};
209+
};
Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,54 @@
1-
var AWS = require("aws-sdk");
1+
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
2+
const { LambdaClient, InvokeCommand } = require("@aws-sdk/client-lambda");
23

3-
function Messages(env) {
4-
this.sqs = new AWS.SQS({region: env.AWS_REGION});
4+
class Messages {
5+
constructor(env) {
6+
this.sqs = new SQSClient({ region: env.AWS_REGION });
57
this.env = env;
6-
}
8+
}
79

8-
Messages.prototype.receiveMessages = function (messageCount, callback) {
9-
var params = {
10-
QueueUrl: this.env.TASK_QUEUE_URL,
11-
MaxNumberOfMessages: messageCount
10+
async receiveMessages(messageCount) {
11+
const params = {
12+
QueueUrl: this.env.TASK_QUEUE_URL,
13+
MaxNumberOfMessages: messageCount,
1214
};
13-
this.sqs.receiveMessage(params, callback);
14-
};
15-
16-
Messages.prototype.deleteMessage = function (receiptHandle, callback) {
17-
this.sqs.deleteMessage({
18-
ReceiptHandle: receiptHandle,
19-
QueueUrl: this.env.TASK_QUEUE_URL
20-
}, callback);
21-
};
22-
23-
function invokeLambdas(awsRegion, numOfWorkers, functionName, payload, context) {
24-
25-
for (var i = 0; i < numOfWorkers; i++) {
26-
var lambda = new AWS.Lambda({
27-
region: awsRegion
28-
});
29-
lambda.invoke({
30-
InvocationType: 'Event',
31-
FunctionName: functionName,
32-
Payload: payload
33-
}, function(err, data) {
34-
if (err) {
35-
context.fail(err);
36-
} else {
37-
context.succeed('success');
38-
}
39-
});
15+
16+
const command = new ReceiveMessageCommand(params);
17+
const response = await this.sqs.send(command);
18+
return response.Messages || [];
19+
}
20+
21+
async deleteMessage(receiptHandle) {
22+
const params = {
23+
ReceiptHandle: receiptHandle,
24+
QueueUrl: this.env.TASK_QUEUE_URL,
25+
};
26+
27+
const command = new DeleteMessageCommand(params);
28+
await this.sqs.send(command);
29+
}
30+
}
31+
32+
async function invokeLambdas(awsRegion, numOfWorkers, functionName, payload, context) {
33+
const lambda = new LambdaClient({ region: awsRegion });
34+
35+
for (let i = 0; i < numOfWorkers; i++) {
36+
const command = new InvokeCommand({
37+
InvocationType: 'Event',
38+
FunctionName: functionName,
39+
Payload: payload,
40+
});
41+
42+
try {
43+
await lambda.send(command);
44+
context.succeed('success');
45+
} catch (err) {
46+
context.fail(err);
4047
}
48+
}
4149
}
4250

4351
module.exports = {
44-
Messages: Messages,
45-
invokeLambdas: invokeLambdas
46-
};
52+
Messages,
53+
invokeLambdas,
54+
};

cloudwatchlogs-with-dlq/vpcutils.js

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
var find = require('lodash').find;
2-
var EC2 = require('aws-sdk/clients/ec2');
1+
// Import the required AWS SDK modules
2+
const { EC2Client, DescribeNetworkInterfacesCommand } = require('@aws-sdk/client-ec2');
33
var jmespath = require('jmespath');
4-
var ec2 = null;
4+
var find = require('lodash').find;
5+
// Create an instance of the EC2 client
6+
const ec2Client = new EC2Client({ region: process.env.AWS_REGION });
7+
58
/*
69
VPC Log Format
710
version The VPC Flow Logs version.
@@ -50,19 +53,25 @@ function discardInternalTraffic(vpcCIDRPrefix, records) {
5053
*
5154
* @return `Promise` for async processing
5255
*/
53-
function listNetworkInterfaces(allIPaddresses) {
54-
if (!ec2) {
55-
ec2 = new EC2({region: process.env.AWS_REGION});
56-
}
57-
var params = {
58-
Filters: [
59-
{
60-
Name: 'private-ip-address',
61-
Values: allIPaddresses
62-
}
63-
]
64-
}
65-
return ec2.describeNetworkInterfaces(params).promise();
56+
async function listNetworkInterfaces(allIPaddresses) {
57+
const params = {
58+
Filters: [
59+
{
60+
Name: 'private-ip-address',
61+
Values: allIPaddresses,
62+
},
63+
],
64+
};
65+
66+
const command = new DescribeNetworkInterfacesCommand(params);
67+
68+
try {
69+
const response = await ec2Client.send(command);
70+
return response;
71+
} catch (err) {
72+
console.log('Error in listNetworkInterfaces', err);
73+
throw err;
74+
}
6675
}
6776

6877
/**
@@ -145,4 +154,4 @@ function includeSecurityGroupIds(records) {
145154
module.exports = {
146155
discardInternalTraffic: discardInternalTraffic,
147156
includeSecurityGroupIds: includeSecurityGroupIds
148-
};
157+
};

0 commit comments

Comments
 (0)