Skip to content

Commit 3903445

Browse files
authored
Merge pull request #3 from topcoder-platform/dev [skip ci]
sync - dev to Master [skip ci]
2 parents 7903dce + 909065b commit 3903445

File tree

6 files changed

+48
-20
lines changed

6 files changed

+48
-20
lines changed

.circleci/config.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ builddeploy_steps: &builddeploy_steps
5050
# # consumer deployment
5151
# rm -rf buildenvvar
5252
# ./unsetenv.sh
53-
# ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar
54-
# source buildenvvar
55-
# ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
53+
./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar
54+
source buildenvvar
55+
./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
5656
# # without kafka dynamodb
5757
# rm -rf buildenvvar
5858
# ./unsetenv.sh
@@ -62,9 +62,9 @@ builddeploy_steps: &builddeploy_steps
6262
# # reconciler deployment
6363
# rm -rf buildenvvar
6464
# ./unsetenv.sh
65-
./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar
66-
source buildenvvar
67-
./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
65+
# ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar
66+
# source buildenvvar
67+
# ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
6868
6969
7070
jobs:

config/default.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ module.exports = {
99
},
1010
RETRY_COUNTER: 3,
1111
KAFKA_REPOST_COUNT: 5,
12+
KAFKA_URL: process.env.KAFKA_URL,
13+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'ifx-pg-consumer',
14+
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
15+
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ?
16+
process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
1217
topic_error: {
1318
NAME: 'db.ifxpgmigrate.error',
1419
PARTITION: 0,

informix_auditing/audit_util.c

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum,
6767
DPRINTF("logger",95,("-- typeName=%s --",srcType));
6868
printf("-- typeName=%s --",srcType);
6969
if ((strcmp("blob", srcType) == 0) || (strcmp("clob", srcType) == 0) || (strcmp("text", srcType) == 0) || (strcmp("byte", srcType) == 0)) {
70+
printf("skiping data read\n");
7071
return("unsupportedtype");
7172
}
7273
else{
@@ -111,8 +112,8 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum,
111112
tdesc = mi_type_typedesc(conn, typeid);
112113
precision = mi_type_precision(tdesc);
113114

114-
printf("rputine read initiated \n");
115-
printf("rputine read initiated %ld\n",collen);
115+
//printf("rputine read initiated \n");
116+
//printf("rputine read initiated %ld\n",collen);
116117
new_datum = mi_routine_exec(conn, fn, &ret, datum, collen, precision, fp);
117118
printf("routine read completed \n");
118119
pbuf = mi_lvarchar_to_string(new_datum);
@@ -158,7 +159,7 @@ mi_string *doInsertCN()
158159
//fixname(pdbname);
159160
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
160161
posi = strlen(buffer);
161-
printf("\"TABLENAME\": \"%s\", ", tabname);
162+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s\" \n",pdbname,tabname,cdatetime);
162163
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", tabname);
163164
posi = strlen(buffer);
164165
sprintf(&buffer[posi], "\"OPERATION\": \"INSERT\", ");
@@ -182,7 +183,9 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname));
182183
sprintf(&buffer[posi], ", ");
183184
posi = strlen(buffer);
184185
}
185-
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast));
186+
char *bufdatval = escapecharjson(pcast);
187+
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatval);
188+
free(bufdatval);
186189
if (strcmp("unsupportedtype", pcast) == 0) {
187190
strcpy(uniquedatatype, "true");
188191
}
@@ -200,6 +203,7 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname));
200203
} else {
201204
sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }");
202205
}
206+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s-Completed\" \n",pdbname,tabname,cdatetime);
203207
free(cdatetime);
204208
return(buffer);
205209
}
@@ -310,6 +314,7 @@ mi_string *doDeleteCN()
310314
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
311315
posi = strlen(buffer);
312316
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
317+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s\" \n", pdbname,ptabname,cdatetime);
313318
posi = strlen(buffer);
314319
sprintf(&buffer[posi], "\"OPERATION\": \"DELETE\", ");
315320
posi = strlen(buffer);
@@ -336,7 +341,9 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname));
336341

337342
//pcast = escapecharjson(pcast);
338343
//printf("%s",pcast);
339-
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast));
344+
char *bufdatdelval = escapecharjson(pcast);
345+
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatdelval);
346+
free(bufdatdelval);
340347
if (strcmp("unsupportedtype", pcast) == 0) {
341348
strcpy(uniquedatatype, "true");
342349
}
@@ -355,6 +362,7 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname));
355362
} else {
356363
sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }");
357364
}
365+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s-Completed\" \n ", pdbname,ptabname,cdatetime);
358366
free(cdatetime);
359367
return(buffer);
360368
}
@@ -406,7 +414,8 @@ mi_string *doUpdateCN()
406414
//fixname(pdbname);
407415
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
408416
posi = strlen(buffer);
409-
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
417+
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
418+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s\" \n", pdbname,ptabname,cdatetime);
410419
posi = strlen(buffer);
411420
sprintf(&buffer[posi], "\"OPERATION\": \"UPDATE\", ");
412421
posi = strlen(buffer);
@@ -448,7 +457,11 @@ mi_string *doUpdateCN()
448457
sprintf(&buffer[pbufLen], ", ");
449458
pbufLen = strlen(buffer);
450459
}
451-
sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, escapecharjson(pcast), escapecharjson(pcast2));
460+
char *bufdatoldval = escapecharjson(pcast);
461+
char *bufdatnewval = escapecharjson(pcast2);
462+
sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, bufdatoldval, bufdatnewval);
463+
free(bufdatoldval);
464+
free(bufdatnewval);
452465
if (strcmp("unsupportedtype", pcast2) == 0) {
453466
strcpy(uniquedatatype, "true");
454467
}
@@ -464,6 +477,7 @@ mi_string *doUpdateCN()
464477
sprintf(&buffer[pbufLen], "}, \n \"uniquedatatype\" : \"false\" \n }");
465478
}
466479
DPRINTF("logger", 90, ("Exiting doUpdateCN()"));
480+
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s-Completed\" \n ", pdbname,ptabname,cdatetime);
467481
free(cdatetime);
468482
return(buffer);
469483
}
@@ -579,7 +593,7 @@ char * escapecharjson( char *jsonvalue_org)
579593
escjsonvalue = (char *)calloc(10000, sizeof(char));
580594
for (jsonvalue_copy = jsonvalue_org; *jsonvalue_copy != '\0'; jsonvalue_copy++) {
581595

582-
printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy);
596+
//printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy);
583597
if (*jsonvalue_copy == '"') {
584598
posi = strlen(escjsonvalue);
585599
sprintf(&escjsonvalue[posi], "%s","\\\"") ;
@@ -616,6 +630,6 @@ char * escapecharjson( char *jsonvalue_org)
616630
}
617631
//p=NULL;
618632
jsonvalue_copy=NULL;
619-
printf("%s", escjsonvalue);
633+
//printf("%s", escjsonvalue);
620634
return(escjsonvalue);
621635
}

informix_auditing/auditing2.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp)
6060
mi_string buffer[32], *pdata;
6161

6262
DPRINTF("logger", 80, ("connected user %s", mi_lvarchar_to_string(sessionusername)));
63-
printf("operating user %s welcome test \n",mi_lvarchar_to_string(sessionusername));
63+
printf("USER Triggered: %s\n",mi_lvarchar_to_string(sessionusername));
6464
if (strcmp(mi_lvarchar_to_string(sessionusername), "ifxsyncuser") == 0)
6565
{
6666
printf("automated user. skipping trigger\n");
@@ -72,14 +72,14 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp)
7272
if (trigger_operation & MI_TRIGGER_NOT_IN_EVENT) {
7373
/* not in a trigger! generate an exception */
7474
mi_db_error_raise(NULL, MI_EXCEPTION,
75-
"do_auditing1() can only be called within a trigger!", NULL);
75+
"do_auditing2() can only be called within a trigger!", NULL);
7676
return;
7777
}
7878
/* Make sure this is in a FOR EACH type of trigger */
7979
if (0 == (trigger_operation & MI_TRIGGER_FOREACH_EVENT) ) {
8080
/* not in a for each trigger! generate an exception */
8181
mi_db_error_raise(NULL, MI_EXCEPTION,
82-
"do_auditing1() must be in a FOR EACH trigger operation", NULL);
82+
"do_auditing2() must be in a FOR EACH trigger operation", NULL);
8383
return;
8484
}
8585
/* keep only the SQL operation */

src/common/app_log.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ async function consumerpg_failure_log(payload, postgreErr) {
197197
CONSUMER_FAILURE_LOG: postgreErr
198198
}).then(log => console.log('Added Error in Consumer Log Table'))
199199
.catch(err => console.log(err))
200-
console.log(`error-sync: consumer failed to update : "${postgreErr}"`)
200+
console.log(`error-sync: consumer failed to update :` + JSON.stringify(postgreErr))
201201
//audit table update
202202
}
203203
// CONSUMER_PAYLOAD: { type: DataTypes.JSON, allowNull: false },

src/consumer.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,16 @@ const Promise = require('bluebird');
33
const config = require('config');
44
const logger = require('./common/logger');
55
const healthcheck = require('topcoder-healthcheck-dropin');
6-
const consumer = new Kafka.GroupConsumer();
6+
const options = {
7+
groupId: config.KAFKA_GROUP_ID,
8+
connectionString: config.KAFKA_URL,
9+
ssl: {
10+
cert: config.KAFKA_CLIENT_CERT,
11+
key: config.KAFKA_CLIENT_CERT_KEY
12+
}
13+
};
14+
15+
const consumer = new Kafka.GroupConsumer(options);
716
const {
817
create_consumer_app_log,
918
consumerpg_success_log,

0 commit comments

Comments
 (0)