Skip to content

Commit a719cc5

Browse files
Merge branch '10.2.x' into master
2 parents 1cc56e9 + 1786d57 commit a719cc5

File tree

7 files changed

+58
-21
lines changed

7 files changed

+58
-21
lines changed

src/main/java/io/confluent/connect/hdfs/FileUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package io.confluent.connect.hdfs;
1717

18+
import com.google.re2j.Matcher;
19+
1820
import org.apache.hadoop.fs.FileStatus;
1921
import org.apache.hadoop.fs.FileSystem;
2022
import org.apache.hadoop.fs.Path;
@@ -27,7 +29,6 @@
2729
import java.util.ArrayList;
2830
import java.util.List;
2931
import java.util.UUID;
30-
import java.util.regex.Matcher;
3132

3233
import io.confluent.connect.hdfs.filter.CommittedFileFilter;
3334
import io.confluent.connect.hdfs.storage.Storage;

src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515

1616
package io.confluent.connect.hdfs;
1717

18+
import com.google.re2j.Matcher;
19+
import com.google.re2j.Pattern;
20+
import com.google.re2j.PatternSyntaxException;
1821
import io.confluent.connect.hdfs.orc.OrcFormat;
1922
import java.util.ArrayList;
2023
import java.util.Collections;
21-
import java.util.regex.Matcher;
22-
import java.util.regex.Pattern;
23-
import java.util.regex.PatternSyntaxException;
2424
import io.confluent.connect.hdfs.parquet.ParquetFormat;
2525
import io.confluent.connect.hdfs.string.StringFormat;
2626
import org.apache.commons.lang.StringUtils;
@@ -402,10 +402,14 @@ protected HdfsSinkConnectorConfig(ConfigDef configDef, Map<String, String> props
402402
this.url = extractUrl();
403403
try {
404404
String topicRegex = getString(TOPIC_CAPTURE_GROUPS_REGEX_CONFIG);
405-
this.topicRegexCaptureGroup = topicRegex != null ? Pattern.compile(topicRegex) : null;
405+
if (topicRegex != null) {
406+
this.topicRegexCaptureGroup = Pattern.compile(topicRegex);
407+
} else {
408+
this.topicRegexCaptureGroup = null;
409+
}
406410
} catch (PatternSyntaxException e) {
407411
throw new ConfigException(
408-
TOPIC_CAPTURE_GROUPS_REGEX_CONFIG + " is an invalid regex pattern: ",
412+
TOPIC_CAPTURE_GROUPS_REGEX_CONFIG + " is an invalid regex pattern: " + e.getMessage(),
409413
e
410414
);
411415
}

src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package io.confluent.connect.hdfs;
1717

18-
import java.util.regex.Pattern;
18+
import com.google.re2j.Pattern;
1919

2020
public class HdfsSinkConnectorConstants {
2121

src/main/java/io/confluent/connect/hdfs/filter/CommittedFileFilter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
import io.confluent.connect.hdfs.HdfsSinkConnectorConstants;
1919

20+
import com.google.re2j.Matcher;
2021
import org.apache.hadoop.fs.Path;
2122
import org.apache.hadoop.fs.PathFilter;
2223

23-
import java.util.regex.Matcher;
24-
25-
2624
public class CommittedFileFilter implements PathFilter {
2725
@Override
2826
public boolean accept(Path path) {

src/main/java/io/confluent/connect/hdfs/filter/TopicCommittedFileFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515

1616
package io.confluent.connect.hdfs.filter;
1717

18+
import com.google.re2j.Matcher;
1819
import org.apache.hadoop.fs.Path;
1920

2021
import io.confluent.connect.hdfs.HdfsSinkConnectorConstants;
2122

22-
import java.util.regex.Matcher;
23-
2423
public class TopicCommittedFileFilter extends CommittedFileFilter {
2524
private String topic;
2625

src/main/java/io/confluent/connect/hdfs/filter/TopicPartitionCommittedFileFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515

1616
package io.confluent.connect.hdfs.filter;
1717

18+
import com.google.re2j.Matcher;
1819
import org.apache.hadoop.fs.Path;
1920
import org.apache.kafka.common.TopicPartition;
2021

21-
import java.util.regex.Matcher;
22-
2322
import io.confluent.connect.hdfs.HdfsSinkConnectorConstants;
2423

2524
public class TopicPartitionCommittedFileFilter extends CommittedFileFilter {

src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ public void testHiveTableNameValidation() {
8282
configException.getMessage());
8383
}
8484

85-
@Test
8685
public void testValidRegexCaptureGroup() {
8786
String topic = "topica";
8887
String topicDir = "topic.another.${topic}.again";
@@ -172,15 +171,52 @@ public void testInvalidRegexCaptureGroup() {
172171
connectorConfig = new HdfsSinkConnectorConfig(properties);
173172
}
174173

175-
@Test(expected = ConfigException.class)
176-
public void testInvalidRegexCaptureGroupDoesntMatchTopic() {
174+
@Test
175+
public void testRegexDoesNotFullyMatchButTopicDirUsesOnlyTopicPlaceholder() {
177176
String topic = "topica";
178-
String topicDir = "topic.another.${topic}.again";
177+
String topicDirPattern = "logs/${topic}/data";
178+
properties.put(HdfsSinkConnectorConfig.TOPIC_CAPTURE_GROUPS_REGEX_CONFIG, "[a-z]+");
179+
properties.put(StorageCommonConfig.TOPICS_DIR_CONFIG, topicDirPattern);
180+
connectorConfig = new HdfsSinkConnectorConfig(properties);
179181

180-
properties.put(HdfsSinkConnectorConfig.TOPIC_CAPTURE_GROUPS_REGEX_CONFIG, "[a-z]");
181-
properties.put(StorageCommonConfig.TOPICS_DIR_CONFIG, topicDir);
182+
String expectedDir = "logs/topica/data";
183+
assertEquals(expectedDir, connectorConfig.getTopicsDirFromTopic(topic));
184+
}
185+
186+
@Test
187+
public void testRegexDoesNotMatchAndTopicDirUsesCaptureGroupsShouldThrow() {
188+
String topic = "this_topic_will_not_match_pattern";
189+
String topicDirPattern = "groups/${1}/data";
190+
properties.put(HdfsSinkConnectorConfig.TOPIC_CAPTURE_GROUPS_REGEX_CONFIG, "(\\d+)");
191+
properties.put(StorageCommonConfig.TOPICS_DIR_CONFIG, topicDirPattern);
182192
connectorConfig = new HdfsSinkConnectorConfig(properties);
183-
connectorConfig.getTopicsDirFromTopic(topic);
193+
ConfigException ex = assertThrows(ConfigException.class, () -> {
194+
connectorConfig.getTopicsDirFromTopic(topic);
195+
});
196+
197+
assertTrue(
198+
"Exception message should indicate topic-regex mismatch",
199+
ex.getMessage().contains("does not fully match the specified regex")
200+
|| ex.getMessage().contains("Requested regex group 1 is not available")
201+
);
202+
}
203+
204+
@Test
205+
public void testRegexMatchesButTopicDirUsesNonExistentCaptureGroupShouldThrow() {
206+
String topic = "data-set-alpha";
207+
String topicDirPattern = "output/${1}/${2}";
208+
209+
properties.put(HdfsSinkConnectorConfig.TOPIC_CAPTURE_GROUPS_REGEX_CONFIG, "data-set-([a-z]+)");
210+
properties.put(StorageCommonConfig.TOPICS_DIR_CONFIG, topicDirPattern);
211+
connectorConfig = new HdfsSinkConnectorConfig(properties);
212+
213+
ConfigException ex = assertThrows(ConfigException.class, () -> {
214+
connectorConfig.getTopicsDirFromTopic(topic);
215+
});
216+
assertTrue(
217+
"Exception message should indicate missing regex group",
218+
ex.getMessage().contains("actually had 1 capture groups")
219+
);
184220
}
185221

186222
@Test(expected = ConfigException.class)

0 commit comments

Comments
 (0)