Skip to content

Commit 3cbd012

Browse files
fabik111ilcato
authored andcommitted
add mutex and fix bug
1 parent 2448e42 commit 3cbd012

File tree

5 files changed

+25
-4
lines changed

5 files changed

+25
-4
lines changed

arduino-cloud.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ module.exports = function (RED) {
3030
else
3131
this.status({});
3232
});
33+
}else{
34+
this.status({ fill: "red", shape: "ring", text: "Connection Error" });
3335
}
3436
this.on('close', function (done) {
3537
connectionManager.deleteClientMqtt(connectionConfig.credentials.clientid, this.thing, this.propertyName).then(() => { done(); });

arduino-iot-client-mqtt/arduino-iot-client-mqtt.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class ArduinoClientMqtt {
116116
new ArduinoCloudError(errorCode, errorMessage),
117117
);
118118
});
119-
119+
120120
client.on("message", (topic, msg) => {
121121
if (topic.indexOf('/s/o') > -1) {
122122
client.topics[topic].forEach((cb) => {
@@ -562,7 +562,7 @@ class ArduinoClientMqtt {
562562
topic: propOutputTopic,
563563
cb,
564564
};
565-
565+
this.numSubscriptions++;
566566
if (!this.propertyCallback[propOutputTopic]) {
567567
this.propertyCallback[propOutputTopic] = {};
568568
this.propertyCallback[propOutputTopic][name] = cb;
@@ -571,7 +571,6 @@ class ArduinoClientMqtt {
571571

572572
if (this.propertyCallback[propOutputTopic] && !this.propertyCallback[propOutputTopic][name]) {
573573
this.propertyCallback[propOutputTopic][name] = cb;
574-
this.numSubscriptions++;
575574
}
576575
return Promise.resolve(propOutputTopic);
577576
};

package-lock.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"dependencies": {
2121
"@arduino/arduino-iot-client": "^1.0.1",
2222
"@arduino/cbor-js": "github:arduino/cbor-js",
23+
"async-mutex": "^0.1.4",
2324
"async-request": "^1.2.0",
2425
"jws": "^3.2.2",
2526
"lodash": "^4.17.15",

utils/arduino-connection-manager.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const ArduinoClientMqtt = require ('../arduino-iot-client-mqtt/arduino-iot-clien
44
const accessTokenUri = process.env.NODE_RED_ACCESS_TOKEN_URI || 'https://login.arduino.cc/oauth/token';
55
const accessTokenAudience = process.env.NODE_RED_ACCESS_TOKEN_AUDIENCE || 'https://api2.arduino.cc/iot';
66
const arduinoCloudHost = process.env.NODE_RED_MQTT_HOST || 'wss.iot.arduino.cc';
7-
7+
const Mutex = require('async-mutex').Mutex;
88
/**
99
* {
1010
* clientId: clientId,
@@ -17,6 +17,9 @@ const arduinoCloudHost = process.env.NODE_RED_MQTT_HOST || 'wss.iot.arduino.cc';
1717
* }
1818
*/
1919
var connections=[];
20+
const getClientMutex = new Mutex();
21+
22+
2023

2124
async function getToken(connectionConfig){
2225
var options = {
@@ -48,9 +51,11 @@ async function getToken(connectionConfig){
4851
}
4952

5053
async function getClientMqtt(connectionConfig){
54+
5155
if (!connectionConfig || !connectionConfig.credentials) {
5256
throw new Error("Cannot find cooonection config or credentials.");
5357
}
58+
const releaseMutex = await getClientMutex.acquire();
5459
try{
5560
let user = findUser(connectionConfig.credentials.clientid);
5661
let clientMqtt;
@@ -102,17 +107,22 @@ async function getClientMqtt(connectionConfig){
102107
connections[user].clientMqtt=clientMqtt;
103108
}
104109
}
110+
releaseMutex();
111+
105112
return clientMqtt;
106113
}catch(err){
107114
console.log(err);
115+
releaseMutex();
108116
}
109117

110118
}
111119

112120
async function getClientHttp(connectionConfig){
121+
113122
if (!connectionConfig || !connectionConfig.credentials) {
114123
throw new Error("Cannot find cooonection config or credentials.");
115124
}
125+
const releaseMutex = await getClientMutex.acquire();
116126
try{
117127
var user = findUser(connectionConfig.credentials.clientid);
118128
var clientHttp;
@@ -143,10 +153,14 @@ async function getClientHttp(connectionConfig){
143153
connections[user].clientHttp=clientHttp;
144154
}
145155
}
156+
releaseMutex();
146157
return clientHttp;
147158
}catch(err){
148159
console.log(err);
160+
releaseMutex();
161+
149162
}
163+
150164
}
151165

152166
function findUser(clientId) {

0 commit comments

Comments
 (0)