Skip to content

Commit ecb3111

Browse files
committed
Polishing.
Unify byte array/buffer/String conversion. Simplify tests. See #3204
1 parent 9e019fe commit ecb3111

File tree

2 files changed

+70
-89
lines changed

2 files changed

+70
-89
lines changed

src/main/java/org/springframework/data/redis/connection/stream/StreamRecords.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.jspecify.annotations.Nullable;
2424
import org.springframework.data.redis.util.ByteUtils;
25+
import org.springframework.lang.Contract;
2526
import org.springframework.util.Assert;
2627
import org.springframework.util.ClassUtils;
2728
import org.springframework.util.ObjectUtils;
@@ -178,7 +179,7 @@ public <K, V> MapRecord<S, K, V> ofMap(Map<K, V> map) {
178179
* @see MapRecord
179180
*/
180181
public StringRecord ofStrings(Map<String, String> map) {
181-
return new StringMapBackedRecord(ObjectUtils.nullSafeToString(stream), id, map);
182+
return new StringMapBackedRecord(toString(stream), id, map);
182183
}
183184

184185
/**
@@ -197,47 +198,55 @@ public <V> ObjectRecord<S, V> ofObject(V value) {
197198
* @return new instance of {@link ByteRecord}.
198199
*/
199200
public ByteRecord ofBytes(Map<byte[], byte[]> value) {
201+
return new ByteMapBackedRecord(toByteArray(stream), id, value);
202+
}
200203

201-
byte[] streamKey = convertStreamToByteArray(stream);
202-
return new ByteMapBackedRecord(streamKey, id, value);
204+
/**
205+
* @param value
206+
* @return new instance of {@link ByteBufferRecord}.
207+
*/
208+
public ByteBufferRecord ofBuffer(Map<ByteBuffer, ByteBuffer> value) {
209+
return new ByteBufferMapBackedRecord(toByteBuffer(stream), id, value);
203210
}
204211

205-
private byte[] convertStreamToByteArray(@Nullable Object stream) {
206-
if (stream instanceof byte[]) {
207-
return (byte[]) stream;
208-
} else if (stream instanceof String) {
209-
return ((String) stream).getBytes();
212+
@Contract("null -> null; !null -> !null")
213+
private static byte @Nullable [] toByteArray(@Nullable Object stream) {
214+
215+
if (stream instanceof byte[] bytes) {
216+
return bytes;
210217
} else if (stream instanceof ByteBuffer buffer) {
211-
byte[] result = new byte[buffer.remaining()];
212-
buffer.get(result);
213-
return result;
218+
return ByteUtils.getBytes(buffer);
219+
} else if (stream instanceof CharSequence s) {
220+
return s.toString().getBytes();
214221
} else if (stream == null) {
215222
return null;
216-
} else {
217-
throw new IllegalArgumentException("Stream key %s cannot be converted to byte array".formatted(stream));
218223
}
224+
225+
throw new IllegalArgumentException("Stream key '%s' cannot be converted to byte array".formatted(stream));
219226
}
220227

221-
/**
222-
* @param value
223-
* @return new instance of {@link ByteBufferRecord}.
224-
*/
225-
public ByteBufferRecord ofBuffer(Map<ByteBuffer, ByteBuffer> value) {
228+
private static ByteBuffer toByteBuffer(@Nullable Object stream) {
226229

227-
ByteBuffer streamKey;
230+
if (stream instanceof byte[] bytes) {
231+
return ByteBuffer.wrap(bytes);
232+
} else if (stream instanceof ByteBuffer bb) {
233+
return bb;
234+
} else if (stream instanceof CharSequence cs) {
235+
return ByteUtils.getByteBuffer(cs.toString());
236+
}
228237

229-
if (stream instanceof ByteBuffer) {
230-
streamKey = (ByteBuffer) stream;
231-
} else if (stream instanceof String) {
232-
streamKey = ByteUtils.getByteBuffer((String) stream);
233-
} else if (stream instanceof byte[]) {
234-
streamKey = ByteBuffer.wrap((byte[]) stream);
235-
} else {
236-
throw new IllegalArgumentException("Stream key %s cannot be converted to byte buffer".formatted(stream));
238+
throw new IllegalArgumentException("Stream key '%s' cannot be converted to byte buffer".formatted(stream));
239+
}
240+
241+
private static String toString(@Nullable Object stream) {
242+
243+
if (stream instanceof byte[] || stream instanceof ByteBuffer) {
244+
return new String(toByteArray(stream));
237245
}
238246

239-
return new ByteBufferMapBackedRecord(streamKey, id, value);
247+
return ObjectUtils.nullSafeToString(stream);
240248
}
249+
241250
}
242251

243252
/**
@@ -338,7 +347,7 @@ public int hashCode() {
338347
*/
339348
static class ByteMapBackedRecord extends MapBackedRecord<byte[], byte[], byte[]> implements ByteRecord {
340349

341-
ByteMapBackedRecord(byte @Nullable[] stream, RecordId recordId, Map<byte[], byte[]> map) {
350+
ByteMapBackedRecord(byte @Nullable [] stream, RecordId recordId, Map<byte[], byte[]> map) {
342351
super(stream, recordId, map);
343352
}
344353

@@ -359,8 +368,7 @@ public ByteMapBackedRecord withId(RecordId id) {
359368
static class ByteBufferMapBackedRecord extends MapBackedRecord<ByteBuffer, ByteBuffer, ByteBuffer>
360369
implements ByteBufferRecord {
361370

362-
ByteBufferMapBackedRecord(@Nullable ByteBuffer stream, RecordId recordId,
363-
Map<ByteBuffer, ByteBuffer> map) {
371+
ByteBufferMapBackedRecord(@Nullable ByteBuffer stream, RecordId recordId, Map<ByteBuffer, ByteBuffer> map) {
364372
super(stream, recordId, map);
365373
}
366374

@@ -420,7 +428,6 @@ public S getStream() {
420428
return stream;
421429
}
422430

423-
424431
@Override
425432
public RecordId getId() {
426433
return recordId;

src/test/java/org/springframework/data/redis/connection/StreamRecordsUnitTests.java

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.Collections;
2323
import java.util.Map;
24+
import java.util.stream.Stream;
2425

2526
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.Arguments;
29+
import org.junit.jupiter.params.provider.MethodSource;
2630

2731
import org.springframework.data.redis.connection.stream.ByteRecord;
2832
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -39,6 +43,7 @@
3943
*
4044
* @author Christoph Strobl
4145
* @author Romain Beghi
46+
* @author Seo Bo Gyeong
4247
*/
4348
class StreamRecordsUnitTests {
4449

@@ -99,9 +104,7 @@ void serializeMapRecordStringAsHashValue() {
99104
ByteRecord target = source.serialize(RedisSerializer.string());
100105

101106
assertThat(target.getId()).isEqualTo(RECORD_ID);
102-
assertThat(target.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
103-
assertThat(target.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
104-
assertThat(target.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
107+
assertByteRecord(target);
105108
}
106109

107110
@Test // DATAREDIS-993
@@ -137,14 +140,6 @@ static class StubValueReturningHashMapper<T, K, V> implements HashMapper<T, K, V
137140
final Map<K, V> to;
138141
final T from;
139142

140-
public StubValueReturningHashMapper(Map<K, V> to) {
141-
this(to, (T) new Object());
142-
}
143-
144-
public StubValueReturningHashMapper(T from) {
145-
this(Collections.emptyMap(), from);
146-
}
147-
148143
StubValueReturningHashMapper(Map<K, V> to, T from) {
149144
this.to = to;
150145
this.from = from;
@@ -165,72 +160,51 @@ static HashMapper<Object, String, String> simpleString(String value) {
165160
}
166161
}
167162

168-
@Test // Stream key auto conversion for ofBytes method
169-
void ofBytesWithStringStreamKey() {
163+
@Test // GH-3204
164+
void ofBytesWithNullStreamKey() {
170165

171166
ByteRecord record = StreamRecords.newRecord()
172-
.in(STRING_STREAM_KEY)
173167
.withId(RECORD_ID)
174168
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
175169

176170
assertThat(record.getId()).isEqualTo(RECORD_ID);
177-
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
178-
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
179-
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
171+
assertThat(record.getStream()).isNull();
180172
}
181173

182-
@Test // Stream key auto conversion for ofBytes method with byte array
183-
void ofBytesWithByteArrayStreamKey() {
184-
185-
ByteRecord record = StreamRecords.newRecord()
186-
.in(SERIALIZED_STRING_STREAM_KEY)
187-
.withId(RECORD_ID)
188-
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
174+
@Test // GH-3204
175+
void ofBytesWithUnsupportedStreamKeyType() {
189176

190-
assertThat(record.getId()).isEqualTo(RECORD_ID);
191-
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
192-
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
193-
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
177+
assertThatIllegalArgumentException().isThrownBy(() -> StreamRecords.newRecord().in(123L) // Unsupported type
178+
.withId(RECORD_ID).ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL)))
179+
.withMessageContaining("Stream key '123' cannot be converted to byte array");
194180
}
195181

196-
@Test // Stream key auto conversion for ofBytes method with null stream key
197-
void ofBytesWithNullStreamKey() {
182+
@ParameterizedTest // GH-3204
183+
@MethodSource("ofBytesInStreamArgs")
184+
void ofBytes(Object streamKey) {
198185

199-
ByteRecord record = StreamRecords.newRecord()
200-
.withId(RECORD_ID)
186+
ByteRecord record = StreamRecords.newRecord().in(streamKey).withId(RECORD_ID)
201187
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
202188

203189
assertThat(record.getId()).isEqualTo(RECORD_ID);
204-
assertThat(record.getStream()).isNull();
205-
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
206-
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
190+
assertByteRecord(record);
207191
}
208192

209-
@Test // Stream key auto conversion for ofBytes method with ByteBuffer stream key
210-
void ofBytesWithByteBufferStreamKey() {
211-
212-
ByteBuffer streamKeyBuffer = ByteBuffer.wrap(SERIALIZED_STRING_STREAM_KEY);
213-
214-
ByteRecord record = StreamRecords.newRecord()
215-
.in(streamKeyBuffer)
216-
.withId(RECORD_ID)
217-
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
218-
219-
assertThat(record.getId()).isEqualTo(RECORD_ID);
220-
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
221-
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
222-
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
193+
static Stream<Arguments> ofBytesInStreamArgs() {
194+
return Stream.of(Arguments.argumentSet("ByteBuffer", ByteBuffer.wrap(SERIALIZED_STRING_STREAM_KEY)), //
195+
Arguments.argumentSet("byte[]", new Object[] { SERIALIZED_STRING_STREAM_KEY }), //
196+
Arguments.argumentSet("String", STRING_STREAM_KEY));
223197
}
224198

225-
@Test // Stream key auto conversion for ofBytes method with unsupported type
226-
void ofBytesWithUnsupportedStreamKeyType() {
199+
private void assertByteRecord(ByteRecord target) {
227200

228-
assertThatThrownBy(() -> StreamRecords.newRecord()
229-
.in(123L) // Unsupported type
230-
.withId(RECORD_ID)
231-
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL)))
232-
.isInstanceOf(IllegalArgumentException.class)
233-
.hasMessageContaining("Stream key 123 cannot be converted to byte array");
201+
assertThat(target.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
202+
assertThat(target.getValue()).hasSize(1);
203+
204+
target.getValue().forEach((k, v) -> {
205+
assertThat(k).isEqualTo(SERIALIZED_STRING_MAP_KEY);
206+
assertThat(v).isEqualTo(SERIALIZED_STRING_VAL);
207+
});
234208
}
235209

236210
}

0 commit comments

Comments
 (0)