1- # Copyright (c) 2014 Adafruit Industries
2- # Author: Tony DiCola
1+ # Copyright (c) 2020 Adafruit Industries
2+ # Author: Tony DiCola, Brent Rubell
33
44# Permission is hereby granted, free of charge, to any person obtaining a copy
55# of this software and associated documentation files (the "Software"), to deal
@@ -43,9 +43,10 @@ class MQTTClient(object):
4343 def __init__ (self , username , key , service_host = 'io.adafruit.com' , secure = True ):
4444 """Create instance of MQTT client.
4545
46- :param username: Adafruit.IO Username for your account.
47- :param key: Adafruit IO access key (AIO Key) for your account.
48- :param secure: (optional, boolean) Switches secure/insecure connections
46+ :param username: Adafruit.IO Username for your account.
47+ :param key: Adafruit IO access key (AIO Key) for your account.
48+ :param secure: (optional, boolean) Switches secure/insecure connections
49+
4950 """
5051 self ._username = username
5152 self ._service_host = service_host
@@ -70,6 +71,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
7071 self ._client .on_connect = self ._mqtt_connect
7172 self ._client .on_disconnect = self ._mqtt_disconnect
7273 self ._client .on_message = self ._mqtt_message
74+ self ._client .on_subscribe = self ._mqtt_subscribe
7375 self ._connected = False
7476
7577
@@ -95,7 +97,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
9597 # log the RC as an error. Continue on to call any disconnect handler
9698 # so clients can potentially recover gracefully.
9799 if rc != 0 :
98- print (" Unexpected disconnection." )
100+ print (' Unexpected disconnection.' )
99101 raise MQTTError (rc )
100102 print ('Disconnected from Adafruit IO!' )
101103 # Call the on_disconnect callback if available.
@@ -105,6 +107,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
105107 def _mqtt_message (self , client , userdata , msg ):
106108 """Parse out the topic and call on_message callback
107109 assume topic looks like `username/topic/id`
110+
108111 """
109112 logger .debug ('Client on_message called.' )
110113 parsed_topic = msg .topic .split ('/' )
@@ -124,15 +127,19 @@ def _mqtt_message(self, client, userdata, msg):
124127 else :
125128 raise ValueError ('on_message not defined' )
126129 self .on_message (self , topic , payload )
127-
128- def _mqtt_subscribe (client , userdata , mid , granted_qos ):
130+
131+ def _mqtt_subscribe (self , client , userdata , mid , granted_qos ):
129132 """Called when broker responds to a subscribe request."""
133+ logger .debug ('Client called on_subscribe' )
134+ if self .on_subscribe is not None :
135+ self .on_subscribe (self , userdata , mid , granted_qos )
130136
131137 def connect (self , ** kwargs ):
132138 """Connect to the Adafruit.IO service. Must be called before any loop
133139 or publish operations are called. Will raise an exception if a
134140 connection cannot be made. Optional keyword arguments will be passed
135141 to paho-mqtt client connect function.
142+
136143 """
137144 # Skip calling connect if already connected.
138145 if self ._connected :
@@ -145,6 +152,7 @@ def connect(self, **kwargs):
145152
146153 def is_connected (self ):
147154 """Returns True if connected to Adafruit.IO and False if not connected.
155+
148156 """
149157 return self ._connected
150158
@@ -157,9 +165,9 @@ def loop_background(self, stop=None):
157165 """Starts a background thread to listen for messages from Adafruit.IO
158166 and call the appropriate callbacks when feed events occur. Will return
159167 immediately and will not block execution. Should only be called once.
160-
161- Params:
162- - stop: boolean, stops the execution of the background loop.
168+
169+ :param bool stop: Stops the execution of the background loop.
170+
163171 """
164172 if stop :
165173 self ._client .loop_stop ()
@@ -174,6 +182,7 @@ def loop_blocking(self):
174182 listen and respond to Adafruit.IO feed events. If you need to do other
175183 processing, consider using the loop_background function to run a loop
176184 in the background.
185+
177186 """
178187 self ._client .loop_forever ()
179188
@@ -185,28 +194,36 @@ def loop(self, timeout_sec=1.0):
185194 The optional timeout_sec parameter specifies at most how long to block
186195 execution waiting for messages when this function is called. The default
187196 is one second.
197+
188198 """
189199 self ._client .loop (timeout = timeout_sec )
190200
191- def subscribe (self , feed_id , feed_user = None ):
201+ def subscribe (self , feed_id , feed_user = None , qos = 0 ):
192202 """Subscribe to changes on the specified feed. When the feed is updated
193203 the on_message function will be called with the feed_id and new value.
194204
195- Params:
196- - feed_id: The id of the feed to subscribe to.
197- - feed_user (optional): The user id of the feed. Used for feed sharing functionality.
205+ :param str feed_id: The key of the feed to subscribe to.
206+ :param str feed_user: Optional, identifies feed owner. Used for feed sharing.
207+ :param int qos: The QoS to use when subscribing. Defaults to 0.
208+
198209 """
210+ if qos > 1 :
211+ raise MQTTError ("Adafruit IO only supports a QoS level of 0 or 1." )
199212 if feed_user is not None :
200- (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id ))
213+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id , qos = qos ))
201214 else :
202- (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
215+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ), qos = qos )
203216 return res , mid
204217
205- def subscribe_group (self , group_id ):
218+ def subscribe_group (self , group_id , qos = 0 ):
206219 """Subscribe to changes on the specified group. When the group is updated
207220 the on_message function will be called with the group_id and the new value.
221+
222+ :param str feed_id: The key of the feed to subscribe to.
223+ :param int qos: The QoS to use when subscribing. Defaults to 0.
224+
208225 """
209- self ._client .subscribe ('{0}/groups/{1}' .format (self ._username , group_id ))
226+ self ._client .subscribe ('{0}/groups/{1}' .format (self ._username , group_id ), qos = qos )
210227
211228 def subscribe_randomizer (self , randomizer_id ):
212229 """Subscribe to changes on a specified random data stream from
@@ -216,6 +233,7 @@ def subscribe_randomizer(self, randomizer_id):
216233 every client that is subscribed to the same topic.
217234
218235 :param int randomizer_id: ID of the random word record you want data for.
236+
219237 """
220238 self ._client .subscribe ('{0}/integration/words/{1}' .format (self ._username , randomizer_id ))
221239
0 commit comments