@@ -3,17 +3,9 @@ require('express-async-errors');
33const Kafka = require ( 'no-kafka' )
44const config = require ( 'config' )
55const bodyParser = require ( 'body-parser' )
6- const {
7- create_producer_app_log,
8- producerpost_success_log,
9- producerpost_failure_log
10- } = require ( './common/app_log' )
6+ const app_log = require ( './common/app_log' )
117const pushToKafka = require ( './api/pushToKafka' )
12- const {
13- postMessage,
14- validateMsgPosted
15- } = require ( './api/postslackinfo' )
16-
8+ const slack = require ( './api/postslackinfo' )
179const app = express ( )
1810app . use ( bodyParser . json ( ) ) ; // to support JSON-encoded bodies
1911app . use ( bodyParser . urlencoded ( { // to support URL-encoded bodies
@@ -22,21 +14,14 @@ app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
2214app . get ( '/' , function ( req , res ) {
2315 res . send ( 'hello world' )
2416} )
25-
26-
2717app . post ( '/kafkaevents' , async ( req , res , next ) => {
2818 const payload = req . body
2919 let seqID = payload . TIME + "_" + payload . TABLENAME
30- //retry_count = payload['RETRY_COUNT'] ? payload['RETRY_COUNT'] : 0
31- //let reconcile_flag = payload['RECONCILE_STATUS'] ? payload['RECONCILE_STATUS'] : 0
32- let producer_retry_count
33-
3420 try {
35- await create_producer_app_log ( payload , "PayloadReceived" )
21+ await app_log . create_producer_app_log ( payload , "PayloadReceived" )
3622 } catch ( error ) {
3723 console . log ( error )
3824 }
39-
4025 //send kafka message
4126 let kafka_error
4227 let msgValue = {
@@ -46,14 +31,12 @@ app.post('/kafkaevents', async (req, res, next) => {
4631 kafka_error = await pushToKafka ( producer , config . topic . NAME , msgValue )
4732 //add auditlog
4833 if ( ! kafka_error ) {
49- await producerpost_success_log ( payload , "PayloadPosted" )
34+ await app_log . producerpost_success_log ( payload , "PayloadPosted" )
5035 res . send ( 'done' )
5136 return
5237 }
53-
5438 //add auditlog
55- await producerpost_failure_log ( payload , kafka_error , 'PayloadPostFailed' )
56-
39+ await app_log . producerpost_failure_log ( payload , kafka_error , 'PayloadPostFailed' )
5740 msgValue = {
5841 ...kafka_error ,
5942 SEQ_ID : seqID ,
@@ -66,26 +49,20 @@ app.post('/kafkaevents', async (req, res, next) => {
6649 console . log ( "Kafka Message posted successfully to the topic : " + config . topic_error . NAME )
6750 } else {
6851 if ( config . SLACK . SLACKNOTIFY === 'true' ) {
69- await postMessage ( "producer post meesage failed- But usable to post the error in kafka error topic due to errors" , async ( response ) => {
70- await validateMsgPosted ( response . statusCode , response . statusMessage )
52+ await slack . postMessage ( "producer post meesage failed- But usable to post the error in kafka error topic due to errors" , async ( response ) => {
53+ await slack . validateMsgPosted ( response . statusCode , response . statusMessage )
7154 } ) ;
7255 }
7356 }
74-
7557 res . send ( 'error' )
76-
7758} )
7859
79-
8060const producer = new Kafka . Producer ( )
81-
8261producer . init ( ) . then ( function ( ) {
8362 console . log ( 'connected to local kafka server on port 9092 ...' ) ;
84-
8563 // start the server
8664 app . listen ( config . PORT ) ;
8765 console . log ( 'Server started! At http://localhost:' + config . PORT ) ;
88-
8966 } //end producer init
9067) . catch ( e => {
9168 console . log ( 'Error : ' , e )
0 commit comments