Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ClickHouseSinkConfig {
public static final String BYPASS_SCHEMA_VALIDATION = "bypassSchemaValidation";
public static final String BYPASS_FIELD_CLEANUP = "bypassFieldCleanup";
public static final String IGNORE_PARTITIONS_WHEN_BATCHING = "ignorePartitionsWhenBatching";
public static final String ENABLE_NEW_SCHEMA_VALIDATION = "enableNewSchemaValidation";

public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
Expand All @@ -64,6 +65,7 @@ public class ClickHouseSinkConfig {
public static final Integer tableRefreshIntervalDefault = 0;
public static final Boolean exactlyOnceDefault = Boolean.FALSE;
public static final Boolean customInsertFormatDefault = Boolean.FALSE;
public static final boolean enableNewSchemaValidationDefault = false;

private final String hostname;
private final int port;
Expand Down Expand Up @@ -95,6 +97,7 @@ public class ClickHouseSinkConfig {
private final boolean bypassFieldCleanup;
private final boolean ignorePartitionsWhenBatching;
private final boolean binaryFormatWrtiteJsonAsString;
private final boolean enableNewSchemaValidation;

public enum InsertFormats {
NONE,
Expand Down Expand Up @@ -270,6 +273,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
this.bypassSchemaValidation = Boolean.parseBoolean(props.getOrDefault(BYPASS_SCHEMA_VALIDATION, "false"));
this.bypassFieldCleanup = Boolean.parseBoolean(props.getOrDefault(BYPASS_FIELD_CLEANUP, "false"));
this.ignorePartitionsWhenBatching = Boolean.parseBoolean(props.getOrDefault(IGNORE_PARTITIONS_WHEN_BATCHING, "false"));
this.enableNewSchemaValidation = Boolean.parseBoolean(props.getOrDefault(ENABLE_NEW_SCHEMA_VALIDATION, String.valueOf(enableNewSchemaValidationDefault)));


String jsonAsString = getClickhouseSettings().get("input_format_binary_read_json_as_string");
Expand Down Expand Up @@ -610,6 +614,15 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Ignore partitions when batching."
);
configDef.define(ENABLE_NEW_SCHEMA_VALIDATION,
ConfigDef.Type.BOOLEAN,
enableNewSchemaValidationDefault,
ConfigDef.Importance.LOW,
"Enable the new schema validation logic. Default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Enable New Schema Validation");
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,59 @@ public Table describeTableV2(String database, String tableName) {
}
return table;
}

public ClickHouseTableSchema getTableSchema(String tableName) {
Map<String, String> columnTypes = new HashMap<>();

if (useClientV2) {
return getTableSchemaV2(tableName, columnTypes);
} else {
return getTableSchemaV1(tableName, columnTypes);
}
}

private ClickHouseTableSchema getTableSchemaV1(String tableName, Map<String, String> columnTypes) {
Table table = describeTable(database, tableName);
if (table == null) {
return null;
}

// Get the full type description from describe table
String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName);

try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.set("describe_include_subcolumns", true)
.format(ClickHouseFormat.JSONEachRow)
.query(describeQuery)
.executeAndWait()) {

for (ClickHouseRecord r : response.records()) {
ClickHouseValue v = r.getValue(0);
String rowJson = v.asString();
ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(rowJson);

if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized() || fieldDescriptor.isEphemeral()) {
continue;
}

columnTypes.put(fieldDescriptor.getName(), fieldDescriptor.getType());
}
} catch (ClickHouseException | JsonProcessingException e) {
LOGGER.error(String.format("Exception when getting table schema %s", describeQuery), e);
return null;
}

return new ClickHouseTableSchema(columnTypes);
}

private ClickHouseTableSchema getTableSchemaV2(String tableName, Map<String, String> columnTypes) {
return getTableSchemaV1(tableName, columnTypes);
}

public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
List<Table> tableList = new ArrayList<>();
for (Table table : showTables(database) ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.clickhouse.kafka.connect.sink.db.helper;

import java.util.HashMap;
import java.util.Map;

public class ClickHouseTableSchema {
private final Map<String, String> columnTypes;

public ClickHouseTableSchema(Map<String, String> columnTypes) {
this.columnTypes = new HashMap<>(columnTypes);
}

public String getColumnType(String columnName) {
return columnTypes.get(columnName);
}

public Map<String, String> getAllColumnTypes() {
return new HashMap<>(columnTypes);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,50 @@
package com.clickhouse.kafka.connect.sink;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;

import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.kafka.connect.avro.test.Image;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseTableSchema;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.sink.helper.SchemaTestData;
import com.clickhouse.kafka.connect.test.TestProtos;
import com.clickhouse.kafka.connect.test.junit.extension.FromVersionConditionExtension;
import com.clickhouse.kafka.connect.test.junit.extension.SinceClickHouseVersion;
import com.clickhouse.kafka.connect.test.TestProtos;
import com.clickhouse.kafka.connect.util.Utils;

import io.confluent.connect.avro.AvroConverter;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.protobuf.ProtobufConverterConfig;
Expand All @@ -19,32 +55,6 @@
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONObject;
import org.json.JSONArray;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(FromVersionConditionExtension.class)
public class ClickHouseSinkTaskWithSchemaTest extends ClickHouseBase {
Expand Down Expand Up @@ -75,19 +85,37 @@ public void arrayTypesTest() {
@Test
public void arrayNullableSubtypesTest() {
Map<String, String> props = createProps();
props.put("enable.new.schema.validation", "true"); // Feature flag
ClickHouseHelperClient chc = createClient(props);

String topic = "array_nullable_subtypes_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr_nullable_str` Array(Nullable(String)), `arr_empty_nullable_str` Array(Nullable(String)), `arr_nullable_int8` Array(Nullable(Int8)), `arr_nullable_int16` Array(Nullable(Int16)), `arr_nullable_int32` Array(Nullable(Int32)), `arr_nullable_int64` Array(Nullable(Int64)), `arr_nullable_float32` Array(Nullable(Float32)), `arr_nullable_float64` Array(Nullable(Float64)), `arr_nullable_bool` Array(Nullable(Bool)) ) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemaTestData.createArrayNullableSubtypes(topic, 1);

Collection<SinkRecord> validRecords = SchemaTestData.createArrayNullableSubtypes(topic, 1);
Collection<SinkRecord> invalidRecords = SchemaTestData.createInvalidArrayData(topic);
ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.put(validRecords);
assertEquals(validRecords.size(), ClickHouseTestHelpers.countRows(chc, topic),
"All valid records should be inserted correctly");

assertThrows(RuntimeException.class, () -> chst.put(invalidRecords),
"Invalid records should cause a failure");
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
ClickHouseTableSchema schema = chc.getTableSchema(topic);

assertEquals("Array(Nullable(String))", schema.getColumnType("arr_nullable_str"));
assertEquals("Array(Nullable(String))", schema.getColumnType("arr_empty_nullable_str"));
assertEquals("Array(Nullable(Int8))", schema.getColumnType("arr_nullable_int8"));
assertEquals("Array(Nullable(Int16))", schema.getColumnType("arr_nullable_int16"));
assertEquals("Array(Nullable(Int32))", schema.getColumnType("arr_nullable_int32"));
assertEquals("Array(Nullable(Int64))", schema.getColumnType("arr_nullable_int64"));
assertEquals("Array(Nullable(Float32))", schema.getColumnType("arr_nullable_float32"));
assertEquals("Array(Nullable(Float64))", schema.getColumnType("arr_nullable_float64"));
assertEquals("Array(Nullable(Bool))", schema.getColumnType("arr_nullable_bool"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,4 +1544,54 @@ public static TestProtos.TestMessage createProductMessage() {
)
.build();
}

public static Collection<SinkRecord> createInvalidArrayData(String topic) {
// Create schema that expects Array(Nullable(String)) but we'll provide wrong inner type
Schema ARRAY_INT32_SCHEMA = SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).build();
Schema ARRAY_OPTIONAL_STRING_SCHEMA = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build();

// This schema has the WRONG type for arr_nullable_str field
Schema INVALID_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("arr_nullable_str", ARRAY_INT32_SCHEMA) // Wrong: INT32 instead of STRING
.field("arr_nullable_int32", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).build())
.field("arr_nullable_int8", SchemaBuilder.array(Schema.OPTIONAL_INT8_SCHEMA).build())
.field("arr_nullable_int16", SchemaBuilder.array(Schema.OPTIONAL_INT16_SCHEMA).build())
.field("arr_nullable_int64", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).build())
.field("arr_nullable_float32", SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA).build())
.field("arr_nullable_float64", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).build())
.field("arr_nullable_bool", SchemaBuilder.array(Schema.OPTIONAL_BOOLEAN_SCHEMA).build())
.field("arr_empty_nullable_str", ARRAY_OPTIONAL_STRING_SCHEMA)
.build();

List<SinkRecord> array = new ArrayList<>();

Struct invalidStruct = new Struct(INVALID_SCHEMA)
.put("off16", (short) 1)
.put("arr_nullable_str", Arrays.asList(1, 2, 3)) // Wrong: Integer list for String field
.put("arr_nullable_int32", Arrays.asList(1, 2))
.put("arr_nullable_int8", Arrays.asList((byte)1, null))
.put("arr_nullable_int16", Arrays.asList((short)1, null))
.put("arr_nullable_int64", Arrays.asList((long)1, null))
.put("arr_nullable_float32", Arrays.asList((float)1, null))
.put("arr_nullable_float64", Arrays.asList((double)1, null))
.put("arr_nullable_bool", Arrays.asList(true, null))
.put("arr_empty_nullable_str", new ArrayList<String>())
;

SinkRecord sr = new SinkRecord(
topic,
0,
null,
null,
INVALID_SCHEMA,
invalidStruct,
0,
System.currentTimeMillis(),
TimestampType.CREATE_TIME
);

array.add(sr);
return array;
}
}