Skip to content

Commit 6fc0cbc

Browse files
authored
Merge pull request #29 from lukbaj/ft-zmq-input
Add ZeroMQ input
2 parents 102d78e + f0ac39a commit 6fc0cbc

11 files changed

+1296
-648
lines changed

CMakeLists.txt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ set(SOURCE
1212
metadata_utils.c
1313
system_helpers.c
1414
metadata_input_netlink.c
15+
metadata_input_nl_zmq_common.c
1516
lib/minmea.c)
1617

1718

@@ -57,13 +58,20 @@ if (NEAT)
5758
add_definitions("-DNEAT_SUPPORT")
5859
endif()
5960

60-
if (ZEROMQ)
61+
if (ZEROMQ_WRITER)
6162
set(LIBS ${LIBS} zmq)
6263
set(SOURCE ${SOURCE}
6364
metadata_writer_zeromq.c
6465
metadata_writer_zeromq_monroe.c
6566
metadata_writer_zeromq_nne.c)
66-
add_definitions("-DZEROMQ_SUPPORT")
67+
add_definitions("-DZEROMQ_SUPPORT_WRITER")
68+
endif()
69+
70+
if (ZEROMQ_INPUT)
71+
set(LIBS ${LIBS} zmq)
72+
set(SOURCE ${SOURCE}
73+
metadata_input_zeromq.c)
74+
add_definitions("-DZEROMQ_SUPPORT_INPUT")
6775
endif()
6876

6977
if (GPSD)

metadata_exporter.c

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
#ifdef GPSD_SUPPORT
4545
#include "metadata_input_gpsd.h"
4646
#endif
47+
#ifdef ZEROMQ_SUPPORT_INPUT
48+
#include "metadata_input_zeromq.h"
49+
#endif
4750
#ifdef MUNIN_SUPPORT
4851
#include "metadata_input_munin.h"
4952
#endif
@@ -58,7 +61,7 @@
5861
#ifdef SQLITE_SUPPORT
5962
#include "metadata_writer_sqlite.h"
6063
#endif
61-
#ifdef ZEROMQ_SUPPORT
64+
#ifdef ZEROMQ_SUPPORT_WRITER
6265
#include "metadata_writer_zeromq.h"
6366
#endif
6467
#ifdef NNE_SUPPORT
@@ -258,16 +261,16 @@ static struct json_object *create_fake_restart_obj()
258261
static struct json_object *create_fake_conn_obj(uint64_t l3_id, uint64_t l4_id,
259262
uint8_t event_param, char *event_value_str, uint64_t tstamp)
260263
{
261-
struct json_object *obj = NULL, *obj_add = NULL;
264+
struct json_object *obj = NULL, *obj_add = NULL;
262265
uint8_t rand_value = 0;
263266
uint64_t rand_value_64 = 0;
264267
struct timeval tv;
265268

266-
if (!(obj = json_object_new_object()))
267-
return NULL;
269+
if (!(obj = json_object_new_object()))
270+
return NULL;
268271

269272
gettimeofday(&tv, NULL);
270-
if (!(obj_add = json_object_new_int64(tv.tv_sec))) {
273+
if (!(obj_add = json_object_new_int64(tv.tv_sec))) {
271274
json_object_put(obj);
272275
return NULL;
273276
}
@@ -354,7 +357,6 @@ static struct json_object *create_fake_conn_obj(uint64_t l3_id, uint64_t l4_id,
354357
}
355358
json_object_object_add(obj, "interface_id", obj_add);
356359

357-
358360
if (!(obj_add = json_object_new_string("1234567"))) {
359361
json_object_put(obj);
360362
return NULL;
@@ -578,7 +580,7 @@ static void test_netlink(uint32_t packets)
578580
//
579581
//When testing, there is no need to multicast. We can just send to the PID
580582
netlink_addr.nl_pid = getpid();
581-
583+
582584
srand(time(NULL));
583585

584586
//TODO: Specify number of packets from command line
@@ -684,8 +686,8 @@ static void print_usage()
684686
#ifdef SQLITE_SUPPORT
685687
md_sqlite_usage();
686688
#endif
687-
#ifdef ZEROMQ_SUPPORT
688-
md_zeromq_usage();
689+
#ifdef ZEROMQ_SUPPORT_WRITER
690+
md_zeromq_writer_usage();
689691
#endif
690692
}
691693

@@ -784,6 +786,19 @@ int main(int argc, char *argv[])
784786
num_inputs++;
785787
}
786788
#endif
789+
#ifdef ZEROMQ_SUPPORT_INPUT
790+
else if (!strcmp(key, "zmq_input")) {
791+
mde->md_inputs[MD_INPUT_ZEROMQ] = calloc(sizeof(struct md_input_zeromq), 1);
792+
793+
if (mde->md_inputs[MD_INPUT_ZEROMQ] == NULL) {
794+
META_PRINT_SYSLOG(mde, LOG_ERR, "Could not allocate ZeroMQ input\n");
795+
exit(EXIT_FAILURE);
796+
}
797+
798+
md_zeromq_input_setup(mde, (struct md_input_zeromq*) mde->md_inputs[MD_INPUT_ZEROMQ]);
799+
num_inputs++;
800+
}
801+
#endif
787802
#ifdef NNE_SUPPORT
788803
else if (!strcmp(key, "nne")) {
789804
mde->md_writers[MD_WRITER_NNE] = calloc(sizeof(struct md_writer_nne), 1);
@@ -862,7 +877,7 @@ int main(int argc, char *argv[])
862877
num_writers++;
863878
}
864879
#endif
865-
#ifdef ZEROMQ_SUPPORT
880+
#ifdef ZEROMQ_SUPPORT_WRITER
866881
else if (!strcmp(key, "zmq")) {
867882
mde->md_writers[MD_WRITER_ZEROMQ] = calloc(sizeof(struct md_writer_zeromq), 1);
868883

@@ -871,7 +886,7 @@ int main(int argc, char *argv[])
871886
exit(EXIT_FAILURE);
872887
}
873888

874-
md_zeromq_setup(mde, (struct md_writer_zeromq*) mde->md_writers[MD_WRITER_ZEROMQ]);
889+
md_zeromq_writer_setup(mde, (struct md_writer_zeromq*) mde->md_writers[MD_WRITER_ZEROMQ]);
875890
num_writers++;
876891
}
877892
#endif

metadata_exporter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ enum md_inputs {
111111
MD_INPUT_GPS_NSB,
112112
MD_INPUT_MUNIN,
113113
MD_INPUT_SYSEVENT,
114+
MD_INPUT_ZEROMQ,
114115
__MD_INPUT_MAX
115116
};
116117

0 commit comments

Comments
 (0)