@@ -18,9 +18,10 @@ const xtend = require('xtend')
1818const debug = require ( 'debug' ) ( 'mqttjs:client' )
1919const nextTick = process ? process . nextTick : function ( callback ) { setTimeout ( callback , 0 ) }
2020const setImmediate = global . setImmediate || function ( callback ) {
21- // works in node v0.8
22- nextTick ( callback )
21+ const args = arguments . slice ( 1 )
22+ process . nextTick ( callback . bind ( null , ... args ) )
2323}
24+
2425const defaultConnectOptions = {
2526 keepalive : 60 ,
2627 reschedulePings : true ,
@@ -89,11 +90,11 @@ const errors = {
8990 162 : 'Wildcard Subscriptions not supported'
9091}
9192
92- function defaultId ( ) {
93+ function defaultId ( ) {
9394 return 'mqttjs_' + Math . random ( ) . toString ( 16 ) . substr ( 2 , 8 )
9495}
9596
96- function applyTopicAlias ( client , packet ) {
97+ function applyTopicAlias ( client , packet ) {
9798 if ( client . options . protocolVersion === 5 ) {
9899 if ( packet . cmd === 'publish' ) {
99100 let alias
@@ -143,7 +144,7 @@ function applyTopicAlias (client, packet) {
143144 }
144145}
145146
146- function removeTopicAliasAndRecoverTopicName ( client , packet ) {
147+ function removeTopicAliasAndRecoverTopicName ( client , packet ) {
147148 let alias
148149 if ( packet . properties ) {
149150 alias = packet . properties . topicAlias
@@ -168,7 +169,7 @@ function removeTopicAliasAndRecoverTopicName (client, packet) {
168169 }
169170}
170171
171- function sendPacket ( client , packet , cb ) {
172+ function sendPacket ( client , packet , cb ) {
172173 debug ( 'sendPacket :: packet: %O' , packet )
173174 debug ( 'sendPacket :: emitting `packetsend`' )
174175
@@ -186,7 +187,7 @@ function sendPacket (client, packet, cb) {
186187 }
187188}
188189
189- function flush ( queue ) {
190+ function flush ( queue ) {
190191 if ( queue ) {
191192 debug ( 'flush: queue exists? %b' , ! ! ( queue ) )
192193 Object . keys ( queue ) . forEach ( function ( messageId ) {
@@ -200,7 +201,7 @@ function flush (queue) {
200201 }
201202}
202203
203- function flushVolatile ( queue ) {
204+ function flushVolatile ( queue ) {
204205 if ( queue ) {
205206 debug ( 'flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function' )
206207 Object . keys ( queue ) . forEach ( function ( messageId ) {
@@ -212,7 +213,7 @@ function flushVolatile (queue) {
212213 }
213214}
214215
215- function storeAndSend ( client , packet , cb , cbStorePut ) {
216+ function storeAndSend ( client , packet , cb , cbStorePut ) {
216217 debug ( 'storeAndSend :: store packet with cmd %s to outgoingStore' , packet . cmd )
217218 let storePacket = packet
218219 let err
@@ -226,7 +227,7 @@ function storeAndSend (client, packet, cb, cbStorePut) {
226227 return cb && cb ( err )
227228 }
228229 }
229- client . outgoingStore . put ( storePacket , function storedPacket ( err ) {
230+ client . outgoingStore . put ( storePacket , function storedPacket ( err ) {
230231 if ( err ) {
231232 return cb && cb ( err )
232233 }
@@ -235,7 +236,7 @@ function storeAndSend (client, packet, cb, cbStorePut) {
235236 } )
236237}
237238
238- function nop ( error ) {
239+ function nop ( error ) {
239240 debug ( 'nop ::' , error )
240241}
241242
@@ -246,7 +247,7 @@ function nop (error) {
246247 * @param {Object } [options] - connection options
247248 * (see Connection#connect)
248249 */
249- function MqttClient ( streamBuilder , options ) {
250+ function MqttClient ( streamBuilder , options ) {
250251 let k
251252 const that = this
252253
@@ -338,7 +339,7 @@ function MqttClient (streamBuilder, options) {
338339 this . on ( 'connect' , function ( ) {
339340 const queue = that . queue
340341
341- function deliver ( ) {
342+ function deliver ( ) {
342343 const entry = queue . shift ( )
343344 debug ( 'deliver :: entry %o' , entry )
344345 let packet = null
@@ -426,7 +427,7 @@ MqttClient.prototype._setupStream = function () {
426427 packets . push ( packet )
427428 } )
428429
429- function nextTickWork ( ) {
430+ function nextTickWork ( ) {
430431 if ( packets . length ) {
431432 nextTick ( work )
432433 } else {
@@ -436,7 +437,7 @@ MqttClient.prototype._setupStream = function () {
436437 }
437438 }
438439
439- function work ( ) {
440+ function work ( ) {
440441 debug ( 'work :: getting next packet in queue' )
441442 const packet = packets . shift ( )
442443
@@ -459,7 +460,7 @@ MqttClient.prototype._setupStream = function () {
459460 work ( )
460461 }
461462
462- function streamErrorHandler ( error ) {
463+ function streamErrorHandler ( error ) {
463464 debug ( 'streamErrorHandler :: error' , error . message )
464465 if ( socketErrors . includes ( error . code ) ) {
465466 // handle error
@@ -747,7 +748,7 @@ MqttClient.prototype.subscribe = function () {
747748 debug ( 'subscribe: array topic %s' , topic )
748749 if ( ! Object . prototype . hasOwnProperty . call ( that . _resubscribeTopics , topic ) ||
749750 that . _resubscribeTopics [ topic ] . qos < opts . qos ||
750- resubscribe ) {
751+ resubscribe ) {
751752 const currentOpts = {
752753 topic : topic ,
753754 qos : opts . qos
@@ -769,7 +770,7 @@ MqttClient.prototype.subscribe = function () {
769770 debug ( 'subscribe: object topic %s' , k )
770771 if ( ! Object . prototype . hasOwnProperty . call ( that . _resubscribeTopics , k ) ||
771772 that . _resubscribeTopics [ k ] . qos < obj [ k ] . qos ||
772- resubscribe ) {
773+ resubscribe ) {
773774 const currentOpts = {
774775 topic : k ,
775776 qos : obj [ k ] . qos
@@ -988,7 +989,7 @@ MqttClient.prototype.end = function (force, opts, cb) {
988989 debug ( 'end :: cb? %s' , ! ! cb )
989990 cb = cb || nop
990991
991- function closeStores ( ) {
992+ function closeStores ( ) {
992993 debug ( 'end :: closeStores: closing incoming and outgoing stores' )
993994 that . disconnected = true
994995 that . incomingStore . close ( function ( e1 ) {
@@ -1007,7 +1008,7 @@ MqttClient.prototype.end = function (force, opts, cb) {
10071008 }
10081009 }
10091010
1010- function finish ( ) {
1011+ function finish ( ) {
10111012 // defer closesStores of an I/O cycle,
10121013 // just to make sure things are
10131014 // ok for websockets
@@ -1263,7 +1264,7 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) {
12631264 * anyway it will result in -1 evaluation
12641265 */
12651266 case 0 :
1266- /* falls through */
1267+ /* falls through */
12671268 default :
12681269 sendPacket ( this , packet , cb )
12691270 break
@@ -1622,7 +1623,7 @@ MqttClient.prototype._handleAck = function (packet) {
16221623 debug ( '_handleAck :: packet type' , type )
16231624 switch ( type ) {
16241625 case 'pubcomp' :
1625- // same thing as puback for QoS 2
1626+ // same thing as puback for QoS 2
16261627 case 'puback' : {
16271628 const pubackRC = packet . reasonCode
16281629 // Callback - we're done
@@ -1685,7 +1686,7 @@ MqttClient.prototype._handleAck = function (packet) {
16851686 }
16861687
16871688 if ( this . disconnecting &&
1688- Object . keys ( this . outgoing ) . length === 0 ) {
1689+ Object . keys ( this . outgoing ) . length === 0 ) {
16891690 this . emit ( 'outgoingEmpty' )
16901691 }
16911692}
@@ -1754,8 +1755,8 @@ MqttClient.prototype._resubscribe = function () {
17541755 debug ( '_resubscribe' )
17551756 const _resubscribeTopicsKeys = Object . keys ( this . _resubscribeTopics )
17561757 if ( ! this . _firstConnection &&
1757- ( this . options . clean || ( this . options . protocolVersion === 5 && ! this . connackPacket . sessionPresent ) ) &&
1758- _resubscribeTopicsKeys . length > 0 ) {
1758+ ( this . options . clean || ( this . options . protocolVersion === 5 && ! this . connackPacket . sessionPresent ) ) &&
1759+ _resubscribeTopicsKeys . length > 0 ) {
17591760 if ( this . options . resubscribe ) {
17601761 if ( this . options . protocolVersion === 5 ) {
17611762 debug ( '_resubscribe: protocolVersion 5' )
@@ -1796,10 +1797,10 @@ MqttClient.prototype._onConnect = function (packet) {
17961797
17971798 this . connected = true
17981799
1799- function startStreamProcess ( ) {
1800+ function startStreamProcess ( ) {
18001801 let outStore = that . outgoingStore . createStream ( )
18011802
1802- function clearStoreProcessing ( ) {
1803+ function clearStoreProcessing ( ) {
18031804 that . _storeProcessing = false
18041805 that . _packetIdsDuringStoreProcessing = { }
18051806 }
@@ -1812,14 +1813,14 @@ MqttClient.prototype._onConnect = function (packet) {
18121813 that . emit ( 'error' , err )
18131814 } )
18141815
1815- function remove ( ) {
1816+ function remove ( ) {
18161817 outStore . destroy ( )
18171818 outStore = null
18181819 that . _flushStoreProcessingQueue ( )
18191820 clearStoreProcessing ( )
18201821 }
18211822
1822- function storeDeliver ( ) {
1823+ function storeDeliver ( ) {
18231824 // edge case, we wrapped this twice
18241825 if ( ! outStore ) {
18251826 return
0 commit comments