File tree Expand file tree Collapse file tree 2 files changed +26
-4
lines changed
flink-connector-kafka/src
main/java/org/apache/flink/connector/kafka/source
test/java/org/apache/flink/connector/kafka/source Expand file tree Collapse file tree 2 files changed +26
-4
lines changed Original file line number Diff line number Diff line change @@ -474,10 +474,9 @@ private void parseAndSetRequiredProperties() {
474474 true );
475475
476476 // If the source is bounded, do not run periodic partition discovery.
477- maybeOverride (
478- KafkaSourceOptions .PARTITION_DISCOVERY_INTERVAL_MS .key (),
479- "-1" ,
480- boundedness == Boundedness .BOUNDED );
477+ if (boundedness == Boundedness .BOUNDED ) {
478+ maybeOverride (KafkaSourceOptions .PARTITION_DISCOVERY_INTERVAL_MS .key (), "-1" , true );
479+ }
481480
482481 // If the client id prefix is not set, reuse the consumer group id as the client id prefix,
483482 // or generate a random string if consumer group id is not specified.
Original file line number Diff line number Diff line change @@ -217,6 +217,29 @@ public void testSettingInvalidCustomDeserializers(
217217 .hasMessageContaining (expectedError );
218218 }
219219
220+ @ Test
221+ public void testDefaultPartitionDiscovery () {
222+ final KafkaSource <String > kafkaSource = getBasicBuilder ().build ();
223+ // Commit on checkpoint and auto commit should be disabled because group.id is not specified
224+ assertThat (
225+ kafkaSource
226+ .getConfiguration ()
227+ .get (KafkaSourceOptions .PARTITION_DISCOVERY_INTERVAL_MS ))
228+ .isEqualTo (KafkaSourceOptions .PARTITION_DISCOVERY_INTERVAL_MS .defaultValue ());
229+ }
230+
231+ @ Test
232+ public void testPeriodPartitionDiscovery () {
233+ final KafkaSource <String > kafkaSource =
234+ getBasicBuilder ().setBounded (OffsetsInitializer .latest ()).build ();
235+ // Commit on checkpoint and auto commit should be disabled because group.id is not specified
236+ assertThat (
237+ kafkaSource
238+ .getConfiguration ()
239+ .get (KafkaSourceOptions .PARTITION_DISCOVERY_INTERVAL_MS ))
240+ .isEqualTo (-1L );
241+ }
242+
220243 private KafkaSourceBuilder <String > getBasicBuilder () {
221244 return new KafkaSourceBuilder <String >()
222245 .setBootstrapServers ("testServer" )
You can’t perform that action at this time.
0 commit comments