Skip to content

Commit 15e02bb

Browse files
committed
implement serializer. The buffer feature is now functional.
1 parent 9f335db commit 15e02bb

File tree

7 files changed

+138
-51
lines changed

7 files changed

+138
-51
lines changed

src/main/java/org/iot/dsa/dslink/restadapter/RuleNode.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,18 @@ public double getMaxRefreshRate() {
134134

135135
@Override
136136
public void responseRecieved(Response resp, int rowNum) {
137-
int status = resp.getStatus();
138-
String data = resp.readEntity(String.class);
139-
140-
put(lastRespCode, DSInt.valueOf(status));
141-
put(lastRespData, DSString.valueOf(data));
142-
put(lastRespTs, DSString.valueOf(resp.getDate() != null ? DSDateTime.valueOf(resp.getDate().getTime()) : DSDateTime.currentTime()));
137+
if (resp == null) {
138+
put(lastRespCode, DSInt.valueOf(-1));
139+
put(lastRespData, DSString.valueOf("Failed to send update"));
140+
put(lastRespTs, DSString.valueOf(DSDateTime.currentTime()));
141+
} else {
142+
int status = resp.getStatus();
143+
String data = resp.readEntity(String.class);
144+
145+
put(lastRespCode, DSInt.valueOf(status));
146+
put(lastRespData, DSString.valueOf(data));
147+
put(lastRespTs, DSString.valueOf(resp.getDate() != null ? DSDateTime.valueOf(resp.getDate().getTime()) : DSDateTime.currentTime()));
148+
}
143149
}
144150

145151
}

src/main/java/org/iot/dsa/dslink/restadapter/RuleTableNode.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,23 @@ public RuleTableNode(DSList table) {
2828

2929
@Override
3030
public void responseRecieved(Response resp, int rowNum) {
31-
int status = resp.getStatus();
32-
String data = resp.readEntity(String.class);
33-
3431
DSList respTable = lastResponses.getElement().toList();
3532
DSMap respMap = respTable.getMap(rowNum);
36-
respMap.put(Constants.LAST_RESPONSE_CODE, status);
37-
respMap.put(Constants.LAST_RESPONSE_DATA, data);
38-
respMap.put(Constants.LAST_RESPONSE_TS,
39-
(resp.getDate() != null ? DSDateTime.valueOf(resp.getDate().getTime())
40-
: DSDateTime.currentTime()).toString());
33+
34+
if (resp == null) {
35+
respMap.put(Constants.LAST_RESPONSE_CODE, -1);
36+
respMap.put(Constants.LAST_RESPONSE_DATA, "Failed to send update");
37+
respMap.put(Constants.LAST_RESPONSE_TS, DSDateTime.currentTime().toString());
38+
} else {
39+
int status = resp.getStatus();
40+
String data = resp.readEntity(String.class);
41+
42+
respMap.put(Constants.LAST_RESPONSE_CODE, status);
43+
respMap.put(Constants.LAST_RESPONSE_DATA, data);
44+
respMap.put(Constants.LAST_RESPONSE_TS,
45+
(resp.getDate() != null ? DSDateTime.valueOf(resp.getDate().getTime())
46+
: DSDateTime.currentTime()).toString());
47+
}
4148
fire(VALUE_CHANGED, lastResponses);
4249
}
4350

src/main/java/org/iot/dsa/dslink/restadapter/SubUpdate.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,20 @@
55
import org.iot.dsa.time.DSDateTime;
66

77
public class SubUpdate {
8-
final DSDateTime dateTime;
9-
final DSElement value;
10-
final DSStatus status;
118

12-
public SubUpdate(DSDateTime dateTime, DSElement value, DSStatus status) {
9+
public final String dateTime;
10+
public final String value;
11+
public final String status;
12+
public final long ts;
13+
14+
public SubUpdate(String dateTime, String value, String status, long ts) {
1315
this.dateTime = dateTime;
1416
this.value = value;
1517
this.status = status;
18+
this.ts = ts;
19+
}
20+
21+
public SubUpdate(DSDateTime dateTime, DSElement value, DSStatus status) {
22+
this(dateTime.toString(), value.toString(), status.toString(), dateTime.timeInMillis());
1623
}
1724
}
Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,48 @@
11
package org.iot.dsa.dslink.restadapter;
22

3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
36
import org.etsdb.ByteArrayBuilder;
47
import org.etsdb.Serializer;
8+
import org.iot.dsa.io.json.JsonReader;
9+
import org.iot.dsa.io.json.JsonWriter;
10+
import org.iot.dsa.node.DSList;
11+
import org.iot.dsa.node.DSString;
512

613
public class SubUpdateSerializer extends Serializer<SubUpdate> {
714

815
@Override
9-
public void toByteArray(ByteArrayBuilder b, SubUpdate obj, long ts) {
10-
// TODO Auto-generated method stub
11-
16+
public void toByteArray(final ByteArrayBuilder builder, SubUpdate obj, long ts) {
17+
JsonWriter jw = new JsonWriter(new OutputStream() {
18+
@Override
19+
public void write(int b) throws IOException {
20+
builder.put((byte) b);
21+
}
22+
});
23+
jw.beginList();
24+
jw.value(obj.dateTime);
25+
jw.value(obj.value);
26+
jw.value(obj.status);
27+
jw.endList();
28+
jw.close();
1229
}
1330

1431
@Override
15-
public SubUpdate fromByteArray(ByteArrayBuilder b, long ts) {
16-
// TODO Auto-generated method stub
17-
return null;
32+
public SubUpdate fromByteArray(ByteArrayBuilder builder, long ts) {
33+
JsonReader jr = new JsonReader(new InputStream() {
34+
@Override
35+
public int read() throws IOException {
36+
return builder.getAvailable() > 0 ? builder.getByte() : -1;
37+
}
38+
}, DSString.UTF8.name());
39+
jr.next();
40+
DSList l = jr.getList();
41+
jr.close();
42+
if (l.size() < 3) {
43+
throw new RuntimeException("Failed to deserialize subscription update");
44+
}
45+
return new SubUpdate(l.getString(0), l.getString(1), l.getString(2), ts);
1846
}
1947

20-
}
48+
}

src/main/java/org/iot/dsa/dslink/restadapter/SubscriptionRule.java

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -121,33 +121,33 @@ public void onInit(String path, int qos, OutboundStream stream) {
121121
@Override
122122
public void onUpdate(DSDateTime dateTime, DSElement value, DSStatus status) {
123123
info("Rule with sub path " + subPath + ": onUpdate called with value " + (value!=null ? value : "Null"));
124-
storedUpdate = new SubUpdate(dateTime, value, status);
124+
storedUpdate = new SubUpdate(dateTime.toString(), value.toString(), status.toString(), dateTime.timeInMillis());
125125
if (lastUpdateTime < 0 || System.currentTimeMillis() - lastUpdateTime >= minRefreshRate) {
126126
if (future != null) {
127127
future.cancel();
128128
}
129-
trySendUpdate(dateTime, value, status);
129+
trySendUpdate(new SubUpdate(dateTime.toString(), value.toString(), status.toString(), dateTime.timeInMillis()));
130130
}
131131
}
132132

133133
private void sendStoredUpdate() {
134134
if (storedUpdate != null) {
135-
trySendUpdate(storedUpdate.dateTime, storedUpdate.value, storedUpdate.status);
135+
trySendUpdate(storedUpdate);
136136
}
137137
}
138138

139139
private String getSubId() {
140140
return node.getId() + "_" + subPath;
141141
}
142142

143-
private void trySendUpdate(final DSDateTime dateTime, final DSElement value, final DSStatus status) {
144-
if (sendUpdate(dateTime, value, status)) {
143+
private void trySendUpdate(final SubUpdate update) {
144+
if (sendUpdate(update)) {
145145
if (unsentInBuffer) {
146146
unsentInBuffer = !Util.processBuffer(getSubId(), this);
147147
}
148148
} else {
149149
if (node.isBufferEnabled()) {
150-
Util.storeInBuffer(getSubId(), new SubUpdate(dateTime, value, status));
150+
Util.storeInBuffer(getSubId(), update);
151151
unsentInBuffer = true;
152152
}
153153
}
@@ -163,41 +163,42 @@ public void run() {
163163
}
164164
}
165165

166-
private boolean sendUpdate(final DSDateTime dateTime, final DSElement value, final DSStatus status) {
166+
private boolean sendUpdate(final SubUpdate update) {
167167

168168
DSMap urlParams = urlParameters.copy();
169169
String body = this.body;
170170
for (String key: urlParamsWithValues) {
171171
String pattern = urlParams.getString(key);
172172
if (Constants.PLACEHOLDER_VALUE.equals(pattern)) {
173-
urlParams.put(key, value);
173+
urlParams.put(key, update.value);
174174
} else {
175-
pattern = pattern.replaceAll(Constants.PLACEHOLDER_VALUE, value.toString());
176-
pattern = pattern.replaceAll(Constants.PLACEHOLDER_TS, dateTime.toString());
177-
pattern = pattern.replaceAll(Constants.PLACEHOLDER_STATUS, status.toString());
175+
pattern = pattern.replaceAll(Constants.PLACEHOLDER_VALUE, update.value);
176+
pattern = pattern.replaceAll(Constants.PLACEHOLDER_TS, update.dateTime);
177+
pattern = pattern.replaceAll(Constants.PLACEHOLDER_STATUS, update.status);
178178
urlParams.put(key, pattern);
179179
}
180180
}
181181

182182
if (valuesInBody) {
183-
body = body.replaceAll(Constants.PLACEHOLDER_VALUE, value.toString());
184-
body = body.replaceAll(Constants.PLACEHOLDER_TS, dateTime.toString());
185-
body = body.replaceAll(Constants.PLACEHOLDER_STATUS, status.toString());
183+
body = body.replaceAll(Constants.PLACEHOLDER_VALUE, update.value);
184+
body = body.replaceAll(Constants.PLACEHOLDER_TS, update.dateTime);
185+
body = body.replaceAll(Constants.PLACEHOLDER_STATUS, update.status);
186+
body = body.replaceAll(Constants.PLACEHOLDER_BLOCK_START, "");
187+
body = body.replaceAll(Constants.PLACEHOLDER_BLOCK_END, "");
186188
}
187189

188-
info("Rule with sub path " + subPath + ": sending Update with value " + (value!=null ? value : "Null"));
190+
info("Rule with sub path " + subPath + ": sending Update with value " + (update.value!=null ? update.value : "Null"));
189191

190-
Response resp = getWebClientProxy().invoke(method, restUrl, urlParams, body);
191-
node.responseRecieved(resp, rowNum);
192-
return resp.getStatus() == 200;
192+
Response resp = restInvoke(urlParams, body);
193+
return resp != null && resp.getStatus() == 200;
193194
}
194195

195196
public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
196197
if (!batchable) {
197198
Queue<SubUpdate> failed = new LinkedList<SubUpdate>();
198199
while (!updates.isEmpty()) {
199200
SubUpdate update = updates.poll();
200-
if (!sendUpdate(update.dateTime, update.value, update.status)) {
201+
if (!sendUpdate(update)) {
201202
failed.add(update);
202203
}
203204
}
@@ -215,9 +216,9 @@ public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
215216
while (!updates.isEmpty()) {
216217
SubUpdate update = updates.poll();
217218
updatesCopy.add(update);
218-
String temp = block.replaceAll(Constants.PLACEHOLDER_VALUE, update.value.toString())
219-
.replaceAll(Constants.PLACEHOLDER_TS, update.dateTime.toString())
220-
.replaceAll(Constants.PLACEHOLDER_STATUS, update.status.toString());
219+
String temp = block.replaceAll(Constants.PLACEHOLDER_VALUE, update.value)
220+
.replaceAll(Constants.PLACEHOLDER_TS, update.dateTime)
221+
.replaceAll(Constants.PLACEHOLDER_STATUS, update.status);
221222
sb.append(temp);
222223
if (!updates.isEmpty()) {
223224
sb.append(',');
@@ -227,16 +228,26 @@ public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
227228
String body = sb.toString();
228229
info("Rule with sub path " + subPath + ": sending batch update");
229230

230-
Response resp = getWebClientProxy().invoke(method, restUrl, urlParams, body);
231-
node.responseRecieved(resp, rowNum);
232-
if (resp.getStatus() == 200) {
231+
Response resp = restInvoke(urlParams, body);
232+
if (resp != null && resp.getStatus() == 200) {
233233
return null;
234234
} else {
235235
return updatesCopy;
236236
}
237237

238238
}
239239

240+
private Response restInvoke(DSMap urlParams, String body) {
241+
Response resp = null;
242+
try {
243+
resp = getWebClientProxy().invoke(method, restUrl, urlParams, body);
244+
} catch (Exception e) {
245+
warn("", e);
246+
}
247+
node.responseRecieved(resp, rowNum);
248+
return resp;
249+
}
250+
240251
public void close() {
241252
if (stream != null && stream.isStreamOpen()) {
242253
info("Rule with sub path " + subPath + ": closing Stream");

src/main/java/org/iot/dsa/dslink/restadapter/Util.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static void storeInBuffer(String subId, SubUpdate update) {
8787
if (buffer == null) {
8888
initBuffer();
8989
}
90-
buffer.write(subId, update.dateTime.timeInMillis(), update);
90+
buffer.write(subId, update.ts, update);
9191
}
9292

9393
// public static boolean isBufferEmpty(String subPath) {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.iot.dsa.dslink.restadapter;
2+
3+
import org.junit.Test;
4+
import org.etsdb.ByteArrayBuilder;
5+
import org.iot.dsa.dslink.restadapter.SubUpdate;
6+
import org.iot.dsa.node.DSLong;
7+
import org.iot.dsa.node.DSStatus;
8+
import org.iot.dsa.time.DSDateTime;
9+
import static org.junit.Assert.*;
10+
11+
public class SubUpdateSerializerTests {
12+
13+
@Test
14+
public void testSerializer() {
15+
DSDateTime ts = DSDateTime.valueOf(2016, 10, 1);
16+
SubUpdate before = new SubUpdate(ts, DSLong.valueOf(42), DSStatus.ok);
17+
SubUpdateSerializer serializer = new SubUpdateSerializer();
18+
ByteArrayBuilder builder = new ByteArrayBuilder();
19+
serializer.toByteArray(builder, before, ts.timeInMillis());
20+
SubUpdate after = serializer.fromByteArray(builder, ts.timeInMillis());
21+
assertEquals(before.dateTime, after.dateTime);
22+
assertEquals(before.value, after.value);
23+
assertEquals(before.status, after.status);
24+
assertEquals(before.ts, after.ts);
25+
}
26+
27+
28+
}

0 commit comments

Comments
 (0)