2929import org .apache .kafka .streams .kstream .KTable ;
3030import org .apache .kafka .streams .kstream .Materialized ;
3131import org .assertj .core .api .InstanceOfAssertFactories ;
32+ import org .jspecify .annotations .Nullable ;
3233import org .junit .jupiter .api .AfterEach ;
3334import org .junit .jupiter .api .Test ;
3435import org .junit .jupiter .api .condition .DisabledOnOs ;
4849import org .springframework .kafka .retrytopic .DestinationTopic ;
4950import org .springframework .kafka .retrytopic .RetryTopicConfiguration ;
5051import org .springframework .kafka .support .KafkaHeaders ;
52+ import org .springframework .kafka .test .EmbeddedKafkaBroker ;
5153import org .springframework .kafka .test .condition .EmbeddedKafkaCondition ;
5254import org .springframework .kafka .test .context .EmbeddedKafka ;
5355import org .springframework .messaging .handler .annotation .Header ;
@@ -71,7 +73,7 @@ class KafkaAutoConfigurationIntegrationTests {
7173
7274 private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic" ;
7375
74- private AnnotationConfigApplicationContext context ;
76+ private @ Nullable AnnotationConfigApplicationContext context ;
7577
7678 @ AfterEach
7779 void close () {
@@ -85,14 +87,14 @@ void close() {
8587 void testEndToEnd () throws Exception {
8688 load (KafkaConfig .class , "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString (),
8789 "spring.kafka.consumer.group-id=testGroup" , "spring.kafka.consumer.auto-offset-reset=earliest" );
88- KafkaTemplate <String , String > template = this . context .getBean (KafkaTemplate .class );
90+ KafkaTemplate <String , String > template = getContext () .getBean (KafkaTemplate .class );
8991 template .send (TEST_TOPIC , "foo" , "bar" );
90- Listener listener = this . context .getBean (Listener .class );
92+ Listener listener = getContext () .getBean (Listener .class );
9193 assertThat (listener .latch .await (30 , TimeUnit .SECONDS )).isTrue ();
9294 assertThat (listener .key ).isEqualTo ("foo" );
9395 assertThat (listener .received ).isEqualTo ("bar" );
9496
95- DefaultKafkaProducerFactory producerFactory = this . context .getBean (DefaultKafkaProducerFactory .class );
97+ DefaultKafkaProducerFactory producerFactory = getContext () .getBean (DefaultKafkaProducerFactory .class );
9698 Producer producer = producerFactory .createProducer ();
9799 assertThat (producer .partitionsFor (ADMIN_CREATED_TOPIC )).hasSize (10 );
98100 producer .close ();
@@ -106,12 +108,12 @@ void testEndToEndWithRetryTopics() throws Exception {
106108 "spring.kafka.retry.topic.attempts=5" , "spring.kafka.retry.topic.backoff.delay=100ms" ,
107109 "spring.kafka.retry.topic.backoff.multiplier=2" , "spring.kafka.retry.topic.backoff.max-delay=300ms" ,
108110 "spring.kafka.consumer.auto-offset-reset=earliest" );
109- RetryTopicConfiguration configuration = this . context .getBean (RetryTopicConfiguration .class );
111+ RetryTopicConfiguration configuration = getContext () .getBean (RetryTopicConfiguration .class );
110112 assertThat (configuration .getDestinationTopicProperties ()).extracting (DestinationTopic .Properties ::delay )
111113 .containsExactly (0L , 100L , 200L , 300L , 0L );
112- KafkaTemplate <String , String > template = this . context .getBean (KafkaTemplate .class );
114+ KafkaTemplate <String , String > template = getContext () .getBean (KafkaTemplate .class );
113115 template .send (TEST_RETRY_TOPIC , "foo" , "bar" );
114- RetryListener listener = this . context .getBean (RetryListener .class );
116+ RetryListener listener = getContext () .getBean (RetryListener .class );
115117 assertThat (listener .latch .await (30 , TimeUnit .SECONDS )).isTrue ();
116118 assertThat (listener ).extracting (RetryListener ::getKey , RetryListener ::getReceived )
117119 .containsExactly ("foo" , "bar" );
@@ -126,7 +128,7 @@ void testEndToEndWithRetryTopics() throws Exception {
126128 void testStreams () {
127129 load (KafkaStreamsConfig .class , "spring.application.name:my-app" ,
128130 "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString ());
129- assertThat (this . context .getBean (StreamsBuilderFactoryBean .class ).isAutoStartup ()).isTrue ();
131+ assertThat (getContext () .getBean (StreamsBuilderFactoryBean .class ).isAutoStartup ()).isTrue ();
130132 }
131133
132134 private void load (Class <?> config , String ... environment ) {
@@ -144,7 +146,15 @@ private AnnotationConfigApplicationContext doLoad(Class<?>[] configs, String...
144146 }
145147
146148 private String getEmbeddedKafkaBrokersAsString () {
147- return EmbeddedKafkaCondition .getBroker ().getBrokersAsString ();
149+ EmbeddedKafkaBroker broker = EmbeddedKafkaCondition .getBroker ();
150+ assertThat (broker ).isNotNull ();
151+ return broker .getBrokersAsString ();
152+ }
153+
154+ private AnnotationConfigApplicationContext getContext () {
155+ AnnotationConfigApplicationContext context = this .context ;
156+ assertThat (context ).isNotNull ();
157+ return context ;
148158 }
149159
150160 @ Configuration (proxyBeanMethods = false )
@@ -183,9 +193,9 @@ static class Listener {
183193
184194 private final CountDownLatch latch = new CountDownLatch (1 );
185195
186- private volatile String received ;
196+ private volatile @ Nullable String received ;
187197
188- private volatile String key ;
198+ private volatile @ Nullable String key ;
189199
190200 @ KafkaListener (topics = TEST_TOPIC )
191201 void listen (String foo , @ Header (KafkaHeaders .RECEIVED_KEY ) String key ) {
@@ -202,9 +212,9 @@ static class RetryListener {
202212
203213 private final List <String > topics = new ArrayList <>();
204214
205- private volatile String received ;
215+ private volatile @ Nullable String received ;
206216
207- private volatile String key ;
217+ private volatile @ Nullable String key ;
208218
209219 @ KafkaListener (topics = TEST_RETRY_TOPIC )
210220 void listen (String foo , @ Header (KafkaHeaders .RECEIVED_KEY ) String key ,
@@ -220,11 +230,11 @@ private List<String> getTopics() {
220230 return this .topics ;
221231 }
222232
223- private String getReceived () {
233+ private @ Nullable String getReceived () {
224234 return this .received ;
225235 }
226236
227- private String getKey () {
237+ private @ Nullable String getKey () {
228238 return this .key ;
229239 }
230240
0 commit comments