Skip to content

Commit 6efa59d

Browse files
committed
Introduce optional external.version.header config to use for ES external version instead of kafka offset for non-datastream indices
1 parent 4d5c9d7 commit 6efa59d

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@
2525
import org.apache.kafka.connect.data.Struct;
2626
import org.apache.kafka.connect.data.Time;
2727
import org.apache.kafka.connect.data.Timestamp;
28+
import org.apache.kafka.connect.errors.ConnectException;
2829
import org.apache.kafka.connect.errors.DataException;
30+
import org.apache.kafka.connect.header.Header;
2931
import org.apache.kafka.connect.json.JsonConverter;
3032
import org.apache.kafka.connect.sink.SinkRecord;
3133
import org.apache.kafka.connect.storage.Converter;
34+
import org.apache.kafka.connect.storage.HeaderConverter;
35+
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
3236
import org.elasticsearch.action.DocWriteRequest;
3337
import org.elasticsearch.action.delete.DeleteRequest;
3438
import org.elasticsearch.action.index.IndexRequest;
@@ -52,6 +56,7 @@ public class DataConverter {
5256
private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
5357

5458
private static final Converter JSON_CONVERTER;
59+
private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter();
5560
protected static final String MAP_KEY = "key";
5661
protected static final String MAP_VALUE = "value";
5762

@@ -202,7 +207,24 @@ private DocWriteRequest<?> maybeAddExternalVersioning(
202207
) {
203208
if (!config.shouldIgnoreKey(record.topic())) {
204209
request.versionType(VersionType.EXTERNAL);
205-
request.version(record.kafkaOffset());
210+
if (config.hasExternalVersionHeader()) {
211+
final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader());
212+
final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(
213+
record.topic(),
214+
versionHeader.key(),
215+
versionHeader.schema(),
216+
versionHeader.value()
217+
);
218+
try {
219+
//fromConnectHeader byte output is UTF_8
220+
request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8)));
221+
} catch (NumberFormatException e) {
222+
throw new ConnectException("Error converting to long: "
223+
+ new String(versionValue, StandardCharsets.UTF_8), e);
224+
}
225+
} else {
226+
request.version(record.kafkaOffset());
227+
}
206228
}
207229

208230
return request;

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
226226
private static final BehaviorOnMalformedDoc BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT =
227227
BehaviorOnMalformedDoc.FAIL;
228228

229+
public static final String EXTERNAL_VERSION_HEADER_CONFIG = "external.version.header";
230+
private static final String EXTERNAL_VERSION_HEADER_DOC = "Header name to pull value for"
231+
+ " external versioning, defaults to using the kafka record offset. Must have a numeric"
232+
+ " value.";
233+
private static final String EXTERNAL_VERSION_HEADER_DISPLAY = "External Version Header Name";
234+
private static final String EXTERNAL_VERSION_HEADER_DEFAULT = "";
235+
229236
public static final String WRITE_METHOD_CONFIG = "write.method";
230237
private static final String WRITE_METHOD_DOC = String.format(
231238
"Method used for writing data to Elasticsearch, and one of %s or %s. The default method is"
@@ -592,6 +599,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
592599
Width.SHORT,
593600
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
594601
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
602+
).define(
603+
EXTERNAL_VERSION_HEADER_CONFIG,
604+
Type.STRING,
605+
EXTERNAL_VERSION_HEADER_DEFAULT,
606+
Importance.LOW,
607+
EXTERNAL_VERSION_HEADER_DOC,
608+
DATA_CONVERSION_GROUP,
609+
++order,
610+
Width.SHORT,
611+
EXTERNAL_VERSION_HEADER_DISPLAY
595612
).define(
596613
WRITE_METHOD_CONFIG,
597614
Type.STRING,
@@ -874,6 +891,14 @@ public boolean useCompactMapEntries() {
874891
return getBoolean(COMPACT_MAP_ENTRIES_CONFIG);
875892
}
876893

894+
public boolean hasExternalVersionHeader() {
895+
return !getString(EXTERNAL_VERSION_HEADER_CONFIG).isEmpty();
896+
}
897+
898+
public String externalVersionHeader() {
899+
return getString(EXTERNAL_VERSION_HEADER_CONFIG);
900+
}
901+
877902
public WriteMethod writeMethod() {
878903
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
879904
}

src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,54 @@ public void deleteOnNullValue() {
325325

326326
assertEquals(key, actualRecord.id());
327327
assertEquals(index, actualRecord.index());
328+
assertEquals(sinkRecord.kafkaOffset(), actualRecord.version());
329+
}
330+
331+
@Test
332+
public void externalVersionHeaderOnDelete() {
333+
String externalVersionHeader = "version";
334+
long expectedExternalVersion = 123l;
335+
336+
props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
337+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
338+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
339+
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
340+
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
341+
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
342+
343+
SinkRecord sinkRecord = createSinkRecordWithValue(null);
344+
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);
345+
346+
DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index);
347+
348+
assertEquals(key, actualRecord.id());
349+
assertEquals(index, actualRecord.index());
350+
assertEquals(expectedExternalVersion, actualRecord.version());
351+
}
352+
353+
@Test
354+
public void externalVersionHeaderOnIndex() {
355+
String externalVersionHeader = "version";
356+
long expectedExternalVersion = 123l;
357+
358+
props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
359+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
360+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
361+
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
362+
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
363+
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
364+
365+
366+
Schema preProcessedSchema = converter.preProcessSchema(schema);
367+
Struct struct = new Struct(preProcessedSchema).put("string", "myValue");
368+
SinkRecord sinkRecord = createSinkRecordWithValue(struct);
369+
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);
370+
371+
IndexRequest actualRecord = (IndexRequest) converter.convertRecord(sinkRecord, index);
372+
373+
assertEquals(key, actualRecord.id());
374+
assertEquals(index, actualRecord.index());
375+
assertEquals(expectedExternalVersion, actualRecord.version());
328376
}
329377

330378
@Test

0 commit comments

Comments
 (0)