diff --git a/CMakeLists.txt b/CMakeLists.txt index a1bc893..d1814f4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -74,6 +74,13 @@ if (ZEROMQ_INPUT) add_definitions("-DZEROMQ_SUPPORT_INPUT") endif() +if (ZEROMQ_RELAY) + set(LIBS ${LIBS} zmq) + set(SOURCE ${SOURCE} + metadata_input_zeromq_relay.c) + add_definitions("-DZEROMQ_SUPPORT_INPUT_RELAY") +endif() + if (GPSD) set(LIBS ${LIBS} gps) set(SOURCE ${SOURCE} diff --git a/metadata_exporter.c b/metadata_exporter.c index 0070b2d..c4a4737 100644 --- a/metadata_exporter.c +++ b/metadata_exporter.c @@ -47,6 +47,9 @@ #ifdef ZEROMQ_SUPPORT_INPUT #include "metadata_input_zeromq.h" #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + #include "metadata_input_zeromq_relay.h" +#endif #ifdef MUNIN_SUPPORT #include "metadata_input_munin.h" #endif @@ -670,6 +673,9 @@ static void print_usage() #ifdef MUNIN_SUPPORT md_munin_usage(); #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + md_zeromq_relay_usage(); +#endif #ifdef GPS_NSB_SUPPORT md_gps_nsb_usage(); #endif @@ -799,6 +805,19 @@ int main(int argc, char *argv[]) num_inputs++; } #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + else if (!strcmp(key, "zmq_input_relay")) { + mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] = calloc(sizeof(struct md_input_zeromq_relay), 1); + + if (mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] == NULL) { + META_PRINT_SYSLOG(mde, LOG_ERR, "Could not allocate ZeroMQ Relay input\n"); + exit(EXIT_FAILURE); + } + + md_zeromq_relay_setup(mde, (struct md_input_zeromq_relay*) mde->md_inputs[MD_INPUT_ZEROMQ_RELAY]); + num_inputs++; + } +#endif #ifdef NNE_SUPPORT else if (!strcmp(key, "nne")) { mde->md_writers[MD_WRITER_NNE] = calloc(sizeof(struct md_writer_nne), 1); diff --git a/metadata_exporter.h b/metadata_exporter.h index 46066b4..f0e91e0 100644 --- a/metadata_exporter.h +++ b/metadata_exporter.h @@ -54,6 +54,7 @@ #define META_TYPE_SYSEVENT 0x06 #define META_TYPE_RADIO 0x08 #define META_TYPE_SYSTEM 0x10 +#define META_TYPE_ZEROMQ 0xA enum iface_event { IFACE_EVENT_DEV_STATE=1, @@ -112,6 +113,7 @@ enum md_inputs { MD_INPUT_MUNIN, MD_INPUT_SYSEVENT, MD_INPUT_ZEROMQ, + MD_INPUT_ZEROMQ_RELAY, __MD_INPUT_MAX }; @@ -238,6 +240,11 @@ struct md_munin_event { json_object* json_blob; }; +struct md_zeromq_event { + MD_EVENT; + const char *msg; +}; + struct md_radio_event { MD_RADIO_EVENT; }; diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c new file mode 100644 index 0000000..1eb3158 --- /dev/null +++ b/metadata_input_zeromq_relay.c @@ -0,0 +1,252 @@ +/* Copyright (c) 2018, Karlstad Universitet, Jonas Karlsson + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include JSON_LOC +#include +#include +#include + +#include "metadata_exporter.h" +#include "metadata_input_nl_zmq_common.h" +#include "metadata_input_zeromq_relay.h" +#include "backend_event_loop.h" + +#include "lib/minmea.h" +#include "metadata_exporter_log.h" + +static int hashCode(struct table *t,int key){ + if(key<0) + return -(key%t->size); + return key%t->size; +} + +void insert(struct table *t,int key, struct zmq_connection *val){ + int pos = hashCode(t,key); + struct node *list = t->list[pos]; + struct node *newNode = (struct node*)malloc(sizeof(struct node)); + struct node *temp = list; + while(temp){ + if(temp->key==key){ + temp->val = val; + return; + } + temp = temp->next; + } + newNode->key = key; + newNode->val = val; + newNode->next = list; + t->list[pos] = newNode; +} + +struct table *createTable(int size){ + struct table *t = (struct table*)malloc(sizeof(struct table)); + t->size = size; + t->list = (struct node**)malloc(sizeof(struct node*)*size); + int i; + for(i=0;ilist[i] = NULL; + return t; +}; + +static struct zmq_connection* lookup(struct table *t,int key){ + int pos = hashCode(t,key); + struct node *list = t->list[pos]; + struct node *temp = list; + while(temp){ + if(temp->key==key){ + return temp->val; + } + temp = temp->next; + } + return NULL; +} + +static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t events) +{ + struct md_input_zeromq_relay *miz = ptr; + int zmq_events = 0; + size_t events_len = sizeof(zmq_events); + json_object *zmqh_obj = NULL; + const char *json_msg; + struct zmq_connection *zmq_con = lookup(miz->zmq_connections, fd); + void *zmq_socket; + + if (!zmq_con) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Could not lookup ZMQ connection\n"); + return; + } + + zmq_socket = zmq_con->zmq_socket; + if (!zmq_socket) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Not a valid ZMQ socket\n"); + return; + } + + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + + while (zmq_events & ZMQ_POLLIN) + { + char buf[2048] = {0}; + zmq_recv(zmq_socket, buf, 2048, 0); + + json_msg = strchr(buf, '{'); + // Sanity checks + // Do we even have a json object + if (json_msg == NULL) + { + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + continue; + } + + // Is the json object valid ? + zmqh_obj = json_tokener_parse(json_msg); + if (!zmqh_obj) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Received invalid JSON object on ZMQ socket\n"); + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + continue; + } + //TODO: Check so we also have a topic + + META_PRINT(miz->parent->logfile, "Got JSON %s\n", json_object_to_json_string(zmqh_obj)); + json_object_put(zmqh_obj); + + //Yay we have a valid object lets store that and publish it. + + //Create a zeromq event + + memset(miz->mse, 0, sizeof(struct md_zeromq_event)); + miz->mse->md_type = META_TYPE_ZEROMQ; + miz->mse->msg = buf; //This works as next line will call the consuming function sync, could mde_publish_event_obj ever be asynch? + mde_publish_event_obj(miz->parent, (struct md_event*) miz->mse); + + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + } +} + +static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) +{ + int zmq_fd = -1; + size_t len = 0; + struct zmq_connection *zmq_con; + miz->zmq_connections = createTable(miz->nr_of_connections); //len of miz->urls + + // Connect to ZMQ publisher(s) + for (int i = 0; i < miz->nr_of_connections; i++) { + zmq_con = calloc(1, sizeof(struct zmq_connection)); + zmq_con->zmq_ctx = zmq_ctx_new(); + if (zmq_con->zmq_ctx == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ context\n"); + return RETVAL_FAILURE; + } + + zmq_con->zmq_socket = zmq_socket(zmq_con->zmq_ctx, ZMQ_SUB); + if (zmq_con->zmq_socket == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ socket\n"); + return RETVAL_FAILURE; + } + + //Connect to user defined publiser $URL + if (zmq_connect(zmq_con->zmq_socket, miz->urls[i]) == -1) + { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't connect to %s ZMQ publisher\n", miz->urls[i]); + return RETVAL_FAILURE; + } + + // subscribe to all topics (of this publisher) + const char *topic = ""; + zmq_setsockopt(zmq_con->zmq_socket, ZMQ_SUBSCRIBE, topic, strlen(topic)); + + len = sizeof(zmq_fd); + if (zmq_getsockopt(zmq_con->zmq_socket, ZMQ_FD, &zmq_fd, &len) == -1) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't get ZMQ file descriptor\n"); + return RETVAL_FAILURE; + } + + if(!(miz->event_handle = backend_create_epoll_handle(miz, + zmq_fd, md_input_zeromq_relay_handle_event))) + return RETVAL_FAILURE; + + miz->mse = calloc(1, sizeof(struct md_zeromq_event)); + if (miz->mse == NULL) + return RETVAL_FAILURE; + + backend_event_loop_update(miz->parent->event_loop, EPOLLIN, EPOLL_CTL_ADD, + zmq_fd, miz->event_handle); + insert(miz->zmq_connections,zmq_fd,zmq_con); + } + + return RETVAL_SUCCESS; +} + +static uint8_t md_input_zeromq_relay_init(void *ptr, json_object* config) +{ + struct md_input_zeromq_relay *miz = ptr; + miz->nr_of_connections = 0; + + json_object* subconfig; + if (json_object_object_get_ex(config, "zmq_input_relay", &subconfig)) { + json_object_object_foreach(subconfig, key, val) { + if (!strcmp(key, "urls")) { + miz->nr_of_connections = json_object_array_length(val); + miz->urls= calloc(miz->nr_of_connections, sizeof(char*)); + for (int i=0; i< miz->nr_of_connections; i++) { + struct json_object* json_url = json_object_array_get_idx(val,i); + int url_len = json_object_get_string_len(json_url) + 1; + const char * url = json_object_get_string(json_url); + miz->urls[i] = calloc(url_len, sizeof(char)); + snprintf((char *)miz->urls[i],url_len, "%s", url); + } + } + } + } + + + if (miz->nr_of_connections <= 0) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "At least one publisher must be present\n"); + return RETVAL_FAILURE; + } + + return md_input_zeromq_relay_config(miz); +} + +void md_zeromq_relay_usage() +{ + fprintf(stderr, "\"zmq_input_relay\": {\tZeroMQ input (at least one url must be present)\n"); + fprintf(stderr, " \"urls\":\t\tArray of ZeroMQ URLs to listen to, \ +eg. [\"tcp://127.0.0.1:10001\", \"tcp://127.0.0.1:10002\"] \n"); + fprintf(stderr, "},\n"); +} + +void md_zeromq_relay_setup(struct md_exporter *mde, struct md_input_zeromq_relay *miz) +{ + miz->parent = mde; + miz->init = md_input_zeromq_relay_init; +} diff --git a/metadata_input_zeromq_relay.h b/metadata_input_zeromq_relay.h new file mode 100644 index 0000000..f63b39b --- /dev/null +++ b/metadata_input_zeromq_relay.h @@ -0,0 +1,60 @@ +/* Copyright (c) 2018, Karlstad Universitet, Jonas Karlsson + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once +#include "metadata_exporter.h" +#include +#include + +struct node{ + int key; //zmq_fd + struct zmq_connection *val; + struct node *next; +}; + +struct table{ + int size; + struct node **list; +}; + +struct backend_epoll_handle; + +struct zmq_connection { + void* zmq_ctx; + void* zmq_socket; +}; + +struct md_input_zeromq_relay { + MD_INPUT; + struct backend_epoll_handle *event_handle; + struct table *zmq_connections; + const char **urls; + int nr_of_connections; + struct md_zeromq_event *mse; +}; + +void md_zeromq_relay_usage(); +void md_zeromq_relay_setup(struct md_exporter *mde, struct md_input_zeromq_relay *miz); diff --git a/metadata_writer_zeromq.c b/metadata_writer_zeromq.c index 33bdd03..61ac052 100644 --- a/metadata_writer_zeromq.c +++ b/metadata_writer_zeromq.c @@ -219,6 +219,18 @@ static void md_zeromq_writer_handle_munin(struct md_writer_zeromq *mwz, } } +static void md_zeromq_writer_handle_zeromq(struct md_writer_zeromq *mwz, + struct md_zeromq_event *mge) +{ + char topic[8192]; //In reality this is the message not the topic + int retval; + + retval = snprintf(topic, sizeof(topic), "%s", mge->msg); + if (retval < sizeof(topic)) { + zmq_send(mwz->zmq_publisher, topic, strlen(topic), 0); + } +} + static void md_zeromq_writer_handle_sysevent(struct md_writer_zeromq *mwz, struct md_sysevent *mge) @@ -1024,6 +1036,9 @@ static void md_zeromq_writer_handle(struct md_writer *writer, struct md_event *e case META_TYPE_RADIO: md_zeromq_writer_handle_radio(mwz, (struct md_radio_event*) event); break; + case META_TYPE_ZEROMQ: + md_zeromq_writer_handle_zeromq(mwz, (struct md_zeromq_event*) event); + break; default: META_PRINT_SYSLOG(mwz->parent, LOG_INFO, "ZMQ writer does not support event %u\n", event->md_type);