2323import org .apache .flink .api .connector .source .SourceReader ;
2424import org .apache .flink .api .connector .source .SourceReaderContext ;
2525import org .apache .flink .configuration .Configuration ;
26+ import org .apache .flink .connector .base .source .reader .RecordEvaluator ;
2627import org .apache .flink .connector .kafka .source .KafkaSource ;
2728import org .apache .flink .connector .kafka .source .KafkaSourceBuilder ;
2829import org .apache .flink .connector .kafka .source .KafkaSourceOptions ;
5657import org .junit .jupiter .api .Test ;
5758import org .mockito .Mockito ;
5859
60+ import javax .annotation .Nullable ;
61+
5962import java .time .Duration ;
6063import java .util .ArrayList ;
6164import java .util .Arrays ;
7073import java .util .Set ;
7174import java .util .function .Consumer ;
7275import java .util .function .Supplier ;
76+ import java .util .stream .Collectors ;
77+ import java .util .stream .IntStream ;
7378
7479import static org .apache .flink .connector .kafka .source .metrics .KafkaSourceReaderMetrics .COMMITS_SUCCEEDED_METRIC_COUNTER ;
7580import static org .apache .flink .connector .kafka .source .metrics .KafkaSourceReaderMetrics .COMMITTED_OFFSET_METRIC_GAUGE ;
8085import static org .apache .flink .connector .kafka .source .metrics .KafkaSourceReaderMetrics .PARTITION_GROUP ;
8186import static org .apache .flink .connector .kafka .source .metrics .KafkaSourceReaderMetrics .TOPIC_GROUP ;
8287import static org .apache .flink .connector .kafka .testutils .KafkaSourceTestEnv .NUM_PARTITIONS ;
88+ import static org .apache .flink .connector .kafka .testutils .KafkaSourceTestEnv .NUM_RECORDS_PER_PARTITION ;
89+ import static org .apache .flink .core .io .InputStatus .END_OF_INPUT ;
90+ import static org .apache .flink .core .io .InputStatus .NOTHING_AVAILABLE ;
8391import static org .apache .flink .core .testutils .CommonTestUtils .waitUtil ;
8492import static org .assertj .core .api .Assertions .assertThat ;
8593import static org .mockito .Mockito .never ;
@@ -144,7 +152,7 @@ void testCommitOffsetsWithoutAliveFetchers() throws Exception {
144152 InputStatus status ;
145153 do {
146154 status = reader .pollNext (output );
147- } while (status != InputStatus . NOTHING_AVAILABLE );
155+ } while (status != NOTHING_AVAILABLE );
148156 pollUntil (
149157 reader ,
150158 output ,
@@ -296,6 +304,7 @@ void testDisableOffsetCommit() throws Exception {
296304 new TestingReaderContext (),
297305 (ignore ) -> {},
298306 properties ,
307+ null ,
299308 null )) {
300309 reader .addSplits (
301310 getSplits (numSplits , NUM_RECORDS_PER_SPLIT , Boundedness .CONTINUOUS_UNBOUNDED ));
@@ -517,7 +526,8 @@ public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
517526 new TestingReaderContext (),
518527 (ignore ) -> {},
519528 new Properties (),
520- rackIdSupplier )) {
529+ rackIdSupplier ,
530+ null )) {
521531 // Do nothing here
522532 }
523533
@@ -530,13 +540,15 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
530540 Mockito .when (rackIdSupplier .get ()).thenReturn ("use1-az1" );
531541
532542 try (KafkaSourceReader <Integer > reader =
533- (KafkaSourceReader <Integer >)
534- createReader (
535- Boundedness .CONTINUOUS_UNBOUNDED ,
536- new TestingReaderContext (),
537- (ignore ) -> {},
538- new Properties (),
539- rackIdSupplier )) {
543+ (KafkaSourceReader <Integer >)
544+ createReader (
545+ Boundedness .CONTINUOUS_UNBOUNDED ,
546+ new TestingReaderContext (),
547+ (ignore ) -> {
548+ },
549+ new Properties (),
550+ rackIdSupplier ,
551+ null )) {
540552 reader .addSplits (
541553 Collections .singletonList (
542554 new KafkaPartitionSplit (new TopicPartition (TOPIC , 1 ), 1L )));
@@ -545,6 +557,111 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep
545557 verify (rackIdSupplier ).get ();
546558 }
547559
560+ @ Test
561+ public void testReadingWithRecordEvaluatorAndAllSplitsFinished () throws Exception {
562+ final int readRecordNumPerSplit = 9 ;
563+ final int readSplits = 2 ;
564+ final Set <String > finishedSplits = new HashSet <>();
565+ try (final KafkaSourceReader <Integer > reader =
566+ (KafkaSourceReader <Integer >)
567+ createReader (
568+ Boundedness .BOUNDED ,
569+ "groupId" ,
570+ new TestingReaderContext (),
571+ finishedSplits ::addAll ,
572+ r -> (r % NUM_RECORDS_PER_SPLIT ) == readRecordNumPerSplit )) {
573+ List <KafkaPartitionSplit > splits = new ArrayList <>();
574+ List <Integer > excepted = new ArrayList <>();
575+ for (int i = 0 ; i < readSplits ; i ++) {
576+ splits .add (
577+ new KafkaPartitionSplit (
578+ new TopicPartition (TOPIC , i ), 0 , Integer .MAX_VALUE ));
579+ excepted .addAll (
580+ IntStream .range (
581+ i * NUM_RECORDS_PER_SPLIT ,
582+ (i + 1 ) * NUM_RECORDS_PER_SPLIT - 1 )
583+ .boxed ()
584+ .collect (Collectors .toList ()));
585+ }
586+
587+ reader .addSplits (splits );
588+ reader .notifyNoMoreSplits ();
589+
590+ TestingReaderOutput <Integer > output = new TestingReaderOutput <>();
591+ pollUntil (
592+ reader ,
593+ output ,
594+ () -> finishedSplits .size () == splits .size (),
595+ "The reader cannot get the excepted result before timeout." );
596+ InputStatus status = reader .pollNext (output );
597+ assertThat (output .getEmittedRecords ().size ())
598+ .isEqualTo (readRecordNumPerSplit * readSplits );
599+ assertThat (finishedSplits )
600+ .containsExactly (
601+ splits .stream ()
602+ .map (s -> s .getTopicPartition ().toString ())
603+ .toArray (String []::new ));
604+ assertThat (output .getEmittedRecords ())
605+ .containsExactlyInAnyOrder (excepted .toArray (new Integer [0 ]));
606+ assertThat (status ).isEqualTo (END_OF_INPUT );
607+ }
608+ }
609+
610+ @ Test
611+ public void testReadingWithRecordEvaluatorAndSomeSplitsFinished () throws Exception {
612+ final int finishPartitionIndex = 1 ;
613+ final int readRecordNumInFinishedSplit = 7 ;
614+ final int readSplits = 2 ;
615+ final Set <String > finishedSplits = new HashSet <>();
616+
617+ try (final KafkaSourceReader <Integer > reader =
618+ (KafkaSourceReader <Integer >)
619+ createReader (
620+ Boundedness .BOUNDED ,
621+ "groupId" ,
622+ new TestingReaderContext (),
623+ finishedSplits ::addAll ,
624+ r ->
625+ r
626+ == (finishPartitionIndex * NUM_RECORDS_PER_PARTITION
627+ + readRecordNumInFinishedSplit ))) {
628+ List <KafkaPartitionSplit > splits = new ArrayList <>();
629+ List <Integer > excepted = new ArrayList <>();
630+ for (int i = 0 ; i < readSplits ; i ++) {
631+ splits .add (
632+ new KafkaPartitionSplit (
633+ new TopicPartition (TOPIC , i ), 0 , Integer .MAX_VALUE ));
634+ excepted .addAll (
635+ IntStream .range (
636+ i * NUM_RECORDS_PER_SPLIT ,
637+ i * NUM_RECORDS_PER_SPLIT
638+ + (i == finishPartitionIndex
639+ ? readRecordNumInFinishedSplit
640+ : NUM_RECORDS_PER_SPLIT ))
641+ .boxed ()
642+ .collect (Collectors .toList ()));
643+ }
644+
645+ reader .addSplits (splits );
646+ reader .notifyNoMoreSplits ();
647+
648+ TestingReaderOutput <Integer > output = new TestingReaderOutput <>();
649+ pollUntil (
650+ reader ,
651+ output ,
652+ () -> output .getEmittedRecords ().size () == excepted .size (),
653+ "The reader cannot get the excepted result before timeout." );
654+ assertThat (finishedSplits )
655+ .containsExactly (new TopicPartition (TOPIC , finishPartitionIndex ).toString ());
656+ assertThat (output .getEmittedRecords ())
657+ .containsExactlyInAnyOrder (excepted .toArray (new Integer [0 ]));
658+
659+ InputStatus status = reader .pollNext (output );
660+ assertThat (output .getEmittedRecords ().size ()).isEqualTo (excepted .size ());
661+ assertThat (status ).isEqualTo (NOTHING_AVAILABLE );
662+ }
663+ }
664+
548665 // ------------------------------------------
549666
550667 @ Override
@@ -599,17 +716,28 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
599716 SourceReaderContext context ,
600717 Consumer <Collection <String >> splitFinishedHook )
601718 throws Exception {
719+ return createReader (boundedness , groupId , context , splitFinishedHook , null );
720+ }
721+
722+ private SourceReader <Integer , KafkaPartitionSplit > createReader (
723+ Boundedness boundedness ,
724+ String groupId ,
725+ SourceReaderContext context ,
726+ Consumer <Collection <String >> splitFinishedHook ,
727+ @ Nullable RecordEvaluator <Integer > recordEvaluator )
728+ throws Exception {
602729 Properties properties = new Properties ();
603730 properties .setProperty (ConsumerConfig .GROUP_ID_CONFIG , groupId );
604- return createReader (boundedness , context , splitFinishedHook , properties , null );
731+ return createReader (boundedness , context , splitFinishedHook , properties , null , recordEvaluator );
605732 }
606733
607734 private SourceReader <Integer , KafkaPartitionSplit > createReader (
608735 Boundedness boundedness ,
609736 SourceReaderContext context ,
610737 Consumer <Collection <String >> splitFinishedHook ,
611738 Properties props ,
612- SerializableSupplier <String > rackIdSupplier )
739+ SerializableSupplier <String > rackIdSupplier ,
740+ @ Nullable RecordEvaluator <Integer > recordEvaluator )
613741 throws Exception {
614742 KafkaSourceBuilder <Integer > builder =
615743 KafkaSource .<Integer >builder ()
@@ -622,7 +750,8 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
622750 ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
623751 KafkaSourceTestEnv .brokerConnectionStrings )
624752 .setProperty (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , "false" )
625- .setProperties (props );
753+ .setProperties (props )
754+ .setEofRecordEvaluator (recordEvaluator );
626755 if (boundedness == Boundedness .BOUNDED ) {
627756 builder .setBounded (OffsetsInitializer .latest ());
628757 }
0 commit comments