Skip to content

Commit 53926fe

Browse files
committed
Add assertions to fail early on absent values using StreamMessageListenerContainer.
Closes #2472
1 parent 45621dd commit 53926fe

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public int getPhase() {
183183

184184
@Override
185185
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
186+
187+
Assert.notNull(streamRequest, "StreamReadRequest must not be null");
188+
Assert.notNull(listener, "StreamListener must not be null");
189+
186190
return doRegister(getReadTask(streamRequest, listener));
187191
}
188192

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ class StreamReadRequestBuilder<K> {
337337
Predicate<Throwable> cancelSubscriptionOnError = t -> true;
338338

339339
StreamReadRequestBuilder(StreamOffset<K> streamOffset) {
340+
341+
Assert.notNull(streamOffset, "StreamOffset must not be null");
342+
340343
this.streamOffset = streamOffset;
341344
}
342345

@@ -355,6 +358,8 @@ class StreamReadRequestBuilder<K> {
355358
*/
356359
public StreamReadRequestBuilder<K> errorHandler(ErrorHandler errorHandler) {
357360

361+
Assert.notNull(errorHandler, "ErrorHandler must not be null");
362+
358363
this.errorHandler = errorHandler;
359364
return this;
360365
}
@@ -368,6 +373,7 @@ public StreamReadRequestBuilder<K> errorHandler(ErrorHandler errorHandler) {
368373
*/
369374
public StreamReadRequestBuilder<K> cancelOnError(Predicate<Throwable> cancelSubscriptionOnError) {
370375

376+
Assert.notNull(cancelSubscriptionOnError, "cancelSubscriptionOnError Predicate must not be null");
371377
this.cancelSubscriptionOnError = cancelSubscriptionOnError;
372378
return this;
373379
}
@@ -440,6 +446,8 @@ public ConsumerStreamReadRequestBuilder<K> cancelOnError(Predicate<Throwable> ca
440446
*/
441447
public ConsumerStreamReadRequestBuilder<K> consumer(Consumer consumer) {
442448

449+
Assert.notNull(consumer, "Consumer must not be null");
450+
443451
this.consumer = consumer;
444452
return this;
445453
}

0 commit comments

Comments
 (0)