Skip to content

Commit 136d384

Browse files
author
Frank Reno
committed
fix kinesis, add enhancements that Steve created for the CWL logs as well to make the function more robust
1 parent 89297c5 commit 136d384

File tree

1 file changed

+212
-27
lines changed

1 file changed

+212
-27
lines changed

kinesis/node.js/k2sl_lambda.js

Lines changed: 212 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,219 @@
1+
//////////////////////////////////////////////////////////////////////////////////
2+
// CloudWatch Logs to SumoLogic //
3+
// https://github.com/SumoLogic/sumologic-aws-lambda/tree/master/cloudwatchlogs //
4+
//////////////////////////////////////////////////////////////////////////////////
5+
6+
// SumoLogic Endpoint to post logs
7+
var SumoURL = process.env.SUMO_ENDPOINT;
8+
9+
// The following parameters override the sourceCategoryOverride, sourceHostOverride and sourceNameOverride metadata fields within SumoLogic.
10+
// Not these can also be overridden via json within the message payload. See the README for more information.
11+
var sourceCategoryOverride = process.env.SOURCE_CATEGORY_OVERRIDE || 'none'; // If none sourceCategoryOverride will not be overridden
12+
var sourceHostOverride = process.env.SOURCE_HOST_OVERRIDE || 'none'; // If none sourceHostOverride will not be set to the name of the logGroup
13+
var sourceNameOverride = process.env.SOURCE_NAME_OVERRIDE || 'none'; // If none sourceNameOverride will not be set to the name of the logStream
14+
15+
// CloudWatch logs encoding
16+
var encoding = process.env.ENCODING || 'utf-8'; // default is utf-8
17+
18+
// Include logStream and logGroup as json fields within the message. Required for SumoLogic AWS Lambda App
19+
var includeLogInfo = true; // default is true
20+
21+
// Regex used to detect logs coming from lambda functions.
22+
// The regex will parse out the requestID and strip the timestamp
23+
// Example: 2016-11-10T23:11:54.523Z 108af3bb-a79b-11e6-8bd7-91c363cc05d9 some message
24+
var consoleFormatRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z\t(\w+?-\w+?-\w+?-\w+?-\w+)\t/;
25+
26+
// Used to extract RequestID
27+
var requestIdRegex = /(?:RequestId:|Z)\s+([\w\d\-]+)/;
28+
129
var https = require('https');
30+
var zlib = require('zlib');
31+
var url = require('url');
232

3-
///////////////////////////////////////////////////////////////////////////////////////////////////////////
4-
// Remember to change the hostname and path to match your collection API and specific HTTP-source endpoint
5-
// See more at: https://service.sumologic.com/help/Default.htm#Collector_Management_API.htm
6-
///////////////////////////////////////////////////////////////////////////////////////////////////////////
7-
var options = { 'hostname': 'endpoint1.collection.us2.sumologic.com',
8-
'path': 'https://endpoint1.collection.us2.sumologic.com/receiver/v1/http/XXXXX',
9-
'method': 'POST'
10-
};
11-
12-
exports.handler = function(event, context) {
13-
14-
var req = https.request(options, function(res) {
15-
var body = '';
16-
console.log('Status:', res.statusCode);
17-
res.setEncoding('utf8');
18-
res.on('data', function(chunk) { body += chunk; });
19-
res.on('end', function() {
20-
console.log('Successfully processed HTTPS response');
21-
context.succeed(); });
22-
});
2333

24-
options.agent = new https.Agent(options);
34+
function sumoMetaKey(awslogsData, message) {
35+
var sourceCategory = '';
36+
var sourceName = '';
37+
var sourceHost = '';
38+
39+
if (sourceCategoryOverride !== null && sourceCategoryOverride !== '' && sourceCategoryOverride != 'none') {
40+
sourceCategory = sourceCategoryOverride;
41+
}
2542

26-
req.on('error', context.fail);
43+
if (sourceHostOverride !== null && sourceHostOverride !== '' && sourceHostOverride != 'none') {
44+
sourceHost = sourceHostOverride;
45+
} else {
46+
sourceHost = awslogsData.logGroup;
47+
}
2748

28-
event.Records.forEach(function(record) {
29-
var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
30-
req.write(payload + '\n');
31-
})
32-
req.end();
49+
if (sourceNameOverride !== null && sourceNameOverride !== '' && sourceNameOverride != 'none') {
50+
sourceName = sourceNameOverride;
51+
} else {
52+
sourceName = awslogsData.logStream;
53+
}
54+
55+
// Ability to override metadata within the message
56+
// Useful within Lambda function console.log to dynamically set metadata fields within SumoLogic.
57+
if (message.hasOwnProperty('_sumo_metadata')) {
58+
var metadataOverride = message._sumo_metadata;
59+
if (metadataOverride.category) {
60+
sourceCategory = metadataOverride.category;
61+
}
62+
if (metadataOverride.host) {
63+
sourceHost = metadataOverride.host;
64+
}
65+
if (metadataOverride.source) {
66+
sourceName = metadataOverride.source;
67+
}
68+
delete message._sumo_metadata;
69+
}
70+
return sourceName + ':' + sourceCategory + ':' + sourceHost;
3371

3472
}
73+
74+
function postToSumo(context, messages) {
75+
var messagesTotal = Object.keys(messages).length;
76+
var messagesSent = 0;
77+
var messageErrors = [];
78+
79+
var urlObject = url.parse(SumoURL);
80+
var options = {
81+
'hostname': urlObject.hostname,
82+
'path': urlObject.pathname,
83+
'method': 'POST'
84+
};
85+
86+
var finalizeContext = function () {
87+
var total = messagesSent + messageErrors.length;
88+
if (total == messagesTotal) {
89+
console.log('messagesSent: ' + messagesSent + ' messagesErrors: ' + messageErrors.length);
90+
if (messageErrors.length > 0) {
91+
context.fail('errors: ' + messageErrors);
92+
} else {
93+
context.succeed();
94+
}
95+
}
96+
};
97+
98+
99+
Object.keys(messages).forEach(function (key, index) {
100+
var headerArray = key.split(':');
101+
102+
options.headers = {
103+
'X-Sumo-Name': headerArray[0],
104+
'X-Sumo-Category': headerArray[1],
105+
'X-Sumo-Host': headerArray[2]
106+
};
107+
108+
var req = https.request(options, function (res) {
109+
res.setEncoding('utf8');
110+
res.on('data', function (chunk) {});
111+
res.on('end', function () {
112+
if (res.statusCode == 200) {
113+
messagesSent++;
114+
} else {
115+
messageErrors.push('HTTP Return code ' + res.statusCode);
116+
}
117+
finalizeContext();
118+
});
119+
});
120+
121+
req.on('error', function (e) {
122+
messageErrors.push(e.message);
123+
finalizeContext();
124+
});
125+
126+
for (var i = 0; i < messages[key].length; i++) {
127+
req.write(JSON.stringify(messages[key][i]) + '\n');
128+
}
129+
req.end();
130+
});
131+
}
132+
133+
134+
exports.handler = function (event, context) {
135+
136+
// console.log(JSON.stringify(event));
137+
138+
// Used to hold chunks of messages to post to SumoLogic
139+
var messageList = {};
140+
141+
// Validate URL has been set
142+
var urlObject = url.parse(SumoURL);
143+
if (urlObject.protocol != 'https:' || urlObject.host === null || urlObject.path === null) {
144+
context.fail('Invalid SUMO_ENDPOINT environment variable: ' + SumoURL);
145+
}
146+
147+
event.Records.forEach(function(record) {
148+
var zippedInput = new Buffer(record.kinesis.data, 'base64');
149+
150+
zlib.gunzip(zippedInput, function (e, buffer) {
151+
if (e) {
152+
context.fail(e);
153+
}
154+
155+
var awslogsData = JSON.parse(buffer.toString(encoding));
156+
157+
if (awslogsData.messageType === 'CONTROL_MESSAGE') {
158+
console.log('Control message');
159+
context.succeed('Success');
160+
}
161+
162+
var lastRequestID = null;
163+
164+
console.log('Log events: ' + awslogsData.logEvents.length);
165+
166+
// Chunk log events before posting to SumoLogic
167+
awslogsData.logEvents.forEach(function (log, idx, arr) {
168+
169+
// Remove any trailing \n
170+
log.message = log.message.replace(/\n$/, '');
171+
172+
// Try extract requestID
173+
var requestId = requestIdRegex.exec(log.message);
174+
if (requestId !== null) {
175+
lastRequestID = requestId[1];
176+
}
177+
178+
// Attempt to detect console log and auto extract requestID and message
179+
var consoleLog = consoleFormatRegex.exec(log.message);
180+
if (consoleLog !== null) {
181+
lastRequestID = consoleLog[1];
182+
log.message = log.message.substring(consoleLog[0].length);
183+
}
184+
185+
// Auto detect if message is json
186+
try {
187+
log.message = JSON.parse(log.message);
188+
} catch (err) {
189+
// Do nothing, leave as text
190+
log.message.trim();
191+
}
192+
193+
// delete id as it's not very useful
194+
delete log.id;
195+
196+
if (includeLogInfo) {
197+
log.logStream = awslogsData.logStream;
198+
log.logGroup = awslogsData.logGroup;
199+
}
200+
201+
if (lastRequestID) {
202+
log.requestID = lastRequestID;
203+
}
204+
205+
var metadataKey = sumoMetaKey(awslogsData, log.message);
206+
207+
if (metadataKey in messageList) {
208+
messageList[metadataKey].push(log);
209+
} else {
210+
messageList[metadataKey] = [log];
211+
}
212+
});
213+
214+
// Push messages to Sumo
215+
postToSumo(context, messageList);
216+
})
217+
218+
});
219+
};

0 commit comments

Comments
 (0)