Skip to content

Commit bf7affc

Browse files
committed
implement automatic purging of buffer
1 parent 15e02bb commit bf7affc

File tree

8 files changed

+169
-8
lines changed

8 files changed

+169
-8
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dependencies {
2727
implementation 'com.github.iot-dsa-v2.sdk-dslink-java-v2:dslink-v2-websocket:v+'
2828
implementation 'commons-logging:commons-logging:+'
2929
implementation 'org.apache.commons:commons-lang3:3.6'
30+
implementation 'commons-io:commons-io:2.6'
3031
implementation 'org.apache.cxf:cxf-rt-rs-security-oauth2:3.1.7'
3132
implementation 'org.apache.cxf:cxf-rt-rs-client:3.0.0-milestone1'
3233
implementation 'javax.xml.bind:jaxb-api:2.3.0'
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package org.etsdb.util;
2+
3+
import org.apache.commons.io.FileUtils;
4+
import org.etsdb.Database;
5+
import org.etsdb.TimeRange;
6+
import java.io.File;
7+
import java.util.ArrayList;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Map.Entry;
12+
import java.util.concurrent.ScheduledFuture;
13+
14+
15+
public class DbPurger {
16+
private static DbPurger singleton = null;
17+
18+
private Map<Database<?>, PurgeSettings> databases = new HashMap<Database<?>, PurgeSettings>();
19+
private ScheduledFuture<?> fut;
20+
private boolean running;
21+
22+
private DbPurger() {
23+
}
24+
25+
public static DbPurger getInstance() {
26+
if (singleton == null) {
27+
singleton = new DbPurger();
28+
}
29+
return singleton;
30+
}
31+
32+
public synchronized void addDb(Database<?> db, PurgeSettings purgeSettings) {
33+
if (!databases.containsKey(db)) {
34+
databases.put(db, purgeSettings);
35+
}
36+
}
37+
38+
public synchronized void removeDb(Database<?> db) {
39+
databases.remove(db);
40+
}
41+
42+
public void stop() {
43+
running = false;
44+
synchronized (this) {
45+
if (fut != null) {
46+
fut.cancel(true);
47+
}
48+
}
49+
}
50+
51+
public Runnable setupPurger() {
52+
running = true;
53+
Runnable runner = new Runnable() {
54+
@Override
55+
public void run() {
56+
for (Entry<Database<?>, PurgeSettings> entry : databases.entrySet()) {
57+
Database<?> db = entry.getKey();
58+
PurgeSettings settings = entry.getValue();
59+
if (!(settings.isPurgeEnabled() && running)) {
60+
continue;
61+
}
62+
63+
File path = db.getBaseDir();
64+
long currSize = FileUtils.sizeOf(path);
65+
long maxSize = settings.getMaxSizeInBytes();
66+
long delCount = 0;
67+
// LOGGER.info("Deciding whether to purge");
68+
// LOGGER.info("curr = " + curr + " , request = " + request);
69+
if (maxSize - currSize <= 0) {
70+
if (!running) {
71+
break;
72+
}
73+
// LOGGER.info("Going to purge");
74+
75+
List<String> series = db.getSeriesIds();
76+
if (File.separatorChar != '/') {
77+
List<String> corrected = new ArrayList<String>();
78+
for (String s: series) {
79+
corrected.add(s.replace(File.separatorChar, '/'));
80+
}
81+
series = corrected;
82+
}
83+
// LOGGER.info("Purge Step 1");
84+
while (maxSize - currSize <= 0) {
85+
// LOGGER.info("Purge Step 2");
86+
TimeRange range = db.getTimeRange(series);
87+
if (range == null || range.isUndefined()) {
88+
break;
89+
}
90+
// LOGGER.info("Purge Step 3");
91+
long from = range.getFrom();
92+
for (String s : series) {
93+
// LOGGER.info("Purge Step 4");
94+
delCount += db.delete(s, from, from + 3600000);
95+
}
96+
// LOGGER.info("Purge Step 5");
97+
if (delCount <= 0) {
98+
break;
99+
}
100+
// LOGGER.info("Purge Step 6");
101+
currSize = FileUtils.sizeOf(path);
102+
// LOGGER.info("Purge Step 7: curr = " + curr + " , request = " + request);
103+
}
104+
}
105+
if (delCount > 0) {
106+
// String p = path.getPath();
107+
// LOGGER.info("Deleted {} records from {}", delCount, p);
108+
}
109+
}
110+
}
111+
};
112+
return runner;
113+
}
114+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.etsdb.util;
2+
3+
public interface PurgeSettings {
4+
5+
public boolean isPurgeEnabled();
6+
7+
public long getMaxSizeInBytes();
8+
9+
}

src/main/java/org/iot/dsa/dslink/restadapter/AbstractRuleNode.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import org.iot.dsa.node.DSIObject;
77
import org.iot.dsa.node.DSIValue;
88
import org.iot.dsa.node.DSInfo;
9+
import org.iot.dsa.node.DSLong;
910
import org.iot.dsa.node.DSNode;
1011

1112
public abstract class AbstractRuleNode extends DSNode {
1213
private DSInfo bufferEnabled = getInfo(Constants.USE_BUFFER);
14+
private DSInfo maxBatchSize = getInfo(Constants.MAX_BATCH_SIZE);
1315
private String id;
1416

1517
public WebClientProxy getWebClientProxy() {
@@ -22,12 +24,17 @@ public WebClientProxy getWebClientProxy() {
2224
protected void declareDefaults() {
2325
super.declareDefaults();
2426
declareDefault(Constants.USE_BUFFER, DSBool.FALSE, "Whether updates that failed to send should be stored and re-sent in the future");
27+
declareDefault(Constants.MAX_BATCH_SIZE, DSLong.valueOf(50), "Maximum number of updates to put in a single REST request");
2528
}
2629

2730
public boolean isBufferEnabled() {
2831
return bufferEnabled.getValue().toElement().toBoolean();
2932
}
3033

34+
public int getMaxBatchSize() {
35+
return maxBatchSize.getValue().toElement().toInt();
36+
}
37+
3138
@Override
3239
protected void onStarted() {
3340
super.onStarted();

src/main/java/org/iot/dsa/dslink/restadapter/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ public class Constants {
2121
public static final String MIN_REFRESH_RATE = "Minimum Refresh Rate";
2222
public static final String MAX_REFRESH_RATE = "Maximum Refresh Rate";
2323
public static final String USE_BUFFER = "Buffer Enabled";
24+
public static final String MAX_BATCH_SIZE = "Maximum Batch Size";
25+
public static final String BUFFER_PURGE_ENABLED = "Enable Buffer Auto-Purge";
26+
public static final String BUFFER_MAX_SIZE = "Maximum Buffer Size";
2427

2528
//Actions
2629
public static final String ACT_ADD_BASIC_CONN = "Basic Connection";
@@ -48,5 +51,4 @@ public class Constants {
4851

4952
//MISC
5053
public static final String BUFFER_PATH = "buffer";
51-
public static final int MAX_BATCH_SIZE = 50;
5254
}

src/main/java/org/iot/dsa/dslink/restadapter/MainNode.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package org.iot.dsa.dslink.restadapter;
22

3+
import org.etsdb.util.PurgeSettings;
34
import org.iot.dsa.dslink.DSIRequester;
45
import org.iot.dsa.dslink.DSLinkConnection;
56
import org.iot.dsa.dslink.DSMainNode;
7+
import org.iot.dsa.node.DSBool;
68
import org.iot.dsa.node.DSInfo;
9+
import org.iot.dsa.node.DSLong;
710
import org.iot.dsa.node.DSMap;
811
import org.iot.dsa.node.DSNode;
912
import org.iot.dsa.node.DSString;
@@ -17,13 +20,15 @@
1720
import org.iot.dsa.util.DSException;
1821

1922
/**
20-
* The root and only node of this link.
23+
* The root node of this link.
2124
*/
22-
public class MainNode extends DSMainNode {
23-
24-
25+
public class MainNode extends DSMainNode implements PurgeSettings {
2526
private static final Object requesterLock = new Object();
2627
private static DSIRequester requester;
28+
public static MainNode instance;
29+
30+
private DSInfo purgeEnabled = getInfo(Constants.BUFFER_PURGE_ENABLED);
31+
private DSInfo maxBufferSize = getInfo(Constants.BUFFER_MAX_SIZE);
2732

2833
public MainNode() {
2934
}
@@ -58,10 +63,21 @@ protected void declareDefaults() {
5863
declareDefault(Constants.ACT_ADD_BASIC_CONN, makeAddBasicConnectionAction());
5964
declareDefault(Constants.ACT_ADD_OAUTH_CLIENT_CONN, makeAddOauthClientConnectionAction());
6065
declareDefault(Constants.ACT_ADD_OAUTH_PASSWORD_CONN, makeAddOauthPassConnectionAction());
66+
declareDefault(Constants.BUFFER_PURGE_ENABLED, DSBool.FALSE, "Whether old unsent records should automatically be purged from the buffer when the buffer gets too large");
67+
declareDefault(Constants.BUFFER_MAX_SIZE, DSLong.valueOf(1074000000), "Maximum size of buffer in bytes; only applies if auto-purge is enabled");
68+
}
69+
70+
public boolean isPurgeEnabled() {
71+
return purgeEnabled.getValue().toElement().toBoolean();
72+
}
73+
74+
public long getMaxSizeInBytes() {
75+
return maxBufferSize.getValue().toElement().toLong();
6176
}
6277

6378
@Override
6479
protected void onStarted() {
80+
instance = this;
6581
getLink().getConnection().subscribe(
6682
DSLinkConnection.CONNECTED, null, null,
6783
new DSISubscriber() {

src/main/java/org/iot/dsa/dslink/restadapter/SubscriptionRule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,9 @@ public void close() {
255255
}
256256
}
257257

258+
public int getMaxBatchSize() {
259+
return node.getMaxBatchSize();
260+
}
258261

259262
public WebClientProxy getWebClientProxy() {
260263
return node.getWebClientProxy();

src/main/java/org/iot/dsa/dslink/restadapter/Util.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import org.etsdb.Database;
88
import org.etsdb.DatabaseFactory;
99
import org.etsdb.QueryCallback;
10+
import org.etsdb.util.DbPurger;
11+
import org.iot.dsa.DSRuntime;
1012
import org.iot.dsa.io.json.JsonReader;
1113
import org.iot.dsa.node.DSElement;
1214
import org.iot.dsa.node.DSList;
@@ -81,6 +83,10 @@ public static double getDouble(DSMap map, String key, double def) {
8183
private static void initBuffer() {
8284
File f = new File(Constants.BUFFER_PATH);
8385
buffer = DatabaseFactory.createDatabase(f, new SubUpdateSerializer());
86+
DbPurger purger = DbPurger.getInstance();
87+
purger.addDb(buffer, MainNode.instance);
88+
Runnable purgeRunner = purger.setupPurger();
89+
DSRuntime.runAfterDelay(purgeRunner, 30000, 30000);
8490
}
8591

8692
public static void storeInBuffer(String subId, SubUpdate update) {
@@ -106,12 +112,14 @@ public static boolean processBuffer(String subId, SubscriptionRule subRule) {
106112
// if (range == null || range.isUndefined()) {
107113
// return;
108114
// }
115+
final int maxBatchSize = subRule.getMaxBatchSize();
109116
final LinkedList<SubUpdate> updates = new LinkedList<SubUpdate>();
117+
final AtomicLong firstTs = new AtomicLong();
110118
final AtomicLong lastTs = new AtomicLong();
111119
int count;
112120
do {
113121
updates.clear();
114-
buffer.query(subId, lastTs.get(), Long.MAX_VALUE, Constants.MAX_BATCH_SIZE, new QueryCallback<SubUpdate>() {
122+
buffer.query(subId, lastTs.get(), Long.MAX_VALUE, maxBatchSize, new QueryCallback<SubUpdate>() {
115123

116124
@Override
117125
public void sample(String seriesId, long ts, SubUpdate value) {
@@ -124,7 +132,8 @@ public void sample(String seriesId, long ts, SubUpdate value) {
124132
count = updates.size();
125133
Queue<SubUpdate> failedUpdates = subRule.sendBatchUpdate(updates);
126134
if (failedUpdates == null || failedUpdates.size() < count) {
127-
buffer.purge(subId, lastTs.get());
135+
buffer.delete(subId, firstTs.get(), lastTs.get());
136+
firstTs.set(lastTs.get());
128137
if (failedUpdates != null) {
129138
for (SubUpdate failedUpdate: failedUpdates) {
130139
storeInBuffer(subId, failedUpdate);
@@ -133,7 +142,7 @@ public void sample(String seriesId, long ts, SubUpdate value) {
133142
} else {
134143
return false;
135144
}
136-
} while (count >= 50);
145+
} while (count >= maxBatchSize);
137146

138147
return true;
139148
}

0 commit comments

Comments
 (0)