Skip to content

Commit cb1839e

Browse files
committed
Add assertions to fail early on absent values using StreamMessageListenerContainer.
Closes #2472
1 parent 74cff0b commit cb1839e

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ public int getPhase() {
211211
*/
212212
@Override
213213
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
214+
215+
Assert.notNull(streamRequest, "StreamReadRequest must not be null");
216+
Assert.notNull(listener, "StreamListener must not be null");
217+
214218
return doRegister(getReadTask(streamRequest, listener));
215219
}
216220

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ class StreamReadRequestBuilder<K> {
346346
Predicate<Throwable> cancelSubscriptionOnError = t -> true;
347347

348348
StreamReadRequestBuilder(StreamOffset<K> streamOffset) {
349+
350+
Assert.notNull(streamOffset, "StreamOffset must not be null");
351+
349352
this.streamOffset = streamOffset;
350353
}
351354

@@ -364,6 +367,8 @@ class StreamReadRequestBuilder<K> {
364367
*/
365368
public StreamReadRequestBuilder<K> errorHandler(ErrorHandler errorHandler) {
366369

370+
Assert.notNull(errorHandler, "ErrorHandler must not be null");
371+
367372
this.errorHandler = errorHandler;
368373
return this;
369374
}
@@ -377,6 +382,7 @@ public StreamReadRequestBuilder<K> errorHandler(ErrorHandler errorHandler) {
377382
*/
378383
public StreamReadRequestBuilder<K> cancelOnError(Predicate<Throwable> cancelSubscriptionOnError) {
379384

385+
Assert.notNull(cancelSubscriptionOnError, "cancelSubscriptionOnError Predicate must not be null");
380386
this.cancelSubscriptionOnError = cancelSubscriptionOnError;
381387
return this;
382388
}
@@ -449,6 +455,8 @@ public ConsumerStreamReadRequestBuilder<K> cancelOnError(Predicate<Throwable> ca
449455
*/
450456
public ConsumerStreamReadRequestBuilder<K> consumer(Consumer consumer) {
451457

458+
Assert.notNull(consumer, "Consumer must not be null");
459+
452460
this.consumer = consumer;
453461
return this;
454462
}

0 commit comments

Comments
 (0)