@@ -12,6 +12,9 @@ var sourceCategoryOverride = process.env.SOURCE_CATEGORY_OVERRIDE || 'none'; //
1212var sourceHostOverride = process . env . SOURCE_HOST_OVERRIDE || 'none' ; // If none sourceHostOverride will not be set to the name of the logGroup
1313var sourceNameOverride = process . env . SOURCE_NAME_OVERRIDE || 'none' ; // If none sourceNameOverride will not be set to the name of the logStream
1414
15+ var retryInterval = process . env . RETRY_INTERVAL || 5000 ; // the interval in millisecs between retries
16+ var numOfRetries = process . env . NUMBER_OF_RETRIES || 3 ; // the number of retries
17+
1518// CloudWatch logs encoding
1619var encoding = process . env . ENCODING || 'utf-8' ; // default is utf-8
1720
@@ -20,7 +23,7 @@ var includeLogInfo = true; // default is true
2023
2124// Regex used to detect logs coming from lambda functions.
2225// The regex will parse out the requestID and strip the timestamp
23- // Example: 2016-11-10T23:11:54.523Z 108af3bb-a79b-11e6-8bd7-91c363cc05d9 some message
26+ // Example: 2016-11-10T23:11:54.523Z 108af3bb-a79b-11e6-8bd7-91c363cc05d9 some message
2427var consoleFormatRegex = / ^ \d { 4 } - \d { 2 } - \d { 2 } T \d { 2 } : \d { 2 } : \d { 2 } .\d { 3 } Z \t ( \w + ?- \w + ?- \w + ?- \w + ?- \w + ) \t / ;
2528
2629// Used to extract RequestID
@@ -31,7 +34,30 @@ var zlib = require('zlib');
3134var url = require ( 'url' ) ;
3235
3336
34- function sumoMetaKey ( awslogsData , message ) {
37+ Promise . retryMax = function ( fn , retry , interval , fnParams ) {
38+ return fn . apply ( this , fnParams ) . catch ( err => {
39+ var waitTime = typeof interval === 'function' ? interval ( ) : interval ;
40+ console . log ( "Retries left " + ( retry - 1 ) + " delay(in ms) " + waitTime ) ;
41+ return ( retry > 1 ? Promise . wait ( waitTime ) . then ( ( ) => Promise . retryMax ( fn , retry - 1 , interval , fnParams ) ) :Promise . reject ( err ) ) ;
42+ } ) ;
43+ }
44+
45+ Promise . wait = function ( delay ) {
46+ return new Promise ( ( fulfill , reject ) => {
47+ //console.log(Date.now());
48+ setTimeout ( fulfill , delay || 0 ) ;
49+ } ) ;
50+ } ;
51+
52+ function exponentialBackoff ( seed ) {
53+ var count = 0 ;
54+ return function ( ) {
55+ count ++ ;
56+ return count * seed ;
57+ }
58+ }
59+
60+ function sumoMetaKey ( awslogsData , message , data ) {
3561 var sourceCategory = '' ;
3662 var sourceName = '' ;
3763 var sourceHost = '' ;
@@ -95,43 +121,52 @@ function postToSumo(context, messages) {
95121 }
96122 } ;
97123
98-
124+ function httpSend ( options , headers , data ) {
125+ return new Promise ( ( resolve , reject ) => {
126+ var curOptions = options ;
127+ curOptions . headers = headers ;
128+ var req = https . request ( curOptions , function ( res ) {
129+ var body = '' ;
130+ res . setEncoding ( 'utf8' ) ;
131+ res . on ( 'data' , function ( chunk ) {
132+ body += chunk ; // don't really do anything with body
133+ } ) ;
134+ res . on ( 'end' , function ( ) {
135+ if ( res . statusCode == 200 ) {
136+ resolve ( body ) ;
137+ } else {
138+ reject ( { 'error' :'HTTP Return code ' + res . statusCode , 'res' :res } ) ;
139+ }
140+ } ) ;
141+ } ) ;
142+ req . on ( 'error' , function ( e ) {
143+ reject ( { 'error' :e , 'res' :null } ) ;
144+ } ) ;
145+ for ( var i = 0 ; i < data . length ; i ++ ) {
146+ req . write ( JSON . stringify ( data [ i ] ) + '\n' ) ;
147+ }
148+ console . log ( "sending to Sumo..." )
149+ req . end ( ) ;
150+ } ) ;
151+ }
99152 Object . keys ( messages ) . forEach ( function ( key , index ) {
100153 var headerArray = key . split ( ':' ) ;
101-
102- options . headers = {
154+ var headers = {
103155 'X-Sumo-Name' : headerArray [ 0 ] ,
104156 'X-Sumo-Category' : headerArray [ 1 ] ,
105157 'X-Sumo-Host' : headerArray [ 2 ] ,
106158 'X-Sumo-Client' : 'kinesis-aws-lambda'
107159 } ;
108-
109- var req = https . request ( options , function ( res ) {
110- res . setEncoding ( 'utf8' ) ;
111- res . on ( 'data' , function ( chunk ) { } ) ;
112- res . on ( 'end' , function ( ) {
113- if ( res . statusCode == 200 ) {
114- messagesSent ++ ;
115- } else {
116- messageErrors . push ( 'HTTP Return code ' + res . statusCode ) ;
117- }
118- finalizeContext ( ) ;
119- } ) ;
120- } ) ;
121-
122- req . on ( 'error' , function ( e ) {
123- messageErrors . push ( e . message ) ;
160+ Promise . retryMax ( httpSend , numOfRetries , retryInterval , [ options , headers , messages [ key ] ] ) . then ( ( body ) => {
161+ messagesSent ++ ;
162+ finalizeContext ( )
163+ } ) . catch ( ( e ) => {
164+ messageErrors . push ( e . error ) ;
124165 finalizeContext ( ) ;
125166 } ) ;
126-
127- for ( var i = 0 ; i < messages [ key ] . length ; i ++ ) {
128- req . write ( JSON . stringify ( messages [ key ] [ i ] ) + '\n' ) ;
129- }
130- req . end ( ) ;
131167 } ) ;
132168}
133169
134-
135170exports . handler = function ( event , context ) {
136171
137172 // console.log(JSON.stringify(event));
@@ -144,15 +179,14 @@ exports.handler = function (event, context) {
144179 if ( urlObject . protocol != 'https:' || urlObject . host === null || urlObject . path === null ) {
145180 context . fail ( 'Invalid SUMO_ENDPOINT environment variable: ' + SumoURL ) ;
146181 }
147-
148- event . Records . forEach ( function ( record ) {
182+ var numOfRecords = event . Records . length ;
183+ event . Records . forEach ( function ( record , index ) {
149184 var zippedInput = new Buffer ( record . kinesis . data , 'base64' ) ;
150185
151186 zlib . gunzip ( zippedInput , function ( e , buffer ) {
152187 if ( e ) {
153188 context . fail ( e ) ;
154189 }
155-
156190 var awslogsData = JSON . parse ( buffer . toString ( encoding ) ) ;
157191
158192 if ( awslogsData . messageType === 'CONTROL_MESSAGE' ) {
@@ -211,10 +245,11 @@ exports.handler = function (event, context) {
211245 messageList [ metadataKey ] = [ log ] ;
212246 }
213247 } ) ;
214-
215248 // Push messages to Sumo
216- postToSumo ( context , messageList ) ;
249+ if ( index === numOfRecords - 1 ) {
250+ postToSumo ( context , messageList ) ;
251+ }
217252 } )
218253
219254 } ) ;
220- } ;
255+ } ;
0 commit comments