Skip to content

Commit 831d75d

Browse files
authored
Merge pull request #316 from swisspost/DequeueStatistic-deserialize-failed-between-version
DequeueStatistic changes will causes InvalidClassException
2 parents 809a9e2 + 574cf76 commit 831d75d

File tree

4 files changed

+342
-21
lines changed

4 files changed

+342
-21
lines changed

src/main/java/org/swisspush/redisques/QueueStatsService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,16 @@ private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<
207207
dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> {
208208
for (Queue queue : req.queues) {
209209
if (event.containsKey(queue.name)) {
210-
DequeueStatistic sharedDequeueStatisticCopy = event.get(queue.name);
211-
// Attach value of shared data
212-
queue.lastDequeueAttemptEpochMs = sharedDequeueStatisticCopy.getLastDequeueAttemptTimestamp();
213-
queue.lastDequeueSuccessEpochMs = sharedDequeueStatisticCopy.getLastDequeueSuccessTimestamp();
214-
queue.nextDequeueDueTimestampEpochMs = sharedDequeueStatisticCopy.getNextDequeueDueTimestamp();
215-
queue.failedReason = sharedDequeueStatisticCopy.getFailedReason();
210+
if (event.get(queue.name) instanceof JsonObject) {
211+
DequeueStatistic sharedDequeueStatisticCopy = DequeueStatistic.fromJson(event.get(queue.name));
212+
// Attach value of shared data
213+
queue.lastDequeueAttemptEpochMs = sharedDequeueStatisticCopy.getLastDequeueAttemptTimestamp();
214+
queue.lastDequeueSuccessEpochMs = sharedDequeueStatisticCopy.getLastDequeueSuccessTimestamp();
215+
queue.nextDequeueDueTimestampEpochMs = sharedDequeueStatisticCopy.getNextDequeueDueTimestamp();
216+
queue.failedReason = sharedDequeueStatisticCopy.getFailedReason();
217+
} else {
218+
log.warn("DequeueStatistics for {} have a wrong type, skip it for now.", queue.name);
219+
}
216220
}
217221
}
218222
onDone.accept(null, req);

src/main/java/org/swisspush/redisques/util/DequeueStatistic.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,45 @@
11
package org.swisspush.redisques.util;
22

3-
import java.io.Serializable;
3+
import io.vertx.core.json.JsonObject;
4+
5+
import javax.annotation.Nullable;
6+
7+
public class DequeueStatistic {
8+
9+
public static final String KEY_LAST_DEQUEUE_ATTEMPT_TS = "lastDequeueAttemptTimestamp";
10+
public static final String KEY_LAST_DEQUEUE_SUCCESS_TS = "lastDequeueSuccessTimestamp";
11+
public static final String KEY_NEXT_DEQUEUE_DUE_TS = "nextDequeueDueTimestamp";
12+
public static final String KEY_LAST_UPDATED_TS = "lastUpdatedTimestamp";
13+
public static final String KEY_FAILED_REASON = "failedReason";
14+
public static final String KEY_MARK_FOR_REMOVAL = "markForRemoval";
415

5-
public class DequeueStatistic implements Serializable {
616
private Long lastDequeueAttemptTimestamp = null;
717
private Long lastDequeueSuccessTimestamp = null;
818
private Long nextDequeueDueTimestamp = null;
919
private Long lastUpdatedTimestamp = null;
1020
private String failedReason = null;
1121
private boolean markForRemoval = false;
1222

23+
public DequeueStatistic() {
24+
// default
25+
}
26+
27+
public DequeueStatistic(
28+
Long lastDequeueAttemptTimestamp,
29+
Long lastDequeueSuccessTimestamp,
30+
Long nextDequeueDueTimestamp,
31+
Long lastUpdatedTimestamp,
32+
String failedReason,
33+
boolean markForRemoval
34+
) {
35+
this.lastDequeueAttemptTimestamp = lastDequeueAttemptTimestamp;
36+
this.lastDequeueSuccessTimestamp = lastDequeueSuccessTimestamp;
37+
this.nextDequeueDueTimestamp = nextDequeueDueTimestamp;
38+
this.lastUpdatedTimestamp = lastUpdatedTimestamp;
39+
this.failedReason = failedReason;
40+
this.markForRemoval = markForRemoval;
41+
}
42+
1343
private void updateLastUpdatedTimestamp() {
1444
this.lastUpdatedTimestamp = System.currentTimeMillis();
1545
}
@@ -64,4 +94,56 @@ public void setMarkedForRemoval() {
6494
public boolean isMarkedForRemoval() {
6595
return this.markForRemoval;
6696
}
97+
98+
99+
private static void putIfNotNull(JsonObject json, String key, Object value) {
100+
if (value != null) {
101+
json.put(key, value);
102+
}
103+
}
104+
/**
105+
* Convert this object to Vert.x JsonObject
106+
*/
107+
public JsonObject asJson() {
108+
JsonObject json = new JsonObject();
109+
putIfNotNull(json, KEY_LAST_DEQUEUE_ATTEMPT_TS, lastDequeueAttemptTimestamp);
110+
putIfNotNull(json, KEY_LAST_DEQUEUE_SUCCESS_TS, lastDequeueSuccessTimestamp);
111+
putIfNotNull(json, KEY_NEXT_DEQUEUE_DUE_TS, nextDequeueDueTimestamp);
112+
putIfNotNull(json, KEY_LAST_UPDATED_TS, lastUpdatedTimestamp);
113+
putIfNotNull(json, KEY_FAILED_REASON, failedReason);
114+
115+
// always include boolean
116+
json.put(KEY_MARK_FOR_REMOVAL, markForRemoval);
117+
118+
return json;
119+
}
120+
121+
/**
122+
* Reconstruct object from JsonObject, safe for Redis / Vert.x storage.
123+
* @param json
124+
* @return a DequeueStatistic, if param is null, will return null
125+
*/
126+
@Nullable
127+
public static DequeueStatistic fromJson(@Nullable JsonObject json) {
128+
if (json == null)
129+
{
130+
return null;
131+
}
132+
Long lastAtt = json.getLong(KEY_LAST_DEQUEUE_ATTEMPT_TS);
133+
Long lastSuc = json.getLong(KEY_LAST_DEQUEUE_SUCCESS_TS);
134+
Long nextDue = json.getLong(KEY_NEXT_DEQUEUE_DUE_TS);
135+
Long lastUpd = json.getLong(KEY_LAST_UPDATED_TS);
136+
String reason = json.getString(KEY_FAILED_REASON);
137+
138+
Boolean flag = json.getBoolean(KEY_MARK_FOR_REMOVAL, false);
139+
140+
return new DequeueStatistic(
141+
lastAtt,
142+
lastSuc,
143+
nextDue,
144+
lastUpd,
145+
reason,
146+
flag
147+
);
148+
}
67149
}

src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.vertx.core.Handler;
66
import io.vertx.core.Promise;
77
import io.vertx.core.Vertx;
8+
import io.vertx.core.json.JsonObject;
89
import io.vertx.core.shareddata.AsyncMap;
910
import io.vertx.core.shareddata.Lock;
1011
import io.vertx.core.shareddata.SharedData;
@@ -49,26 +50,44 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
4950
promise.complete();
5051
};
5152

52-
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
53+
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, JsonObject>>>) asyncResult -> {
5354
if (asyncResult.failed()) {
5455
log.error("Failed to get shared dequeue statistic data map.", asyncResult.cause());
5556
releaseAndCompleteHandler.handle(null);
5657
return;
5758
}
58-
59-
AsyncMap<String, DequeueStatistic> asyncMap = asyncResult.result();
59+
AsyncMap<String, JsonObject> asyncMap = asyncResult.result();
6060
asyncMap.size().onComplete(mapSizeResult -> {
6161
log.debug("shared dequeue statistic map size: {}", mapSizeResult.result());
6262
asyncMap.get(queueName).onComplete(dequeueStatisticAsyncResult -> {
6363
if (dequeueStatisticAsyncResult.failed()) {
6464
log.error("Failed to get shared dequeue statistic data for queue {}.", queueName, dequeueStatisticAsyncResult.cause());
65-
releaseAndCompleteHandler.handle(null);
65+
if (dequeueStatisticAsyncResult.cause().getClass().getName().contains("HazelcastSerializationException")) {
66+
asyncMap.remove(queueName).onComplete(new Handler<AsyncResult<JsonObject>>() {
67+
@Override
68+
public void handle(AsyncResult<JsonObject> event) {
69+
if (event.failed()) {
70+
log.error("failed to clean broken dequeue statistic data.", event.cause());
71+
} else {
72+
log.info("broken dequeue statistic for {} removed", queueName);
73+
}
74+
releaseAndCompleteHandler.handle(null);
75+
}
76+
});
77+
} else {
78+
releaseAndCompleteHandler.handle(null);
79+
}
6680
return;
6781
}
6882

69-
final DequeueStatistic sharedDequeueStatistic = dequeueStatisticAsyncResult.result();
83+
DequeueStatistic sharedDequeueStatistic = null;
84+
85+
// check does it is a JsonObject, if not assume it in not exist.
86+
if (dequeueStatisticAsyncResult.result() instanceof JsonObject) {
87+
sharedDequeueStatistic = DequeueStatistic.fromJson(dequeueStatisticAsyncResult.result());
88+
}
7089
if (sharedDequeueStatistic == null) {
71-
asyncMap.put(queueName, dequeueStatistic).onComplete(voidAsyncResult -> {
90+
asyncMap.put(queueName, dequeueStatistic.asJson()).onComplete(voidAsyncResult -> {
7291
if (voidAsyncResult.failed()) {
7392
log.error("shared dequeue statistic for queue {} failed to add.", queueName, voidAsyncResult.cause());
7493
} else {
@@ -89,7 +108,7 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
89108
});
90109
} else {
91110
// update
92-
asyncMap.put(queueName, dequeueStatistic).onComplete(event -> {
111+
asyncMap.put(queueName, dequeueStatistic.asJson()).onComplete(event -> {
93112
if (event.failed()) {
94113
log.error("shared dequeue statistic for queue {} failed to update.", queueName, event.cause());
95114
} else {
@@ -109,25 +128,42 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
109128
return promise.future();
110129
}
111130

112-
public Future<Map<String, DequeueStatistic>> getAllDequeueStatistics() {
131+
public Future<Map<String, JsonObject>> getAllDequeueStatistics() {
113132
// Check if dequeue statistics are enabled
114133
if (!dequeueStatisticEnabled) {
115134
return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions
116135
}
117-
Promise<Map<String, DequeueStatistic>> promise = Promise.promise();
118-
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
136+
Promise<Map<String, JsonObject>> promise = Promise.promise();
137+
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, JsonObject>>>) asyncResult -> {
119138
if (asyncResult.failed()) {
120139
log.error("Failed to get dequeue statistic data map.", asyncResult.cause());
121140
promise.fail(asyncResult.cause());
122141
return;
123142
}
124-
AsyncMap<String, DequeueStatistic> asyncMap = asyncResult.result();
143+
AsyncMap<String, JsonObject> asyncMap = asyncResult.result();
125144
asyncMap.entries().onSuccess(promise::complete).onFailure(throwable -> {
126145
log.error("Failed to get dequeue statistic map", throwable);
127-
promise.fail(throwable);
146+
cleanAsyncMapIfBroken(asyncMap, throwable).onComplete(e -> promise.fail(throwable));
128147
});
129-
130148
});
131149
return promise.future();
132150
}
151+
152+
Future<Void> cleanAsyncMapIfBroken(AsyncMap<String, JsonObject> asyncMap, Throwable throwable) {
153+
Promise<Void> promise = Promise.promise();
154+
if (throwable.getClass().getName().contains("HazelcastSerializationException")) {
155+
asyncMap.clear().onComplete(event -> {
156+
if (event.failed()) {
157+
log.error("failed to clean dequeue statistic map.", throwable);
158+
} else {
159+
log.info("Cleaned up broken dequeue statistic AsyncMap.");
160+
}
161+
promise.complete();
162+
}
163+
);
164+
} else {
165+
promise.complete();
166+
}
167+
return promise.future();
168+
}
133169
}

0 commit comments

Comments
 (0)