From d0a4d6e0af311a3c91e8e7368b6ec65b11433424 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Tue, 30 Nov 2021 16:39:49 +0800 Subject: [PATCH 1/3] fix incorrect method getUseEarliestWhenDataLossAndRemoveKey --- .../connectors/pulsar/internal/SourceSinkUtils.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java index 9db11f1f..618dbf49 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java @@ -235,12 +235,15 @@ public static boolean getFailOnDataLossAndRemoveKey(Map readerCo } public static boolean getUseEarliestWhenDataLossAndRemoveKey(Map readerConf) { - String failOnDataLossVal = - readerConf - .getOrDefault(PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY, "false") - .toString(); + String key = PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY; + if (!readerConf.containsKey(key)) { + key = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, key); + } + String failOnDataLossVal = readerConf + .getOrDefault(key, "false") + .toString(); final boolean value = Boolean.parseBoolean(failOnDataLossVal); - readerConf.remove(PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY); + readerConf.remove(key); return value; } } From aae11aaacbf2b74a63f3b414b923689178cca729 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Tue, 30 Nov 2021 16:43:01 +0800 Subject: [PATCH 2/3] fix-useEarliestWhenDataLoss --- .../streaming/connectors/pulsar/internal/PulsarOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java index 76f4f677..c8b10bd0 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java @@ -67,7 +67,7 @@ public class PulsarOptions { public static final String OLD_STATE_VERSION = "old-state-version"; public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "failOnDataLoss"; public static final String USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY = - "use-earliest-when-data-loss"; + "useEarliestWhenDataLoss"; public static final String SEND_DELAY_MILLISECONDS = "send-delay-millisecond"; public static final String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = From 522bdd40aca8cd8a94c9aa8ac0c85d52fae130e9 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Tue, 30 Nov 2021 16:48:43 +0800 Subject: [PATCH 3/3] fix unit test error --- .../streaming/connectors/pulsar/internal/PulsarOptions.java | 3 +-- .../streaming/connectors/pulsar/internal/SourceSinkUtils.java | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java index c8b10bd0..7c341b9c 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java @@ -66,8 +66,7 @@ public class PulsarOptions { public static final String KEY_DISABLED_METRICS = "key-disable-metrics"; public static final String OLD_STATE_VERSION = "old-state-version"; public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "failOnDataLoss"; - public static final String USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY = - "useEarliestWhenDataLoss"; + public static final String USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY = "useEarliestWhenDataLoss"; public static final String SEND_DELAY_MILLISECONDS = "send-delay-millisecond"; public static final String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java index 618dbf49..f7b10e0e 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java @@ -239,9 +239,7 @@ public static boolean getUseEarliestWhenDataLossAndRemoveKey(Map if (!readerConf.containsKey(key)) { key = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, key); } - String failOnDataLossVal = readerConf - .getOrDefault(key, "false") - .toString(); + String failOnDataLossVal = readerConf.getOrDefault(key, "false").toString(); final boolean value = Boolean.parseBoolean(failOnDataLossVal); readerConf.remove(key); return value;