Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1408,17 +1408,26 @@ public List<StreamEntry> build(Object data) {
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
StreamEntryID entryID = new StreamEntryID(entryIdString);
List<byte[]> hash = (List<byte[]>) res.get(1);
if (hash == null) {
responses.add(new StreamEntry(entryID, null));
continue;

Map<String, String> fieldsMap = null;

if (hash != null) {
Iterator<byte[]> hashIterator = hash.iterator();
fieldsMap = new HashMap<>(hash.size() / 2, 1f);

while (hashIterator.hasNext()) {
fieldsMap.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
}
}

Iterator<byte[]> hashIterator = hash.iterator();
Map<String, String> map = new HashMap<>(hash.size() / 2, 1f);
while (hashIterator.hasNext()) {
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
if (res.size() >= 4) {
Long millisElapsedFromDelivery = LONG.build(res.get(2));
Long deliveredCount = LONG.build(res.get(3));
responses.add(new StreamEntry(entryID, fieldsMap, millisElapsedFromDelivery, deliveredCount));
continue;
}
responses.add(new StreamEntry(entryID, map));

responses.add(new StreamEntry(entryID, fieldsMap));
}

return responses;
Expand Down Expand Up @@ -1959,16 +1968,25 @@ public List<StreamEntryBinary> build(Object data) {
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
StreamEntryID entryID = new StreamEntryID(entryIdString);
List<byte[]> hash = (List<byte[]>) res.get(1);
if (hash == null) {
responses.add(new StreamEntryBinary(entryID, null));
continue;

Map<byte[], byte[]> map = null;

if (hash != null) {
Iterator<byte[]> hashIterator = hash.iterator();
map = new JedisByteHashMap();

while (hashIterator.hasNext()) {
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
}
}

Iterator<byte[]> hashIterator = hash.iterator();
Map<byte[], byte[]> map = new JedisByteHashMap();
while (hashIterator.hasNext()) {
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
if (res.size() >= 4) {
Long millisElapsedFromDelivery = LONG.build(res.get(2));
Long deliveredCount = LONG.build(res.get(3));
responses.add(new StreamEntryBinary(entryID, map, millisElapsedFromDelivery, deliveredCount));
continue;
}

responses.add(new StreamEntryBinary(entryID, map));
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static enum Keyword implements Rawable {
AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES,
RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS,
REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST,
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM,
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK,
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/redis/clients/jedis/params/XReadGroupParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams {
private Integer count = null;
private Integer block = null;
private boolean noack = false;
private Long claim = null;

public static XReadGroupParams xReadGroupParams() {
return new XReadGroupParams();
Expand All @@ -30,6 +31,11 @@ public XReadGroupParams noAck() {
return this;
}

public XReadGroupParams claim(long minIdleMillis) {
this.claim = minIdleMillis;
return this;
}

@Override
public void addParams(CommandArguments args) {
if (count != null) {
Expand All @@ -41,18 +47,22 @@ public void addParams(CommandArguments args) {
if (noack) {
args.add(Keyword.NOACK);
}
if (claim != null) {
args.add(Keyword.CLAIM).add(claim);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
XReadGroupParams that = (XReadGroupParams) o;
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block);
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block)
&& Objects.equals(claim, that.claim);
}

@Override
public int hashCode() {
return Objects.hash(count, block, noack);
return Objects.hash(count, block, noack, claim);
}
}
39 changes: 39 additions & 0 deletions src/main/java/redis/clients/jedis/resps/StreamEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,51 @@ public class StreamEntry implements Serializable {

private StreamEntryID id;
private Map<String, String> fields;
private Long millisElapsedFromDelivery;
private Long deliveredCount;

public StreamEntry(StreamEntryID id, Map<String, String> fields) {
this.id = id;
this.fields = fields;
}

public StreamEntry(StreamEntryID id, Map<String, String> fields, Long millisElapsedFromDelivery, Long deliveredCount) {
this.id = id;
this.fields = fields;
this.millisElapsedFromDelivery = millisElapsedFromDelivery;
this.deliveredCount = deliveredCount;
}

/**
* @return the milliseconds since the last delivery of this message when CLAIM was used.
* <ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getMillisElapsedFromDelivery() {
return millisElapsedFromDelivery;
}

/**
* @return the number of prior deliveries of this message when CLAIM was used:
* <ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getDeliveredCount() {
return deliveredCount;
}

public boolean isClaimed() {
return this.deliveredCount != null && this.deliveredCount > 0;
}

public StreamEntryID getID() {
return id;
}
Expand Down
41 changes: 40 additions & 1 deletion src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,51 @@ public class StreamEntryBinary implements Serializable {

private StreamEntryID id;
private Map<byte[], byte[]> fields;
private Long millisElapsedFromDelivery;
private Long deliveredCount;

public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields) {
this.id = id;
this.fields = fields;
}

public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields, Long millisElapsedFromDelivery, Long deliveredCount) {
this.id = id;
this.fields = fields;
this.millisElapsedFromDelivery = millisElapsedFromDelivery;
this.deliveredCount = deliveredCount;
}

/**
* @return the milliseconds since the last delivery of this message when CLAIM was used.
* <ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getMillisElapsedFromDelivery() {
return millisElapsedFromDelivery;
}

/**
* @return the number of prior deliveries of this message when CLAIM was used:
* <ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getDeliveredCount() {
return deliveredCount;
}

public boolean isClaimed() {
return this.deliveredCount != null && this.deliveredCount > 0;
}

public StreamEntryID getID() {
return id;
}
Expand All @@ -39,4 +78,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
this.id = (StreamEntryID) in.readUnshared();
this.fields = (Map<byte[], byte[]>) in.readUnshared();
}
}
}
3 changes: 2 additions & 1 deletion src/test/java/io/redis/test/utils/RedisVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class RedisVersion implements Comparable<RedisVersion> {
public static final RedisVersion V7_4 = RedisVersion.of("7.4");
public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0");
public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0");
public static final String V8_4_0_STRING= "8.4.0";
public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0");

private final int major;
Expand Down Expand Up @@ -91,4 +92,4 @@ public boolean isGreaterThan(RedisVersion other) {
public static int compare(RedisVersion v1, RedisVersion v2) {
return v1.compareTo(v2);
}
}
}
Loading
Loading