diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index ddd6071776..eeb5c31f8c 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1408,17 +1408,26 @@ public List build(Object data) { String entryIdString = SafeEncoder.encode((byte[]) res.get(0)); StreamEntryID entryID = new StreamEntryID(entryIdString); List hash = (List) res.get(1); - if (hash == null) { - responses.add(new StreamEntry(entryID, null)); - continue; + + Map fieldsMap = null; + + if (hash != null) { + Iterator hashIterator = hash.iterator(); + fieldsMap = new HashMap<>(hash.size() / 2, 1f); + + while (hashIterator.hasNext()) { + fieldsMap.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); + } } - Iterator hashIterator = hash.iterator(); - Map map = new HashMap<>(hash.size() / 2, 1f); - while (hashIterator.hasNext()) { - map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); + if (res.size() >= 4) { + Long millisElapsedFromDelivery = LONG.build(res.get(2)); + Long deliveredCount = LONG.build(res.get(3)); + responses.add(new StreamEntry(entryID, fieldsMap, millisElapsedFromDelivery, deliveredCount)); + continue; } - responses.add(new StreamEntry(entryID, map)); + + responses.add(new StreamEntry(entryID, fieldsMap)); } return responses; @@ -1959,16 +1968,25 @@ public List build(Object data) { String entryIdString = SafeEncoder.encode((byte[]) res.get(0)); StreamEntryID entryID = new StreamEntryID(entryIdString); List hash = (List) res.get(1); - if (hash == null) { - responses.add(new StreamEntryBinary(entryID, null)); - continue; + + Map map = null; + + if (hash != null) { + Iterator hashIterator = hash.iterator(); + map = new JedisByteHashMap(); + + while (hashIterator.hasNext()) { + map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next())); + } } - Iterator hashIterator = hash.iterator(); - Map map = new JedisByteHashMap(); - while (hashIterator.hasNext()) { - map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next())); + if (res.size() >= 4) { + Long millisElapsedFromDelivery = LONG.build(res.get(2)); + Long deliveredCount = LONG.build(res.get(3)); + responses.add(new StreamEntryBinary(entryID, map, millisElapsedFromDelivery, deliveredCount)); + continue; } + responses.add(new StreamEntryBinary(entryID, map)); } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 8f2f2248be..87bde9afdb 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -331,7 +331,7 @@ public static enum Keyword implements Rawable { AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES, RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS, REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST, - STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, + STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM, RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER, SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2, NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK, diff --git a/src/main/java/redis/clients/jedis/params/XReadGroupParams.java b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java index a761f166b2..8bddb1c699 100644 --- a/src/main/java/redis/clients/jedis/params/XReadGroupParams.java +++ b/src/main/java/redis/clients/jedis/params/XReadGroupParams.java @@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams { private Integer count = null; private Integer block = null; private boolean noack = false; + private Long claim = null; public static XReadGroupParams xReadGroupParams() { return new XReadGroupParams(); @@ -30,6 +31,11 @@ public XReadGroupParams noAck() { return this; } + public XReadGroupParams claim(long minIdleMillis) { + this.claim = minIdleMillis; + return this; + } + @Override public void addParams(CommandArguments args) { if (count != null) { @@ -41,6 +47,9 @@ public void addParams(CommandArguments args) { if (noack) { args.add(Keyword.NOACK); } + if (claim != null) { + args.add(Keyword.CLAIM).add(claim); + } } @Override @@ -48,11 +57,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; XReadGroupParams that = (XReadGroupParams) o; - return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block); + return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block) + && Objects.equals(claim, that.claim); } @Override public int hashCode() { - return Objects.hash(count, block, noack); + return Objects.hash(count, block, noack, claim); } } diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntry.java b/src/main/java/redis/clients/jedis/resps/StreamEntry.java index 0d76ad1e12..e91c99159c 100644 --- a/src/main/java/redis/clients/jedis/resps/StreamEntry.java +++ b/src/main/java/redis/clients/jedis/resps/StreamEntry.java @@ -11,12 +11,51 @@ public class StreamEntry implements Serializable { private StreamEntryID id; private Map fields; + private Long millisElapsedFromDelivery; + private Long deliveredCount; public StreamEntry(StreamEntryID id, Map fields) { this.id = id; this.fields = fields; } + public StreamEntry(StreamEntryID id, Map fields, Long millisElapsedFromDelivery, Long deliveredCount) { + this.id = id; + this.fields = fields; + this.millisElapsedFromDelivery = millisElapsedFromDelivery; + this.deliveredCount = deliveredCount; + } + + /** + * @return the milliseconds since the last delivery of this message when CLAIM was used. + *
    + *
  • {@code null} when not applicable
  • + *
  • {@code 0} means not claimed from the pending entries list (PEL)
  • + *
  • {@code > 0} means claimed from the PEL
  • + *
+ * @since 7.1 + */ + public Long getMillisElapsedFromDelivery() { + return millisElapsedFromDelivery; + } + + /** + * @return the number of prior deliveries of this message when CLAIM was used: + *
    + *
  • {@code null} when not applicable
  • + *
  • {@code 0} means not claimed from the pending entries list (PEL)
  • + *
  • {@code > 0} means claimed from the PEL
  • + *
+ * @since 7.1 + */ + public Long getDeliveredCount() { + return deliveredCount; + } + + public boolean isClaimed() { + return this.deliveredCount != null && this.deliveredCount > 0; + } + public StreamEntryID getID() { return id; } diff --git a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java index 24f0b5f5fb..1aed7a9ac5 100644 --- a/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java +++ b/src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java @@ -11,12 +11,51 @@ public class StreamEntryBinary implements Serializable { private StreamEntryID id; private Map fields; + private Long millisElapsedFromDelivery; + private Long deliveredCount; public StreamEntryBinary(StreamEntryID id, Map fields) { this.id = id; this.fields = fields; } + public StreamEntryBinary(StreamEntryID id, Map fields, Long millisElapsedFromDelivery, Long deliveredCount) { + this.id = id; + this.fields = fields; + this.millisElapsedFromDelivery = millisElapsedFromDelivery; + this.deliveredCount = deliveredCount; + } + + /** + * @return the milliseconds since the last delivery of this message when CLAIM was used. + *
    + *
  • {@code null} when not applicable
  • + *
  • {@code 0} means not claimed from the pending entries list (PEL)
  • + *
  • {@code > 0} means claimed from the PEL
  • + *
+ * @since 7.1 + */ + public Long getMillisElapsedFromDelivery() { + return millisElapsedFromDelivery; + } + + /** + * @return the number of prior deliveries of this message when CLAIM was used: + *
    + *
  • {@code null} when not applicable
  • + *
  • {@code 0} means not claimed from the pending entries list (PEL)
  • + *
  • {@code > 0} means claimed from the PEL
  • + *
+ * @since 7.1 + */ + public Long getDeliveredCount() { + return deliveredCount; + } + + public boolean isClaimed() { + return this.deliveredCount != null && this.deliveredCount > 0; + } + public StreamEntryID getID() { return id; } @@ -39,4 +78,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN this.id = (StreamEntryID) in.readUnshared(); this.fields = (Map) in.readUnshared(); } -} \ No newline at end of file +} diff --git a/src/test/java/io/redis/test/utils/RedisVersion.java b/src/test/java/io/redis/test/utils/RedisVersion.java index bbaab9b4c0..1811ed1181 100644 --- a/src/test/java/io/redis/test/utils/RedisVersion.java +++ b/src/test/java/io/redis/test/utils/RedisVersion.java @@ -7,6 +7,7 @@ public class RedisVersion implements Comparable { public static final RedisVersion V7_4 = RedisVersion.of("7.4"); public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0"); public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0"); + public static final String V8_4_0_STRING= "8.4.0"; public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0"); private final int major; @@ -91,4 +92,4 @@ public boolean isGreaterThan(RedisVersion other) { public static int compare(RedisVersion v1, RedisVersion v2) { return v1.compareTo(v2); } -} \ No newline at end of file +} diff --git a/src/test/java/redis/clients/jedis/BuilderTest.java b/src/test/java/redis/clients/jedis/BuilderTest.java index 8076a181a5..01596b8234 100644 --- a/src/test/java/redis/clients/jedis/BuilderTest.java +++ b/src/test/java/redis/clients/jedis/BuilderTest.java @@ -2,11 +2,71 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import redis.clients.jedis.resps.StreamEntry; +import redis.clients.jedis.resps.StreamEntryBinary; +import redis.clients.jedis.util.RedisInputStream; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class BuilderTest { + // Helper methods for building test data from RESP protocol strings + private static Object parseRespResponse(String respResponse) { + InputStream is = new ByteArrayInputStream(respResponse.getBytes()); + return Protocol.read(new RedisInputStream(is)); + } + + @SuppressWarnings("unchecked") + private static ArrayList createStreamEntryData(String id, String fieldKey, + String fieldValue, long millisElapsedFromDelivery, long deliveredCount) { + String respResponse = + "*4\r\n" + // Entry with 4 elements + "$" + id.length() + "\r\n" + id + "\r\n" + // Entry ID + "*2\r\n" + // 2 field-value pairs + "$" + fieldKey.length() + "\r\n" + fieldKey + "\r\n" + // Field key + "$" + fieldValue.length() + "\r\n" + fieldValue + "\r\n" + // Field value + ":" + millisElapsedFromDelivery + "\r\n" + // millisElapsedFromDelivery + ":" + deliveredCount + "\r\n"; // deliveredCount + + return (ArrayList) parseRespResponse(respResponse); + } + + @SuppressWarnings("unchecked") + private static ArrayList createStreamEntryBinaryData(String id, String fieldKey, + byte[] fieldValue, long millisElapsedFromDelivery, long deliveredCount) { + // For binary data, we need to construct the RESP response with the actual byte length + String respResponse = + "*4\r\n" + // Entry with 4 elements + "$" + id.length() + "\r\n" + id + "\r\n" + // Entry ID + "*2\r\n" + // 2 field-value pairs + "$" + fieldKey.length() + "\r\n" + fieldKey + "\r\n" + // Field key + "$" + fieldValue.length + "\r\n"; // Field value length + + // Manually construct the byte array with binary field value + byte[] respBytes = respResponse.getBytes(); + byte[] crLf = "\r\n".getBytes(); + byte[] metadataBytes = (":" + millisElapsedFromDelivery + "\r\n" + ":" + deliveredCount + + "\r\n").getBytes(); + + byte[] fullResponse = new byte[respBytes.length + fieldValue.length + crLf.length + + metadataBytes.length]; + System.arraycopy(respBytes, 0, fullResponse, 0, respBytes.length); + System.arraycopy(fieldValue, 0, fullResponse, respBytes.length, fieldValue.length); + System.arraycopy(crLf, 0, fullResponse, respBytes.length + fieldValue.length, crLf.length); + System.arraycopy(metadataBytes, 0, fullResponse, + respBytes.length + fieldValue.length + crLf.length, metadataBytes.length); + + InputStream is = new ByteArrayInputStream(fullResponse); + return (ArrayList) Protocol.read(new RedisInputStream(is)); + } + @Test public void buildDouble() { Double build = BuilderFactory.DOUBLE.build("1.0".getBytes()); @@ -25,4 +85,93 @@ public void buildDouble() { assertEquals("empty String", expected.getMessage()); } } + + @Test + public void buildStreamEntryListWithClaimedEntryMetadata() { + // Simulate Redis response for a single claimed entry with metadata + // Format: [[id, [field, value], msSinceLastDelivery, redeliveryCount]] + List data = new ArrayList<>(); + data.add(createStreamEntryData("1234-12", "key", "value", 5000L, 2L)); + + List result = BuilderFactory.STREAM_ENTRY_LIST.build(data); + + assertNotNull(result); + assertEquals(1, result.size()); + + StreamEntry streamEntry = result.get(0); + assertEquals("1234-12", streamEntry.getID().toString()); + assertEquals("value", streamEntry.getFields().get("key")); + assertEquals(Long.valueOf(5000), streamEntry.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(2), streamEntry.getDeliveredCount()); + } + + @Test + public void buildStreamEntryListWithFreshEntryZeroRedeliveries() { + // Simulate Redis response for a fresh entry (not claimed from PEL) + // Format: [[id, [field, value], 0, 0]] + List data = new ArrayList<>(); + data.add(createStreamEntryData("1234-12", "key", "value", 1000L, 0L)); + + List result = BuilderFactory.STREAM_ENTRY_LIST.build(data); + + assertNotNull(result); + assertEquals(1, result.size()); + + StreamEntry streamEntry = result.get(0); + assertEquals("1234-12", streamEntry.getID().toString()); + assertEquals(Long.valueOf(1000), streamEntry.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(0), streamEntry.getDeliveredCount()); + } + + @Test + public void buildStreamEntryListWithMixedBatchClaimedFirstThenFresh() { + // Simulate Redis response with mixed entries: claimed entries first, then fresh entries + List data = new ArrayList<>(); + + // Entry #1 (claimed, redeliveryCount=2) + data.add(createStreamEntryData("1-0", "f1", "v1", 1500L, 2L)); + + // Entry #2 (claimed, redeliveryCount=1) + data.add(createStreamEntryData("2-0", "f2", "v2", 1200L, 1L)); + + // Entry #3 (fresh, redeliveryCount=0) + data.add(createStreamEntryData("3-0", "f3", "v3", 10L, 0L)); + + List result = BuilderFactory.STREAM_ENTRY_LIST.build(data); + + assertNotNull(result); + assertEquals(3, result.size()); + + StreamEntry m1 = result.get(0); + StreamEntry m2 = result.get(1); + StreamEntry m3 = result.get(2); + + // Verify claimed entries + assertTrue(m1.getDeliveredCount() > 0); + assertTrue(m2.getDeliveredCount() > 0); + assertEquals(Long.valueOf(2), m1.getDeliveredCount()); + assertEquals(Long.valueOf(1), m2.getDeliveredCount()); + + // Verify fresh entry + assertEquals(Long.valueOf(0), m3.getDeliveredCount()); + } + + @Test + public void buildStreamEntryBinaryListWithClaimedEntryMetadata() { + // Test binary version with claimed entry metadata + List data = new ArrayList<>(); + data.add( + createStreamEntryBinaryData("1234-12", "key", new byte[] { 0x00, 0x01, 0x02 }, 5000L, 2L)); + + List result = BuilderFactory.STREAM_ENTRY_BINARY_LIST.build(data); + + assertNotNull(result); + assertEquals(1, result.size()); + + StreamEntryBinary streamEntry = result.get(0); + assertEquals("1234-12", streamEntry.getID().toString()); + assertEquals(Long.valueOf(5000), streamEntry.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(2), streamEntry.getDeliveredCount()); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java index 85b1a11a44..e045b3a814 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_0_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -103,19 +105,21 @@ public static Map offsets(Object... streamOffsets) { @BeforeEach public void setUpTestStream() { + setUpTestStream(StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes()); + } + + private void setUpTestStream(byte[] startId) { jedis.del(STREAM_KEY_1); jedis.del(STREAM_KEY_2); try { - jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, - StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; } } try { - jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, - StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; @@ -527,4 +531,67 @@ public void testXtrimWithAcknowledged() { assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length } + // ========== XREADGROUP CLAIM Tests ========== + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimReturnsMetadataOrdered() throws InterruptedException { + setUpTestStream("0-0".getBytes()); + + final byte[] CONSUMER_1 = "consumer-1".getBytes(); + final byte[] CONSUMER_2 = "consumer-2".getBytes(); + final long IDLE_TIME_MS = 5; + + // Produce two entries + Map hash = singletonMap(FIELD_KEY_1, BINARY_VALUE_1); + jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY), hash); + jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY), hash); + Map streams = offsets(STREAM_KEY_1, XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_1, XReadGroupParams.xReadGroupParams().count(10), + streams); + + // Ensure idle time so entries are claimable + Thread.sleep(IDLE_TIME_MS); + + // Produce fresh entries that are NOT claimed (not pending) + jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY), hash); + jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY), hash); + + // Read with consumer-2 using CLAIM + List>> consumer2Result = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_2, XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).count(10), + streams); + + assertNotNull(consumer2Result); + assertEquals(1, consumer2Result.size()); + + List entries = consumer2Result.get(0).getValue(); + assertEquals(4, entries.size()); + + long claimedCount = entries.stream().filter(StreamEntryBinary::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // Assert order: pending entries are first + StreamEntryBinary first = entries.get(0); + StreamEntryBinary second = entries.get(1); + StreamEntryBinary third = entries.get(2); + StreamEntryBinary fourth = entries.get(3); + + // Claimed entries + assertTrue(first.isClaimed()); + assertTrue(second.isClaimed()); + assertTrue(first.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertTrue(second.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + + // Fresh entries + assertFalse(third.isClaimed()); + assertFalse(fourth.isClaimed()); + assertEquals(Long.valueOf(0), third.getDeliveredCount()); + assertEquals(Long.valueOf(0), fourth.getDeliveredCount()); + assertEquals(Long.valueOf(0), third.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(0), fourth.getMillisElapsedFromDelivery()); + } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 691b7fa427..b73443215f 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -1,5 +1,6 @@ package redis.clients.jedis.commands.jedis; +import static io.redis.test.utils.RedisVersion.V8_4_0_STRING; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; @@ -880,6 +881,8 @@ public void xreadGroupAsMap() { assertEquals(1, list.size()); assertEquals(id1, list.get(0).getID()); assertEquals(map, list.get(0).getFields()); + assertNull(list.get(0).getMillisElapsedFromDelivery()); + assertNull(list.get(0).getDeliveredCount()); } @Test @@ -904,6 +907,8 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { assertEquals(firstMessageEntryId, range.get(0).getValue().get(0).getID()); assertEquals(map1, range.get(0).getValue().get(0).getFields()); + assertNull(range.get(0).getValue().get(0).getMillisElapsedFromDelivery()); + assertNull(range.get(0).getValue().get(0).getDeliveredCount()); // Add third message, the fields of pending message1 will be discarded by redis-server Map map3 = new HashMap<>(); @@ -919,6 +924,8 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { assertEquals(firstMessageEntryId, pendingMessages.get(0).getValue().get(0).getID()); assertNull(pendingMessages.get(0).getValue().get(0).getFields()); + assertNull(pendingMessages.get(0).getValue().get(0).getDeliveredCount()); + assertNull(pendingMessages.get(0).getValue().get(0).getMillisElapsedFromDelivery()); } @Test @@ -1426,4 +1433,160 @@ public void transaction() { assertEquals(id2.get(), entries.get(1).getID()); assertEquals(map, entries.get(1).getFields()); } + + // ========== XREADGROUP CLAIM Tests ========== + + private static final String STREAM_KEY = "test-stream-claim"; + private static final String GROUP_NAME = "test-group"; + private static final String CONSUMER_1 = "consumer-1"; + private static final String CONSUMER_2 = "consumer-2"; + private static final long IDLE_TIME_MS = 5; + private static final Map HASH = singletonMap("field", "value"); + + private Map beforeEachClaimTest() throws InterruptedException { + jedis.del(STREAM_KEY); + + // Produce two entries + Map hash = singletonMap("field", "value"); + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, hash); + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, hash); + + // Create group and consume with consumer-1 + try { + jedis.xgroupCreate(STREAM_KEY, GROUP_NAME, new StreamEntryID("0-0"), false); + } catch (JedisDataException e) { + if (!e.getMessage().contains("BUSYGROUP")) { + throw e; + } + } + Map streams = singletonMap(STREAM_KEY, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_1, XReadGroupParams.xReadGroupParams().count(10), + streams); + + // Ensure idle time + Thread.sleep(IDLE_TIME_MS); + return streams; + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimReturnsMetadataOrdered() throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Produce fresh entries + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, HASH); + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, HASH); + + // Read with consumer-2 using CLAIM + List>> consumer2Result = jedis.xreadGroup(GROUP_NAME, + CONSUMER_2, XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS - 1).count(10), streams); + + assertNotNull(consumer2Result); + assertEquals(1, consumer2Result.size()); + + List entries = consumer2Result.get(0).getValue(); + assertEquals(4, entries.size()); + + long claimedCount = entries.stream().filter(StreamEntry::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // Assert order: pending entries are first + StreamEntry first = entries.get(0); + StreamEntry second = entries.get(1); + StreamEntry third = entries.get(2); + StreamEntry fourth = entries.get(3); + + // Claimed entries + assertTrue(first.isClaimed()); + assertTrue(second.isClaimed()); + assertTrue(first.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertTrue(second.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + + // Fresh entries + assertFalse(third.isClaimed()); + assertFalse(fourth.isClaimed()); + assertEquals(Long.valueOf(0), third.getDeliveredCount()); + assertEquals(Long.valueOf(0), fourth.getDeliveredCount()); + assertEquals(Long.valueOf(0), third.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(0), fourth.getMillisElapsedFromDelivery()); + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimMovesPendingFromC1ToC2AndRemainsPendingUntilAck() + throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Verify pending belongs to consumer-1 + StreamPendingSummary before = jedis.xpending(STREAM_KEY, GROUP_NAME); + assertEquals(2L, before.getTotal()); + assertEquals(2L, before.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + + // Claim with consumer-2 + List>> res = jedis.xreadGroup(GROUP_NAME, CONSUMER_2, + XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).count(10), streams); + + assertNotNull(res); + assertEquals(1, res.size()); + + List entries = res.get(0).getValue(); + long claimed = entries.stream().filter(StreamEntry::isClaimed).count(); + assertEquals(2, claimed); + + // After claim: entries are pending for consumer-2 + StreamPendingSummary afterClaim = jedis.xpending(STREAM_KEY, GROUP_NAME); + assertEquals(2L, afterClaim.getTotal()); + assertEquals(0L, afterClaim.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(2L, afterClaim.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + + // XACK the claimed entries + long acked = jedis.xack(STREAM_KEY, GROUP_NAME, entries.get(0).getID(), entries.get(1).getID()); + assertEquals(2, acked); + + StreamPendingSummary afterAck = jedis.xpending(STREAM_KEY, GROUP_NAME); + assertEquals(0L, afterAck.getTotal()); + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimWithNoackDoesNotCreatePendingAndRemovesClaimedFromPel() + throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Verify pending belongs to consumer-1 + StreamPendingSummary before = jedis.xpending(STREAM_KEY, GROUP_NAME); + assertEquals(2L, before.getTotal()); + assertEquals(2L, before.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(0L, before.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + + // Produce fresh entries + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, HASH); + jedis.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, HASH); + + // Claim with NOACK using consumer-2 + List>> res = jedis.xreadGroup(GROUP_NAME, CONSUMER_2, + XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).noAck().count(10), streams); + + assertNotNull(res); + assertEquals(1, res.size()); + + List entries = res.get(0).getValue(); + long claimedCount = entries.stream().filter(StreamEntry::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // After NOACK read, previously pending entries remain pending + StreamPendingSummary afterNoack = jedis.xpending(STREAM_KEY, GROUP_NAME); + assertEquals(2L, afterNoack.getTotal()); + + // Claimed entries are now owned by consumer-2 + assertEquals(0L, afterNoack.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(2L, afterNoack.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + } } diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java index a1ad0fd23d..bf6b873665 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java @@ -14,21 +14,25 @@ import redis.clients.jedis.params.XTrimParams; import redis.clients.jedis.resps.StreamEntryBinary; import redis.clients.jedis.resps.StreamEntryDeletionResult; +import redis.clients.jedis.util.AssertUtil; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_0_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static redis.clients.jedis.StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY; +import static redis.clients.jedis.util.ByteArrayMapMatcher.equalToByteArrayMap; import static redis.clients.jedis.util.StreamEntryBinaryListMatcher.equalsStreamEntries; @Tag("integration") @@ -94,19 +98,21 @@ public static Map offsets(Object... streamOffsets) { @BeforeEach public void setUpTestStream() { + setUpTestStream(StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes()); + } + + private void setUpTestStream(byte[] startId) { jedis.del(STREAM_KEY_1); jedis.del(STREAM_KEY_2); try { - jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, - StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; } } try { - jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, - StreamEntryID.XGROUP_LAST_ENTRY.toString().getBytes(), true); + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; @@ -542,4 +548,69 @@ public void testXtrimWithAcknowledged() { assertTrue(jedis.xlen(STREAM_KEY_1) <= 5); // Should not exceed original length } + // ========== XREADGROUP CLAIM Tests ========== + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimReturnsMetadataOrdered() throws InterruptedException { + setUpTestStream("0-0".getBytes()); + + final byte[] CONSUMER_1 = "consumer-1".getBytes(); + final byte[] CONSUMER_2 = "consumer-2".getBytes(); + final long IDLE_TIME_MS = 5; + + // Produce two entries + jedis.xadd(STREAM_KEY_1, new XAddParams().id(StreamEntryID.NEW_ENTRY), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id(StreamEntryID.NEW_ENTRY), HASH_1); + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroupBinary(GROUP_NAME, CONSUMER_1, XReadGroupParams.xReadGroupParams().count(10), + streams); + + // Ensure idle time so entries are claimable + Thread.sleep(IDLE_TIME_MS); + + // Produce fresh entries that are NOT claimed (not pending) + jedis.xadd(STREAM_KEY_1, new XAddParams().id(StreamEntryID.NEW_ENTRY), HASH_1); + jedis.xadd(STREAM_KEY_1, new XAddParams().id(StreamEntryID.NEW_ENTRY), HASH_1); + + // Read with consumer-2 using CLAIM + List>> consumer2Result = jedis.xreadGroupBinary( + GROUP_NAME, CONSUMER_2, XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).count(10), + streams); + + assertNotNull(consumer2Result); + assertEquals(1, consumer2Result.size()); + + List entries = consumer2Result.get(0).getValue(); + assertEquals(4, entries.size()); + + long claimedCount = entries.stream().filter(StreamEntryBinary::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // Assert order: pending entries are first + StreamEntryBinary first = entries.get(0); + StreamEntryBinary second = entries.get(1); + StreamEntryBinary third = entries.get(2); + StreamEntryBinary fourth = entries.get(3); + + // Claimed entries + assertTrue(first.isClaimed()); + assertTrue(second.isClaimed()); + assertTrue(first.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertTrue(second.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertThat(first.getFields(), equalToByteArrayMap(HASH_1)); + + // Fresh entries + assertFalse(third.isClaimed()); + assertFalse(fourth.isClaimed()); + assertEquals(Long.valueOf(0), third.getDeliveredCount()); + assertEquals(Long.valueOf(0), fourth.getDeliveredCount()); + assertEquals(Long.valueOf(0), third.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(0), fourth.getMillisElapsedFromDelivery()); + assertThat(fourth.getFields(), equalToByteArrayMap(HASH_1)); + } } diff --git a/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java index ecbd23020b..e4c82916a7 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java +++ b/src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java @@ -20,10 +20,12 @@ import java.util.List; import java.util.Map; +import static io.redis.test.utils.RedisVersion.V8_4_0_STRING; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -68,17 +70,21 @@ public void setUp() { } private void setUpTestStream() { + setUpTestStream(StreamEntryID.XGROUP_LAST_ENTRY); + } + + private void setUpTestStream(StreamEntryID startId) { jedis.del(STREAM_KEY_1); jedis.del(STREAM_KEY_2); try { - jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true); + jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; } } try { - jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true); + jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, startId, true); } catch (JedisDataException e) { if (!e.getMessage().contains("BUSYGROUP")) { throw e; @@ -815,4 +821,151 @@ public void xdelexNotAcknowledged() { assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, result.get(0)); } + + // ========== XREADGROUP CLAIM Tests ========== + + private static final String CONSUMER_1 = "consumer-1"; + private static final String CONSUMER_2 = "consumer-2"; + private static final long IDLE_TIME_MS = 5; + + Map beforeEachClaimTest() throws InterruptedException { + setUpTestStream(new StreamEntryID("0-0")); + + // Produce two entries + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + Map streams = singletonMap(STREAM_KEY_1, + StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + jedis.xreadGroup(GROUP_NAME, CONSUMER_1, XReadGroupParams.xReadGroupParams().count(10), + streams); + + // Ensure idle time so entries are claimable + Thread.sleep(IDLE_TIME_MS); + return streams; + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimReturnsMetadataOrdered() throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Produce fresh entries that are NOT claimed (not pending) + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + + // Read with consumer-2 using CLAIM + List>> consumer2Result = jedis.xreadGroup(GROUP_NAME, + CONSUMER_2, XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).count(10), streams); + + assertNotNull(consumer2Result); + assertEquals(1, consumer2Result.size()); + + List entries = consumer2Result.get(0).getValue(); + assertEquals(4, entries.size()); + + long claimedCount = entries.stream().filter(StreamEntry::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // Assert order: pending entries are first + StreamEntry first = entries.get(0); + StreamEntry second = entries.get(1); + StreamEntry third = entries.get(2); + StreamEntry fourth = entries.get(3); + + // Claimed entries + assertTrue(first.isClaimed()); + assertTrue(second.isClaimed()); + assertTrue(first.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertTrue(second.getMillisElapsedFromDelivery() >= IDLE_TIME_MS); + assertEquals(HASH_1, first.getFields()); + + // Fresh entries + assertFalse(third.isClaimed()); + assertFalse(fourth.isClaimed()); + assertEquals(Long.valueOf(0), third.getDeliveredCount()); + assertEquals(Long.valueOf(0), fourth.getDeliveredCount()); + assertEquals(Long.valueOf(0), third.getMillisElapsedFromDelivery()); + assertEquals(Long.valueOf(0), fourth.getMillisElapsedFromDelivery()); + assertEquals(HASH_1, fourth.getFields()); + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimMovesPendingFromC1ToC2AndRemainsPendingUntilAck() + throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Verify pending belongs to consumer-1 + StreamPendingSummary before = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(2L, before.getTotal()); + assertEquals(2L, before.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + + // Claim with consumer-2 + List>> res = jedis.xreadGroup(GROUP_NAME, CONSUMER_2, + XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).count(10), streams); + + assertNotNull(res); + assertEquals(1, res.size()); + + List entries = res.get(0).getValue(); + long claimed = entries.stream().filter(StreamEntry::isClaimed).count(); + assertEquals(2, claimed); + + // After claim: entries are pending for consumer-2 (moved), not acked yet + StreamPendingSummary afterClaim = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(2L, afterClaim.getTotal()); + assertEquals(0L, afterClaim.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(2L, afterClaim.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + + // XACK the claimed entries -> PEL should become empty + long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, entries.get(0).getID(), + entries.get(1).getID()); + assertEquals(2, acked); + + StreamPendingSummary afterAck = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(0L, afterAck.getTotal()); + } + + @Test + @SinceRedisVersion(V8_4_0_STRING) + public void xreadgroupClaimWithNoackDoesNotCreatePendingAndRemovesClaimedFromPel() + throws InterruptedException { + Map streams = beforeEachClaimTest(); + + // Verify pending belongs to consumer-1 + StreamPendingSummary before = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(2L, before.getTotal()); + assertEquals(2L, before.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(0L, before.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + + // Also produce fresh entries that should not be added to PEL when NOACK is set + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1); + + // Claim with NOACK using consumer-2 + List>> res = jedis.xreadGroup(GROUP_NAME, CONSUMER_2, + XReadGroupParams.xReadGroupParams().claim(IDLE_TIME_MS).noAck().count(10), streams); + + assertNotNull(res); + assertEquals(1, res.size()); + + List entries = res.get(0).getValue(); + long claimedCount = entries.stream().filter(StreamEntry::isClaimed).count(); + long freshCount = entries.size() - claimedCount; + + assertEquals(2, claimedCount); + assertEquals(2, freshCount); + + // After NOACK read, previously pending entries remain pending (NOACK does not remove them) + StreamPendingSummary afterNoack = jedis.xpending(STREAM_KEY_1, GROUP_NAME); + assertEquals(2L, afterNoack.getTotal()); + + // Claimed entries remain pending and are now owned by consumer-2 (CLAIM reassigns ownership). + // Fresh entries were not added to PEL. + assertEquals(0L, afterNoack.getConsumerMessageCount().getOrDefault(CONSUMER_1, 0L).longValue()); + assertEquals(2L, afterNoack.getConsumerMessageCount().getOrDefault(CONSUMER_2, 0L).longValue()); + } }