diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index fcbaf98a..bb5f00d2 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -51,6 +51,7 @@ public class ClickHouseSinkConfig { public static final String BYPASS_SCHEMA_VALIDATION = "bypassSchemaValidation"; public static final String BYPASS_FIELD_CLEANUP = "bypassFieldCleanup"; public static final String IGNORE_PARTITIONS_WHEN_BATCHING = "ignorePartitionsWhenBatching"; + public static final String ENABLE_NEW_SCHEMA_VALIDATION = "enableNewSchemaValidation"; public static final int MILLI_IN_A_SEC = 1000; private static final String databaseDefault = "default"; @@ -64,6 +65,7 @@ public class ClickHouseSinkConfig { public static final Integer tableRefreshIntervalDefault = 0; public static final Boolean exactlyOnceDefault = Boolean.FALSE; public static final Boolean customInsertFormatDefault = Boolean.FALSE; + public static final boolean enableNewSchemaValidationDefault = false; private final String hostname; private final int port; @@ -95,6 +97,7 @@ public class ClickHouseSinkConfig { private final boolean bypassFieldCleanup; private final boolean ignorePartitionsWhenBatching; private final boolean binaryFormatWrtiteJsonAsString; + private final boolean enableNewSchemaValidation; public enum InsertFormats { NONE, @@ -270,6 +273,7 @@ public ClickHouseSinkConfig(Map props) { this.bypassSchemaValidation = Boolean.parseBoolean(props.getOrDefault(BYPASS_SCHEMA_VALIDATION, "false")); this.bypassFieldCleanup = Boolean.parseBoolean(props.getOrDefault(BYPASS_FIELD_CLEANUP, "false")); this.ignorePartitionsWhenBatching = Boolean.parseBoolean(props.getOrDefault(IGNORE_PARTITIONS_WHEN_BATCHING, "false")); + this.enableNewSchemaValidation = Boolean.parseBoolean(props.getOrDefault(ENABLE_NEW_SCHEMA_VALIDATION, String.valueOf(enableNewSchemaValidationDefault))); String jsonAsString = getClickhouseSettings().get("input_format_binary_read_json_as_string"); @@ -610,6 +614,15 @@ private static ConfigDef createConfigDef() { ConfigDef.Width.SHORT, "Ignore partitions when batching." ); + configDef.define(ENABLE_NEW_SCHEMA_VALIDATION, + ConfigDef.Type.BOOLEAN, + enableNewSchemaValidationDefault, + ConfigDef.Importance.LOW, + "Enable the new schema validation logic. Default: false", + group, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Enable New Schema Validation"); return configDef; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index 906ef256..be67436c 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -431,6 +431,59 @@ public Table describeTableV2(String database, String tableName) { } return table; } + + public ClickHouseTableSchema getTableSchema(String tableName) { + Map columnTypes = new HashMap<>(); + + if (useClientV2) { + return getTableSchemaV2(tableName, columnTypes); + } else { + return getTableSchemaV1(tableName, columnTypes); + } + } + + private ClickHouseTableSchema getTableSchemaV1(String tableName, Map columnTypes) { + Table table = describeTable(database, tableName); + if (table == null) { + return null; + } + + // Get the full type description from describe table + String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName); + + try (ClickHouseClient client = ClickHouseClient.builder() + .options(getDefaultClientOptions()) + .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .build(); + ClickHouseResponse response = client.read(server) + .set("describe_include_subcolumns", true) + .format(ClickHouseFormat.JSONEachRow) + .query(describeQuery) + .executeAndWait()) { + + for (ClickHouseRecord r : response.records()) { + ClickHouseValue v = r.getValue(0); + String rowJson = v.asString(); + ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(rowJson); + + if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized() || fieldDescriptor.isEphemeral()) { + continue; + } + + columnTypes.put(fieldDescriptor.getName(), fieldDescriptor.getType()); + } + } catch (ClickHouseException | JsonProcessingException e) { + LOGGER.error(String.format("Exception when getting table schema %s", describeQuery), e); + return null; + } + + return new ClickHouseTableSchema(columnTypes); + } + + private ClickHouseTableSchema getTableSchemaV2(String tableName, Map columnTypes) { + return getTableSchemaV1(tableName, columnTypes); + } + public List extractTablesMapping(String database, Map cache) { List
tableList = new ArrayList<>(); for (Table table : showTables(database) ) { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseTableSchema.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseTableSchema.java new file mode 100644 index 00000000..8b002187 --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseTableSchema.java @@ -0,0 +1,20 @@ +package com.clickhouse.kafka.connect.sink.db.helper; + +import java.util.HashMap; +import java.util.Map; + +public class ClickHouseTableSchema { + private final Map columnTypes; + + public ClickHouseTableSchema(Map columnTypes) { + this.columnTypes = new HashMap<>(columnTypes); + } + + public String getColumnType(String columnName) { + return columnTypes.get(columnName); + } + + public Map getAllColumnTypes() { + return new HashMap<>(columnTypes); + } +} diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 443565b9..58e74aab 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -1,14 +1,50 @@ package com.clickhouse.kafka.connect.sink; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; + import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.kafka.connect.avro.test.Image; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; +import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseTableSchema; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; +import com.clickhouse.kafka.connect.util.Utils; import com.clickhouse.kafka.connect.sink.helper.SchemaTestData; +import com.clickhouse.kafka.connect.test.TestProtos; import com.clickhouse.kafka.connect.test.junit.extension.FromVersionConditionExtension; import com.clickhouse.kafka.connect.test.junit.extension.SinceClickHouseVersion; -import com.clickhouse.kafka.connect.test.TestProtos; -import com.clickhouse.kafka.connect.util.Utils; + import io.confluent.connect.avro.AvroConverter; import io.confluent.connect.protobuf.ProtobufConverter; import io.confluent.connect.protobuf.ProtobufConverterConfig; @@ -19,32 +55,6 @@ import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.sink.SinkRecord; -import org.json.JSONObject; -import org.json.JSONArray; -import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(FromVersionConditionExtension.class) public class ClickHouseSinkTaskWithSchemaTest extends ClickHouseBase { @@ -75,6 +85,7 @@ public void arrayTypesTest() { @Test public void arrayNullableSubtypesTest() { Map props = createProps(); + props.put("enable.new.schema.validation", "true"); // Feature flag ClickHouseHelperClient chc = createClient(props); String topic = "array_nullable_subtypes_table_test"; @@ -82,12 +93,29 @@ public void arrayNullableSubtypesTest() { ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr_nullable_str` Array(Nullable(String)), `arr_empty_nullable_str` Array(Nullable(String)), `arr_nullable_int8` Array(Nullable(Int8)), `arr_nullable_int16` Array(Nullable(Int16)), `arr_nullable_int32` Array(Nullable(Int32)), `arr_nullable_int64` Array(Nullable(Int64)), `arr_nullable_float32` Array(Nullable(Float32)), `arr_nullable_float64` Array(Nullable(Float64)), `arr_nullable_bool` Array(Nullable(Bool)) ) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createArrayNullableSubtypes(topic, 1); + Collection validRecords = SchemaTestData.createArrayNullableSubtypes(topic, 1); + Collection invalidRecords = SchemaTestData.createInvalidArrayData(topic); ClickHouseSinkTask chst = new ClickHouseSinkTask(); chst.start(props); - chst.put(sr); + chst.put(validRecords); + assertEquals(validRecords.size(), ClickHouseTestHelpers.countRows(chc, topic), + "All valid records should be inserted correctly"); + + assertThrows(RuntimeException.class, () -> chst.put(invalidRecords), + "Invalid records should cause a failure"); chst.stop(); - assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + ClickHouseTableSchema schema = chc.getTableSchema(topic); + + assertEquals("Array(Nullable(String))", schema.getColumnType("arr_nullable_str")); + assertEquals("Array(Nullable(String))", schema.getColumnType("arr_empty_nullable_str")); + assertEquals("Array(Nullable(Int8))", schema.getColumnType("arr_nullable_int8")); + assertEquals("Array(Nullable(Int16))", schema.getColumnType("arr_nullable_int16")); + assertEquals("Array(Nullable(Int32))", schema.getColumnType("arr_nullable_int32")); + assertEquals("Array(Nullable(Int64))", schema.getColumnType("arr_nullable_int64")); + assertEquals("Array(Nullable(Float32))", schema.getColumnType("arr_nullable_float32")); + assertEquals("Array(Nullable(Float64))", schema.getColumnType("arr_nullable_float64")); + assertEquals("Array(Nullable(Bool))", schema.getColumnType("arr_nullable_bool")); } @Test diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index 8b041d4f..0ac70bb2 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -1544,4 +1544,54 @@ public static TestProtos.TestMessage createProductMessage() { ) .build(); } + + public static Collection createInvalidArrayData(String topic) { + // Create schema that expects Array(Nullable(String)) but we'll provide wrong inner type + Schema ARRAY_INT32_SCHEMA = SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).build(); + Schema ARRAY_OPTIONAL_STRING_SCHEMA = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(); + + // This schema has the WRONG type for arr_nullable_str field + Schema INVALID_SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("arr_nullable_str", ARRAY_INT32_SCHEMA) // Wrong: INT32 instead of STRING + .field("arr_nullable_int32", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).build()) + .field("arr_nullable_int8", SchemaBuilder.array(Schema.OPTIONAL_INT8_SCHEMA).build()) + .field("arr_nullable_int16", SchemaBuilder.array(Schema.OPTIONAL_INT16_SCHEMA).build()) + .field("arr_nullable_int64", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).build()) + .field("arr_nullable_float32", SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA).build()) + .field("arr_nullable_float64", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).build()) + .field("arr_nullable_bool", SchemaBuilder.array(Schema.OPTIONAL_BOOLEAN_SCHEMA).build()) + .field("arr_empty_nullable_str", ARRAY_OPTIONAL_STRING_SCHEMA) + .build(); + + List array = new ArrayList<>(); + + Struct invalidStruct = new Struct(INVALID_SCHEMA) + .put("off16", (short) 1) + .put("arr_nullable_str", Arrays.asList(1, 2, 3)) // Wrong: Integer list for String field + .put("arr_nullable_int32", Arrays.asList(1, 2)) + .put("arr_nullable_int8", Arrays.asList((byte)1, null)) + .put("arr_nullable_int16", Arrays.asList((short)1, null)) + .put("arr_nullable_int64", Arrays.asList((long)1, null)) + .put("arr_nullable_float32", Arrays.asList((float)1, null)) + .put("arr_nullable_float64", Arrays.asList((double)1, null)) + .put("arr_nullable_bool", Arrays.asList(true, null)) + .put("arr_empty_nullable_str", new ArrayList()) + ; + + SinkRecord sr = new SinkRecord( + topic, + 0, + null, + null, + INVALID_SCHEMA, + invalidStruct, + 0, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + array.add(sr); + return array; + } }