Skip to content

Commit 079f8c4

Browse files
committed
SUMO-94622: adding retries in kinesis lambda function
1 parent 3fa5da6 commit 079f8c4

File tree

1 file changed

+59
-33
lines changed

1 file changed

+59
-33
lines changed

kinesis/node.js/k2sl_lambda.js

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ var sourceCategoryOverride = process.env.SOURCE_CATEGORY_OVERRIDE || 'none'; //
1212
var sourceHostOverride = process.env.SOURCE_HOST_OVERRIDE || 'none'; // If none sourceHostOverride will not be set to the name of the logGroup
1313
var sourceNameOverride = process.env.SOURCE_NAME_OVERRIDE || 'none'; // If none sourceNameOverride will not be set to the name of the logStream
1414

15+
var retryInterval = process.env.RETRY_INTERVAL || 5000; // the interval in millisecs between retries
16+
var numOfRetries = process.env.NUMBER_OF_RETRIES || 3; // the number of retries
17+
1518
// CloudWatch logs encoding
1619
var encoding = process.env.ENCODING || 'utf-8'; // default is utf-8
1720

@@ -20,7 +23,7 @@ var includeLogInfo = true; // default is true
2023

2124
// Regex used to detect logs coming from lambda functions.
2225
// The regex will parse out the requestID and strip the timestamp
23-
// Example: 2016-11-10T23:11:54.523Z 108af3bb-a79b-11e6-8bd7-91c363cc05d9 some message
26+
// Example: 2016-11-10T23:11:54.523Z 108af3bb-a79b-11e6-8bd7-91c363cc05d9 some message
2427
var consoleFormatRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z\t(\w+?-\w+?-\w+?-\w+?-\w+)\t/;
2528

2629
// Used to extract RequestID
@@ -31,7 +34,22 @@ var zlib = require('zlib');
3134
var url = require('url');
3235

3336

34-
function sumoMetaKey(awslogsData, message) {
37+
Promise.retryMax = function(fn,retry,interval,fnParams) {
38+
return fn.apply(this,fnParams).catch( err => {
39+
console.log("inside retry left", retry)
40+
return (retry>1? Promise.wait(interval).then(()=> Promise.retryMax(fn,retry-1,interval, fnParams)):Promise.reject(err));
41+
});
42+
}
43+
44+
Promise.wait = function(delay) {
45+
return new Promise((fulfill,reject)=> {
46+
//console.log(Date.now());
47+
setTimeout(fulfill,delay||0);
48+
});
49+
};
50+
51+
52+
function sumoMetaKey(awslogsData, message, data) {
3553
var sourceCategory = '';
3654
var sourceName = '';
3755
var sourceHost = '';
@@ -95,43 +113,51 @@ function postToSumo(context, messages) {
95113
}
96114
};
97115

98-
116+
function httpSend(options, headers, data) {
117+
return new Promise( (resolve,reject) => {
118+
var curOptions = options;
119+
curOptions.headers = headers;
120+
var req = https.request(curOptions, function (res) {
121+
var body = '';
122+
res.setEncoding('utf8');
123+
res.on('data', function (chunk) {
124+
body += chunk; // don't really do anything with body
125+
});
126+
res.on('end', function () {
127+
if (res.statusCode == 200) {
128+
resolve(body);
129+
} else {
130+
reject({'error':'HTTP Return code ' + res.statusCode,'res':res});
131+
}
132+
});
133+
});
134+
req.on('error', function (e) {
135+
reject({'error':e,'res':null});
136+
});
137+
for (var i = 0; i < data.length; i++) {
138+
req.write(JSON.stringify(data[i]) + '\n');
139+
}
140+
req.end();
141+
});
142+
}
99143
Object.keys(messages).forEach(function (key, index) {
100144
var headerArray = key.split(':');
101-
102-
options.headers = {
145+
var headers = {
103146
'X-Sumo-Name': headerArray[0],
104147
'X-Sumo-Category': headerArray[1],
105148
'X-Sumo-Host': headerArray[2],
106149
'X-Sumo-Client': 'kinesis-aws-lambda'
107150
};
108-
109-
var req = https.request(options, function (res) {
110-
res.setEncoding('utf8');
111-
res.on('data', function (chunk) {});
112-
res.on('end', function () {
113-
if (res.statusCode == 200) {
114-
messagesSent++;
115-
} else {
116-
messageErrors.push('HTTP Return code ' + res.statusCode);
117-
}
118-
finalizeContext();
119-
});
120-
});
121-
122-
req.on('error', function (e) {
123-
messageErrors.push(e.message);
151+
Promise.retryMax(httpSend, 1, 2000, [options, headers, messages[key]]).then((body)=> {
152+
messagesSent++;
153+
finalizeContext()
154+
}).catch((e) => {
155+
messageErrors.push(e.error);
124156
finalizeContext();
125157
});
126-
127-
for (var i = 0; i < messages[key].length; i++) {
128-
req.write(JSON.stringify(messages[key][i]) + '\n');
129-
}
130-
req.end();
131158
});
132159
}
133160

134-
135161
exports.handler = function (event, context) {
136162

137163
// console.log(JSON.stringify(event));
@@ -144,15 +170,14 @@ exports.handler = function (event, context) {
144170
if (urlObject.protocol != 'https:' || urlObject.host === null || urlObject.path === null) {
145171
context.fail('Invalid SUMO_ENDPOINT environment variable: ' + SumoURL);
146172
}
147-
148-
event.Records.forEach(function(record) {
173+
var numOfRecords = event.Records.length;
174+
event.Records.forEach(function(record, index) {
149175
var zippedInput = new Buffer(record.kinesis.data, 'base64');
150176

151177
zlib.gunzip(zippedInput, function (e, buffer) {
152178
if (e) {
153179
context.fail(e);
154180
}
155-
156181
var awslogsData = JSON.parse(buffer.toString(encoding));
157182

158183
if (awslogsData.messageType === 'CONTROL_MESSAGE') {
@@ -211,10 +236,11 @@ exports.handler = function (event, context) {
211236
messageList[metadataKey] = [log];
212237
}
213238
});
214-
215239
// Push messages to Sumo
216-
postToSumo(context, messageList);
240+
if (index === numOfRecords-1) {
241+
postToSumo(context, messageList);
242+
}
217243
})
218244

219245
});
220-
};
246+
};

0 commit comments

Comments
 (0)