@@ -19,23 +19,48 @@ var sourceCategoryOverride = process.env.SOURCE_CATEGORY_OVERRIDE || ''; // If
1919var sourceHostOverride = process . env . SOURCE_HOST_OVERRIDE || '' ; // If empty sourceHostOverride will not be set to the name of the logGroup
2020var sourceNameOverride = process . env . SOURCE_NAME_OVERRIDE || '' ; // If empty sourceNameOverride will not be set to the name of the logStream
2121
22+ var retryInterval = process . env . RETRY_INTERVAL || 5000 ; // the interval in millisecs between retries
23+ var numOfRetries = process . env . NUMBER_OF_RETRIES || 3 ; // the number of retries
24+
2225var https = require ( 'https' ) ;
2326var zlib = require ( 'zlib' ) ;
2427var url = require ( 'url' ) ;
2528
29+ Promise . retryMax = function ( fn , retry , interval , fnParams ) {
30+ return fn . apply ( this , fnParams ) . catch ( err => {
31+ var waitTime = typeof interval === 'function' ? interval ( ) : interval ;
32+ console . log ( "Retries left " + ( retry - 1 ) + " delay(in ms) " + waitTime ) ;
33+ return ( retry > 1 ? Promise . wait ( waitTime ) . then ( ( ) => Promise . retryMax ( fn , retry - 1 , interval , fnParams ) ) :Promise . reject ( err ) ) ;
34+ } ) ;
35+ }
36+
37+ Promise . wait = function ( delay ) {
38+ return new Promise ( ( fulfill , reject ) => {
39+ //console.log(Date.now());
40+ setTimeout ( fulfill , delay || 0 ) ;
41+ } ) ;
42+ } ;
43+
44+ function exponentialBackoff ( seed ) {
45+ var count = 0 ;
46+ return function ( ) {
47+ count ++ ;
48+ return count * seed ;
49+ }
50+ }
2651
2752function postToSumo ( context , messages ) {
2853 var messagesTotal = Object . keys ( messages ) . length ;
2954 var messagesSent = 0 ;
3055 var messageErrors = [ ] ;
31-
56+
3257 var urlObject = url . parse ( SumoURL ) ;
3358 var options = {
3459 'hostname' : urlObject . hostname ,
3560 'path' : urlObject . pathname ,
3661 'method' : 'POST'
3762 } ;
38-
63+
3964 var finalizeContext = function ( ) {
4065 var total = messagesSent + messageErrors . length ;
4166 if ( total == messagesTotal ) {
@@ -48,45 +73,54 @@ function postToSumo(context, messages) {
4873 }
4974 } ;
5075
51-
76+ function httpSend ( options , headers , data ) {
77+ return new Promise ( ( resolve , reject ) => {
78+ var curOptions = options ;
79+ curOptions . headers = headers ;
80+ var req = https . request ( curOptions , function ( res ) {
81+ var body = '' ;
82+ res . setEncoding ( 'utf8' ) ;
83+ res . on ( 'data' , function ( chunk ) {
84+ body += chunk ; // don't really do anything with body
85+ } ) ;
86+ res . on ( 'end' , function ( ) {
87+ if ( res . statusCode == 200 ) {
88+ resolve ( body ) ;
89+ } else {
90+ reject ( { 'error' :'HTTP Return code ' + res . statusCode , 'res' :res } ) ;
91+ }
92+ } ) ;
93+ } ) ;
94+ req . on ( 'error' , function ( e ) {
95+ reject ( { 'error' :e , 'res' :null } ) ;
96+ } ) ;
97+ for ( var i = 0 ; i < data . length ; i ++ ) {
98+ req . write ( JSON . stringify ( data [ i ] ) + '\n' ) ;
99+ }
100+ console . log ( "sending to Sumo..." )
101+ req . end ( ) ;
102+ } ) ;
103+ }
52104 Object . keys ( messages ) . forEach ( function ( key , index ) {
53105 var headerArray = key . split ( ':' ) ;
54- options . headers = {
106+ var headers = {
55107 'X-Sumo-Name' : headerArray [ 0 ] ,
56108 'X-Sumo-Category' : headerArray [ 1 ] ,
57109 'X-Sumo-Host' : headerArray [ 2 ] ,
58- 'X-Sumo-Client' : 'cloudwatchevents -aws-lambda'
110+ 'X-Sumo-Client' : 'kinesis -aws-lambda'
59111 } ;
60-
61- var req = https . request ( options , function ( res ) {
62- res . setEncoding ( 'utf8' ) ;
63- res . on ( 'data' , function ( chunk ) { } ) ;
64- res . on ( 'end' , function ( ) {
65- console . log ( "Got response code: " + res . statusCode ) ;
66- if ( res . statusCode == 200 ) {
67- messagesSent ++ ;
68- } else {
69- messageErrors . push ( 'HTTP Return code ' + res . statusCode ) ;
70- }
71- finalizeContext ( ) ;
72- } ) ;
73- } ) ;
74-
75- req . on ( 'error' , function ( e ) {
76- messageErrors . push ( e . message ) ;
112+ Promise . retryMax ( httpSend , numOfRetries , retryInterval , [ options , headers , messages [ key ] ] ) . then ( ( body ) => {
113+ messagesSent ++ ;
114+ finalizeContext ( )
115+ } ) . catch ( ( e ) => {
116+ messageErrors . push ( e . error ) ;
77117 finalizeContext ( ) ;
78118 } ) ;
79-
80- for ( var i = 0 ; i < messages [ key ] . length ; i ++ ) {
81- req . write ( JSON . stringify ( messages [ key ] [ i ] ) + '\n' ) ;
82- }
83- req . end ( ) ;
84119 } ) ;
85120}
86121
87-
88122exports . handler = function ( event , context ) {
89-
123+
90124 // Used to hold chunks of messages to post to SumoLogic
91125 var messageList = { } ;
92126 var final_event ;
@@ -95,13 +129,13 @@ exports.handler = function (event, context) {
95129 if ( urlObject . protocol != 'https:' || urlObject . host === null || urlObject . path === null ) {
96130 context . fail ( 'Invalid SUMO_ENDPOINT environment variable: ' + SumoURL ) ;
97131 }
98-
132+
99133 //console.log(event);
100134 if ( ( event . source === "aws.guardduty" ) || ( removeOuterFields ) ) {
101135 final_event = event . detail ;
102136 } else {
103137 final_event = event ;
104- }
138+ }
105139 messageList [ sourceNameOverride + ':' + sourceCategoryOverride + ':' + sourceHostOverride ] = [ final_event ] ;
106140 postToSumo ( context , messageList ) ;
107141} ;
0 commit comments