Skip to content

Commit 6ad36a1

Browse files
committed
Moved code where it should belong, session handling and inflight in handlers
1 parent 495d5e8 commit 6ad36a1

File tree

6 files changed

+127
-88
lines changed

6 files changed

+127
-88
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 2.8)
22

33
project(sol)
4-
set (VERSION 0.18.2)
4+
set (VERSION 0.18.3)
55
set (CMAKE_EXPORT_COMPILE_COMMANDS ON)
66

77
OPTION(DEBUG "add debug flags" OFF)

src/config.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
// Default parameters
6565

66-
#define VERSION "0.18.2"
66+
#define VERSION "0.18.3"
6767
#define DEFAULT_SOCKET_FAMILY INET
6868
#define DEFAULT_LOG_LEVEL DEBUG
6969
#define DEFAULT_CONF_PATH "/etc/sol/sol.conf"

src/handlers.c

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ static int pubrel_handler(struct io_event *);
4949
static int pubcomp_handler(struct io_event *);
5050
static int pingreq_handler(struct io_event *);
5151

52+
static void session_init(struct client_session *, const char *);
53+
54+
static struct client_session *client_session_alloc(const char *);
55+
56+
static unsigned next_free_mid(struct client_session *);
57+
58+
static void inflight_msg_init(struct inflight_msg *, struct mqtt_packet *);
59+
5260
/* Command handler mapped usign their position paired with their type */
5361
static handler *handlers[15] = {
5462
NULL,
@@ -68,6 +76,58 @@ static handler *handlers[15] = {
6876
disconnect_handler
6977
};
7078

79+
/*
80+
* =========================
81+
* Internal module helpers
82+
* =========================
83+
*/
84+
85+
static void session_free(const struct ref *refcount) {
86+
struct client_session *session =
87+
container_of(refcount, struct client_session, refcount);
88+
list_destroy(session->subscriptions, 0);
89+
list_destroy(session->outgoing_msgs, 0);
90+
if (has_inflight(session)) {
91+
for (int i = 0; i < MAX_INFLIGHT_MSGS; ++i) {
92+
if (session->i_msgs[i].packet)
93+
DECREF(session->i_msgs[i].packet, struct mqtt_packet);
94+
}
95+
}
96+
free_memory(session->i_acks);
97+
free_memory(session->i_msgs);
98+
free_memory(session);
99+
}
100+
101+
static void session_init(struct client_session *session, const char *session_id) {
102+
session->inflights = ATOMIC_VAR_INIT(0);
103+
session->next_free_mid = 1;
104+
session->subscriptions = list_new(NULL);
105+
session->outgoing_msgs = list_new(NULL);
106+
snprintf(session->session_id, MQTT_CLIENT_ID_LEN, "%s", session_id);
107+
session->i_acks = try_calloc(MAX_INFLIGHT_MSGS, sizeof(time_t));
108+
session->i_msgs = try_calloc(MAX_INFLIGHT_MSGS, sizeof(struct inflight_msg));
109+
session->refcount = (struct ref) { session_free, 0 };
110+
}
111+
112+
static struct client_session *client_session_alloc(const char *session_id) {
113+
struct client_session *session = try_alloc(sizeof(*session));
114+
session_init(session, session_id);
115+
return session;
116+
}
117+
118+
static inline unsigned next_free_mid(struct client_session *session) {
119+
if (session->next_free_mid == MAX_INFLIGHT_MSGS)
120+
session->next_free_mid = 1;
121+
return session->next_free_mid++;
122+
}
123+
124+
static inline void inflight_msg_init(struct inflight_msg *imsg,
125+
struct mqtt_packet *p) {
126+
imsg->seen = time(NULL);
127+
imsg->packet = p;
128+
imsg->qos = p->header.bits.qos;
129+
}
130+
71131
/*
72132
* One of the two exposed functions of the module, it's also needed on server
73133
* module to publish periodic messages (e.g. $SOL stats). It's responsible

src/server.c

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,85 +1014,3 @@ void daemonize(void) {
10141014
if (fd > STDERR_FILENO) close(fd);
10151015
}
10161016
}
1017-
1018-
/*
1019-
* =================================
1020-
* Internal module implementations
1021-
* =================================
1022-
*/
1023-
1024-
static void subscriber_free(const struct ref *r) {
1025-
struct subscriber *sub = container_of(r, struct subscriber, refcount);
1026-
free_memory(sub);
1027-
}
1028-
1029-
static void session_free(const struct ref *refcount) {
1030-
struct client_session *session =
1031-
container_of(refcount, struct client_session, refcount);
1032-
list_destroy(session->subscriptions, 0);
1033-
list_destroy(session->outgoing_msgs, 0);
1034-
if (has_inflight(session)) {
1035-
for (int i = 0; i < MAX_INFLIGHT_MSGS; ++i) {
1036-
if (session->i_msgs[i].packet)
1037-
DECREF(session->i_msgs[i].packet, struct mqtt_packet);
1038-
}
1039-
}
1040-
free_memory(session->i_acks);
1041-
free_memory(session->i_msgs);
1042-
free_memory(session);
1043-
}
1044-
1045-
void session_init(struct client_session *session, const char *session_id) {
1046-
session->inflights = ATOMIC_VAR_INIT(0);
1047-
session->next_free_mid = 1;
1048-
session->subscriptions = list_new(NULL);
1049-
session->outgoing_msgs = list_new(NULL);
1050-
snprintf(session->session_id, MQTT_CLIENT_ID_LEN, "%s", session_id);
1051-
session->i_acks = try_calloc(MAX_INFLIGHT_MSGS, sizeof(time_t));
1052-
session->i_msgs = try_calloc(MAX_INFLIGHT_MSGS, sizeof(struct inflight_msg));
1053-
session->refcount = (struct ref) { session_free, 0 };
1054-
}
1055-
1056-
struct client_session *client_session_alloc(const char *session_id) {
1057-
struct client_session *session = try_alloc(sizeof(*session));
1058-
session_init(session, session_id);
1059-
return session;
1060-
}
1061-
1062-
bool is_subscribed(const struct topic *t, const struct client_session *s) {
1063-
struct subscriber *dummy = NULL;
1064-
HASH_FIND_STR(t->subscribers, s->session_id, dummy);
1065-
return dummy != NULL;
1066-
}
1067-
1068-
struct subscriber *subscriber_new(struct topic *t,
1069-
struct client_session * s,
1070-
unsigned char qos) {
1071-
struct subscriber *sub = try_alloc(sizeof(*sub));
1072-
sub->session = s;
1073-
sub->granted_qos = qos;
1074-
sub->refcount = (struct ref) { .count = 0, .free = subscriber_free };
1075-
memcpy(sub->id, s->session_id, MQTT_CLIENT_ID_LEN);
1076-
return sub;
1077-
}
1078-
1079-
struct subscriber *subscriber_clone(const struct subscriber *s) {
1080-
struct subscriber *sub = try_alloc(sizeof(*sub));
1081-
sub->session = s->session;
1082-
sub->granted_qos = s->granted_qos;
1083-
sub->refcount = (struct ref) { .count = 0, .free = subscriber_free };
1084-
memcpy(sub->id, s->id, MQTT_CLIENT_ID_LEN);
1085-
return sub;
1086-
}
1087-
1088-
void inflight_msg_init(struct inflight_msg *imsg, struct mqtt_packet *p) {
1089-
imsg->seen = time(NULL);
1090-
imsg->packet = p;
1091-
imsg->qos = p->header.bits.qos;
1092-
}
1093-
1094-
unsigned next_free_mid(struct client_session *session) {
1095-
if (session->next_free_mid == MAX_INFLIGHT_MSGS)
1096-
session->next_free_mid = 1;
1097-
return session->next_free_mid++;
1098-
}

src/sol_internal.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ extern pthread_mutex_t mutex;
237237

238238
struct server;
239239

240-
void inflight_msg_init(struct inflight_msg *, struct mqtt_packet *);
241240
bool is_subscribed(const struct topic *, const struct client_session *);
242241
struct subscriber *subscriber_new(struct topic *,
243242
struct client_session *, unsigned char);
@@ -265,9 +264,9 @@ bool topic_store_wildcards_empty(const struct topic_store *);
265264
#define topic_store_wildcards_foreach(item, store) \
266265
list_foreach(item, store->wildcards)
267266

268-
unsigned next_free_mid(struct client_session *);
269-
void session_init(struct client_session *, const char *);
270-
struct client_session *client_session_alloc(const char *);
267+
/* unsigned next_free_mid(struct client_session *); */
268+
/* void session_init(struct client_session *, const char *); */
269+
/* struct client_session *client_session_alloc(const char *); */
271270

272271
#define has_inflight(session) ((session)->inflights > 0)
273272
#define inflight_msg_clear(msg) DECREF((msg)->packet, struct mqtt_packet)

src/subscriber.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/* BSD 2-Clause License
2+
*
3+
* Copyright (c) 2020, Andrea Giacomo Baldan All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
*
11+
* * Redistributions in binary form must reproduce the above copyright notice,
12+
* this list of conditions and the following disclaimer in the documentation
13+
* and/or other materials provided with the distribution.
14+
*
15+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
19+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25+
* POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
28+
#include "memory.h"
29+
#include "sol_internal.h"
30+
31+
static void subscriber_destroy(const struct ref *);
32+
33+
struct subscriber *subscriber_new(struct topic *t,
34+
struct client_session * s,
35+
unsigned char qos) {
36+
struct subscriber *sub = try_alloc(sizeof(*sub));
37+
sub->session = s;
38+
sub->granted_qos = qos;
39+
sub->refcount = (struct ref) { .count = 0, .free = subscriber_destroy };
40+
memcpy(sub->id, s->session_id, MQTT_CLIENT_ID_LEN);
41+
return sub;
42+
}
43+
44+
struct subscriber *subscriber_clone(const struct subscriber *s) {
45+
struct subscriber *sub = try_alloc(sizeof(*sub));
46+
sub->session = s->session;
47+
sub->granted_qos = s->granted_qos;
48+
sub->refcount = (struct ref) { .count = 0, .free = subscriber_destroy };
49+
memcpy(sub->id, s->id, MQTT_CLIENT_ID_LEN);
50+
return sub;
51+
}
52+
53+
static void subscriber_destroy(const struct ref *r) {
54+
struct subscriber *sub = container_of(r, struct subscriber, refcount);
55+
free_memory(sub);
56+
}
57+
58+
bool is_subscribed(const struct topic *t, const struct client_session *s) {
59+
struct subscriber *dummy = NULL;
60+
HASH_FIND_STR(t->subscribers, s->session_id, dummy);
61+
return dummy != NULL;
62+
}

0 commit comments

Comments
 (0)