Skip to content

Commit 1abf4b5

Browse files
authored
0.20.0 (#26)
- Multi-part messages. - Use info flags for action permissions. - Disconnection cleanup.
1 parent 9a5c4e2 commit 1abf4b5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1322
-835
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apply plugin: 'java'
22
apply plugin: 'maven'
33

44
group 'org.iot.dsa'
5-
version '0.19.0'
5+
version '0.20.0'
66

77
sourceCompatibility = 1.6
88
targetCompatibility = 1.6

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/io/DSByteBuffer.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.acuity.iot.dsa.dslink.io;
22

33
import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport;
4-
import java.io.IOException;
54
import java.io.InputStream;
65
import java.io.OutputStream;
76
import java.io.PrintStream;
@@ -269,21 +268,14 @@ public DSByteBuffer put(int dest, byte[] msg, int off, int len) {
269268
}
270269

271270
public int put(InputStream in, int len) {
272-
int count = 0;
271+
growBuffer(offset + len);
273272
try {
274-
int ch;
275-
while (count < len) {
276-
ch = in.read();
277-
if (ch < 0) {
278-
return count;
279-
}
280-
put((byte) ch);
281-
count++;
282-
}
283-
} catch (IOException x) {
273+
len = in.read(buffer, offset, len);
274+
} catch (Exception x) {
284275
DSException.throwRuntime(x);
285276
}
286-
return count;
277+
length += len;
278+
return len;
287279
}
288280

289281
/**
@@ -554,6 +546,19 @@ public void sendTo(DSBinaryTransport transport, boolean isLast) {
554546
length = 0;
555547
}
556548

549+
/**
550+
* Push bytes from the internal buffer to the given.
551+
*/
552+
public void sendTo(DSByteBuffer buf, int len) {
553+
buf.put(buffer, offset, len);
554+
length -= len;
555+
if (length == 0) {
556+
offset = 0;
557+
} else {
558+
offset += len;
559+
}
560+
}
561+
557562
/**
558563
* Push bytes from the internal buffer to the stream.
559564
*/

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import org.iot.dsa.dslink.requester.ErrorType;
66
import org.iot.dsa.dslink.requester.OutboundRequestHandler;
77
import org.iot.dsa.dslink.requester.OutboundStream;
8-
import org.iot.dsa.node.DSElement;
98
import org.iot.dsa.node.DSMap;
109

1110
/**
@@ -79,49 +78,6 @@ public void handleClose() {
7978
getRequester().removeRequest(getRequestId());
8079
}
8180

82-
public void handleError(DSElement details) {
83-
if (!open) {
84-
return;
85-
}
86-
try {
87-
ErrorType type = ErrorType.internalError;
88-
String msg;
89-
if (details.isMap()) {
90-
String detail = null;
91-
DSMap map = details.toMap();
92-
String tmp = map.getString("type");
93-
if (tmp.equals("permissionDenied")) {
94-
type = ErrorType.permissionDenied;
95-
} else if (tmp.equals("invalidRequest")) {
96-
type = ErrorType.badRequest;
97-
} else if (tmp.equals("invalidPath")) {
98-
type = ErrorType.badRequest;
99-
} else if (tmp.equals("notSupported")) {
100-
type = ErrorType.notSupported;
101-
} else {
102-
type = ErrorType.internalError;
103-
}
104-
msg = map.getString("msg");
105-
detail = map.getString("detail");
106-
if (msg == null) {
107-
msg = detail;
108-
}
109-
if (msg == null) {
110-
msg = details.toString();
111-
}
112-
} else {
113-
type = ErrorType.internalError;
114-
msg = details.toString();
115-
}
116-
if (msg == null) {
117-
msg = "";
118-
}
119-
getHandler().onError(type, msg);
120-
} catch (Exception x) {
121-
getRequester().error(getRequester().getPath(), x);
122-
}
123-
}
124-
12581
public void handleError(ErrorType type, String message) {
12682
if (!open) {
12783
return;

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscribeStubs.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,6 @@ void add(DSOutboundSubscribeStub stub) {
7272
}
7373
}
7474

75-
public void close() {
76-
//TODO who calls this and for what purpose
77-
/*
78-
DSOutboundSubscribeStub cur = first;
79-
while (cur != null) {
80-
cur.close();
81-
cur = cur.getNext();
82-
}
83-
*/
84-
}
85-
8675
private boolean contains(DSOutboundSubscribeStub stub) {
8776
if (stub == first) {
8877
return true;
@@ -114,6 +103,14 @@ public boolean hasSid() {
114103
return sid != null;
115104
}
116105

106+
public void onDisconnect() {
107+
DSOutboundSubscribeStub cur = first;
108+
while (cur != null) {
109+
cur.closeStream();
110+
cur = cur.getNext();
111+
}
112+
}
113+
117114
/**
118115
* Null if the arg is the first in the list, last if stub is not contained.
119116
*/

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ public void onConnectFail() {
132132
}
133133

134134
public void onDisconnect() {
135+
for (DSOutboundSubscribeStubs stubs : pendingSubscribe) {
136+
stubs.onDisconnect();
137+
}
138+
pendingSubscribe.clear();
139+
pendingUnsubscribe.clear();
140+
for (DSOutboundSubscribeStubs stubs : pathMap.values()) {
141+
stubs.onDisconnect();
142+
}
143+
sidMap.clear();
144+
pathMap.clear();
135145
}
136146

137147
public void handleUpdate(int sid, String ts, String sts, DSElement value) {
@@ -217,8 +227,7 @@ public void write(MessageWriter writer) {
217227
if (!pendingSubscribe.isEmpty()) {
218228
doBeginSubscribe(writer);
219229
Iterator<DSOutboundSubscribeStubs> it = pendingSubscribe.iterator();
220-
//while (it.hasNext() && !session.shouldEndMessage()) {
221-
while (it.hasNext()) { //todo
230+
while (it.hasNext() && !session.shouldEndMessage()) {
222231
DSOutboundSubscribeStubs stubs = it.next();
223232
if (!stubs.hasSid()) {
224233
synchronized (pathMap) {

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.acuity.iot.dsa.dslink.protocol.DSSession;
44
import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage;
55
import com.acuity.iot.dsa.dslink.transport.DSTransport;
6+
import java.util.Iterator;
67
import java.util.Map;
8+
import java.util.Map.Entry;
79
import java.util.concurrent.ConcurrentHashMap;
810
import java.util.concurrent.atomic.AtomicInteger;
911
import org.iot.dsa.dslink.DSIRequester;
@@ -28,8 +30,7 @@ public abstract class DSRequester extends DSNode implements DSIRequester {
2830

2931
private AtomicInteger nextRid = new AtomicInteger();
3032
private DSSession session;
31-
private Map<Integer, DSOutboundStub> requests =
32-
new ConcurrentHashMap<Integer, DSOutboundStub>();
33+
private Map<Integer, DSOutboundStub> requests = new ConcurrentHashMap<Integer, DSOutboundStub>();
3334
private DSOutboundSubscriptions subscriptions = makeSubscriptions();
3435

3536
///////////////////////////////////////////////////////////////////////////
@@ -120,7 +121,6 @@ protected DSOutboundSetStub makeSet(String path, DSIValue value, OutboundRequest
120121

121122
public void onConnect() {
122123
subscriptions.onConnect();
123-
session.setRequesterAllowed();
124124
}
125125

126126
public void onConnectFail() {
@@ -129,6 +129,17 @@ public void onConnectFail() {
129129

130130
public void onDisconnect() {
131131
subscriptions.onDisconnect();
132+
Iterator<Entry<Integer, DSOutboundStub>> it = requests.entrySet().iterator();
133+
Map.Entry<Integer, DSOutboundStub> me;
134+
while (it.hasNext()) {
135+
me = it.next();
136+
try {
137+
me.getValue().getHandler().onClose();
138+
} catch (Exception x) {
139+
error(getPath(), x);
140+
}
141+
it.remove();
142+
}
132143
}
133144

134145
@Override

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Iterator;
88
import org.iot.dsa.DSRuntime;
99
import org.iot.dsa.dslink.DSIResponder;
10+
import org.iot.dsa.dslink.DSPermissionException;
1011
import org.iot.dsa.dslink.DSRequestException;
1112
import org.iot.dsa.dslink.responder.InboundInvokeRequest;
1213
import org.iot.dsa.io.DSIWriter;
@@ -79,7 +80,7 @@ public void close() {
7980
}
8081
state = STATE_CLOSE_PENDING;
8182
enqueueResponse();
82-
fine(fine() ? getPath() + " list closed locally" : null);
83+
fine(fine() ? getPath() + " invoke closed locally" : null);
8384
}
8485

8586
@Override
@@ -90,7 +91,7 @@ public void close(Exception reason) {
9091
closeReason = reason;
9192
state = STATE_CLOSE_PENDING;
9293
enqueueResponse();
93-
fine(fine() ? getPath() + " list closed locally" : null);
94+
fine(fine() ? getPath() + " invoke closed locally" : null);
9495
}
9596

9697
private synchronized Update dequeueUpdate() {
@@ -236,7 +237,19 @@ public void run() {
236237
if (!info.isAction()) {
237238
throw new DSRequestException("Not an action " + path.getPath());
238239
}
239-
//TODO verify incoming permission
240+
if (info.isAdmin()) {
241+
if (!permission.isConfig()) {
242+
throw new DSPermissionException("Config permission required");
243+
}
244+
} else if (!info.isReadOnly()) {
245+
if (DSPermission.WRITE.isGreaterThan(permission)) {
246+
throw new DSPermissionException("Write permission required");
247+
}
248+
} else {
249+
if (DSPermission.READ.isGreaterThan(permission)) {
250+
throw new DSPermissionException("Read permission required");
251+
}
252+
}
240253
DSAction action = info.getAction();
241254
result = action.invoke(info, this);
242255
} catch (Exception x) {

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.iot.dsa.node.event.DSISubscriber;
2828
import org.iot.dsa.node.event.DSInfoTopic;
2929
import org.iot.dsa.node.event.DSTopic;
30+
import org.iot.dsa.security.DSPermission;
3031

3132
/**
3233
* List implementation for a responder.
@@ -219,7 +220,11 @@ protected void encodeChild(ApiObject child, MessageWriter writer) {
219220
if (e != null) {
220221
map.put("$invokable", e);
221222
} else {
222-
map.put("$invokable", action.getPermission().toString());
223+
if (child.isAdmin()) {
224+
map.put("$invokable", DSPermission.CONFIG.toString());
225+
} else if (!child.isReadOnly()) {
226+
map.put("$invokable", DSPermission.WRITE.toString());
227+
}
223228
}
224229
} else if (child.isValue()) {
225230
e = cacheMap.remove("$type");
@@ -301,7 +306,11 @@ private void encodeTargetAction(ApiObject object, MessageWriter writer) {
301306
}
302307
DSElement e = cacheMap.remove("$invokable");
303308
if (e == null) {
304-
encode("$invokable", action.getPermission().toString(), writer);
309+
if (object.isAdmin()) {
310+
encode("$invokable", DSPermission.CONFIG.toString(), writer);
311+
} else if (!object.isReadOnly()) {
312+
encode("$invokable", DSPermission.WRITE.toString(), writer);
313+
}
305314
} else {
306315
encode("$invokable", e, writer);
307316
}

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSet.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.acuity.iot.dsa.dslink.protocol.message.RequestPath;
44
import org.iot.dsa.dslink.DSIResponder;
5+
import org.iot.dsa.dslink.DSPermissionException;
56
import org.iot.dsa.dslink.DSRequestException;
67
import org.iot.dsa.dslink.responder.InboundSetRequest;
78
import org.iot.dsa.node.DSElement;
@@ -44,7 +45,13 @@ public void run() {
4445
if (info.isReadOnly()) {
4546
throw new DSRequestException("Not writable: " + getPath());
4647
}
47-
//TODO verify incoming permission
48+
if (!permission.isConfig()) {
49+
if (info.isAdmin()) {
50+
throw new DSPermissionException("Config permission required");
51+
} else if (DSPermission.WRITE.isGreaterThan(permission)) {
52+
throw new DSPermissionException("Write permission required");
53+
}
54+
}
4855
if (info.isNode()) {
4956
info.getNode().onSet(value);
5057
} else {

dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,6 @@ public DSInboundSubscriptions(DSResponder responder) {
4747
// Methods in alphabetical order
4848
///////////////////////////////////////////////////////////////////////////
4949

50-
/**
51-
* Unsubscribes all.
52-
*/
53-
public void close() {
54-
for (Integer i : sidMap.keySet()) {
55-
unsubscribe(i);
56-
}
57-
}
58-
5950
/**
6051
* Add to the outbound queue if not already enqueued.
6152
*/
@@ -89,6 +80,21 @@ protected DSInboundSubscription makeSubscription(Integer sid, String path, int q
8980
return new DSInboundSubscription(this, sid, path, qos);
9081
}
9182

83+
public void onConnect() {
84+
}
85+
86+
public void onConnectFail() {
87+
}
88+
89+
/**
90+
* Unsubscribes all.
91+
*/
92+
public void onDisconnect() {
93+
for (Integer i : sidMap.keySet()) {
94+
unsubscribe(i);
95+
}
96+
}
97+
9298
/**
9399
* Create or update a subscription.
94100
*/

0 commit comments

Comments
 (0)