Skip to content

Commit cf1939c

Browse files
committed
Add support for CLAIM arg in XREADGROUP
1 parent 0bcac5c commit cf1939c

File tree

14 files changed

+993
-10
lines changed

14 files changed

+993
-10
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,7 +1377,13 @@ public StreamEntry build(Object data) {
13771377
while (hashIterator.hasNext()) {
13781378
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
13791379
}
1380-
return new StreamEntry(entryID, map);
1380+
Long idle = null;
1381+
Long times = null;
1382+
if (objectList.size() >= 4) {
1383+
idle = LONG.build(objectList.get(2));
1384+
times = LONG.build(objectList.get(3));
1385+
}
1386+
return new StreamEntry(entryID, map, idle, times);
13811387
}
13821388

13831389
@Override
@@ -1418,7 +1424,19 @@ public List<StreamEntry> build(Object data) {
14181424
while (hashIterator.hasNext()) {
14191425
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
14201426
}
1421-
responses.add(new StreamEntry(entryID, map));
1427+
Long idle = null;
1428+
Long times = null;
1429+
if (res.size() >= 4) {
1430+
Object idleObj = res.get(2);
1431+
Object timesObj = res.get(3);
1432+
idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj));
1433+
times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj));
1434+
if (idle != null && times != null && idle == 0L && times == 0L) {
1435+
idle = null;
1436+
times = null;
1437+
}
1438+
}
1439+
responses.add(new StreamEntry(entryID, map, idle, times));
14221440
}
14231441

14241442
return responses;
@@ -1969,7 +1987,19 @@ public List<StreamEntryBinary> build(Object data) {
19691987
while (hashIterator.hasNext()) {
19701988
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
19711989
}
1972-
responses.add(new StreamEntryBinary(entryID, map));
1990+
Long idle = null;
1991+
Long times = null;
1992+
if (res.size() >= 4) {
1993+
Object idleObj = res.get(2);
1994+
Object timesObj = res.get(3);
1995+
idle = (idleObj instanceof Long) ? (Long) idleObj : Long.valueOf(STRING.build(idleObj));
1996+
times = (timesObj instanceof Long) ? (Long) timesObj : Long.valueOf(STRING.build(timesObj));
1997+
if (idle != null && times != null && idle == 0L && times == 0L) {
1998+
idle = null;
1999+
times = null;
2000+
}
2001+
}
2002+
responses.add(new StreamEntryBinary(entryID, map, idle, times));
19732003
}
19742004

19752005
return responses;

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public static enum Keyword implements Rawable {
331331
AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES,
332332
RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS,
333333
REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST,
334-
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
334+
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM,
335335
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
336336
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
337337
NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK,

src/main/java/redis/clients/jedis/params/XReadGroupParams.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams {
1010
private Integer count = null;
1111
private Integer block = null;
1212
private boolean noack = false;
13+
private Long claim = null;
1314

1415
public static XReadGroupParams xReadGroupParams() {
1516
return new XReadGroupParams();
@@ -30,6 +31,12 @@ public XReadGroupParams noAck() {
3031
return this;
3132
}
3233

34+
public XReadGroupParams claim(long minIdleMillis) {
35+
this.claim = minIdleMillis;
36+
return this;
37+
}
38+
39+
3340
@Override
3441
public void addParams(CommandArguments args) {
3542
if (count != null) {
@@ -41,18 +48,22 @@ public void addParams(CommandArguments args) {
4148
if (noack) {
4249
args.add(Keyword.NOACK);
4350
}
51+
if (claim != null) {
52+
args.add(Keyword.CLAIM).add(claim);
53+
}
4454
}
4555

4656
@Override
4757
public boolean equals(Object o) {
4858
if (this == o) return true;
4959
if (o == null || getClass() != o.getClass()) return false;
5060
XReadGroupParams that = (XReadGroupParams) o;
51-
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block);
61+
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block)
62+
&& Objects.equals(claim, that.claim);
5263
}
5364

5465
@Override
5566
public int hashCode() {
56-
return Objects.hash(count, block, noack);
67+
return Objects.hash(count, block, noack, claim);
5768
}
5869
}

src/main/java/redis/clients/jedis/resps/StreamEntry.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,28 @@ public class StreamEntry implements Serializable {
1212
private StreamEntryID id;
1313
private Map<String, String> fields;
1414

15+
private Long idleTime; // milliseconds since last delivery (claimed entries)
16+
private Long deliveredTimes; // delivery count (claimed entries)
17+
1518
public StreamEntry(StreamEntryID id, Map<String, String> fields) {
1619
this.id = id;
1720
this.fields = fields;
1821
}
22+
public StreamEntry(StreamEntryID id, Map<String, String> fields, Long idleTime, Long deliveredTimes) {
23+
this.id = id;
24+
this.fields = fields;
25+
this.idleTime = idleTime;
26+
this.deliveredTimes = deliveredTimes;
27+
}
28+
29+
public Long getIdleTime() {
30+
return idleTime;
31+
}
32+
33+
public Long getDeliveredTimes() {
34+
return deliveredTimes;
35+
}
36+
1937

2038
public StreamEntryID getID() {
2139
return id;

src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,28 @@ public class StreamEntryBinary implements Serializable {
1212
private StreamEntryID id;
1313
private Map<byte[], byte[]> fields;
1414

15+
private Long idleTime; // milliseconds since last delivery (claimed entries)
16+
private Long deliveredTimes; // delivery count (claimed entries)
17+
1518
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields) {
1619
this.id = id;
1720
this.fields = fields;
1821
}
22+
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields, Long idleTime, Long deliveredTimes) {
23+
this.id = id;
24+
this.fields = fields;
25+
this.idleTime = idleTime;
26+
this.deliveredTimes = deliveredTimes;
27+
}
28+
29+
public Long getIdleTime() {
30+
return idleTime;
31+
}
32+
33+
public Long getDeliveredTimes() {
34+
return deliveredTimes;
35+
}
36+
1937

2038
public StreamEntryID getID() {
2139
return id;
@@ -39,4 +57,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
3957
this.id = (StreamEntryID) in.readUnshared();
4058
this.fields = (Map<byte[], byte[]>) in.readUnshared();
4159
}
42-
}
60+
}

src/test/java/io/redis/test/utils/RedisVersion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ public class RedisVersion implements Comparable<RedisVersion> {
77
public static final RedisVersion V7_4 = RedisVersion.of("7.4");
88
public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0");
99
public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0");
10+
public static final String V8_4_RC1_STRING = "8.3.224";
11+
public static final RedisVersion V8_4_RC1 = RedisVersion.of(V8_4_RC1_STRING);
1012
public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0");
1113

1214
private final int major;
@@ -91,4 +93,4 @@ public boolean isGreaterThan(RedisVersion other) {
9193
public static int compare(RedisVersion v1, RedisVersion v2) {
9294
return v1.compareTo(v2);
9395
}
94-
}
96+
}

src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package redis.clients.jedis.commands.commandobjects;
22

3+
4+
import io.redis.test.annotations.SinceRedisVersion;
5+
6+
import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING;
37
import static org.hamcrest.MatcherAssert.assertThat;
48
import static org.hamcrest.Matchers.empty;
59
import static org.hamcrest.Matchers.equalTo;
@@ -1102,4 +1106,121 @@ public void testXReadGroupAsMap() {
11021106
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getID(), equalTo(secondMessageId));
11031107
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getFields(), equalTo(messageBody));
11041108
}
1109+
1110+
1111+
@Test
1112+
@SinceRedisVersion(V8_4_RC1_STRING)
1113+
public void xreadGroupWithClaimReturnsPendingThenNewEntries_commandObjects() throws InterruptedException {
1114+
String key = "co-claim-stream";
1115+
String group = "co-claim-group";
1116+
String consumer = "c1";
1117+
1118+
exec(commandObjects.del(key));
1119+
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1120+
1121+
// Make 3 entries pending
1122+
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
1123+
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
1124+
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));
1125+
1126+
Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1127+
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));
1128+
1129+
Thread.sleep(60);
1130+
1131+
// Add two fresh entries
1132+
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));
1133+
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), Collections.singletonMap("f", "v")));
1134+
1135+
List<Map.Entry<String, List<StreamEntry>>> messages = exec(
1136+
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50), stream));
1137+
1138+
assertThat(messages, hasSize(1));
1139+
List<StreamEntry> entries = messages.get(0).getValue();
1140+
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());
1141+
1142+
for (int i = 0; i < 3; i++) {
1143+
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
1144+
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
1145+
}
1146+
for (int i = 3; i < 5; i++) {
1147+
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
1148+
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
1149+
}
1150+
}
1151+
1152+
@Test
1153+
@SinceRedisVersion(V8_4_RC1_STRING)
1154+
public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries_commandObjects() {
1155+
String key = "co-claim-noeligible";
1156+
String group = "co-claim-group";
1157+
String consumer = "c1";
1158+
1159+
exec(commandObjects.del(key));
1160+
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1161+
1162+
// Put 2 entries in PEL
1163+
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
1164+
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
1165+
Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1166+
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(2), stream));
1167+
1168+
// Add two new entries that should be returned
1169+
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));
1170+
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));
1171+
1172+
List<Map.Entry<String, List<StreamEntry>>> messages = exec(
1173+
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(4).claim(500), stream));
1174+
1175+
assertThat(messages, hasSize(1));
1176+
List<StreamEntry> entries = messages.get(0).getValue();
1177+
org.junit.jupiter.api.Assertions.assertEquals(2, entries.size());
1178+
for (StreamEntry e : entries) {
1179+
org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime());
1180+
org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes());
1181+
}
1182+
}
1183+
1184+
@Test
1185+
@SinceRedisVersion(V8_4_RC1_STRING)
1186+
public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL_commandObjects() throws InterruptedException {
1187+
String key = "co-claim-noack";
1188+
String group = "co-claim-group";
1189+
String consumer = "c1";
1190+
1191+
exec(commandObjects.del(key));
1192+
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1193+
1194+
// Make 3 entries pending
1195+
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), java.util.Collections.singletonMap("f", "v")));
1196+
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), java.util.Collections.singletonMap("f", "v")));
1197+
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), java.util.Collections.singletonMap("f", "v")));
1198+
java.util.Map<String, StreamEntryID> stream = java.util.Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1199+
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));
1200+
1201+
// Wait then add fresh entries
1202+
Thread.sleep(60);
1203+
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), java.util.Collections.singletonMap("f", "v")));
1204+
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), java.util.Collections.singletonMap("f", "v")));
1205+
1206+
// Read with CLAIM and NOACK
1207+
java.util.List<java.util.Map.Entry<String, java.util.List<StreamEntry>>> messages = exec(
1208+
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50).noAck(), stream));
1209+
1210+
assertThat(messages, hasSize(1));
1211+
java.util.List<StreamEntry> entries = messages.get(0).getValue();
1212+
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());
1213+
for (int i = 0; i < 3; i++) {
1214+
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
1215+
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
1216+
}
1217+
for (int i = 3; i < 5; i++) {
1218+
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
1219+
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
1220+
}
1221+
1222+
Long acked = exec(commandObjects.xack(key, group, new StreamEntryID("4-0"), new StreamEntryID("5-0")));
1223+
org.junit.jupiter.api.Assertions.assertEquals(0L, acked);
1224+
}
1225+
11051226
}

0 commit comments

Comments
 (0)