Skip to content

Commit db76db4

Browse files
author
Aaron
committed
Fix reconnection issues
1 parent 62c0a5d commit db76db4

File tree

9 files changed

+74
-25
lines changed

9 files changed

+74
-25
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ subprojects {
44
apply plugin: 'maven'
55

66
group 'org.iot-dsa'
7-
version '0.32.0'
7+
version '0.32.8'
88

99
sourceCompatibility = 1.6
1010
targetCompatibility = 1.6
@@ -26,5 +26,5 @@ subprojects {
2626
}
2727

2828
task wrapper(type: Wrapper) {
29-
gradleVersion = '4.8.1'
29+
gradleVersion = '4.9'
3030
}

dslink-v2-websocket/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public DSTransport close() {
6060
return this;
6161
}
6262
open = false;
63-
debug(debug() ? "WsTextTransport.close()" : null, new Exception());
63+
debug(debug() ? "WsTextTransport.close()" : null);
6464
try {
6565
if (session != null) {
6666
session.close();

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/io/DSCharBuffer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ public synchronized void put(char[] msg, int off, int len) {
160160
offset = 0;
161161
}
162162
}
163-
//System.arraycopy(msg, off, buffer, length + offset, len);
164163
System.arraycopy(msg, off, buffer, offset, len);
165164
length += len;
166165
notifyAll();

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,7 @@ protected boolean hasAckToSend() {
240240
* Override point, this returns the result of hasMessagesToSend.
241241
*/
242242
protected boolean hasSomethingToSend() {
243-
if (ackToSend >= 0) {
244-
return true;
245-
}
246-
if (hasPingToSend()) {
243+
if (hasAckToSend() || hasPingToSend()) {
247244
return true;
248245
}
249246
if (waitingForAcks()) {
@@ -282,8 +279,9 @@ protected void onConnected() {
282279
connected = true;
283280
lastTimeRecv = lastTimeSend = System.currentTimeMillis();
284281
readThread = new ReadThread(getConnection().getLink().getLinkName() + " Reader");
285-
writeThread = new WriteThread(getConnection().getLink().getLinkName() + " Writer");
286282
readThread.start();
283+
Thread.yield();
284+
writeThread = new WriteThread(getConnection().getLink().getLinkName() + " Writer");
287285
writeThread.start();
288286
}
289287

@@ -296,11 +294,6 @@ protected void onDisconnected() {
296294
outgoingResponses.clear();
297295
outgoingMutex.notifyAll();
298296
}
299-
try {
300-
writeThread.join();
301-
} catch (Exception x) {
302-
debug(getPath(), x);
303-
}
304297
try {
305298
readThread.join();
306299
} catch (Exception x) {
@@ -319,6 +312,13 @@ protected void onDisconnecting() {
319312
}
320313
connected = false;
321314
notifyOutgoing();
315+
try {
316+
writeThread.join();
317+
} catch (Exception x) {
318+
debug(getPath(), x);
319+
}
320+
//Attempt to exit cleanly, try to get acks for sent messages.
321+
waitForAcks(1000);
322322
}
323323

324324
protected void requeueOutgoingRequest(OutboundMessage arg) {
@@ -385,6 +385,25 @@ private void verifyLastSend() throws IOException {
385385
}
386386
}
387387

388+
/* Try to exit cleanly, wait for all acks for sent messages. */
389+
private void waitForAcks(long timeout) {
390+
long start = System.currentTimeMillis();
391+
synchronized (outgoingMutex) {
392+
while (getMissingAcks() > 0) {
393+
try {
394+
outgoingMutex.wait(500);
395+
} catch (InterruptedException x) {
396+
warn(getPath(), x);
397+
}
398+
if ((System.currentTimeMillis() - start) > timeout) {
399+
debug(debug() ? String.format("witForAcks timeout (%s / %s)",ackRcvd,messageId)
400+
: null);
401+
break;
402+
}
403+
}
404+
}
405+
}
406+
388407
///////////////////////////////////////////////////////////////////////////
389408
// Inner Classes
390409
///////////////////////////////////////////////////////////////////////////
@@ -400,6 +419,7 @@ private class ReadThread extends Thread {
400419
}
401420

402421
public void run() {
422+
debug("Enter DSSession.ReadThread");
403423
DSLinkConnection conn = getConnection();
404424
try {
405425
while (connected) {
@@ -415,6 +435,7 @@ public void run() {
415435
conn.connDown(DSException.makeMessage(x));
416436
}
417437
}
438+
debug("Exit DSSession.ReadThread");
418439
}
419440
}
420441

@@ -430,6 +451,7 @@ private class WriteThread extends Thread {
430451

431452
public void run() {
432453
DSLinkConnection conn = getConnection();
454+
debug("Enter DSSession.WriteThread");
433455
try {
434456
while (connected) {
435457
verifyLastRead();
@@ -454,6 +476,7 @@ public void run() {
454476
conn.connDown(DSException.makeMessage(x));
455477
}
456478
}
479+
debug("Exit DSSession.WriteThread");
457480
}
458481
}
459482

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void handleUpdate(int sid, String ts, String sts, DSElement value) {
9595
@Override
9696
public void write(DSSession session, MessageWriter writer) {
9797
if (!pendingSubscribe.isEmpty()) {
98+
debug(debug() ? "Sending subscribe requests" : null);
9899
doBeginSubscribe(writer);
99100
Iterator<DSOutboundSubscribeStubs> it = pendingSubscribe.iterator();
100101
while (it.hasNext() && !session.shouldEndMessage()) {
@@ -114,6 +115,7 @@ public void write(DSSession session, MessageWriter writer) {
114115
doEndMessage(writer);
115116
}
116117
if (!pendingUnsubscribe.isEmpty() && !session.shouldEndMessage()) {
118+
debug(debug() ? "Sending unsubscribe requests" : null);
117119
doBeginUnsubscribe(writer);
118120
Iterator<DSOutboundSubscribeStubs> it = pendingUnsubscribe.iterator();
119121
while (it.hasNext() && !session.shouldEndMessage()) {
@@ -129,9 +131,7 @@ public void write(DSSession session, MessageWriter writer) {
129131
}
130132
doEndMessage(writer);
131133
}
132-
synchronized (this) {
133-
enqueued = false;
134-
}
134+
enqueued = false;
135135
if (!pendingSubscribe.isEmpty() || !pendingUnsubscribe.isEmpty()) {
136136
sendMessage();
137137
}
@@ -208,6 +208,7 @@ protected void onDisconnected() {
208208
}
209209
sidMap.clear();
210210
pathMap.clear();
211+
enqueued = false;
211212
}
212213

213214
///////////////////////////////////////////////////////////////////////////
@@ -239,6 +240,7 @@ private void sendMessage() {
239240
* Create or update a subscription.
240241
*/
241242
OutboundSubscribeHandler subscribe(String path, int qos, OutboundSubscribeHandler req) {
243+
trace(trace() ? String.format("Subscribe (qos=%s) %s", qos, path) : null);
242244
DSOutboundSubscribeStub stub = new DSOutboundSubscribeStub(path, qos, req);
243245
DSOutboundSubscribeStubs stubs = null;
244246
synchronized (pathMap) {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/DS1Session.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,19 @@ protected void doRecvMessage() throws IOException {
112112
@Override
113113
protected void doSendMessage() {
114114
try {
115+
beginMessage();
115116
if (!waitingForAcks()) {
116117
requestsNext = !requestsNext;
117-
beginMessage();
118118
send(requestsNext);
119119
if (!shouldEndMessage()) {
120120
send(!requestsNext);
121121
}
122-
endMessage();
123122
lastMessageSent = System.currentTimeMillis();
124123
if (requestsBegun || responsesBegun) {
125124
setAckRequired();
126125
}
127126
}
127+
endMessage();
128128
} finally {
129129
requestsBegun = false;
130130
responsesBegun = false;
@@ -250,11 +250,9 @@ private DSIWriter getWriter() {
250250
* Decomposes and processes a complete envelope which can contain multiple requests and
251251
* responses.
252252
*
253-
* @param reader lastRun() will return BEGIN_MAP
253+
* @param reader last() must return BEGIN_MAP
254254
*/
255255
private void processEnvelope(DSIReader reader) {
256-
int msg = -1;
257-
Token next;
258256
switch (reader.next()) {
259257
case END_MAP:
260258
return;
@@ -263,6 +261,8 @@ private void processEnvelope(DSIReader reader) {
263261
default:
264262
throw new IllegalStateException("Poorly formatted request");
265263
}
264+
int msg = -1;
265+
Token next;
266266
boolean sendAck = false;
267267
do {
268268
String key = reader.getString();

dslink-v2/src/main/java/org/iot/dsa/node/DSInfo.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,13 @@ public DSIObject getDefaultObject() {
176176
}
177177

178178
/**
179-
* A convenience that casts getObject().
179+
* A convenience that casts the object. Will call DSIValue.toElement on values.
180180
*/
181181
public DSElement getElement() {
182-
return (DSElement) value;
182+
if (value instanceof DSElement) {
183+
return (DSElement) value;
184+
}
185+
return ((DSIValue)value).toElement();
183186
}
184187

185188
boolean getFlag(int position) {
@@ -375,6 +378,20 @@ public DSInfo next() {
375378
return next;
376379
}
377380

381+
/**
382+
* The next DSInfo in the parent whose is of the given type.
383+
*/
384+
public DSInfo next(Class is) {
385+
DSInfo cur = next;
386+
while (cur != null) {
387+
if (cur.is(is)) {
388+
return cur;
389+
}
390+
cur = cur.next();
391+
}
392+
return cur;
393+
}
394+
378395
/**
379396
* The next DSInfo in the parent that is an action, or null.
380397
*/

dslink-v2/src/main/java/org/iot/dsa/node/DSNode.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,14 @@ public DSInfo getFirstInfo() {
360360
return firstChild;
361361
}
362362

363+
public DSInfo getFirstInfo(Class type) {
364+
DSInfo info = getFirstInfo();
365+
if (info.is(type)) {
366+
return info;
367+
}
368+
return info.next(type);
369+
}
370+
363371
/**
364372
* The info for the first child node, or null.
365373
*/
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)