Skip to content

Commit a227738

Browse files
authored
Implementation for new SMT ExtractTopicFromValueSchema and tests (#93)
* Implementation for new SMT ExtractTopicFromValueSchema and tests
1 parent 02e1d15 commit a227738

File tree

7 files changed

+591
-10
lines changed

7 files changed

+591
-10
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,40 @@ Either `field.value` or `field.value.pattern` must be defined to apply filter.
162162

163163
Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored.
164164

165+
### `ExtractTopicFromSchemaName`
166+
167+
This transformation checks the schema name and if it exists uses it as the topic name.
168+
169+
- `io.aiven.kafka.connect.transforms.ExtractTopicFromSchemaName$Value` - works on value schema name.
170+
171+
Currently this transformation only has implementation for record value schema name. Key schema name is not implemented.
172+
173+
By default (if schema.name.topic-map or the chema.name.regex is not set) transformation uses the content of the schema.name field.
174+
175+
The transformation defines the following optional configurations which can be used to tamper the schema.name:
176+
177+
- `schema.name.topic-map` - Map that contains the schema.name value and corresponding new topic name value that should be used instead. Format is "SchemaValue1:NewValue1,SchemaValue2:NewValue2" so key:value pairs as comma separated list.
178+
- `schema.name.regex` - RegEx that should be used to parse the schema.name to desired value. For example for example `(?:[.]|^)([^.]*)$` which parses the name after last dot.
179+
180+
Here is an example of this transformation configuration (using :schema.name.topic-map)
181+
182+
```properties
183+
transforms=ExtractTopicFromSchemaName
184+
transforms.ExtractTopicFromSchemaName.type=io.aiven.kafka.connect.transforms.ExtractTopicFromSchemaName$Value
185+
transforms.ExtractTopicFromSchemaName.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2
186+
187+
```
188+
And here is an example of this transformation configuration (using :schema.name.regex)
189+
```properties
190+
transforms=ExtractTopicFromValueSchema
191+
transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.ExtractTopicFromSchemaName$Value
192+
transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$
193+
165194
## License
166195

167196
This project is licensed under the [Apache License, Version 2.0](LICENSE).
168197

169198
## Trademarks
170199

171200
Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
201+

src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ final class IntegrationTest {
5656
private final TopicPartition newTopicPartition0 =
5757
new TopicPartition(TestSourceConnector.NEW_TOPIC, 0);
5858

59+
private final TopicPartition originalTopicValueFromSchema =
60+
new TopicPartition(TopicFromValueSchemaConnector.TOPIC, 0);
61+
62+
private final TopicPartition newTopicValueFromSchema =
63+
new TopicPartition(TopicFromValueSchemaConnector.NAME, 0);
5964
private static File pluginsDir;
6065

6166
@Container
@@ -92,7 +97,9 @@ static void setUpAll() throws IOException, InterruptedException {
9297
assert integrationTestClassesPath.exists();
9398

9499
final Class<?>[] testConnectorClasses = new Class[]{
95-
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class
100+
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class,
101+
TopicFromValueSchemaConnector.class,
102+
TopicFromValueSchemaConnector.TopicFromValueSchemaConnectorTask.class
96103
};
97104
for (final Class<?> clazz : testConnectorClasses) {
98105
final String packageName = clazz.getPackage().getName();
@@ -127,7 +134,12 @@ void setUp() throws ExecutionException, InterruptedException {
127134

128135
final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1);
129136
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
130-
adminClient.createTopics(Arrays.asList(originalTopic, newTopic)).all().get();
137+
final NewTopic originalTopicForExtractTopicFromValue =
138+
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
139+
final NewTopic newTopicForExtractTopicFromValue =
140+
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
141+
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
142+
newTopicForExtractTopicFromValue)).all().get();
131143

132144
connectRunner = new ConnectRunner(pluginsDir, kafka.getBootstrapServers());
133145
connectRunner.start();
@@ -159,19 +171,42 @@ final void testExtractTopic() throws ExecutionException, InterruptedException, I
159171
connectorConfig.put("tasks.max", "1");
160172
connectRunner.createConnector(connectorConfig);
161173

174+
checkMessageTopics(originalTopicPartition0, newTopicPartition0);
175+
}
176+
177+
@Test
178+
@Timeout(10)
179+
final void testExtractTopicFromValueSchemaName() throws ExecutionException, InterruptedException, IOException {
180+
final Map<String, String> connectorConfig = new HashMap<>();
181+
connectorConfig.put("name", "test-source-connector");
182+
connectorConfig.put("connector.class", TopicFromValueSchemaConnector.class.getName());
183+
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
184+
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
185+
connectorConfig.put("value.converter.value.subject.name.strategy",
186+
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
187+
connectorConfig.put("transforms",
188+
"ExtractTopicFromSchemaName");
189+
connectorConfig.put("transforms.ExtractTopicFromSchemaName.type",
190+
"io.aiven.kafka.connect.transforms.ExtractTopicFromSchemaName$Value");
191+
connectorConfig.put("tasks.max", "1");
192+
connectRunner.createConnector(connectorConfig);
193+
checkMessageTopics(originalTopicValueFromSchema, newTopicValueFromSchema);
194+
195+
}
196+
197+
final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition)
198+
throws InterruptedException {
162199
waitForCondition(
163-
() -> consumer
164-
.endOffsets(Arrays.asList(originalTopicPartition0, newTopicPartition0))
165-
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
166-
.orElse(false),
167-
5000, "Messages appear in any topic"
200+
() -> consumer.endOffsets(Arrays.asList(originalTopicPartition, newTopicPartition))
201+
.values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE)
202+
.orElse(false), 5000, "Messages appear in any topic"
168203
);
169204
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(
170-
Arrays.asList(originalTopicPartition0, newTopicPartition0));
205+
Arrays.asList(originalTopicPartition, newTopicPartition));
171206
// The original topic should be empty.
172-
assertEquals(0, endOffsets.get(originalTopicPartition0));
207+
assertEquals(0, endOffsets.get(originalTopicPartition));
173208
// The new topic should be non-empty.
174-
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition0));
209+
assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition));
175210
}
176211

177212
private void waitForCondition(final Supplier<Boolean> conditionChecker,
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.connect.connector.Task;
26+
import org.apache.kafka.connect.data.Schema;
27+
import org.apache.kafka.connect.data.SchemaBuilder;
28+
import org.apache.kafka.connect.data.Struct;
29+
import org.apache.kafka.connect.source.SourceConnector;
30+
import org.apache.kafka.connect.source.SourceRecord;
31+
import org.apache.kafka.connect.source.SourceTask;
32+
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/**
37+
* A connector needed for testing of ExtractTopicFromValueSchema.
38+
*
39+
* <p>It just produces a fixed number of struct records with value schema name set.
40+
*/
41+
public class TopicFromValueSchemaConnector extends SourceConnector {
42+
static final int MESSAGES_TO_PRODUCE = 10;
43+
44+
private static final Logger log = LoggerFactory.getLogger(TopicFromValueSchemaConnector.class);
45+
static final String TOPIC = "topic-for-value-schema-connector-test";
46+
static final String FIELD = "field-0";
47+
48+
static final String NAME = "com.acme.schema.SchemaNameToTopic";
49+
50+
@Override
51+
public void start(final Map<String, String> props) {
52+
}
53+
54+
@Override
55+
public Class<? extends Task> taskClass() {
56+
return TopicFromValueSchemaConnectorTask.class;
57+
}
58+
59+
@Override
60+
public List<Map<String, String>> taskConfigs(final int maxTasks) {
61+
return Collections.singletonList(Collections.emptyMap());
62+
}
63+
64+
@Override
65+
public void stop() {
66+
}
67+
68+
@Override
69+
public ConfigDef config() {
70+
return new ConfigDef();
71+
}
72+
73+
@Override
74+
public String version() {
75+
return null;
76+
}
77+
78+
public static class TopicFromValueSchemaConnectorTask extends SourceTask {
79+
private int counter = 0;
80+
81+
private final Schema valueSchema = SchemaBuilder.struct()
82+
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
83+
.name(NAME)
84+
.schema();
85+
private final Struct value = new Struct(valueSchema).put(FIELD, "Data");
86+
87+
@Override
88+
public void start(final Map<String, String> props) {
89+
log.info("Started TopicFromValueSchemaConnector!!!");
90+
}
91+
92+
@Override
93+
public List<SourceRecord> poll() throws InterruptedException {
94+
if (counter >= MESSAGES_TO_PRODUCE) {
95+
return null; // indicate pause
96+
}
97+
98+
final Map<String, String> sourcePartition = new HashMap<>();
99+
sourcePartition.put("partition", "0");
100+
final Map<String, String> sourceOffset = new HashMap<>();
101+
sourceOffset.put("offset", Integer.toString(counter));
102+
103+
counter += 1;
104+
105+
return Collections.singletonList(
106+
new SourceRecord(sourcePartition, sourceOffset,
107+
TOPIC,
108+
valueSchema, value)
109+
);
110+
}
111+
112+
@Override
113+
public void stop() {
114+
}
115+
116+
@Override
117+
public String version() {
118+
return null;
119+
}
120+
}
121+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
import java.util.Optional;
21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
23+
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.connect.connector.ConnectRecord;
26+
import org.apache.kafka.connect.errors.DataException;
27+
import org.apache.kafka.connect.transforms.Transformation;
28+
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public abstract class ExtractTopicFromSchemaName<R extends ConnectRecord<R>> implements Transformation<R> {
33+
34+
private static final Logger log = LoggerFactory.getLogger(ExtractTopicFromSchemaName.class);
35+
36+
private Map<String, String> schemaNameToTopicMap;
37+
private Pattern pattern;
38+
39+
@Override
40+
public ConfigDef config() {
41+
return ExtractTopicFromSchemaNameConfig.config();
42+
}
43+
44+
@Override
45+
public void configure(final Map<String, ?> configs) {
46+
final ExtractTopicFromSchemaNameConfig config = new ExtractTopicFromSchemaNameConfig(configs);
47+
schemaNameToTopicMap = config.schemaNameToTopicMap();
48+
final Optional<String> regex = config.regEx();
49+
regex.ifPresent(s -> pattern = Pattern.compile(s));
50+
}
51+
52+
public abstract String schemaName(R record);
53+
54+
@Override
55+
public R apply(final R record) {
56+
57+
final String schemaName = schemaName(record);
58+
// First check schema value name -> desired topic name mapping and use that if it is set.
59+
if (schemaNameToTopicMap.containsKey(schemaName)) {
60+
return createConnectRecord(record, schemaNameToTopicMap.get(schemaName));
61+
}
62+
// Secondly check if regex parsing from schema value name is set and use that.
63+
if (pattern != null) {
64+
final Matcher matcher = pattern.matcher(schemaName);
65+
if (matcher.find() && matcher.groupCount() == 1) {
66+
return createConnectRecord(record, matcher.group(1));
67+
}
68+
log.trace("No match with pattern {} from schema name {}", pattern.pattern(), schemaName);
69+
}
70+
// If no other configurations are set use value schema name as new topic name.
71+
return createConnectRecord(record, schemaName);
72+
}
73+
74+
private R createConnectRecord(final R record, final String newTopicName) {
75+
return record.newRecord(
76+
newTopicName,
77+
record.kafkaPartition(),
78+
record.keySchema(),
79+
record.key(),
80+
record.valueSchema(),
81+
record.value(),
82+
record.timestamp(),
83+
record.headers()
84+
);
85+
}
86+
87+
@Override
88+
public void close() {
89+
}
90+
91+
public static class Value<R extends ConnectRecord<R>> extends ExtractTopicFromSchemaName<R> {
92+
@Override
93+
public String schemaName(final R record) {
94+
if (null == record.valueSchema() || null == record.valueSchema().name()) {
95+
throw new DataException(" value schema name can't be null: " + record);
96+
}
97+
return record.valueSchema().name();
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)