Skip to content

Commit 495d5e8

Browse files
committed
Moved topic related APIs to its own module, refactored server topic store
1 parent 04f64e4 commit 495d5e8

File tree

8 files changed

+285
-158
lines changed

8 files changed

+285
-158
lines changed

src/handlers.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,11 @@ static int connect_handler(struct io_event *e) {
371371
const char *will_topic = (const char *) c->payload.will_topic;
372372
const char *will_message = (const char *) c->payload.will_message;
373373
// TODO check for will_topic != NULL
374-
struct topic *t = topic_get_or_create(&server, will_topic);
374+
struct topic *t = topic_store_get_or_put(server.store, will_topic);
375375
if (!t)
376376
log_fatal("connect_handler failed: Out of memory");
377-
if (!topic_exists(&server, t->name))
378-
topic_put(&server, t);
377+
if (!topic_store_contains(server.store, t->name))
378+
topic_store_put(server.store, t);
379379
// I'm sure that the string will be NUL terminated by unpack function
380380
size_t msg_len = strlen(will_message);
381381
size_t tpc_len = strlen(will_topic);
@@ -451,7 +451,7 @@ static inline void add_wildcard(const char *topic, struct subscriber *s,
451451
subscription->topic = try_strdup(topic);
452452
subscription->multilevel = wildcard;
453453
INCREF(s, struct subscriber);
454-
server.wildcards = list_push(server.wildcards, subscription);
454+
topic_store_add_wildcard(server.store, subscription);
455455
}
456456

457457
static void recursive_sub(struct trie_node *node, void *arg) {
@@ -509,7 +509,7 @@ static int subscribe_handler(struct io_event *e) {
509509
topic[s->tuples[i].topic_len + 1] = '\0';
510510
}
511511

512-
struct topic *t = topic_get_or_create(&server, topic);
512+
struct topic *t = topic_store_get_or_put(server.store, topic);
513513
if (!t)
514514
log_fatal("subscribe_handler failed: Out of memory");
515515
/*
@@ -535,7 +535,7 @@ static int subscribe_handler(struct io_event *e) {
535535
list_push(e->client->session->subscriptions, t);
536536
if (wildcard == true) {
537537
add_wildcard(topic, tmp, wildcard);
538-
trie_prefix_map(server.topics.root, topic, recursive_sub, tmp);
538+
topic_store_map(server.store, topic, recursive_sub, tmp);
539539
}
540540
}
541541
} else {
@@ -599,8 +599,8 @@ static int unsubscribe_handler(struct io_event *e) {
599599
#endif
600600
struct topic *t = NULL;
601601
for (int i = 0; i < e->data.unsubscribe.tuples_len; ++i) {
602-
t = topic_get(&server,
603-
(const char *) e->data.unsubscribe.tuples[i].topic);
602+
t = topic_store_get(server.store,
603+
(const char *) e->data.unsubscribe.tuples[i].topic);
604604
if (t)
605605
topic_del_subscriber(t, c);
606606
}
@@ -659,13 +659,13 @@ static int publish_handler(struct io_event *e) {
659659
* Retrieve the topic from the global map, if it wasn't created before,
660660
* create a new one with the name selected
661661
*/
662-
struct topic *t = topic_get_or_create(&server, topic);
662+
struct topic *t = topic_store_get_or_put(server.store, topic);
663663
if (!t)
664664
log_fatal("publish_handler failed: Out of memory");
665665

666666
/* Check for # wildcards subscriptions */
667-
if (list_size(server.wildcards) > 0) {
668-
list_foreach(item, server.wildcards) {
667+
if (topic_store_wildcards_empty(server.store)) {
668+
topic_store_wildcards_foreach(item, server.store) {
669669
struct subscription *s = item->data;
670670
int matched = match_subscription(topic, s->topic, s->multilevel);
671671
if (matched == SOL_OK && !is_subscribed(t, s->subscriber->session)) {

src/memory.c

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@
3333
static atomic_size_t memory = ATOMIC_VAR_INIT(0);
3434

3535
/*
36-
* Custom malloc function, allocate a defined size of bytes plus 8, the size
37-
* of an unsigned long long, and append the length choosen at the beginning of
38-
* the memory chunk as an unsigned long long, returning the memory chunk
39-
* allocated just 8 bytes after the start; this way it is possible to track
40-
* the memory usage at every allocation
36+
* Custom malloc function, allocate a defined size of bytes plus
37+
* sizeof(size_t), the size of an unsigned long long, and append the length
38+
* choosen at the beginning of the memory chunk as an unsigned long long,
39+
* returning the memory chunk allocated just sizeof(size_t) bytes after the
40+
* start; this way it is possible to track the memory usage at every
41+
* allocation.
42+
*
43+
* This function can fail if not memory is available, interrupting the
44+
* execution of the program and exiting, hence the prefix "try".
4145
*/
4246
void *try_alloc(size_t size) {
4347
void *ptr = malloc(size + sizeof(size_t));
@@ -54,6 +58,9 @@ void *try_alloc(size_t size) {
5458
/*
5559
* Same as xmalloc, but with calloc, creating chunk o zero'ed memory.
5660
* TODO: still a suboptimal solution
61+
*
62+
* This function can fail if not memory is available, interrupting the
63+
* execution of the program and exiting, hence the prefix "try".
5764
*/
5865
void *try_calloc(size_t len, size_t size) {
5966
void *ptr = try_alloc(len * size);
@@ -64,6 +71,9 @@ void *try_calloc(size_t len, size_t size) {
6471
/*
6572
* Same of xmalloc but with realloc, resize a chunk of memory pointed by a
6673
* given pointer, again appends the new size in front of the byte array
74+
*
75+
* This function can fail if not memory is available, interrupting the
76+
* execution of the program and exiting, hence the prefix "try".
6777
*/
6878
void *try_realloc(void *ptr, size_t size) {
6979
if (!ptr)
@@ -126,6 +136,9 @@ size_t alloc_size(void *ptr) {
126136
* As strdup but using xmalloc instead of malloc, to track the number of bytes
127137
* allocated and to enable use of xfree on duplicated strings without having
128138
* to care when to use a normal free or a xfree
139+
*
140+
* This function can fail if not memory is available, interrupting the
141+
* execution of the program and exiting, hence the prefix "try".
129142
*/
130143
char *try_strdup(const char *s) {
131144
size_t len = strlen(s);

0 commit comments

Comments
 (0)