3838import org .apache .kafka .common .TopicPartition ;
3939import org .apache .kafka .common .header .Headers ;
4040import org .apache .kafka .common .header .internals .RecordHeaders ;
41+ import org .assertj .core .api .InstanceOfAssertFactories ;
4142import org .junit .jupiter .api .Test ;
4243
4344import org .springframework .core .retry .RetryListener ;
@@ -396,6 +397,8 @@ protected boolean doSend(Message<?> message, long timeout) {
396397 void testInboundBatch (EmbeddedKafkaBroker embeddedKafka ) throws Exception {
397398 Map <String , Object > props = KafkaTestUtils .consumerProps (embeddedKafka , "test2" , true );
398399 props .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
400+ props .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 12 );
401+
399402 DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
400403 ContainerProperties containerProps = new ContainerProperties (topic2 );
401404 containerProps .setIdleEventInterval (100L );
@@ -411,7 +414,6 @@ void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
411414 adapter .setOnPartitionsAssignedSeekCallback ((map , consumer ) -> onPartitionsAssignedCalledLatch .countDown ());
412415 adapter .setBeanFactory (TEST_INTEGRATION_CONTEXT );
413416 adapter .afterPropertiesSet ();
414- adapter .setBeanFactory (TEST_INTEGRATION_CONTEXT );
415417 adapter .setBatchMessageConverter (new BatchMessagingMessageConverter () {
416418
417419 @ Override
@@ -436,9 +438,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
436438 Message <?> received = out .receive (10000 );
437439 assertThat (received ).isNotNull ();
438440 Object payload = received .getPayload ();
439- assertThat (payload ).isInstanceOf (List .class );
440- List <?> list = (List <?>) payload ;
441- assertThat (list .size ()).isGreaterThan (0 );
441+ assertThat (payload ).asInstanceOf (InstanceOfAssertFactories .LIST ).hasSize (2 );
442442
443443 MessageHeaders headers = received .getHeaders ();
444444 assertThat (headers .get (KafkaHeaders .RECEIVED_KEY )).isEqualTo (Arrays .asList (1 , 1 ));
@@ -632,52 +632,7 @@ void testPauseResume() throws Exception {
632632 adapter .stop ();
633633 }
634634
635- static class Foo {
636-
637- private String bar ;
638-
639- Foo () {
640- }
641-
642- Foo (String bar ) {
643- this .bar = bar ;
644- }
645-
646- protected String getBar () {
647- return this .bar ;
648- }
649-
650- protected void setBar (String bar ) {
651- this .bar = bar ;
652- }
653-
654- @ Override
655- public int hashCode () {
656- final int prime = 31 ;
657- int result = 1 ;
658- result = prime * result + ((this .bar == null ) ? 0 : this .bar .hashCode ());
659- return result ;
660- }
661-
662- @ Override
663- public boolean equals (Object obj ) {
664- if (this == obj ) {
665- return true ;
666- }
667- if (obj == null ) {
668- return false ;
669- }
670- if (getClass () != obj .getClass ()) {
671- return false ;
672- }
673- Foo other = (Foo ) obj ;
674- if (this .bar == null ) {
675- return other .bar == null ;
676- }
677- else {
678- return this .bar .equals (other .bar );
679- }
680- }
635+ record Foo (String bar ) {
681636
682637 }
683638
0 commit comments