Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ Data Conversion
* Valid Values: [ignore, warn, fail, report]
* Importance: low

``routing.enabled``
Whether to enable routing for documents. If set to true, the connector will use routing when sending documents to OpenSearch. If set to false, no routing will be used. Default is false for backward compatibility.

* Type: boolean
* Default: false
* Importance: low

``routing.field.path``
The path of the field to pull from the payload to use as the routing value for the document. Supports nested fields using dot notation (e.g., 'customer.id'). If set, then that field from either the key or value will be used as the routing value. If not set, then the entire key or value will be used as the routing value. The value will be added to the PUT request to OpenSearch as the \"routing=...\" argument. Only used if routing.enabled is true.

* Type: string
* Default: null
* Importance: low


``routing.key``
Whether to use the Kafka key for routing instead of the value. If set to true, the key will be used for routing. If set to false, the value will be used for routing. Default is false. Only used if routing.enabled is true.

* Type: boolean
* Default: false
* Importance: low

Data Stream
^^^^^^^^^^^

Expand Down Expand Up @@ -237,5 +259,3 @@ Authentication
* Type: password
* Default: null
* Importance: medium


Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
+ "- ``report`` - report to errant record reporter and keep the existing record\n"
+ "- ``fail`` - fail the task.\n\n";

public static final String ROUTING_ENABLED_CONFIG = "routing.enabled";
private static final String ROUTING_ENABLED_DOC = "Whether to enable routing for documents. "
+ "If set to true, the connector will use routing when sending documents to OpenSearch. "
+ "If set to false, no routing will be used. Default is false.";

public static final String ROUTING_FIELD_PATH_CONFIG = "routing.field.path";
private static final String ROUTING_FIELD_PATH_DOC = "The path of the field to pull from the payload "
+ "to use as the routing value for the document. Supports nested fields using dot notation (e.g., 'customer.id'). "
+ "If set, then that field from either the key or value will be used as the routing value. "
+ "If not set, then the entire key or value will be used as the routing value. "
+ "The value will be added to the PUT request to OpenSearch as the \"routing=...\" argument. "
+ "Only used if routing.enabled is true.";

public static final String ROUTING_KEY_CONFIG = "routing.key";
private static final String ROUTING_KEY_DOC = "Whether to use the Kafka key for routing instead of the value. "
+ "If set to true, the key will be used for routing. If set to false, the value will be used for routing. "
+ "Default is false" + "Only used if routing.enabled is true.";

public static final String INDEX_WRITE_METHOD = "index.write.method";

public static final String INDEX_WRITE_METHOD_DOC = String.format(
Expand Down Expand Up @@ -291,7 +309,13 @@ private static void addConversionConfigs(final ConfigDef configDef) {
BulkProcessor.BehaviorOnVersionConflict.DEFAULT.toString(),
BulkProcessor.BehaviorOnVersionConflict.VALIDATOR, Importance.LOW,
BEHAVIOR_ON_VERSION_CONFLICT_DOC, DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT,
"Behavior on document's version conflict (optimistic locking)");
"Behavior on document's version conflict (optimistic locking)")
.define(ROUTING_ENABLED_CONFIG, Type.BOOLEAN, false, Importance.LOW, ROUTING_ENABLED_DOC,
DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Enable routing")
.define(ROUTING_FIELD_PATH_CONFIG, Type.STRING, null, Importance.LOW, ROUTING_FIELD_PATH_DOC,
DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Routing field path")
.define(ROUTING_KEY_CONFIG, Type.BOOLEAN, false, Importance.LOW, ROUTING_KEY_DOC,
DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Use Kafka key for routing");
}

private static void addDataStreamConfig(final ConfigDef configDef) {
Expand Down Expand Up @@ -492,6 +516,18 @@ public BulkProcessor.BehaviorOnVersionConflict behaviorOnVersionConflict() {
.forValue(getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG));
}

public boolean isRoutingEnabled() {
return getBoolean(OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG);
}

public Optional<String> routingFieldPath() {
return Optional.ofNullable(getString(OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG));
}

public boolean useRoutingKey() {
return getBoolean(OpensearchSinkConnectorConfig.ROUTING_KEY_CONFIG);
}

public static void main(final String[] args) {
System.out.println("=========================================");
System.out.println("OpenSearch Sink Connector Configuration Options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,32 @@ static SetOpenSearchSinkConnectorConfig builder() {
? deleteRequest
: addVersionIfAny(documentIDStrategy, record, deleteRequest);
}

// Extract routing field value if configured
final String routingValue = extractRoutingFieldValue(config, record, payload);

if (config.indexWriteMethod() == IndexWriteMethod.UPSERT) {
return new UpdateRequest().id(documentId)
final var updateRequest = new UpdateRequest().id(documentId)
.index(index)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.docAsUpsert(true)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 3));

// Add routing if available
if (routingValue != null) {
updateRequest.routing(routingValue);
}

return updateRequest;
} else {
final var indexRequest = new IndexRequest().id(documentId).index(index);

// Add routing if available
if (routingValue != null) {
indexRequest.routing(routingValue);
}

if (config.dataStreamEnabled()) {
return indexRequest.opType(DocWriteRequest.OpType.CREATE)
.source(addTimestampToPayload(config, record, payload), XContentType.JSON);
Expand Down Expand Up @@ -130,4 +147,85 @@ private static String addTimestampToPayload(final OpensearchSinkConnectorConfig
}
}

private static String extractRoutingFieldValue(final OpensearchSinkConnectorConfig config, final SinkRecord record,
final String payload) {
// If routing is not enabled, don't use routing
if (!config.isRoutingEnabled()) {
return null;
}

// Determine which payload to use based on routing.key setting
// If routing.key is not set, use the value (default behavior)
String payloadToUse = null;
if (config.useRoutingKey()) {
if (config.useRoutingKey() && record.key() != null) {
try {
payloadToUse = OBJECT_MAPPER.writeValueAsString(record.key());
} catch (final IOException e) {
LOGGER.warn("Could not convert key to JSON string", e);
}
}
} else {
payloadToUse = payload;
}

// If the payload to use is null, we can't extract a routing value
if (payloadToUse == null) {
return null;
}

// If routing.field.path is not set, use the entire key or value as routing
final var routingFieldPath = config.routingFieldPath();
if (routingFieldPath.isEmpty()) {
return payloadToUse;
}

// If routing.field.path is set, extract the field from the payload
try {
final var json = OBJECT_MAPPER.readTree(payloadToUse);
if (!json.isObject()) {
LOGGER.warn(
"JSON payload is a type of {}. Required is JSON Object. Routing field value will not be extracted.",
json.getNodeType());
return null;
}

// Get the field path and split it into segments for nested field access
final var fieldPath = routingFieldPath.get();
final var pathSegments = fieldPath.split("\\.");

Comment on lines +194 to +196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if fieldPath is an empty string? I think the config definition needs a non-empty validation check.

// Start with the root object
var currentNode = json;

// Traverse the path segments
for (int i = 0; i < pathSegments.length - 1; i++) {
if (currentNode.isObject() && currentNode.has(pathSegments[i])) {
currentNode = currentNode.get(pathSegments[i]);
} else {
LOGGER.warn(
"Path segment '{}' in routing field path '{}' not found in payload or not an object. Routing field value will not be extracted.",
pathSegments[i], fieldPath);
return null;
}
}

// Get the final field value
final var lastSegment = pathSegments[pathSegments.length - 1];
if (currentNode.isObject() && currentNode.has(lastSegment)) {
final var fieldValue = currentNode.get(lastSegment);
if (fieldValue.isValueNode()) {
return fieldValue.asText();
} else {
LOGGER.warn("Routing field '{}' is not a value node. Routing field value will not be extracted.",
fieldPath);
}
} else {
LOGGER.warn("Routing field '{}' not found in payload. Routing field value will not be extracted.",
fieldPath);
}
} catch (final IOException e) {
LOGGER.warn("Could not parse payload to extract routing field value", e);
}
return null;
}
}
Loading
Loading