Skip to content

Commit 1881b33

Browse files
authored
0.60.0
- Remove default broker url. - Requester caches list requests for reconnection. - Requester caches subscriptions for reconnection. - Unit tests. - Main node name is overridable. - DSPath.resolve
2 parents 4d934f5 + 326391d commit 1881b33

File tree

18 files changed

+471
-120
lines changed

18 files changed

+471
-120
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ subprojects {
55
apply plugin: 'maven'
66

77
group 'org.iot-dsa'
8-
version '0.59.1'
8+
version '0.60.0'
99

1010
sourceCompatibility = 1.8
1111
targetCompatibility = 1.8

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSBrokerConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ protected void doConnect() {
8989
getTransport().open();
9090
connOk();
9191
} catch (Exception x) {
92+
debug(null, debug() ? x : null);
9293
connDown(DSException.makeMessage(x));
9394
}
9495
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSRootLink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ protected void declareDefaults() {
6666
@Override
6767
protected DSLink init(DSLinkOptions config) {
6868
super.init(config);
69-
main = getInfo(MAIN);
69+
main = getInfo(getMainName());
7070
if (main == null) {
7171
String type = getOptions().getMainType();
7272
if (type != null) {
7373
debug("Main node type: " + type);
7474
DSNode node = (DSNode) DSUtil.newInstance(type);
75-
main = put(MAIN, node);
75+
main = put(getMainName(), node);
7676
}
7777
}
7878
String ver = config.getDsaVersion();

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundListStub.java

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.acuity.iot.dsa.dslink.protocol.DSSession;
44
import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter;
5+
import java.io.OutputStream;
56
import java.util.concurrent.ConcurrentLinkedQueue;
67
import org.iot.dsa.dslink.DSRequestException;
78
import org.iot.dsa.dslink.requester.ErrorType;
@@ -13,6 +14,7 @@
1314
import org.iot.dsa.node.DSList;
1415
import org.iot.dsa.node.DSMap;
1516
import org.iot.dsa.node.DSPath;
17+
import org.iot.dsa.time.DSDateTime;
1618

1719
/**
1820
* Manages the lifecycle of an list request and is also the outbound stream passed to the
@@ -26,7 +28,7 @@ public class DSOutboundListStub extends DSOutboundStub {
2628
// Instance Fields
2729
///////////////////////////////////////////////////////////////////////////
2830

29-
private OutboundListHandler handler;
31+
private HandlerAdapter adapter = new HandlerAdapter();
3032
private boolean initialized = false;
3133
private DSMap state = new DSMap();
3234

@@ -39,28 +41,20 @@ protected DSOutboundListStub(DSRequester requester,
3941
String path,
4042
OutboundListHandler handler) {
4143
super(requester, requestId, path);
42-
this.handler = handler;
43-
handler.onInit(path, null, this);
44+
adapter.add(handler);
4445
}
4546

4647
///////////////////////////////////////////////////////////////////////////
4748
// Public Methods
4849
///////////////////////////////////////////////////////////////////////////
4950

50-
public synchronized void addHandler(OutboundListHandler handler) {
51-
HandlerAdapter adapter;
52-
if (this.handler instanceof HandlerAdapter) {
53-
adapter = (HandlerAdapter) this.handler;
54-
} else {
55-
adapter = new HandlerAdapter(this.handler);
56-
this.handler = adapter;
57-
}
51+
public void addHandler(OutboundListHandler handler) {
5852
adapter.add(handler);
5953
}
6054

6155
@Override
6256
public OutboundListHandler getHandler() {
63-
return handler;
57+
return adapter;
6458
}
6559

6660
public DSMap getState() {
@@ -87,14 +81,14 @@ public synchronized void handleResponse(DSMap response) {
8781
}
8882
DSElement e = list.remove(1);
8983
state.put(name, e);
90-
handler.onUpdate(name, e);
84+
adapter.onUpdate(name, e);
9185
} else if (elem.isMap()) {
9286
map = (DSMap) elem;
9387
name = DSPath.decodeName(map.getString("name"));
9488
String change = map.getString("change");
9589
if ("remove".equals(change)) {
9690
state.remove(name);
97-
handler.onRemove(name);
91+
adapter.onRemove(name);
9892
}
9993
} else {
10094
throw new DSRequestException(
@@ -104,7 +98,7 @@ public synchronized void handleResponse(DSMap response) {
10498
}
10599
if ("open".equals(response.getString("stream"))) {
106100
initialized = true;
107-
handler.onInitialized();
101+
adapter.onInitialized();
108102
}
109103
} catch (Exception x) {
110104
getRequester().error(getRequester().getPath(), x);
@@ -125,29 +119,55 @@ public boolean write(DSSession session, MessageWriter writer) {
125119
return true;
126120
}
127121

122+
///////////////////////////////////////////////////////////////////////////
123+
// Package Methods
124+
///////////////////////////////////////////////////////////////////////////
125+
126+
synchronized void disconnected() {
127+
initialized = false;
128+
String name = "$disconnectedTs";
129+
DSElement value = DSDateTime.currentTime().toElement();
130+
state.put(name, value);
131+
try {
132+
adapter.onUpdate(name, value);
133+
} catch (Exception x) {
134+
getRequester().error(getRequester().getPath(), x);
135+
}
136+
}
137+
128138
///////////////////////////////////////////////////////////////////////////
129139
// Inner Classes
130140
///////////////////////////////////////////////////////////////////////////
131141

142+
public interface HandlerAdapterStream extends OutboundStream {
143+
144+
public DSMap getState();
145+
146+
}
147+
132148
class HandlerAdapter implements OutboundListHandler {
133149

134150
private ConcurrentLinkedQueue<OutboundListHandler> handlers = new ConcurrentLinkedQueue<>();
135151

136-
public HandlerAdapter(OutboundListHandler first) {
137-
handlers.add(first);
152+
public HandlerAdapter() {
138153
}
139154

140155
public void add(final OutboundListHandler handler) {
141156
handlers.add(handler);
142-
handler.onInit(getPath(), null, new OutboundStream() {
157+
handler.onInit(getPath(), null, new HandlerAdapterStream() {
158+
boolean open = true;
143159
@Override
144160
public void closeStream() {
161+
open = false;
145162
remove(handler);
146163
}
147-
164+
@Override
165+
public DSMap getState() {
166+
return state;
167+
}
148168
@Override
149169
public boolean isStreamOpen() {
150-
return DSOutboundListStub.this.isStreamOpen();
170+
return open;
151171
}
152172
});
153173
for (DSMap.Entry e : state) {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,8 @@ public boolean isStreamOpen() {
9393
return open;
9494
}
9595

96+
public void setRequestId(Integer rid) {
97+
this.requestId = rid;
98+
}
99+
96100
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscription.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class DSOutboundSubscription {
2525
private int qos = -1;
2626
private Integer sid;
2727
private int size;
28-
private State state = State.PENDING_SUBSCRIBE;
28+
private State state = State.INIT;
2929
private DSOutboundSubscriptions subscriptions;
3030

3131
///////////////////////////////////////////////////////////////////////////
@@ -106,13 +106,12 @@ void add(DSOutboundSubscribeStub stub) {
106106
last.setNext(stub);
107107
last = stub;
108108
}
109-
if (++size > 1) {
110-
if (lastValue != null) {
111-
try {
112-
stub.update(lastTs, lastValue, lastStatus);
113-
} catch (Exception x) {
114-
subscriptions.error(path, x);
115-
}
109+
size++;
110+
if (lastValue != null) {
111+
try {
112+
stub.update(lastTs, lastValue, lastStatus);
113+
} catch (Exception x) {
114+
subscriptions.error(path, x);
116115
}
117116
}
118117
if (qos > prevQos) { //need to resubscribe for new qos
@@ -178,16 +177,17 @@ void update(DSDateTime ts, DSElement value, DSStatus status) {
178177
}
179178

180179
void updateDisconnected() {
181-
if (lastStatus == DSStatus.unknown) {
182-
return;
180+
if (lastStatus == null) {
181+
lastStatus = DSStatus.down;
182+
} else {
183+
lastStatus = lastStatus.add(DSStatus.DOWN);
183184
}
184-
lastStatus = DSStatus.unknown;
185185
lastTs = DSDateTime.currentTime();
186186
if (lastValue == null) {
187187
lastValue = DSNull.NULL;
188188
}
189189
DSOutboundSubscribeStub cur = first;
190-
while (cur.getNext() != null) {
190+
while (cur != null) {
191191
cur.update(lastTs, lastValue, lastStatus);
192192
cur = cur.getNext();
193193
}
@@ -225,6 +225,7 @@ private DSOutboundSubscribeStub predecessor(DSOutboundSubscribeStub stub) {
225225
///////////////////////////////////////////////////////////////////////////
226226

227227
public enum State {
228+
INIT,
228229
PENDING_SUBSCRIBE,
229230
PENDING_UNSUBSCRIBE,
230231
SUBSCRIBED,

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@
88
import java.util.Map;
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.ConcurrentLinkedQueue;
11-
import org.iot.dsa.DSRuntime;
1211
import org.iot.dsa.dslink.requester.OutboundSubscribeHandler;
1312
import org.iot.dsa.io.DSIWriter;
14-
import org.iot.dsa.logging.DSLogger;
1513
import org.iot.dsa.node.DSElement;
1614
import org.iot.dsa.node.DSIValue;
17-
import org.iot.dsa.node.DSLong;
15+
import org.iot.dsa.node.DSNode;
1816
import org.iot.dsa.node.DSNull;
1917
import org.iot.dsa.node.DSStatus;
2018
import org.iot.dsa.time.DSDateTime;
@@ -24,19 +22,19 @@
2422
*
2523
* @author Aaron Hansen
2624
*/
27-
public class DSOutboundSubscriptions extends DSLogger implements OutboundMessage {
25+
public class DSOutboundSubscriptions extends DSNode implements OutboundMessage {
2826

2927
///////////////////////////////////////////////////////////////////////////
3028
// Class Fields
3129
///////////////////////////////////////////////////////////////////////////
3230

33-
static final int MAX_SID = 2147483647;
31+
static final int MAX_SID = 2147483646;
3432

3533
///////////////////////////////////////////////////////////////////////////
3634
// Instance Fields
3735
///////////////////////////////////////////////////////////////////////////
36+
3837
private boolean connected = false;
39-
private DSRuntime.Timer disconnectedTimer;
4038
private boolean enqueued = false;
4139
private int nextSid = 1;
4240
private final Map<String, DSOutboundSubscription> pathMap =
@@ -204,10 +202,6 @@ protected void doWriteUnsubscribe(MessageWriter writer, Integer sid) {
204202

205203
protected void onConnected() {
206204
connected = true;
207-
if (disconnectedTimer != null) {
208-
disconnectedTimer.cancel();
209-
disconnectedTimer = null;
210-
}
211205
for (DSOutboundSubscription sub : pathMap.values()) {
212206
sub.setState(State.PENDING_SUBSCRIBE);
213207
pendingSubscribe.add(sub);
@@ -228,23 +222,7 @@ protected void onDisconnected() {
228222
pendingUnsubscribe.clear();
229223
}
230224
enqueued = false;
231-
if (disconnectedTimer == null) {
232-
disconnectedTimer = DSRuntime.runDelayed(new Runnable() {
233-
@Override
234-
public void run() {
235-
updateDisconnected();
236-
}
237-
}, 15000);
238-
}
239-
}
240-
241-
protected void updateDisconnected() {
242-
disconnectedTimer = null;
243-
if (!connected) {
244-
for (Map.Entry<String, DSOutboundSubscription> entry : pathMap.entrySet()) {
245-
entry.getValue().updateDisconnected();
246-
}
247-
}
225+
updateDisconnected();
248226
}
249227

250228
///////////////////////////////////////////////////////////////////////////
@@ -256,18 +234,27 @@ protected void updateDisconnected() {
256234
* it is a new subscription or there change in qos.
257235
*/
258236
void sendSubscribe(DSOutboundSubscription sub) {
259-
if (connected) {
237+
if (connected && (sub.getState() != State.PENDING_SUBSCRIBE)) {
260238
sub.setState(State.PENDING_SUBSCRIBE);
261239
pendingSubscribe.add(sub);
262240
sendMessage();
263241
}
264242
}
265243

244+
/**
245+
* Number of subscriptions.
246+
*/
247+
int size() {
248+
synchronized (pathMap) {
249+
return pathMap.size();
250+
}
251+
}
252+
266253
/**
267254
* Create or update a subscription.
268255
*/
269256
OutboundSubscribeHandler subscribe(String path, DSIValue qos, OutboundSubscribeHandler req) {
270-
trace(trace() ? String.format("Subscribe (qos=%s) %s", qos, path) : null);
257+
debug(debug() ? String.format("Subscribe (qos=%s) %s", qos, path) : null);
271258
DSOutboundSubscribeStub stub = new DSOutboundSubscribeStub(path, qos, req);
272259
DSOutboundSubscription sub = null;
273260
synchronized (pathMap) {
@@ -325,4 +312,12 @@ private void sendMessage() {
325312
requester.sendRequest(this);
326313
}
327314

315+
private void updateDisconnected() {
316+
for (Map.Entry<String, DSOutboundSubscription> entry : pathMap.entrySet()) {
317+
entry.getValue().updateDisconnected();
318+
}
319+
}
320+
321+
322+
328323
}

0 commit comments

Comments
 (0)