Skip to content

Commit a412a00

Browse files
jkbeMartijnVisser
authored andcommitted
[FLINK-20625][pubsub,e2e] Add basic Pub/Sub source using new interface
* WIP * WIP * Working WIP * Clean up * Place new Pub/Sub source into existing Pub/Sub connector module * Clean up * Apply Spotless code formatting [FLINK-20625][pubsub,e2e] Attempt to support stopping the reader when stopmark is encountered [FLINK-20625][pubsub,e2e] Add checkpointing and do some refactorings * Simplify fetching from Pub/Sub in SplitReader * Allow Pub/Sub source to be only continuous unbounded * Add basic PubSubSource builder * Add configuration options for SubscriberFactory to PubSubSource, remove unused collector * Add checkpointing [FLINK-20625][pubsub,e2e] Allow multiple records inside single Pub/Sub message for deserialization [FLINK-20625][pubsub,e2e] Add Javadocs, README and clean up [FLINK-20625][pubsub,e2e] Reduce visibility of classes and their members [FLINK-20625][pubsub,e2e] Propagate Pub/Sub subscriber creation errors from SplitReader [FLINK-20625][pubsub,e2e] Use constants for default Pub/Sub subscriber parameters [FLINK-20625][pubsub,e2e] Fix dynamic Scala version in artifact example [FLINK-20625][pubsub,e2e] Rename PubSubEnumeratorCheckpoint -> PubSubEnumeratorState [FLINK-20625][pubsub,e2e] Add version checks for deserialization [FLINK-20625][pubsub,e2e] Remove unnecessary declaration of exception-throwing [FLINK-20625][pubsub,e2e] Remove disfunctional end-of-stream logic [FLINK-20625][pubsub,e2e] Avoid concurrency issues with list of Pub/Sub messages to acknowledge [FLINK-20625][pubsub,e2e] Refactor PubSubSourceBuilder [FLINK-20625][pubsub,e2e] Clarify consistency guarantee description [FLINK-20625][pubsub,e2e] Clarify Pub/Sub request timeout [FLINK-20625][pubsub,e2e] Restructure and extend readme, add basic architecture info to docstring [FLINK-20625][pubsub,e2e] Attempt to solve concurrency issues with checkpointing
1 parent d7ed02b commit a412a00

File tree

16 files changed

+1288
-4
lines changed

16 files changed

+1288
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.gcp.pubsub;
20+
21+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
22+
import org.apache.flink.api.common.functions.RichMapFunction;
23+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
24+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
25+
import org.apache.flink.streaming.api.datastream.DataStream;
26+
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
29+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
30+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
31+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
32+
import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
33+
34+
import com.google.cloud.pubsub.v1.Publisher;
35+
import com.google.protobuf.ByteString;
36+
import com.google.pubsub.v1.PubsubMessage;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
41+
import java.time.Duration;
42+
import java.util.ArrayList;
43+
import java.util.Arrays;
44+
import java.util.List;
45+
import java.util.concurrent.ExecutionException;
46+
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertTrue;
49+
50+
/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
51+
public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase {
52+
private static final String PROJECT_NAME = "FLProject";
53+
private static final String TOPIC_NAME = "FLTopic";
54+
private static final String SUBSCRIPTION_NAME = "FLSubscription";
55+
56+
private static PubsubHelper pubsubHelper;
57+
58+
@Before
59+
public void setUp() throws Exception {
60+
pubsubHelper = getPubsubHelper();
61+
pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
62+
pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
63+
}
64+
65+
@After
66+
public void tearDown() throws Exception {
67+
pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
68+
pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
69+
}
70+
71+
public void testFlinkSource(boolean testWithFailure) throws Exception {
72+
// Create some messages and put them into pubsub
73+
List<String> input =
74+
Arrays.asList(
75+
"One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine",
76+
"Ten");
77+
78+
List<String> messagesToSend = new ArrayList<>(input);
79+
80+
// Publish the messages into PubSub
81+
Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
82+
messagesToSend.forEach(
83+
s -> {
84+
try {
85+
publisher
86+
.publish(
87+
PubsubMessage.newBuilder()
88+
.setData(ByteString.copyFromUtf8(s))
89+
.build())
90+
.get();
91+
} catch (InterruptedException | ExecutionException e) {
92+
e.printStackTrace();
93+
}
94+
});
95+
96+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
97+
env.enableCheckpointing(100);
98+
env.setParallelism(1);
99+
if (testWithFailure) {
100+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000));
101+
} else {
102+
env.setRestartStrategy(RestartStrategies.noRestart());
103+
}
104+
105+
PubSubSource<String> source =
106+
PubSubSource.<String>builder()
107+
.setDeserializationSchema(new SimpleStringSchema())
108+
.setProjectName(PROJECT_NAME)
109+
.setSubscriptionName(SUBSCRIPTION_NAME)
110+
.setCredentials(EmulatorCredentials.getInstance())
111+
.setPubSubSubscriberFactory(
112+
new PubSubSubscriberFactoryForEmulator(
113+
getPubSubHostPort(),
114+
PROJECT_NAME,
115+
SUBSCRIPTION_NAME,
116+
10,
117+
Duration.ofSeconds(1),
118+
3))
119+
.build();
120+
121+
DataStream<String> fromPubSub =
122+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source");
123+
124+
if (testWithFailure) {
125+
fromPubSub = fromPubSub.map(new FailureMapFunction<>(3));
126+
}
127+
128+
List<String> output = new ArrayList<>();
129+
DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add);
130+
131+
assertEquals("Wrong number of elements", input.size(), output.size());
132+
for (String test : input) {
133+
assertTrue("Missing " + test, output.contains(test));
134+
}
135+
}
136+
137+
private class FailureMapFunction<T> extends RichMapFunction<T, T> {
138+
private final long numberOfRecordsUntilFailure;
139+
private long numberOfRecordsProcessed;
140+
141+
private FailureMapFunction(long numberOfRecordsBeforeFailure) {
142+
this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure;
143+
}
144+
145+
@Override
146+
public T map(T value) throws Exception {
147+
numberOfRecordsProcessed++;
148+
149+
if (shouldThrowException()) {
150+
throw new Exception(
151+
"Deliberately thrown exception to induce crash for failure recovery testing.");
152+
}
153+
return value;
154+
}
155+
156+
private boolean shouldThrowException() {
157+
return getRuntimeContext().getAttemptNumber() <= 1
158+
&& (numberOfRecordsProcessed >= numberOfRecordsUntilFailure);
159+
}
160+
}
161+
162+
// IMPORTANT: This test makes use of things that happen in the emulated PubSub that
163+
// are GUARANTEED to be different in the real Google hosted PubSub.
164+
// So running these tests against the real thing will have a very high probability of
165+
// failing.
166+
// The assumptions:
167+
// 1) The ordering of the messages is maintained.
168+
// 2) Exactly once: We assume that every message we put in comes out exactly once.
169+
// In the real PubSub there are a lot of situations (mostly failure/retry) where this is not
170+
// true.
171+
@Test
172+
public void testFlinkSourceOk() throws Exception {
173+
testFlinkSource(false);
174+
}
175+
176+
@Test
177+
public void testFlinkSourceFailure() throws Exception {
178+
testFlinkSource(true);
179+
}
180+
}

flink-connector-gcp-pubsub/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ under the License.
3535
<packaging>jar</packaging>
3636

3737
<dependencies>
38+
<dependency>
39+
<groupId>org.apache.flink</groupId>
40+
<artifactId>flink-connector-base</artifactId>
41+
<version>${project.version}</version>
42+
</dependency>
43+
3844
<dependency>
3945
<groupId>org.apache.flink</groupId>
4046
<artifactId>flink-streaming-java</artifactId>

flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DefaultPubSubSubscriberFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
2222
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
23+
import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
2324

2425
import com.google.auth.Credentials;
2526
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
@@ -34,13 +35,26 @@
3435
import java.io.IOException;
3536
import java.time.Duration;
3637

37-
class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
38+
/**
39+
* A default {@link PubSubSubscriberFactory} used by the {@link PubSubSource.PubSubSourceBuilder} to
40+
* obtain a subscriber with which messages can be pulled from GCP Pub/Sub.
41+
*/
42+
public class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
3843
private final int retries;
3944
private final Duration timeout;
4045
private final int maxMessagesPerPull;
4146
private final String projectSubscriptionName;
4247

43-
DefaultPubSubSubscriberFactory(
48+
/**
49+
* @param projectSubscriptionName The formatted name of the Pub/Sub project and subscription to
50+
* pull messages from. Can be easily obtained through {@link
51+
* com.google.pubsub.v1.ProjectSubscriptionName}.
52+
* @param retries The number of times the reception of a message should be retried in case of
53+
* failure.
54+
* @param pullTimeout The timeout after which a message pull request is deemed a failure
55+
* @param maxMessagesPerPull The maximum number of messages that should be pulled in one go.
56+
*/
57+
public DefaultPubSubSubscriberFactory(
4458
String projectSubscriptionName,
4559
int retries,
4660
Duration pullTimeout,

flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/DeserializationSchemaWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
* This class wraps a {@link DeserializationSchema} so it can be used in a {@link PubSubSource} as a
3030
* {@link PubSubDeserializationSchema}.
3131
*/
32-
class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {
32+
public class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {
3333
private final DeserializationSchema<T> deserializationSchema;
3434

35-
DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
35+
public DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
3636
this.deserializationSchema = deserializationSchema;
3737
}
3838

0 commit comments

Comments
 (0)