From f15bab08adf9db9148e71f60fa0fefba0679a86a Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 7 Jan 2022 20:39:04 +0800 Subject: [PATCH] enable pulsar topic offset auto commit --- .../connectors/pulsar/FlinkPulsarSource.java | 53 +++++++++++++++++++ .../pulsar/table/PulsarTableOptions.java | 13 +++++ 2 files changed, 66 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 44eb5f1d..5d396b84 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -58,11 +58,13 @@ import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription; import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer; import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarDeserializationSchema; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TimeUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; @@ -73,6 +75,7 @@ import org.apache.pulsar.shade.com.google.common.collect.Maps; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -85,6 +88,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -238,6 +244,12 @@ public class FlinkPulsarSource extends RichParallelSourceFunction private long startupOffsetsTimestamp = -1L; + private boolean enableOffsetAutoCommit; + + private Duration offsetAutoCommitInterval; + + private ScheduledExecutorService offsetCommitScheduler; + public FlinkPulsarSource( String adminUrl, ClientConfigurationData clientConf, @@ -264,6 +276,22 @@ public FlinkPulsarSource( } this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion); + + this.enableOffsetAutoCommit = + Boolean.parseBoolean( + properties + .getOrDefault( + PulsarTableOptions.ENABLE_OFFSET_AUTO_COMMIT.key(), "true") + .toString()); + if (enableOffsetAutoCommit) { + this.offsetAutoCommitInterval = + TimeUtils.parseDuration( + properties + .getOrDefault( + PulsarTableOptions.OFFSET_AUTO_COMMIT_INTERVAL.key(), + "60 s") + .toString()); + } } public FlinkPulsarSource( @@ -551,6 +579,27 @@ public void open(Configuration parameters) throws Exception { ownedTopicStarts); } } + + if (!((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled() + && enableOffsetAutoCommit) { + this.offsetCommitScheduler = Executors.newScheduledThreadPool(1); + this.offsetCommitScheduler.scheduleAtFixedRate( + () -> { + if (pulsarFetcher != null) { + Map consumedOffsets = + pulsarFetcher.snapshotCurrentState(); + try { + pulsarFetcher.commitOffsetToPulsar( + consumedOffsets, offsetCommitCallback); + } catch (InterruptedException e) { + + } + } + }, + 0, + offsetAutoCommitInterval != null ? offsetAutoCommitInterval.getSeconds() : 60, + TimeUnit.SECONDS); + } } protected String getSubscriptionName() { @@ -742,6 +791,10 @@ public void close() throws Exception { } } + if (offsetCommitScheduler != null) { + offsetCommitScheduler.shutdown(); + } + try { super.close(); } catch (Exception e) { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java index df17860d..a2356ef0 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.impl.MessageIdImpl; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -245,6 +246,18 @@ public class PulsarTableOptions { .defaultValue(Collections.emptyMap()) .withDescription("Optional pulsar config."); + public static final ConfigOption ENABLE_OFFSET_AUTO_COMMIT = + ConfigOptions.key("enable.auto.commit") + .booleanType() + .defaultValue(true) + .withDescription("enable offset auto commit by pulsar connector"); + + public static final ConfigOption OFFSET_AUTO_COMMIT_INTERVAL = + ConfigOptions.key("offset.auto.commit.interval") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription("offset auto commit interval for pulsar source."); + // -------------------------------------------------------------------------------------------- // Option enumerations // --------------------------------------------------------------------------------------------