diff --git a/docs/opensearch-sink-connector-config-options.rst b/docs/opensearch-sink-connector-config-options.rst index 235bd0be..7aca1cdb 100644 --- a/docs/opensearch-sink-connector-config-options.rst +++ b/docs/opensearch-sink-connector-config-options.rst @@ -188,6 +188,28 @@ Data Conversion * Valid Values: [ignore, warn, fail, report] * Importance: low +``routing.enabled`` + Whether to enable routing for documents. If set to true, the connector will use routing when sending documents to OpenSearch. If set to false, no routing will be used. Default is false for backward compatibility. + + * Type: boolean + * Default: false + * Importance: low + +``routing.field.path`` + The path of the field to pull from the payload to use as the routing value for the document. Supports nested fields using dot notation (e.g., 'customer.id'). If set, then that field from either the key or value will be used as the routing value. If not set, then the entire key or value will be used as the routing value. The value will be added to the PUT request to OpenSearch as the \"routing=...\" argument. Only used if routing.enabled is true. + + * Type: string + * Default: null + * Importance: low + + +``routing.key`` + Whether to use the Kafka key for routing instead of the value. If set to true, the key will be used for routing. If set to false, the value will be used for routing. Default is false. Only used if routing.enabled is true. + + * Type: boolean + * Default: false + * Importance: low + Data Stream ^^^^^^^^^^^ @@ -237,5 +259,3 @@ Authentication * Type: password * Default: null * Importance: medium - - diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java index eb51cdac..87e659aa 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java @@ -153,6 +153,24 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig { + "- ``report`` - report to errant record reporter and keep the existing record\n" + "- ``fail`` - fail the task.\n\n"; + public static final String ROUTING_ENABLED_CONFIG = "routing.enabled"; + private static final String ROUTING_ENABLED_DOC = "Whether to enable routing for documents. " + + "If set to true, the connector will use routing when sending documents to OpenSearch. " + + "If set to false, no routing will be used. Default is false."; + + public static final String ROUTING_FIELD_PATH_CONFIG = "routing.field.path"; + private static final String ROUTING_FIELD_PATH_DOC = "The path of the field to pull from the payload " + + "to use as the routing value for the document. Supports nested fields using dot notation (e.g., 'customer.id'). " + + "If set, then that field from either the key or value will be used as the routing value. " + + "If not set, then the entire key or value will be used as the routing value. " + + "The value will be added to the PUT request to OpenSearch as the \"routing=...\" argument. " + + "Only used if routing.enabled is true."; + + public static final String ROUTING_KEY_CONFIG = "routing.key"; + private static final String ROUTING_KEY_DOC = "Whether to use the Kafka key for routing instead of the value. " + + "If set to true, the key will be used for routing. If set to false, the value will be used for routing. " + + "Default is false" + "Only used if routing.enabled is true."; + public static final String INDEX_WRITE_METHOD = "index.write.method"; public static final String INDEX_WRITE_METHOD_DOC = String.format( @@ -291,7 +309,13 @@ private static void addConversionConfigs(final ConfigDef configDef) { BulkProcessor.BehaviorOnVersionConflict.DEFAULT.toString(), BulkProcessor.BehaviorOnVersionConflict.VALIDATOR, Importance.LOW, BEHAVIOR_ON_VERSION_CONFLICT_DOC, DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, - "Behavior on document's version conflict (optimistic locking)"); + "Behavior on document's version conflict (optimistic locking)") + .define(ROUTING_ENABLED_CONFIG, Type.BOOLEAN, false, Importance.LOW, ROUTING_ENABLED_DOC, + DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Enable routing") + .define(ROUTING_FIELD_PATH_CONFIG, Type.STRING, null, Importance.LOW, ROUTING_FIELD_PATH_DOC, + DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Routing field path") + .define(ROUTING_KEY_CONFIG, Type.BOOLEAN, false, Importance.LOW, ROUTING_KEY_DOC, + DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Use Kafka key for routing"); } private static void addDataStreamConfig(final ConfigDef configDef) { @@ -492,6 +516,18 @@ public BulkProcessor.BehaviorOnVersionConflict behaviorOnVersionConflict() { .forValue(getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG)); } + public boolean isRoutingEnabled() { + return getBoolean(OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG); + } + + public Optional routingFieldPath() { + return Optional.ofNullable(getString(OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG)); + } + + public boolean useRoutingKey() { + return getBoolean(OpensearchSinkConnectorConfig.ROUTING_KEY_CONFIG); + } + public static void main(final String[] args) { System.out.println("========================================="); System.out.println("OpenSearch Sink Connector Configuration Options"); diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/RequestBuilder.java b/src/main/java/io/aiven/kafka/connect/opensearch/RequestBuilder.java index 89fbe2d0..be43f294 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/RequestBuilder.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/RequestBuilder.java @@ -76,15 +76,32 @@ static SetOpenSearchSinkConnectorConfig builder() { ? deleteRequest : addVersionIfAny(documentIDStrategy, record, deleteRequest); } + + // Extract routing field value if configured + final String routingValue = extractRoutingFieldValue(config, record, payload); + if (config.indexWriteMethod() == IndexWriteMethod.UPSERT) { - return new UpdateRequest().id(documentId) + final var updateRequest = new UpdateRequest().id(documentId) .index(index) .doc(payload, XContentType.JSON) .upsert(payload, XContentType.JSON) .docAsUpsert(true) .retryOnConflict(Math.min(config.maxInFlightRequests(), 3)); + + // Add routing if available + if (routingValue != null) { + updateRequest.routing(routingValue); + } + + return updateRequest; } else { final var indexRequest = new IndexRequest().id(documentId).index(index); + + // Add routing if available + if (routingValue != null) { + indexRequest.routing(routingValue); + } + if (config.dataStreamEnabled()) { return indexRequest.opType(DocWriteRequest.OpType.CREATE) .source(addTimestampToPayload(config, record, payload), XContentType.JSON); @@ -130,4 +147,85 @@ private static String addTimestampToPayload(final OpensearchSinkConnectorConfig } } + private static String extractRoutingFieldValue(final OpensearchSinkConnectorConfig config, final SinkRecord record, + final String payload) { + // If routing is not enabled, don't use routing + if (!config.isRoutingEnabled()) { + return null; + } + + // Determine which payload to use based on routing.key setting + // If routing.key is not set, use the value (default behavior) + String payloadToUse = null; + if (config.useRoutingKey()) { + if (config.useRoutingKey() && record.key() != null) { + try { + payloadToUse = OBJECT_MAPPER.writeValueAsString(record.key()); + } catch (final IOException e) { + LOGGER.warn("Could not convert key to JSON string", e); + } + } + } else { + payloadToUse = payload; + } + + // If the payload to use is null, we can't extract a routing value + if (payloadToUse == null) { + return null; + } + + // If routing.field.path is not set, use the entire key or value as routing + final var routingFieldPath = config.routingFieldPath(); + if (routingFieldPath.isEmpty()) { + return payloadToUse; + } + + // If routing.field.path is set, extract the field from the payload + try { + final var json = OBJECT_MAPPER.readTree(payloadToUse); + if (!json.isObject()) { + LOGGER.warn( + "JSON payload is a type of {}. Required is JSON Object. Routing field value will not be extracted.", + json.getNodeType()); + return null; + } + + // Get the field path and split it into segments for nested field access + final var fieldPath = routingFieldPath.get(); + final var pathSegments = fieldPath.split("\\."); + + // Start with the root object + var currentNode = json; + + // Traverse the path segments + for (int i = 0; i < pathSegments.length - 1; i++) { + if (currentNode.isObject() && currentNode.has(pathSegments[i])) { + currentNode = currentNode.get(pathSegments[i]); + } else { + LOGGER.warn( + "Path segment '{}' in routing field path '{}' not found in payload or not an object. Routing field value will not be extracted.", + pathSegments[i], fieldPath); + return null; + } + } + + // Get the final field value + final var lastSegment = pathSegments[pathSegments.length - 1]; + if (currentNode.isObject() && currentNode.has(lastSegment)) { + final var fieldValue = currentNode.get(lastSegment); + if (fieldValue.isValueNode()) { + return fieldValue.asText(); + } else { + LOGGER.warn("Routing field '{}' is not a value node. Routing field value will not be extracted.", + fieldPath); + } + } else { + LOGGER.warn("Routing field '{}' not found in payload. Routing field value will not be extracted.", + fieldPath); + } + } catch (final IOException e) { + LOGGER.warn("Could not parse payload to extract routing field value", e); + } + return null; + } } diff --git a/src/test/java/io/aiven/kafka/connect/opensearch/RequestBuilderTest.java b/src/test/java/io/aiven/kafka/connect/opensearch/RequestBuilderTest.java index b5dbf3a9..250a4906 100644 --- a/src/test/java/io/aiven/kafka/connect/opensearch/RequestBuilderTest.java +++ b/src/test/java/io/aiven/kafka/connect/opensearch/RequestBuilderTest.java @@ -257,6 +257,215 @@ void dataStreamRequestWithEmptyTimestamp() throws Exception { assertNotNull(requestPayload.get(OpensearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_FIELD_DEFAULT)); } + @Test + void testRoutingFieldExtraction() throws Exception { + final var objectMapper = RequestBuilder.OBJECT_MAPPER; + final var routingFieldName = "customer_id"; + final var routingFieldValue = "12345"; + + // Test with IndexRequest + final var configForIndex = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, routingFieldName)); + + final var payloadWithRoutingField = objectMapper.writeValueAsString( + objectMapper.createObjectNode().put("a", "b").put("c", "d").put(routingFieldName, routingFieldValue)); + + final var indexRequest = RequestBuilder.builder() + .withConfig(configForIndex) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(VALUE)) + .withPayload(payloadWithRoutingField) + .build(); + + assertTrue(indexRequest instanceof IndexRequest); + assertEquals(routingFieldValue, ((IndexRequest) indexRequest).routing()); + + // Test with UpdateRequest (upsert) + final var configForUpsert = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, routingFieldName, + OpensearchSinkConnectorConfig.INDEX_WRITE_METHOD, + IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT))); + + final var updateRequest = RequestBuilder.builder() + .withConfig(configForUpsert) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(VALUE)) + .withPayload(payloadWithRoutingField) + .build(); + + assertTrue(updateRequest instanceof UpdateRequest); + assertEquals(routingFieldValue, ((UpdateRequest) updateRequest).routing()); + } + + @Test + void testRoutingDisabledByDefault() throws Exception { + final var objectMapper = RequestBuilder.OBJECT_MAPPER; + final var routingFieldName = "customer_id"; + final var routingFieldValue = "12345"; + + // Test 1: Routing is disabled by default, even if routing.field.path is set + final var configWithRoutingFieldOnly = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, routingFieldName)); + + final var payloadWithRoutingField = objectMapper.writeValueAsString( + objectMapper.createObjectNode().put("a", "b").put("c", "d").put(routingFieldName, routingFieldValue)); + + final var requestWithRoutingDisabled = RequestBuilder.builder() + .withConfig(configWithRoutingFieldOnly) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(VALUE)) + .withPayload(payloadWithRoutingField) + .build(); + + assertTrue(requestWithRoutingDisabled instanceof IndexRequest); + // Routing should be null because routing.enabled is false by default + assertNull(((IndexRequest) requestWithRoutingDisabled).routing()); + + // Test 2: Routing is disabled by default, even if routing.key is set to true + final var configWithRoutingKeyOnly = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_KEY_CONFIG, "true")); + + final var valueJson = objectMapper + .writeValueAsString(objectMapper.createObjectNode().put("a", "b").put("c", "d")); + + final var requestWithRoutingKeyDisabled = RequestBuilder.builder() + .withConfig(configWithRoutingKeyOnly) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(valueJson)) + .withPayload(valueJson) + .build(); + + assertTrue(requestWithRoutingKeyDisabled instanceof IndexRequest); + // Routing should be null because routing.enabled is false by default + assertNull(((IndexRequest) requestWithRoutingKeyDisabled).routing()); + } + + @Test + void testNestedRoutingFieldPathExtraction() throws Exception { + final var objectMapper = RequestBuilder.OBJECT_MAPPER; + + // Test with nested routing field path + final var nestedRoutingFieldPath = "customer.id"; + final var routingFieldValue = "12345"; + + // Create a configuration with the new routing.field.path parameter + final var configWithNestedPath = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, nestedRoutingFieldPath)); + + // Create a payload with nested fields + final var customerObject = objectMapper.createObjectNode() + .put("id", routingFieldValue) + .put("name", "John Doe") + .put("email", "john@example.com"); + + final var payloadWithNestedField = objectMapper.writeValueAsString( + objectMapper.createObjectNode().put("a", "b").put("c", "d").set("customer", customerObject)); + + final var requestWithNestedPath = RequestBuilder.builder() + .withConfig(configWithNestedPath) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(VALUE)) + .withPayload(payloadWithNestedField) + .build(); + + assertTrue(requestWithNestedPath instanceof IndexRequest); + assertEquals(routingFieldValue, ((IndexRequest) requestWithNestedPath).routing()); + + // Test with deeply nested routing field path + final var deeplyNestedPath = "user.address.zip"; + final var zipValue = "10001"; + + // Create a configuration with a deeply nested path + final var configWithDeeplyNestedPath = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, deeplyNestedPath)); + + // Create an address object + final var addressObject = objectMapper.createObjectNode() + .put("street", "123 Main St") + .put("city", "New York") + .put("state", "NY") + .put("zip", zipValue); + + // Create a user object with the address + final var userObject = objectMapper.createObjectNode() + .put("name", "Jane Smith") + .put("email", "jane@example.com") + .set("address", addressObject); + + // Create the full payload + final var payloadWithDeeplyNestedField = objectMapper.writeValueAsString( + objectMapper.createObjectNode().put("a", "b").put("c", "d").set("user", userObject)); + + final var requestWithDeeplyNestedPath = RequestBuilder.builder() + .withConfig(configWithDeeplyNestedPath) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(VALUE)) + .withPayload(payloadWithDeeplyNestedField) + .build(); + + assertTrue(requestWithDeeplyNestedPath instanceof IndexRequest); + assertEquals(zipValue, ((IndexRequest) requestWithDeeplyNestedPath).routing()); + } + + @Test + void testRoutingKeyConfig() throws Exception { + final var objectMapper = RequestBuilder.OBJECT_MAPPER; + + // Test 1: routing.enabled=true, routing.key=true, routing.field.name=null + // Should use the entire key as routing + final var configKeyOnly = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_KEY_CONFIG, "true")); + + final var valueJson = objectMapper + .writeValueAsString(objectMapper.createObjectNode().put("a", "b").put("c", "d")); + + final var requestKeyOnly = RequestBuilder.builder() + .withConfig(configKeyOnly) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(valueJson)) + .withPayload(valueJson) + .build(); + + assertTrue(requestKeyOnly instanceof IndexRequest); + // The key is serialized as a JSON string, so it's wrapped in quotes + assertEquals("\"" + KEY + "\"", ((IndexRequest) requestKeyOnly).routing()); + + // Test 2: routing.enabled=true, routing.key=false (default), routing.field.name=customer_id + // Should extract the field from the value (existing behavior) + final var routingFieldName = "customer_id"; + final var routingFieldValue = "12345"; + + final var configValueWithField = new OpensearchSinkConnectorConfig( + Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost", + OpensearchSinkConnectorConfig.ROUTING_ENABLED_CONFIG, "true", + OpensearchSinkConnectorConfig.ROUTING_FIELD_PATH_CONFIG, routingFieldName)); + + final var valueWithRoutingField = objectMapper.writeValueAsString( + objectMapper.createObjectNode().put("a", "b").put("c", "d").put(routingFieldName, routingFieldValue)); + + final var requestValueWithField = RequestBuilder.builder() + .withConfig(configValueWithField) + .withIndex(INDEX) + .withSinkRecord(createSinkRecord(valueWithRoutingField)) + .withPayload(valueWithRoutingField) + .build(); + + assertTrue(requestValueWithField instanceof IndexRequest); + assertEquals(routingFieldValue, ((IndexRequest) requestValueWithField).routing()); + } + SinkRecord createSinkRecord(final Object value) { return new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, KEY, SchemaBuilder.struct().name("struct").field("string", Schema.STRING_SCHEMA).build(), value, OFFSET,