3232#include "mqtt.h"
3333#include "server.h"
3434#include "sol_internal.h"
35+ #include "util.h"
3536#include <stdio.h>
3637
3738/* Prototype for a command handler */
@@ -148,10 +149,7 @@ int publish_message(struct mqtt_packet *pkt, const struct topic *t)
148149 size_t len = 0 ;
149150 unsigned short mid = 0 ;
150151 unsigned char qos = pkt -> header .bits .qos ;
151- #if THREADSNR > 0
152- pthread_mutex_lock (& mutex );
153- #endif
154- int count = HASH_COUNT (t -> subscribers );
152+ int count = HASH_COUNT (t -> subscribers );
155153
156154 if (count == 0 ) {
157155 INCREF (pkt , struct mqtt_packet );
@@ -205,9 +203,6 @@ int publish_message(struct mqtt_packet *pkt, const struct topic *t)
205203 }
206204 continue ;
207205 }
208- #if THREADSNR > 0
209- pthread_mutex_lock (& sc -> mutex );
210- #endif
211206 /*
212207 * The subscriber client is marked as online, so we proceed to
213208 * set the inflight messages according to the QoS level required
@@ -216,19 +211,10 @@ int publish_message(struct mqtt_packet *pkt, const struct topic *t)
216211 inflight_msg_init (& sc -> session -> i_msgs [mid ], pkt );
217212 sc -> session -> i_acks [mid ] = time (NULL );
218213 ++ sc -> session -> inflights ;
219- #if THREADSNR > 0
220- pthread_mutex_unlock (& sc -> mutex );
221- #endif
222214 all_at_most_once = false;
223215 }
224- #if THREADSNR > 0
225- pthread_mutex_lock (& sc -> mutex );
226- #endif
227216 mqtt_pack (pkt , sc -> wbuf + sc -> towrite );
228217 sc -> towrite += len ;
229- #if THREADSNR > 0
230- pthread_mutex_unlock (& sc -> mutex );
231- #endif
232218
233219 // Schedule a write for the current subscriber on the next event cycle
234220 enqueue_event_write (sc );
@@ -248,9 +234,6 @@ int publish_message(struct mqtt_packet *pkt, const struct topic *t)
248234
249235exit :
250236
251- #if THREADSNR > 0
252- pthread_mutex_unlock (& mutex );
253- #endif
254237 return count ;
255238}
256239
@@ -356,7 +339,7 @@ static int connect_handler(struct io_event *e)
356339 */
357340 log_info ("Received double CONNECT from %s, disconnecting client" ,
358341 c -> payload .client_id );
359- goto clientdc ;
342+ goto e_client_dc ;
360343 }
361344
362345 /*
@@ -365,12 +348,12 @@ static int connect_handler(struct io_event *e)
365348 */
366349 if (conf -> allow_anonymous == false) {
367350 if (c -> bits .username == 0 || c -> bits .password == 0 )
368- goto bad_auth ;
351+ goto e_bad_auth ;
369352 else {
370353 struct authentication * auth = NULL ;
371354 HASH_FIND_STR (server .auths , (char * )c -> payload .username , auth );
372355 if (!auth || !check_passwd ((char * )c -> payload .password , auth -> salt ))
373- goto bad_auth ;
356+ goto e_bad_auth ;
374357 }
375358 }
376359
@@ -379,7 +362,7 @@ static int connect_handler(struct io_event *e)
379362 * know who you are
380363 */
381364 if (!c -> payload .client_id [0 ] && c -> bits .clean_session == false)
382- goto not_authorized ;
365+ goto e_not_authorized ;
383366
384367 /*
385368 * Check for client ID, if not present generate a random ID, otherwise add
@@ -393,9 +376,6 @@ static int connect_handler(struct io_event *e)
393376 */
394377 snprintf (cc -> client_id , MQTT_CLIENT_ID_LEN , "%s" , c -> payload .client_id );
395378
396- #if THREADSNR > 0
397- pthread_mutex_lock (& mutex );
398- #endif
399379 // First we check if a session is present
400380 HASH_FIND_STR (server .sessions , cc -> client_id , cc -> session );
401381 if (cc -> session && c -> bits .clean_session == true)
@@ -423,9 +403,6 @@ static int connect_handler(struct io_event *e)
423403
424404 // Let's track client on the global map to be used on publish
425405 HASH_ADD_STR (server .clients_map , client_id , cc );
426- #if THREADSNR > 0
427- pthread_mutex_unlock (& mutex );
428- #endif
429406
430407 // Add LWT topic and message if present
431408 if (c -> bits .will ) {
@@ -475,18 +452,18 @@ static int connect_handler(struct io_event *e)
475452
476453 return REPLY ;
477454
478- clientdc :
455+ e_client_dc :
479456
480457 return - ERRCLIENTDC ;
481458
482- bad_auth :
459+ e_bad_auth :
483460 log_debug ("Sending CONNACK to %s (%u, %u)" , cc -> client_id , session_present ,
484461 MQTT_BAD_USERNAME_OR_PASSWORD );
485462 set_connack (cc , MQTT_BAD_USERNAME_OR_PASSWORD , session_present );
486463
487464 return MQTT_BAD_USERNAME_OR_PASSWORD ;
488465
489- not_authorized :
466+ e_not_authorized :
490467 log_debug ("Sending CONNACK to %s (%u, %u)" , cc -> client_id , session_present ,
491468 MQTT_NOT_AUTHORIZED );
492469 set_connack (cc , MQTT_NOT_AUTHORIZED , session_present );
@@ -576,10 +553,6 @@ static int subscribe_handler(struct io_event *e)
576553 * multilevel wildcard '#'
577554 * 2. A topic contaning one or more single level wildcard '+'
578555 */
579- #if THREADSNR > 0
580- pthread_mutex_lock (& c -> mutex );
581- pthread_mutex_lock (& mutex );
582- #endif
583556 if (!index (topic , '+' )) {
584557 struct subscriber * tmp ;
585558 HASH_FIND_STR (t -> subscribers , c -> client_id , tmp );
@@ -606,9 +579,6 @@ static int subscribe_handler(struct io_event *e)
606579 subscriber_new (e -> client -> session , s -> tuples [i ].qos );
607580 add_wildcard (topic , sub , wildcard );
608581 }
609- #if THREADSNR > 0
610- pthread_mutex_unlock (& mutex );
611- #endif
612582
613583 // Retained message? Publish it
614584 // TODO move after SUBACK response
@@ -617,24 +587,15 @@ static int subscribe_handler(struct io_event *e)
617587 memcpy (c -> wbuf + c -> towrite , t -> retained_msg , len );
618588 c -> towrite += len ;
619589 }
620- #if THREADSNR > 0
621- pthread_mutex_unlock (& c -> mutex );
622- #endif
623590 rcs [i ] = s -> tuples [i ].qos ;
624591 }
625592
626593 struct mqtt_packet pkt = {.header = (union mqtt_header ){.byte = SUBACK_B }};
627594 mqtt_suback (& pkt , s -> pkt_id , rcs , s -> tuples_len );
628595
629- #if THREADSNR > 0
630- pthread_mutex_lock (& c -> mutex );
631- #endif
632596 size_t len = mqtt_size (& pkt , NULL );
633597 mqtt_pack (& pkt , c -> wbuf + c -> towrite );
634598 c -> towrite += len ;
635- #if THREADSNR > 0
636- pthread_mutex_unlock (& c -> mutex );
637- #endif
638599
639600 log_debug ("Sending SUBACK to %s" , c -> client_id );
640601
@@ -650,26 +611,15 @@ static int unsubscribe_handler(struct io_event *e)
650611
651612 log_debug ("Received UNSUBSCRIBE from %s" , c -> client_id );
652613
653- #if THREADSNR > 0
654- pthread_mutex_lock (& c -> mutex );
655- pthread_mutex_lock (& mutex );
656- #endif
657614 struct topic * t = NULL ;
658615 for (int i = 0 ; i < e -> data .unsubscribe .tuples_len ; ++ i ) {
659616 t = topic_store_get (server .store ,
660617 (const char * )e -> data .unsubscribe .tuples [i ].topic );
661618 if (t )
662619 topic_del_subscriber (t , c );
663620 }
664- #if THREADSNR > 0
665- pthread_mutex_unlock (& mutex );
666- #endif
667-
668621 mqtt_pack_mono (c -> wbuf + c -> towrite , UNSUBACK , e -> data .unsubscribe .pkt_id );
669622 c -> towrite += MQTT_ACK_LEN ;
670- #if THREADSNR > 0
671- pthread_mutex_unlock (& c -> mutex );
672- #endif
673623
674624 log_debug ("Sending UNSUBACK to %s" , c -> client_id );
675625
@@ -705,10 +655,6 @@ static int publish_handler(struct io_event *e)
705655 else
706656 snprintf (topic , p -> topiclen + 1 , "%s" , (const char * )p -> topic );
707657
708- #if THREADSNR > 0
709- pthread_mutex_lock (& c -> mutex );
710- pthread_mutex_lock (& mutex );
711- #endif
712658 /*
713659 * Retrieve the topic from the global map, if it wasn't created before,
714660 * create a new one with the name selected
@@ -735,10 +681,6 @@ static int publish_handler(struct io_event *e)
735681 }
736682 }
737683 }
738- #if THREADSNR > 0
739- pthread_mutex_unlock (& mutex );
740- #endif
741-
742684 struct mqtt_packet * pkt = mqtt_packet_alloc (e -> data .header .byte );
743685 // TODO must perform a deep copy here
744686 pkt -> publish = e -> data .publish ;
@@ -747,9 +689,6 @@ static int publish_handler(struct io_event *e)
747689 t -> retained_msg = try_alloc (mqtt_size (& e -> data , NULL ));
748690 mqtt_pack (& e -> data , t -> retained_msg );
749691 }
750- #if THREADSNR > 0
751- pthread_mutex_unlock (& c -> mutex );
752- #endif
753692
754693 if (publish_message (pkt , t ) == 0 )
755694 DECREF (pkt , struct mqtt_packet );
@@ -760,15 +699,9 @@ static int publish_handler(struct io_event *e)
760699
761700 int ptype = qos == EXACTLY_ONCE ? PUBREC : PUBACK ;
762701
763- #if THREADSNR > 0
764- pthread_mutex_lock (& c -> mutex );
765- #endif
766702 mqtt_ack (& e -> data , ptype == PUBACK ? PUBACK_B : PUBREC_B );
767703 mqtt_pack_mono (c -> wbuf + c -> towrite , ptype , orig_mid );
768704 c -> towrite += MQTT_ACK_LEN ;
769- #if THREADSNR > 0
770- pthread_mutex_unlock (& c -> mutex );
771- #endif
772705 log_debug ("Sending %s to %s (m%u)" , ptype == PUBACK ? "PUBACK" : "PUBREC" ,
773706 c -> client_id , orig_mid );
774707 return REPLY ;
@@ -787,16 +720,10 @@ static int puback_handler(struct io_event *e)
787720 struct client * c = e -> client ;
788721 unsigned pkt_id = e -> data .ack .pkt_id ;
789722 log_debug ("Received PUBACK from %s (m%u)" , c -> client_id , pkt_id );
790- #if THREADSNR > 0
791- pthread_mutex_lock (& c -> mutex );
792- #endif
793723 inflight_msg_clear (& c -> session -> i_msgs [pkt_id ]);
794724 c -> session -> i_msgs [pkt_id ].packet = NULL ;
795725 c -> session -> i_acks [pkt_id ] = -1 ;
796726 -- c -> session -> inflights ;
797- #if THREADSNR > 0
798- pthread_mutex_unlock (& c -> mutex );
799- #endif
800727 return NOREPLY ;
801728}
802729
@@ -805,14 +732,8 @@ static int pubrec_handler(struct io_event *e)
805732 struct client * c = e -> client ;
806733 unsigned pkt_id = e -> data .ack .pkt_id ;
807734 log_debug ("Received PUBREC from %s (m%u)" , c -> client_id , pkt_id );
808- #if THREADSNR > 0
809- pthread_mutex_lock (& c -> mutex );
810- #endif
811735 mqtt_pack_mono (c -> wbuf + c -> towrite , PUBREL , pkt_id );
812736 c -> towrite += MQTT_ACK_LEN ;
813- #if THREADSNR > 0
814- pthread_mutex_unlock (& c -> mutex );
815- #endif
816737 // Update inflight acks table
817738 c -> session -> i_acks [pkt_id ] = time (NULL );
818739 log_debug ("Sending PUBREL to %s (m%u)" , c -> client_id , pkt_id );
@@ -824,14 +745,8 @@ static int pubrel_handler(struct io_event *e)
824745 struct client * c = e -> client ;
825746 unsigned pkt_id = e -> data .ack .pkt_id ;
826747 log_debug ("Received PUBREL from %s (m%u)" , c -> client_id , pkt_id );
827- #if THREADSNR > 0
828- pthread_mutex_lock (& c -> mutex );
829- #endif
830748 mqtt_pack_mono (c -> wbuf + c -> towrite , PUBCOMP , pkt_id );
831749 c -> towrite += MQTT_ACK_LEN ;
832- #if THREADSNR > 0
833- pthread_mutex_unlock (& c -> mutex );
834- #endif
835750 log_debug ("Sending PUBCOMP to %s (m%u)" , c -> client_id , pkt_id );
836751 return REPLY ;
837752}
@@ -841,31 +756,19 @@ static int pubcomp_handler(struct io_event *e)
841756 struct client * c = e -> client ;
842757 unsigned pkt_id = e -> data .ack .pkt_id ;
843758 log_debug ("Received PUBCOMP from %s (m%u)" , c -> client_id , pkt_id );
844- #if THREADSNR > 0
845- pthread_mutex_lock (& c -> mutex );
846- #endif
847759 c -> session -> i_acks [pkt_id ] = -1 ;
848760 inflight_msg_clear (& c -> session -> i_msgs [pkt_id ]);
849761 c -> session -> i_msgs [pkt_id ].packet = NULL ;
850762 -- c -> session -> inflights ;
851- #if THREADSNR > 0
852- pthread_mutex_unlock (& c -> mutex );
853- #endif
854763 return NOREPLY ;
855764}
856765
857766static int pingreq_handler (struct io_event * e )
858767{
859768 log_debug ("Received PINGREQ from %s" , e -> client -> client_id );
860769 e -> data .header .byte = PINGRESP_B ;
861- #if THREADSNR > 0
862- pthread_mutex_lock (& e -> client -> mutex );
863- #endif
864770 mqtt_pack (& e -> data , e -> client -> wbuf + e -> client -> towrite );
865771 e -> client -> towrite += MQTT_HEADER_LEN ;
866- #if THREADSNR > 0
867- pthread_mutex_unlock (& e -> client -> mutex );
868- #endif
869772 log_debug ("Sending PINGRESP to %s" , e -> client -> client_id );
870773 return REPLY ;
871774}
0 commit comments