Skip to content

Commit 6840fce

Browse files
authored
0.50.0 (#98)
Fixes and changes for broker implementation. Log names correspond to paths.
1 parent 16817cd commit 6840fce

33 files changed

+254
-261
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ the DSA architecture, please visit
2323

2424
Please read the [developer guide](https://github.com/iot-dsa-v2/sdk-dslink-java-v2/wiki/DSLink-Development-Guide).
2525

26+
## Version Control
27+
28+
TODO
29+
30+
## Dependency Management
31+
32+
TODO
33+
2634
## Acknowledgements
2735

2836
_Project Tyrus_

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ subprojects {
55
apply plugin: 'maven'
66

77
group 'org.iot-dsa'
8-
version '0.49.2'
8+
version '0.50.0'
99

1010
sourceCompatibility = 1.8
1111
targetCompatibility = 1.8

dslink-v2/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ artifacts {
44
}
55

66
dependencies {
7+
compileOnly 'javax.websocket:javax.websocket-api:1.1'
78
testImplementation 'org.testng:testng:6.14.3'
89
}
910

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public abstract class DSSession extends DSNode implements DSIConnected {
3737
///////////////////////////////////////////////////////////////////////////
3838
// Instance Fields
3939
///////////////////////////////////////////////////////////////////////////
40+
4041
private final Object outgoingMutex = new Object();
4142
private int ackRcvd = -1;
4243
private int ackRequired = 0;
@@ -191,11 +192,6 @@ protected OutboundMessage dequeueOutgoingResponse() {
191192
*/
192193
protected abstract void doSendMessage() throws Exception;
193194

194-
@Override
195-
protected String getLogName() {
196-
return getLogName("session");
197-
}
198-
199195
protected int getMissingAcks() {
200196
if (ackRequired > 0) {
201197
return ackRequired - ackRcvd - 1;
@@ -285,8 +281,9 @@ protected void onDisconnected() {
285281
outgoingResponses.clear();
286282
notifyOutgoing();
287283
try {
288-
if (Thread.currentThread() != readThread) {
289-
readThread.join();
284+
Thread thread = readThread;
285+
if ((thread != null) && (Thread.currentThread() != thread)) {
286+
thread.join();
290287
}
291288
} catch (Exception x) {
292289
debug(getPath(), x);
@@ -305,8 +302,9 @@ protected void onDisconnecting() {
305302
connected = false;
306303
notifyOutgoing();
307304
try {
308-
if (Thread.currentThread() != writeThread) {
309-
writeThread.join();
305+
Thread thread = writeThread;
306+
if ((thread != null) && (Thread.currentThread() != thread)) {
307+
thread.join();
310308
}
311309
} catch (Exception x) {
312310
debug(getPath(), x);
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package com.acuity.iot.dsa.dslink.protocol;
2+
3+
import com.acuity.iot.dsa.dslink.transport.DSTransport;
4+
import org.iot.dsa.conn.DSConnection;
5+
import org.iot.dsa.dslink.DSIRequester;
6+
import org.iot.dsa.dslink.DSLink;
7+
import org.iot.dsa.dslink.DSLinkConnection;
8+
import org.iot.dsa.dslink.DSSysNode;
9+
import org.iot.dsa.node.DSInfo;
10+
import org.iot.dsa.node.DSNode;
11+
import org.iot.dsa.node.DSPath;
12+
import org.iot.dsa.node.DSString;
13+
import org.iot.dsa.node.action.ActionInvocation;
14+
import org.iot.dsa.node.action.ActionResult;
15+
import org.iot.dsa.node.action.DSAction;
16+
17+
/**
18+
* Represents an upstream connection to a broker.
19+
*
20+
* @author Aaron Hansen
21+
*/
22+
public abstract class DSUpstreamConnection extends DSLinkConnection {
23+
24+
///////////////////////////////////////////////////////////////////////////
25+
// Class Fields
26+
///////////////////////////////////////////////////////////////////////////
27+
28+
protected static final String BROKER_URI = "Broker URI";
29+
protected static final String BROKER_PATH = "Path In Broker";
30+
protected static final String RECONNECT = "Reconnect";
31+
32+
///////////////////////////////////////////////////////////////////////////
33+
// Instance Fields
34+
///////////////////////////////////////////////////////////////////////////
35+
36+
private DSInfo brokerPath = getInfo(BROKER_PATH);
37+
38+
///////////////////////////////////////////////////////////////////////////
39+
// Public Methods
40+
///////////////////////////////////////////////////////////////////////////
41+
42+
/**
43+
* The path representing the link node in the broker.
44+
*/
45+
public String getPathInBroker() {
46+
return brokerPath.getElement().toString();
47+
}
48+
49+
/**
50+
* Concatenates the path in broker with the path of the node.
51+
*/
52+
public String getPathInBroker(DSNode node) {
53+
StringBuilder buf = new StringBuilder();
54+
String localPath = DSPath.encodePath(node, buf).toString();
55+
buf.setLength(0);
56+
return DSPath.concat(getPathInBroker(), localPath, buf).toString();
57+
}
58+
59+
///////////////////////////////////////////////////////////////////////////
60+
// Protected Methods
61+
///////////////////////////////////////////////////////////////////////////
62+
63+
@Override
64+
protected void declareDefaults() {
65+
super.declareDefaults();
66+
declareDefault(BROKER_URI, DSString.NULL).setTransient(true).setReadOnly(true);
67+
declareDefault(BROKER_PATH, DSString.NULL).setTransient(true).setReadOnly(true);
68+
declareDefault(RECONNECT, new DSAction.Parameterless() {
69+
@Override
70+
public ActionResult invoke(DSInfo target, ActionInvocation invocation) {
71+
((DSUpstreamConnection) target.get()).disconnect();
72+
return null;
73+
}
74+
});
75+
}
76+
77+
protected DSSysNode getSys() {
78+
return (DSSysNode) getParent();
79+
}
80+
81+
/**
82+
* Creates and starts a thread for running the connection lifecycle.
83+
*/
84+
@Override
85+
protected void onStable() {
86+
super.onStable();
87+
Thread t = new Thread(this, "Connection " + getName() + " Runner");
88+
t.setDaemon(true);
89+
t.start();
90+
}
91+
92+
protected void setPathInBroker(String path) {
93+
put(brokerPath, DSString.valueOf(path));
94+
}
95+
96+
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundRequest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,4 @@ public DSInboundRequest setSession(DSSession session) {
7474
return this;
7575
}
7676

77-
@Override
78-
protected String getLogName() {
79-
String pre = "";
80-
if (responder != null) {
81-
pre = responder.getLogName();
82-
}
83-
return pre + "." + getClass().getSimpleName();
84-
}
85-
8677
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscription.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,13 @@ protected void init() {
220220
DSIObject obj = path.getTarget();
221221
if (obj instanceof DSNode) {
222222
node = (DSNode) obj;
223-
this.subscription = node.subscribe(this);
223+
this.subscription = node.subscribe(this, DSNode.VALUE_CHANGED_EVENT, null);
224224
onEvent(DSNode.VALUE_CHANGED_EVENT, node, null, null);
225225
} else {
226226
DSInfo info = path.getTargetInfo();
227227
node = path.getNode();
228228
child = info;
229-
this.subscription = node.subscribe(this);
229+
this.subscription = node.subscribe(this, DSNode.VALUE_CHANGED_EVENT, info);
230230
onEvent(DSNode.VALUE_CHANGED_EVENT, node, info, null);
231231
}
232232
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,6 @@ protected void enqueue(DSInboundSubscription subscription) {
155155
responder.sendResponse(this);
156156
}
157157

158-
@Override
159-
protected String getLogName() {
160-
if (responder != null) {
161-
return responder.getLogName() + ".subscriptions";
162-
}
163-
return getClass().getName();
164-
}
165-
166158
/**
167159
* Returns a DSInboundSubscription for v1.
168160
*

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,6 @@ public boolean shouldEndMessage() {
115115
// Protected Methods
116116
/////////////////////////////////////////////////////////////////
117117

118-
@Override
119-
protected String getLogName() {
120-
return getLogName("responder");
121-
}
122-
123118
protected abstract DSInboundSubscriptions getSubscriptions();
124119

125120
protected DSStream putRequest(Integer rid, DSStream request) {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/DS1LinkConnection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import com.acuity.iot.dsa.dslink.io.msgpack.MsgpackReader;
44
import com.acuity.iot.dsa.dslink.io.msgpack.MsgpackWriter;
5+
import com.acuity.iot.dsa.dslink.protocol.DSUpstreamConnection;
56
import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport;
67
import com.acuity.iot.dsa.dslink.transport.DSTextTransport;
78
import com.acuity.iot.dsa.dslink.transport.DSTransport;
89
import org.iot.dsa.dslink.DSIRequester;
9-
import org.iot.dsa.dslink.DSLinkConnection;
1010
import org.iot.dsa.dslink.DSLinkOptions;
1111
import org.iot.dsa.io.DSIReader;
1212
import org.iot.dsa.io.DSIWriter;
@@ -19,7 +19,7 @@
1919
*
2020
* @author Aaron Hansen
2121
*/
22-
public class DS1LinkConnection extends DSLinkConnection {
22+
public class DS1LinkConnection extends DSUpstreamConnection {
2323

2424
///////////////////////////////////////////////////////////////////////////
2525
// Constants
@@ -60,7 +60,6 @@ public DSIRequester getRequester() {
6060
return getSession().getRequester();
6161
}
6262

63-
@Override
6463
public DS1Session getSession() {
6564
if (session == null) {
6665
session = new DS1Session(this);

0 commit comments

Comments
 (0)