Skip to content

Commit 8a1975b

Browse files
committed
Revamp tests.
1 parent 51a7923 commit 8a1975b

File tree

9 files changed

+792
-852
lines changed

9 files changed

+792
-852
lines changed

src/test/java/redis/clients/jedis/BuilderTest.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,71 @@
22

33
import org.junit.jupiter.api.Assertions;
44
import org.junit.jupiter.api.Test;
5+
import redis.clients.jedis.resps.StreamEntry;
6+
import redis.clients.jedis.resps.StreamEntryBinary;
7+
import redis.clients.jedis.util.RedisInputStream;
8+
9+
import java.io.ByteArrayInputStream;
10+
import java.io.InputStream;
11+
import java.util.ArrayList;
12+
import java.util.List;
513

614
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertNotNull;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
717

818
public class BuilderTest {
919

20+
// Helper methods for building test data from RESP protocol strings
21+
private static Object parseRespResponse(String respResponse) {
22+
InputStream is = new ByteArrayInputStream(respResponse.getBytes());
23+
return Protocol.read(new RedisInputStream(is));
24+
}
25+
26+
@SuppressWarnings("unchecked")
27+
private static ArrayList<Object> createStreamEntryData(String id, String fieldKey,
28+
String fieldValue, long millisElapsedFromDelivery, long deliveredCount) {
29+
String respResponse =
30+
"*4\r\n" + // Entry with 4 elements
31+
"$" + id.length() + "\r\n" + id + "\r\n" + // Entry ID
32+
"*2\r\n" + // 2 field-value pairs
33+
"$" + fieldKey.length() + "\r\n" + fieldKey + "\r\n" + // Field key
34+
"$" + fieldValue.length() + "\r\n" + fieldValue + "\r\n" + // Field value
35+
":" + millisElapsedFromDelivery + "\r\n" + // millisElapsedFromDelivery
36+
":" + deliveredCount + "\r\n"; // deliveredCount
37+
38+
return (ArrayList<Object>) parseRespResponse(respResponse);
39+
}
40+
41+
@SuppressWarnings("unchecked")
42+
private static ArrayList<Object> createStreamEntryBinaryData(String id, String fieldKey,
43+
byte[] fieldValue, long millisElapsedFromDelivery, long deliveredCount) {
44+
// For binary data, we need to construct the RESP response with the actual byte length
45+
String respResponse =
46+
"*4\r\n" + // Entry with 4 elements
47+
"$" + id.length() + "\r\n" + id + "\r\n" + // Entry ID
48+
"*2\r\n" + // 2 field-value pairs
49+
"$" + fieldKey.length() + "\r\n" + fieldKey + "\r\n" + // Field key
50+
"$" + fieldValue.length + "\r\n"; // Field value length
51+
52+
// Manually construct the byte array with binary field value
53+
byte[] respBytes = respResponse.getBytes();
54+
byte[] crLf = "\r\n".getBytes();
55+
byte[] metadataBytes = (":" + millisElapsedFromDelivery + "\r\n" + ":" + deliveredCount
56+
+ "\r\n").getBytes();
57+
58+
byte[] fullResponse = new byte[respBytes.length + fieldValue.length + crLf.length
59+
+ metadataBytes.length];
60+
System.arraycopy(respBytes, 0, fullResponse, 0, respBytes.length);
61+
System.arraycopy(fieldValue, 0, fullResponse, respBytes.length, fieldValue.length);
62+
System.arraycopy(crLf, 0, fullResponse, respBytes.length + fieldValue.length, crLf.length);
63+
System.arraycopy(metadataBytes, 0, fullResponse,
64+
respBytes.length + fieldValue.length + crLf.length, metadataBytes.length);
65+
66+
InputStream is = new ByteArrayInputStream(fullResponse);
67+
return (ArrayList<Object>) Protocol.read(new RedisInputStream(is));
68+
}
69+
1070
@Test
1171
public void buildDouble() {
1272
Double build = BuilderFactory.DOUBLE.build("1.0".getBytes());
@@ -25,4 +85,93 @@ public void buildDouble() {
2585
assertEquals("empty String", expected.getMessage());
2686
}
2787
}
88+
89+
@Test
90+
public void buildStreamEntryListWithClaimedEntryMetadata() {
91+
// Simulate Redis response for a single claimed entry with metadata
92+
// Format: [[id, [field, value], msSinceLastDelivery, redeliveryCount]]
93+
List<Object> data = new ArrayList<>();
94+
data.add(createStreamEntryData("1234-12", "key", "value", 5000L, 2L));
95+
96+
List<StreamEntry> result = BuilderFactory.STREAM_ENTRY_LIST.build(data);
97+
98+
assertNotNull(result);
99+
assertEquals(1, result.size());
100+
101+
StreamEntry streamEntry = result.get(0);
102+
assertEquals("1234-12", streamEntry.getID().toString());
103+
assertEquals("value", streamEntry.getFields().get("key"));
104+
assertEquals(Long.valueOf(5000), streamEntry.getMillisElapsedFromDelivery());
105+
assertEquals(Long.valueOf(2), streamEntry.getDeliveredCount());
106+
}
107+
108+
@Test
109+
public void buildStreamEntryListWithFreshEntryZeroRedeliveries() {
110+
// Simulate Redis response for a fresh entry (not claimed from PEL)
111+
// Format: [[id, [field, value], 0, 0]]
112+
List<Object> data = new ArrayList<>();
113+
data.add(createStreamEntryData("1234-12", "key", "value", 1000L, 0L));
114+
115+
List<StreamEntry> result = BuilderFactory.STREAM_ENTRY_LIST.build(data);
116+
117+
assertNotNull(result);
118+
assertEquals(1, result.size());
119+
120+
StreamEntry streamEntry = result.get(0);
121+
assertEquals("1234-12", streamEntry.getID().toString());
122+
assertEquals(Long.valueOf(1000), streamEntry.getMillisElapsedFromDelivery());
123+
assertEquals(Long.valueOf(0), streamEntry.getDeliveredCount());
124+
}
125+
126+
@Test
127+
public void buildStreamEntryListWithMixedBatchClaimedFirstThenFresh() {
128+
// Simulate Redis response with mixed entries: claimed entries first, then fresh entries
129+
List<Object> data = new ArrayList<>();
130+
131+
// Entry #1 (claimed, redeliveryCount=2)
132+
data.add(createStreamEntryData("1-0", "f1", "v1", 1500L, 2L));
133+
134+
// Entry #2 (claimed, redeliveryCount=1)
135+
data.add(createStreamEntryData("2-0", "f2", "v2", 1200L, 1L));
136+
137+
// Entry #3 (fresh, redeliveryCount=0)
138+
data.add(createStreamEntryData("3-0", "f3", "v3", 10L, 0L));
139+
140+
List<StreamEntry> result = BuilderFactory.STREAM_ENTRY_LIST.build(data);
141+
142+
assertNotNull(result);
143+
assertEquals(3, result.size());
144+
145+
StreamEntry m1 = result.get(0);
146+
StreamEntry m2 = result.get(1);
147+
StreamEntry m3 = result.get(2);
148+
149+
// Verify claimed entries
150+
assertTrue(m1.getDeliveredCount() > 0);
151+
assertTrue(m2.getDeliveredCount() > 0);
152+
assertEquals(Long.valueOf(2), m1.getDeliveredCount());
153+
assertEquals(Long.valueOf(1), m2.getDeliveredCount());
154+
155+
// Verify fresh entry
156+
assertEquals(Long.valueOf(0), m3.getDeliveredCount());
157+
}
158+
159+
@Test
160+
public void buildStreamEntryBinaryListWithClaimedEntryMetadata() {
161+
// Test binary version with claimed entry metadata
162+
List<Object> data = new ArrayList<>();
163+
data.add(
164+
createStreamEntryBinaryData("1234-12", "key", new byte[] { 0x00, 0x01, 0x02 }, 5000L, 2L));
165+
166+
List<StreamEntryBinary> result = BuilderFactory.STREAM_ENTRY_BINARY_LIST.build(data);
167+
168+
assertNotNull(result);
169+
assertEquals(1, result.size());
170+
171+
StreamEntryBinary streamEntry = result.get(0);
172+
assertEquals("1234-12", streamEntry.getID().toString());
173+
assertEquals(Long.valueOf(5000), streamEntry.getMillisElapsedFromDelivery());
174+
assertEquals(Long.valueOf(2), streamEntry.getDeliveredCount());
175+
}
176+
28177
}

src/test/java/redis/clients/jedis/commands/commandobjects/CommandObjectsStreamCommandsTest.java

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package redis.clients.jedis.commands.commandobjects;
22

3-
4-
import io.redis.test.annotations.SinceRedisVersion;
5-
6-
import static io.redis.test.utils.RedisVersion.V8_4_RC1_STRING;
73
import static org.hamcrest.MatcherAssert.assertThat;
84
import static org.hamcrest.Matchers.empty;
95
import static org.hamcrest.Matchers.equalTo;
@@ -1106,121 +1102,4 @@ public void testXReadGroupAsMap() {
11061102
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getID(), equalTo(secondMessageId));
11071103
assertThat(xreadGroupConsumer2.get(streamKey).get(0).getFields(), equalTo(messageBody));
11081104
}
1109-
1110-
1111-
@Test
1112-
@SinceRedisVersion(V8_4_RC1_STRING)
1113-
public void xreadGroupWithClaimReturnsPendingThenNewEntries_commandObjects() throws InterruptedException {
1114-
String key = "co-claim-stream";
1115-
String group = "co-claim-group";
1116-
String consumer = "c1";
1117-
1118-
exec(commandObjects.del(key));
1119-
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1120-
1121-
// Make 3 entries pending
1122-
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
1123-
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
1124-
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));
1125-
1126-
Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1127-
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));
1128-
1129-
Thread.sleep(60);
1130-
1131-
// Add two fresh entries
1132-
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));
1133-
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), Collections.singletonMap("f", "v")));
1134-
1135-
List<Map.Entry<String, List<StreamEntry>>> messages = exec(
1136-
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50), stream));
1137-
1138-
assertThat(messages, hasSize(1));
1139-
List<StreamEntry> entries = messages.get(0).getValue();
1140-
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());
1141-
1142-
for (int i = 0; i < 3; i++) {
1143-
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
1144-
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
1145-
}
1146-
for (int i = 3; i < 5; i++) {
1147-
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
1148-
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
1149-
}
1150-
}
1151-
1152-
@Test
1153-
@SinceRedisVersion(V8_4_RC1_STRING)
1154-
public void xreadGroupWithClaimNoEligiblePendingReturnsOnlyNewEntries_commandObjects() {
1155-
String key = "co-claim-noeligible";
1156-
String group = "co-claim-group";
1157-
String consumer = "c1";
1158-
1159-
exec(commandObjects.del(key));
1160-
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1161-
1162-
// Put 2 entries in PEL
1163-
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), Collections.singletonMap("f", "v")));
1164-
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), Collections.singletonMap("f", "v")));
1165-
Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1166-
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(2), stream));
1167-
1168-
// Add two new entries that should be returned
1169-
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), Collections.singletonMap("f", "v")));
1170-
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), Collections.singletonMap("f", "v")));
1171-
1172-
List<Map.Entry<String, List<StreamEntry>>> messages = exec(
1173-
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(4).claim(500), stream));
1174-
1175-
assertThat(messages, hasSize(1));
1176-
List<StreamEntry> entries = messages.get(0).getValue();
1177-
org.junit.jupiter.api.Assertions.assertEquals(2, entries.size());
1178-
for (StreamEntry e : entries) {
1179-
org.junit.jupiter.api.Assertions.assertNull(e.getIdleTime());
1180-
org.junit.jupiter.api.Assertions.assertNull(e.getDeliveredTimes());
1181-
}
1182-
}
1183-
1184-
@Test
1185-
@SinceRedisVersion(V8_4_RC1_STRING)
1186-
public void xreadGroupWithClaimAndNoAckDoesNotAddNewEntriesToPEL_commandObjects() throws InterruptedException {
1187-
String key = "co-claim-noack";
1188-
String group = "co-claim-group";
1189-
String consumer = "c1";
1190-
1191-
exec(commandObjects.del(key));
1192-
exec(commandObjects.xgroupCreate(key, group, StreamEntryID.XGROUP_LAST_ENTRY, true));
1193-
1194-
// Make 3 entries pending
1195-
exec(commandObjects.xadd(key, new StreamEntryID("1-0"), java.util.Collections.singletonMap("f", "v")));
1196-
exec(commandObjects.xadd(key, new StreamEntryID("2-0"), java.util.Collections.singletonMap("f", "v")));
1197-
exec(commandObjects.xadd(key, new StreamEntryID("3-0"), java.util.Collections.singletonMap("f", "v")));
1198-
java.util.Map<String, StreamEntryID> stream = java.util.Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
1199-
exec(commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(3), stream));
1200-
1201-
// Wait then add fresh entries
1202-
Thread.sleep(60);
1203-
exec(commandObjects.xadd(key, new StreamEntryID("4-0"), java.util.Collections.singletonMap("f", "v")));
1204-
exec(commandObjects.xadd(key, new StreamEntryID("5-0"), java.util.Collections.singletonMap("f", "v")));
1205-
1206-
// Read with CLAIM and NOACK
1207-
java.util.List<java.util.Map.Entry<String, java.util.List<StreamEntry>>> messages = exec(
1208-
commandObjects.xreadGroup(group, consumer, new XReadGroupParams().count(5).claim(50).noAck(), stream));
1209-
1210-
assertThat(messages, hasSize(1));
1211-
java.util.List<StreamEntry> entries = messages.get(0).getValue();
1212-
org.junit.jupiter.api.Assertions.assertEquals(5, entries.size());
1213-
for (int i = 0; i < 3; i++) {
1214-
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getIdleTime());
1215-
org.junit.jupiter.api.Assertions.assertNotNull(entries.get(i).getDeliveredTimes());
1216-
}
1217-
for (int i = 3; i < 5; i++) {
1218-
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getIdleTime());
1219-
org.junit.jupiter.api.Assertions.assertNull(entries.get(i).getDeliveredTimes());
1220-
}
1221-
1222-
Long acked = exec(commandObjects.xack(key, group, new StreamEntryID("4-0"), new StreamEntryID("5-0")));
1223-
org.junit.jupiter.api.Assertions.assertEquals(0L, acked);
1224-
}
1225-
12261105
}

0 commit comments

Comments
 (0)