Skip to content

Commit f097f21

Browse files
committed
Fix reconnection logic
1 parent 2bc5520 commit f097f21

File tree

1 file changed

+114
-113
lines changed

1 file changed

+114
-113
lines changed

utils/arduino-connection-manager.js

Lines changed: 114 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const request = require("async-request");
22
const ArduinoClientHttp = require('./arduino-cloud-api-wrapper');
3-
const ArduinoClientMqtt = require ('../arduino-iot-client-mqtt/arduino-iot-client-mqtt');
3+
const ArduinoClientMqtt = require('../arduino-iot-client-mqtt/arduino-iot-client-mqtt');
44
const accessTokenUri = process.env.NODE_RED_ACCESS_TOKEN_URI || 'https://login.arduino.cc/oauth/token';
55
const accessTokenAudience = process.env.NODE_RED_ACCESS_TOKEN_AUDIENCE || 'https://api2.arduino.cc/iot';
66
const arduinoCloudHost = process.env.NODE_RED_MQTT_HOST || 'wss.iot.arduino.cc';
@@ -16,12 +16,12 @@ const Mutex = require('async-mutex').Mutex;
1616
* timeoutUpdateToken: timeout
1717
* }
1818
*/
19-
var connections=[];
19+
var connections = [];
2020
const getClientMutex = new Mutex();
2121

2222

2323

24-
async function getToken(connectionConfig){
24+
async function getToken(connectionConfig) {
2525
var options = {
2626
method: 'POST',
2727
headers: { 'content-type': 'application/x-www-form-urlencoded' },
@@ -43,120 +43,120 @@ async function getToken(connectionConfig){
4343
var token = data.access_token;
4444
expires_in = data.expires_in;
4545
if (token !== undefined) {
46-
return {token: token, expires_in: expires_in };
46+
return { token: token, expires_in: expires_in };
4747
}
4848
} catch (err) {
4949
console.log(err);
5050
}
5151
}
5252

53-
async function getClientMqtt(connectionConfig){
53+
async function getClientMqtt(connectionConfig) {
5454

5555
if (!connectionConfig || !connectionConfig.credentials) {
5656
throw new Error("Cannot find cooonection config or credentials.");
5757
}
5858
const releaseMutex = await getClientMutex.acquire();
59-
try{
60-
let user = findUser(connectionConfig.credentials.clientid);
61-
let clientMqtt;
62-
if(user === -1){
59+
try {
60+
let user = findUser(connectionConfig.credentials.clientid);
61+
let clientMqtt;
62+
if (user === -1) {
6363

64-
clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt();
65-
const tokenInfo = await getToken(connectionConfig);
66-
if(tokenInfo !==undefined){
67-
const ArduinoCloudOptions = {
68-
host: arduinoCloudHost,
69-
token: tokenInfo.token,
70-
onDisconnect: () => {
71-
reconnectMqtt(connectionConfig.credentials.clientid);
72-
console.log(`connection lost for ${connectionConfig.credentials.clientid}`);
73-
},
74-
useCloudProtocolV2: true
75-
};
76-
await clientMqtt.connect(ArduinoCloudOptions);
77-
const timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000);
78-
connections.push({
79-
clientId: connectionConfig.credentials.clientid,
80-
connectionConfig: connectionConfig,
81-
token: tokenInfo.token,
82-
expires_token_ts: tokenInfo.expires_in,
83-
clientMqtt: clientMqtt,
84-
clientHttp: null,
85-
timeoutUpdateToken: timeout
86-
});
87-
} else {
88-
// TODO: what happens when token is undefined?
89-
clientMqtt = undefined;
90-
}
91-
} else{
92-
if(connections[user].clientMqtt !== null){
93-
clientMqtt = connections[user].clientMqtt;
94-
}else{
9564
clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt();
96-
const ArduinoCloudOptions = {
97-
host: "wss.iot.oniudra.cc",
98-
token: connections[user].token,
99-
onDisconnect: () => {
100-
reconnectMqtt(connectionConfig.credentials.clientid);
101-
console.log(`connection lost for ${connectionConfig.credentials.clientid}`);
102-
},
103-
useCloudProtocolV2: true
104-
};
105-
await clientMqtt.connect(ArduinoCloudOptions);
106-
connections[user].clientMqtt=clientMqtt;
65+
const tokenInfo = await getToken(connectionConfig);
66+
if (tokenInfo !== undefined) {
67+
const ArduinoCloudOptions = {
68+
host: arduinoCloudHost,
69+
token: tokenInfo.token,
70+
onDisconnect: () => {
71+
reconnectMqtt(connectionConfig.credentials.clientid);
72+
console.log(`connection lost for ${connectionConfig.credentials.clientid}`);
73+
},
74+
useCloudProtocolV2: true
75+
};
76+
await clientMqtt.connect(ArduinoCloudOptions);
77+
const timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000);
78+
connections.push({
79+
clientId: connectionConfig.credentials.clientid,
80+
connectionConfig: connectionConfig,
81+
token: tokenInfo.token,
82+
expires_token_ts: tokenInfo.expires_in,
83+
clientMqtt: clientMqtt,
84+
clientHttp: null,
85+
timeoutUpdateToken: timeout
86+
});
87+
} else {
88+
// TODO: what happens when token is undefined?
89+
clientMqtt = undefined;
90+
}
91+
} else {
92+
if (connections[user].clientMqtt !== null) {
93+
clientMqtt = connections[user].clientMqtt;
94+
} else {
95+
clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt();
96+
const ArduinoCloudOptions = {
97+
host: "wss.iot.oniudra.cc",
98+
token: connections[user].token,
99+
onDisconnect: () => {
100+
reconnectMqtt(connectionConfig.credentials.clientid);
101+
console.log(`connection lost for ${connectionConfig.credentials.clientid}`);
102+
},
103+
useCloudProtocolV2: true
104+
};
105+
await clientMqtt.connect(ArduinoCloudOptions);
106+
connections[user].clientMqtt = clientMqtt;
107+
}
107108
}
108-
}
109-
releaseMutex();
109+
releaseMutex();
110110

111-
return clientMqtt;
112-
}catch(err){
111+
return clientMqtt;
112+
} catch (err) {
113113
console.log(err);
114114
releaseMutex();
115115
}
116116

117117
}
118118

119-
async function getClientHttp(connectionConfig){
119+
async function getClientHttp(connectionConfig) {
120120

121121
if (!connectionConfig || !connectionConfig.credentials) {
122122
throw new Error("Cannot find cooonection config or credentials.");
123123
}
124124
const releaseMutex = await getClientMutex.acquire();
125-
try{
126-
var user = findUser(connectionConfig.credentials.clientid);
127-
var clientHttp;
128-
if(user === -1){
125+
try {
126+
var user = findUser(connectionConfig.credentials.clientid);
127+
var clientHttp;
128+
if (user === -1) {
129129

130-
var tokenInfo = await getToken(connectionConfig);
131-
if(tokenInfo !==undefined){
132-
clientHttp= new ArduinoClientHttp.ArduinoClientHttp(tokenInfo.token);
130+
var tokenInfo = await getToken(connectionConfig);
131+
if (tokenInfo !== undefined) {
132+
clientHttp = new ArduinoClientHttp.ArduinoClientHttp(tokenInfo.token);
133133

134-
var timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000);
135-
connections.push({
136-
clientId: connectionConfig.credentials.clientid,
137-
connectionConfig: connectionConfig,
138-
token: tokenInfo.token,
139-
expires_token_ts: tokenInfo.expires_in,
140-
clientMqtt: null,
141-
clientHttp: clientHttp,
142-
timeoutUpdateToken: timeout
143-
});
134+
var timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000);
135+
connections.push({
136+
clientId: connectionConfig.credentials.clientid,
137+
connectionConfig: connectionConfig,
138+
token: tokenInfo.token,
139+
expires_token_ts: tokenInfo.expires_in,
140+
clientMqtt: null,
141+
clientHttp: clientHttp,
142+
timeoutUpdateToken: timeout
143+
});
144144

145-
}
145+
}
146146

147-
} else{
148-
if(connections[user].clientHttp !== null){
149-
clientHttp = connections[user].clientHttp;
150-
}else{
151-
clientHttp = new ArduinoClientHttp.ArduinoClientHttp(connections[user].token);
147+
} else {
148+
if (connections[user].clientHttp !== null) {
149+
clientHttp = connections[user].clientHttp;
150+
} else {
151+
clientHttp = new ArduinoClientHttp.ArduinoClientHttp(connections[user].token);
152152

153-
connections[user].clientHttp=clientHttp;
153+
connections[user].clientHttp = clientHttp;
154+
}
154155
}
155-
}
156156

157-
releaseMutex();
158-
return clientHttp;
159-
}catch(err){
157+
releaseMutex();
158+
return clientHttp;
159+
} catch (err) {
160160
console.log(err);
161161
releaseMutex();
162162

@@ -174,76 +174,77 @@ function findUser(clientId) {
174174

175175
}
176176

177-
async function updateToken(connectionConfig){
178-
try{
177+
async function updateToken(connectionConfig) {
178+
try {
179179
var user = findUser(connectionConfig.credentials.clientid);
180-
if(user !== -1){
181-
var tokenInfo= await getToken(connectionConfig);
182-
if(tokenInfo !==undefined){
183-
connections[user].token= tokenInfo.token;
184-
connections[user].expires_token_ts=tokenInfo.expires_in;
180+
if (user !== -1) {
181+
var tokenInfo = await getToken(connectionConfig);
182+
if (tokenInfo !== undefined) {
183+
connections[user].token = tokenInfo.token;
184+
connections[user].expires_token_ts = tokenInfo.expires_in;
185185
connections[user].clientMqtt.updateToken(tokenInfo.token);
186186
connections[user].clientHttp.updateToken(tokenInfo.token);
187187
connections[user].timeoutUpdateToken = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000);
188-
}else{
188+
} else {
189189
connections[user].timeoutUpdateToken = setTimeout(() => { updateToken(connectionConfig) }, 1000);
190190
}
191191
}
192-
}catch(err){
192+
} catch (err) {
193193
console.log(err);
194194
}
195195
}
196196

197-
async function deleteClientMqtt(clientId, thing, propertyName ){
197+
async function deleteClientMqtt(clientId, thing, propertyName) {
198198
const releaseMutex = await getClientMutex.acquire();
199199
var user = findUser(clientId);
200-
if(user !== -1){
201-
if(connections[user].clientMqtt !== null){
200+
if (user !== -1) {
201+
if (connections[user].clientMqtt !== null) {
202202
var ret = await connections[user].clientMqtt.removePropertyValueCallback(thing, propertyName);
203203

204-
if(ret === 0){
204+
if (ret === 0) {
205205
await connections[user].clientMqtt.disconnect();
206206
delete connections[user].clientMqtt;
207207
connections[user].clientMqtt = null;
208-
if(connections[user].clientHttp === null){
209-
if(connections[user].timeoutUpdateToken)
208+
if (connections[user].clientHttp === null) {
209+
if (connections[user].timeoutUpdateToken)
210210
clearTimeout(connections[user].timeoutUpdateToken);
211-
connections.splice(user,1);
211+
connections.splice(user, 1);
212212
}
213213
}
214214
}
215215
}
216216
releaseMutex();
217217
}
218218

219-
async function deleteClientHttp(clientId){
219+
async function deleteClientHttp(clientId) {
220220
const releaseMutex = await getClientMutex.acquire();
221221
var user = findUser(clientId);
222-
if(user !== -1){
223-
if(connections[user].clientHttp !== null){
222+
if (user !== -1) {
223+
if (connections[user].clientHttp !== null) {
224224
connections[user].clientHttp.openConnections--;
225-
if(connections[user].clientHttp.openConnections === 0){
226-
connections[user].clientHttp= null;
225+
if (connections[user].clientHttp.openConnections === 0) {
226+
connections[user].clientHttp = null;
227227
}
228228
}
229-
if(connections[user].clientMqtt === null){
230-
if(connections[user].timeoutUpdateToken)
229+
if (connections[user].clientMqtt === null) {
230+
if (connections[user].timeoutUpdateToken)
231231
clearTimeout(connections[user].timeoutUpdateToken);
232-
connections.splice(user,1);
232+
connections.splice(user, 1);
233233
}
234234
}
235235
releaseMutex();
236236
}
237237

238-
async function reconnectMqtt(clientId){
238+
async function reconnectMqtt(clientId) {
239239
var user = findUser(clientId);
240-
if(user !== -1){
240+
if (user !== -1) {
241+
await connections[user].clientMqtt.disconnect();
241242
const ArduinoCloudOptions = {
242243
host: "wss.iot.oniudra.cc",
243244
token: connections[user].token,
244-
reconnectMqtt: () => {
245-
disconnected(clientId);
246-
console.log(`connection lost for ${clientId}`);
245+
onDisconnect: () => {
246+
reconnectMqtt(connectionConfig.credentials.clientid);
247+
console.log(`connection lost for ${connectionConfig.credentials.clientid}`);
247248
},
248249
useCloudProtocolV2: true
249250
};

0 commit comments

Comments
 (0)