Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -305,6 +306,14 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
return validSchema;
}

Copy link

Copilot AI Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BASES array lacks documentation explaining the mapping between array indices and precision levels. Add a comment describing how each index corresponds to a specific precision value (e.g., index 3 = precision 3, value 1 = no scaling).

Suggested change
/**
* BASES array maps precision levels to scaling factors for date/time values.
* The index corresponds to the precision (e.g., index 3 = precision 3).
* Value at each index is the scaling factor (e.g., value 1 = no scaling).
* Example: BASES[3] == 1 means precision 3 uses no scaling.
*/

Copilot uses AI. Check for mistakes.
/**
* BASES array maps precision levels to scaling factors for date/time values.
* The index corresponds to the precision (e.g., index 3 = precision 3).
* Value at each index is the scaling factor (e.g., value 1 = no scaling).
* Example: BASES[3] == 1 means precision 3 uses no scaling.
*/
private int[] BASES = new int[] { 1_000, 100, 10, 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000 };

protected void doWriteDates(Type type, OutputStream stream, Data value, int precision, String columnName) throws IOException {
// TODO: develop more specific tests to have better coverage
if (value.getObject() == null) {
Expand Down Expand Up @@ -362,13 +371,14 @@ protected void doWriteDates(Type type, OutputStream stream, Data value, int prec
}
break;
case DateTime64:
if (value.getFieldType().equals(Schema.Type.INT64)) {
if (value.getObject().getClass().getName().endsWith(".Date")) {
Date date = (Date) value.getObject();
BinaryStreamUtils.writeInt64(stream, date.getTime());
if ( value.getFieldType().equals(Schema.Type.INT64)) {
if (value.getObject() instanceof Date) {
doWriteDate(stream, (Date) value.getObject(), precision);
} else {
BinaryStreamUtils.writeInt64(stream, (Long) value.getObject());
}
} else if (value.getFieldType().equals(Schema.Type.INT32) && value.getObject() instanceof Date) {
doWriteDate(stream, (Date) value.getObject(), precision);
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
long seconds;
Expand Down Expand Up @@ -422,6 +432,16 @@ protected void doWriteDates(Type type, OutputStream stream, Data value, int prec
}
}

private void doWriteDate(OutputStream stream, Date date, int precision ) throws IOException {
long ts = date.getTime();
if (precision > 3) {
ts *= BASES[precision];
} else if (precision < 3) {
ts /= BASES[precision];
}
BinaryStreamUtils.writeInt64(stream, ts);
}

protected void doWriteColValue(Column col, OutputStream stream, Data value, boolean defaultsSupport) throws IOException {
Type columnType = col.getType();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.kafka.connect.avro.test.Event;
import com.clickhouse.kafka.connect.avro.test.Image;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
Expand All @@ -13,6 +14,7 @@
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.protobuf.ProtobufConverterConfig;
import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
Expand Down Expand Up @@ -42,10 +44,21 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -227,7 +240,9 @@ public void supportDatesTest() {

String topic = createTopicName("support-dates-table-test");
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, datetime64_number DateTime64, timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32, date_date Date, datetime_date DateTime ) Engine = MergeTree ORDER BY off16");
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` " +
" ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, " +
" datetime64_number DateTime64, datetime64_3_number DateTime64(3), datetime64_6_number DateTime64(6), datetime64_9_number DateTime64(9), timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32, date_date Date, datetime_date DateTime ) Engine = MergeTree ORDER BY off16");
// https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98
Collection<SinkRecord> sr = SchemaTestData.createDateType(topic, 1);

Expand All @@ -237,6 +252,16 @@ public void supportDatesTest() {
chst.stop();

assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));

List<JSONObject> rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic);
for (JSONObject row : rows) {
String dateTime64_9 = row.getString("datetime64_9_number");
String dateTime64_6 = row.getString("datetime64_6_number");
String dateTime64_3 = row.getString("datetime64_3_number");

assertTrue(dateTime64_9.contains(dateTime64_3), dateTime64_3 + " is not a substring of " + dateTime64_9);
assertTrue(dateTime64_9.contains(dateTime64_6), dateTime64_6 + " is not a substring of " + dateTime64_9);
}
}

@Test
Expand Down Expand Up @@ -1228,38 +1253,14 @@ public void testAvroWithUnion() throws Exception {
.setName("image1")
.setContent("content")
.build();


Image image2 = Image.newBuilder()
.setName("image2")
.setContent(ByteBuffer.wrap("content2".getBytes()))
.setDescription("description")
.build();
String topic = createTopicName("test_avro_union_string");

MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
// Register your test schema
String subject = topic + "-value";
schemaRegistry.register(subject, image1.getSchema());

AvroConverter converter = new AvroConverter(schemaRegistry);
Map<String, Object> converterConfig = new HashMap<>();
converterConfig.put(ProtobufConverterConfig.AUTO_REGISTER_SCHEMAS, true);
converterConfig.put(ProtobufDataConfig.GENERATE_INDEX_FOR_UNIONS_CONFIG, false);
converterConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test-url");

converter.configure(converterConfig, false);
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistry);

SchemaAndValue image1ConnectData = converter.toConnectData(topic, serializer.serialize(topic, image1));
SchemaAndValue image2ConnectData = converter.toConnectData(topic, serializer.serialize(topic, image2));

List<SinkRecord> records = Arrays.asList(
new SinkRecord(topic, 0, null, null,
image1ConnectData.schema(), image1ConnectData.value(), 0),
new SinkRecord(topic, 0, null, null,
image2ConnectData.schema(), image2ConnectData.value(), 1)
);
List<SinkRecord> records = SchemaTestData.convertAvroToSinkRecord(topic, new AvroSchema(Image.getClassSchema()), Arrays.asList(image1, image2));
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);
ClickHouseTestHelpers.dropTable(chc, topic);
Expand Down Expand Up @@ -1289,4 +1290,48 @@ public void testAvroWithUnion() throws Exception {
assertEquals("description", row.getString("description"));
assertEquals("content2", row.getString("content"));
}

@Test
public void testAvroDateAndTimeTypes() throws Exception {

final String topic = createTopicName("test_avro_timestamps");
final ZoneId tz = ZoneId.of("UTC");
final Instant now = Instant.now();

List<Object> events = IntStream.range(0, 3).mapToObj(i -> {
Instant time = now.plus(i * 1000, ChronoUnit.MILLIS);
Event event = Event.newBuilder()
.setId(i)
.setTime1(time)
.setTime2(LocalTime.ofInstant(time, tz))
.build();
return event;
}).collect(Collectors.toList());

List<SinkRecord> records = SchemaTestData.convertAvroToSinkRecord(topic, new AvroSchema(Event.getClassSchema()), events);

Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic,
"CREATE TABLE `%s` (`id` Int64, `time1` DateTime64(3), `time2` DateTime64(3)) Engine = MergeTree ORDER BY ()");

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(records);
chst.stop();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any assertion to validate the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed it :-(
Will add.



List<JSONObject> rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic);
assertEquals(events.size(), rows.size());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(tz);
DateTimeFormatter localFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
for (int i = 0; i < events.size(); i++) {
Event event = (Event) events.get(i);
JSONObject row = rows.get(i);
assertEquals(event.getId(), row.getLong("id"));
assertEquals(formatter.format(event.getTime1()), row.get("time1"));
assertEquals(event.getTime2().atDate(LocalDate.of(1970, 1, 1)).format(localFormatter), row.get("time2"));
}
}
}
10 changes: 10 additions & 0 deletions src/testFixtures/avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

To generate avro
```shell
java -jar ~/tools/avro-tools-1.12.0.jar idl event.idl > even.avro
```

To generate code
```shell
java -jar ~/tools/avro-tools-1.12.0.jar compile schema event.avro ../java/
```
13 changes: 13 additions & 0 deletions src/testFixtures/avro/event.idl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace com.clickhouse.kafka.connect.avro.test;
schema Event;

record Event {
// Just ID
long id;

// The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond.
timestamp_ms time1;

// The time-millis logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond.
time_ms time2;
}
Loading