diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 1e60a467..81bc4d78 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -59,6 +59,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -305,6 +306,14 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie return validSchema; } + /** + * BASES array maps precision levels to scaling factors for date/time values. + * The index corresponds to the precision (e.g., index 3 = precision 3). + * Value at each index is the scaling factor (e.g., value 1 = no scaling). + * Example: BASES[3] == 1 means precision 3 uses no scaling. + */ + private int[] BASES = new int[] { 1_000, 100, 10, 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000 }; + protected void doWriteDates(Type type, OutputStream stream, Data value, int precision, String columnName) throws IOException { // TODO: develop more specific tests to have better coverage if (value.getObject() == null) { @@ -362,13 +371,14 @@ protected void doWriteDates(Type type, OutputStream stream, Data value, int prec } break; case DateTime64: - if (value.getFieldType().equals(Schema.Type.INT64)) { - if (value.getObject().getClass().getName().endsWith(".Date")) { - Date date = (Date) value.getObject(); - BinaryStreamUtils.writeInt64(stream, date.getTime()); + if ( value.getFieldType().equals(Schema.Type.INT64)) { + if (value.getObject() instanceof Date) { + doWriteDate(stream, (Date) value.getObject(), precision); } else { BinaryStreamUtils.writeInt64(stream, (Long) value.getObject()); } + } else if (value.getFieldType().equals(Schema.Type.INT32) && value.getObject() instanceof Date) { + doWriteDate(stream, (Date) value.getObject(), precision); } else if (value.getFieldType().equals(Schema.Type.STRING)) { try { long seconds; @@ -422,6 +432,16 @@ protected void doWriteDates(Type type, OutputStream stream, Data value, int prec } } + private void doWriteDate(OutputStream stream, Date date, int precision ) throws IOException { + long ts = date.getTime(); + if (precision > 3) { + ts *= BASES[precision]; + } else if (precision < 3) { + ts /= BASES[precision]; + } + BinaryStreamUtils.writeInt64(stream, ts); + } + protected void doWriteColValue(Column col, OutputStream stream, Data value, boolean defaultsSupport) throws IOException { Type columnType = col.getType(); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 0e999b9b..6440f3da 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -1,6 +1,7 @@ package com.clickhouse.kafka.connect.sink; import com.clickhouse.client.api.ClientConfigProperties; +import com.clickhouse.kafka.connect.avro.test.Event; import com.clickhouse.kafka.connect.avro.test.Image; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; @@ -13,6 +14,7 @@ import io.confluent.connect.protobuf.ProtobufConverter; import io.confluent.connect.protobuf.ProtobufConverterConfig; import io.confluent.connect.protobuf.ProtobufDataConfig; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -42,10 +44,21 @@ import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.LongStream; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -227,7 +240,9 @@ public void supportDatesTest() { String topic = createTopicName("support-dates-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); - ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, datetime64_number DateTime64, timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32, date_date Date, datetime_date DateTime ) Engine = MergeTree ORDER BY off16"); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` " + + " ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, " + + " datetime64_number DateTime64, datetime64_3_number DateTime64(3), datetime64_6_number DateTime64(6), datetime64_9_number DateTime64(9), timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32, date_date Date, datetime_date DateTime ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 Collection sr = SchemaTestData.createDateType(topic, 1); @@ -237,6 +252,16 @@ public void supportDatesTest() { chst.stop(); assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + + List rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + for (JSONObject row : rows) { + String dateTime64_9 = row.getString("datetime64_9_number"); + String dateTime64_6 = row.getString("datetime64_6_number"); + String dateTime64_3 = row.getString("datetime64_3_number"); + + assertTrue(dateTime64_9.contains(dateTime64_3), dateTime64_3 + " is not a substring of " + dateTime64_9); + assertTrue(dateTime64_9.contains(dateTime64_6), dateTime64_6 + " is not a substring of " + dateTime64_9); + } } @Test @@ -1228,8 +1253,6 @@ public void testAvroWithUnion() throws Exception { .setName("image1") .setContent("content") .build(); - - Image image2 = Image.newBuilder() .setName("image2") .setContent(ByteBuffer.wrap("content2".getBytes())) @@ -1237,29 +1260,7 @@ public void testAvroWithUnion() throws Exception { .build(); String topic = createTopicName("test_avro_union_string"); - MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); - // Register your test schema - String subject = topic + "-value"; - schemaRegistry.register(subject, image1.getSchema()); - - AvroConverter converter = new AvroConverter(schemaRegistry); - Map converterConfig = new HashMap<>(); - converterConfig.put(ProtobufConverterConfig.AUTO_REGISTER_SCHEMAS, true); - converterConfig.put(ProtobufDataConfig.GENERATE_INDEX_FOR_UNIONS_CONFIG, false); - converterConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test-url"); - - converter.configure(converterConfig, false); - KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistry); - - SchemaAndValue image1ConnectData = converter.toConnectData(topic, serializer.serialize(topic, image1)); - SchemaAndValue image2ConnectData = converter.toConnectData(topic, serializer.serialize(topic, image2)); - - List records = Arrays.asList( - new SinkRecord(topic, 0, null, null, - image1ConnectData.schema(), image1ConnectData.value(), 0), - new SinkRecord(topic, 0, null, null, - image2ConnectData.schema(), image2ConnectData.value(), 1) - ); + List records = SchemaTestData.convertAvroToSinkRecord(topic, new AvroSchema(Image.getClassSchema()), Arrays.asList(image1, image2)); Map props = createProps(); ClickHouseHelperClient chc = createClient(props); ClickHouseTestHelpers.dropTable(chc, topic); @@ -1289,4 +1290,48 @@ public void testAvroWithUnion() throws Exception { assertEquals("description", row.getString("description")); assertEquals("content2", row.getString("content")); } + + @Test + public void testAvroDateAndTimeTypes() throws Exception { + + final String topic = createTopicName("test_avro_timestamps"); + final ZoneId tz = ZoneId.of("UTC"); + final Instant now = Instant.now(); + + List events = IntStream.range(0, 3).mapToObj(i -> { + Instant time = now.plus(i * 1000, ChronoUnit.MILLIS); + Event event = Event.newBuilder() + .setId(i) + .setTime1(time) + .setTime2(LocalTime.ofInstant(time, tz)) + .build(); + return event; + }).collect(Collectors.toList()); + + List records = SchemaTestData.convertAvroToSinkRecord(topic, new AvroSchema(Event.getClassSchema()), events); + + Map props = createProps(); + ClickHouseHelperClient chc = createClient(props); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, + "CREATE TABLE `%s` (`id` Int64, `time1` DateTime64(3), `time2` DateTime64(3)) Engine = MergeTree ORDER BY ()"); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(records); + chst.stop(); + + + List rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + assertEquals(events.size(), rows.size()); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(tz); + DateTimeFormatter localFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + for (int i = 0; i < events.size(); i++) { + Event event = (Event) events.get(i); + JSONObject row = rows.get(i); + assertEquals(event.getId(), row.getLong("id")); + assertEquals(formatter.format(event.getTime1()), row.get("time1")); + assertEquals(event.getTime2().atDate(LocalDate.of(1970, 1, 1)).format(localFormatter), row.get("time2")); + } + } } diff --git a/src/testFixtures/avro/README.md b/src/testFixtures/avro/README.md new file mode 100644 index 00000000..573c41e5 --- /dev/null +++ b/src/testFixtures/avro/README.md @@ -0,0 +1,10 @@ + +To generate avro +```shell +java -jar ~/tools/avro-tools-1.12.0.jar idl event.idl > even.avro +``` + +To generate code +```shell +java -jar ~/tools/avro-tools-1.12.0.jar compile schema event.avro ../java/ +``` diff --git a/src/testFixtures/avro/event.idl b/src/testFixtures/avro/event.idl new file mode 100644 index 00000000..fdceedee --- /dev/null +++ b/src/testFixtures/avro/event.idl @@ -0,0 +1,13 @@ +namespace com.clickhouse.kafka.connect.avro.test; +schema Event; + +record Event { + // Just ID + long id; + + // The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond. + timestamp_ms time1; + + // The time-millis logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond. + time_ms time2; +} \ No newline at end of file diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/avro/test/Event.java b/src/testFixtures/java/com/clickhouse/kafka/connect/avro/test/Event.java new file mode 100644 index 00000000..b7371d43 --- /dev/null +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/avro/test/Event.java @@ -0,0 +1,447 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package com.clickhouse.kafka.connect.avro.test; + +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -3455267125182295158L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"com.clickhouse.kafka.connect.avro.test\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"time1\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"time2\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + static { + MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion()); + MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimeMillisConversion()); + } + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this Event to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a Event from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a Event instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static Event fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private long id; + private java.time.Instant time1; + private java.time.LocalTime time2; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public Event() {} + + /** + * All-args constructor. + * @param id The new value for id + * @param time1 The new value for time1 + * @param time2 The new value for time2 + */ + public Event(java.lang.Long id, java.time.Instant time1, java.time.LocalTime time2) { + this.id = id; + this.time1 = time1.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + this.time2 = time2.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + + @Override + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return time1; + case 2: return time2; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + private static final org.apache.avro.Conversion[] conversions = + new org.apache.avro.Conversion[] { + null, + new org.apache.avro.data.TimeConversions.TimestampMillisConversion(), + new org.apache.avro.data.TimeConversions.TimeMillisConversion(), + null + }; + + @Override + public org.apache.avro.Conversion getConversion(int field) { + return conversions[field]; + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.Long)value$; break; + case 1: time1 = (java.time.Instant)value$; break; + case 2: time2 = (java.time.LocalTime)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'id' field. + * @return The value of the 'id' field. + */ + public long getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value the value to set. + */ + public void setId(long value) { + this.id = value; + } + + /** + * Gets the value of the 'time1' field. + * @return The value of the 'time1' field. + */ + public java.time.Instant getTime1() { + return time1; + } + + + /** + * Sets the value of the 'time1' field. + * @param value the value to set. + */ + public void setTime1(java.time.Instant value) { + this.time1 = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + /** + * Gets the value of the 'time2' field. + * @return The value of the 'time2' field. + */ + public java.time.LocalTime getTime2() { + return time2; + } + + + /** + * Sets the value of the 'time2' field. + * @param value the value to set. + */ + public void setTime2(java.time.LocalTime value) { + this.time2 = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + /** + * Creates a new Event RecordBuilder. + * @return A new Event RecordBuilder + */ + public static com.clickhouse.kafka.connect.avro.test.Event.Builder newBuilder() { + return new com.clickhouse.kafka.connect.avro.test.Event.Builder(); + } + + /** + * Creates a new Event RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Event RecordBuilder + */ + public static com.clickhouse.kafka.connect.avro.test.Event.Builder newBuilder(com.clickhouse.kafka.connect.avro.test.Event.Builder other) { + if (other == null) { + return new com.clickhouse.kafka.connect.avro.test.Event.Builder(); + } else { + return new com.clickhouse.kafka.connect.avro.test.Event.Builder(other); + } + } + + /** + * Creates a new Event RecordBuilder by copying an existing Event instance. + * @param other The existing instance to copy. + * @return A new Event RecordBuilder + */ + public static com.clickhouse.kafka.connect.avro.test.Event.Builder newBuilder(com.clickhouse.kafka.connect.avro.test.Event other) { + if (other == null) { + return new com.clickhouse.kafka.connect.avro.test.Event.Builder(); + } else { + return new com.clickhouse.kafka.connect.avro.test.Event.Builder(other); + } + } + + /** + * RecordBuilder for Event instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long id; + private java.time.Instant time1; + private java.time.LocalTime time2; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(com.clickhouse.kafka.connect.avro.test.Event.Builder other) { + super(other); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.time1)) { + this.time1 = data().deepCopy(fields()[1].schema(), other.time1); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.time2)) { + this.time2 = data().deepCopy(fields()[2].schema(), other.time2); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + } + + /** + * Creates a Builder by copying an existing Event instance + * @param other The existing instance to copy. + */ + private Builder(com.clickhouse.kafka.connect.avro.test.Event other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.time1)) { + this.time1 = data().deepCopy(fields()[1].schema(), other.time1); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.time2)) { + this.time2 = data().deepCopy(fields()[2].schema(), other.time2); + fieldSetFlags()[2] = true; + } + } + + /** + * Gets the value of the 'id' field. + * @return The value. + */ + public long getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value The value of 'id'. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder setId(long value) { + validate(fields()[0], value); + this.id = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'id' field has been set. + * @return True if the 'id' field has been set, false otherwise. + */ + public boolean hasId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'id' field. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder clearId() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'time1' field. + * @return The value. + */ + public java.time.Instant getTime1() { + return time1; + } + + + /** + * Sets the value of the 'time1' field. + * @param value The value of 'time1'. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder setTime1(java.time.Instant value) { + validate(fields()[1], value); + this.time1 = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'time1' field has been set. + * @return True if the 'time1' field has been set, false otherwise. + */ + public boolean hasTime1() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'time1' field. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder clearTime1() { + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'time2' field. + * @return The value. + */ + public java.time.LocalTime getTime2() { + return time2; + } + + + /** + * Sets the value of the 'time2' field. + * @param value The value of 'time2'. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder setTime2(java.time.LocalTime value) { + validate(fields()[2], value); + this.time2 = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'time2' field has been set. + * @return True if the 'time2' field has been set, false otherwise. + */ + public boolean hasTime2() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'time2' field. + * @return This builder. + */ + public com.clickhouse.kafka.connect.avro.test.Event.Builder clearTime2() { + fieldSetFlags()[2] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Event build() { + try { + Event record = new Event(); + record.id = fieldSetFlags()[0] ? this.id : (java.lang.Long) defaultValue(fields()[0]); + record.time1 = fieldSetFlags()[1] ? this.time1 : (java.time.Instant) defaultValue(fields()[1]); + record.time2 = fieldSetFlags()[2] ? this.time2 : (java.time.LocalTime) defaultValue(fields()[2]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + +} + + + + + + + + + + diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index 8b041d4f..94006b8c 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -1,10 +1,18 @@ package com.clickhouse.kafka.connect.sink.helper; import com.clickhouse.kafka.connect.test.TestProtos; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.connect.protobuf.ProtobufConverterConfig; +import io.confluent.connect.protobuf.ProtobufDataConfig; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; @@ -23,10 +31,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import java.util.stream.LongStream; public class SchemaTestData { @@ -724,6 +734,9 @@ public static Collection createDateType(String topic, int partition, .field("date32_number", Schema.OPTIONAL_INT32_SCHEMA) .field("datetime_int", Schema.INT32_SCHEMA) .field("datetime_number", Schema.INT64_SCHEMA) + .field("datetime64_3_number", Timestamp.SCHEMA) + .field("datetime64_6_number", Timestamp.SCHEMA) + .field("datetime64_9_number", Timestamp.SCHEMA) .field("datetime64_number", Schema.INT64_SCHEMA) .field("timestamp_int64", Timestamp.SCHEMA) .field("timestamp_date", Timestamp.SCHEMA) @@ -746,7 +759,7 @@ public static Collection createDateType(String topic, int partition, LocalDateTime localDateTime = LocalDateTime.now(); long localDateTimeLong = localDateTime.toEpochSecond(ZoneOffset.UTC); int localDateTimeInt = (int)localDateTime.toEpochSecond(ZoneOffset.UTC); - + Date localDateTimeDate = Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant()); Struct value_struct = new Struct(NESTED_SCHEMA) .put("off16", (short)n) .put("date_number", localDateInt) @@ -754,6 +767,9 @@ public static Collection createDateType(String topic, int partition, .put("datetime_int", localDateTimeInt) .put("datetime_number", localDateTimeLong) .put("datetime64_number", currentTime) + .put("datetime64_3_number", localDateTimeDate) + .put("datetime64_6_number", localDateTimeDate) + .put("datetime64_9_number", localDateTimeDate) .put("timestamp_int64", new Date(System.currentTimeMillis())) .put("timestamp_date", new Date(System.currentTimeMillis())) .put("time_int32", new Date(System.currentTimeMillis())) @@ -784,6 +800,8 @@ public static Collection createArrayDateTime64Type(String topic, int Schema NESTED_SCHEMA = SchemaBuilder.struct() .field("off16", Schema.INT16_SCHEMA) .field("arr_datetime64_number", SchemaBuilder.array(Schema.INT64_SCHEMA).build()) + .field("arr_datetime64_3_number", SchemaBuilder.array(Schema.INT64_SCHEMA).build()) + .field("arr_datetime64_6_number", SchemaBuilder.array(Schema.INT64_SCHEMA).build()) .field("arr_timestamp_date", SchemaBuilder.array(Timestamp.SCHEMA).build()) .build(); @@ -800,6 +818,8 @@ public static Collection createArrayDateTime64Type(String topic, int Struct value_struct = new Struct(NESTED_SCHEMA) .put("off16", (short)n) .put("arr_datetime64_number", arrayDateTime64Number) + .put("arr_datetime64_3_number", arrayDateTime64Number) + .put("arr_datetime64_6_number", arrayDateTime64Number) .put("arr_timestamp_date", arrayTimestamps) ; @@ -1544,4 +1564,25 @@ public static TestProtos.TestMessage createProductMessage() { ) .build(); } + + public static List convertAvroToSinkRecord(String topic, ParsedSchema schema, List records) throws Exception { + MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + // Register your test schema + String subject = topic + "-value"; + schemaRegistry.register(subject, schema); + + AvroConverter converter = new AvroConverter(schemaRegistry); + Map converterConfig = new HashMap<>(); + converterConfig.put(ProtobufConverterConfig.AUTO_REGISTER_SCHEMAS, true); + converterConfig.put(ProtobufDataConfig.GENERATE_INDEX_FOR_UNIONS_CONFIG, false); + converterConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test-url"); + + converter.configure(converterConfig, false); + KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistry); + + return records.stream().map(r -> converter.toConnectData(topic, serializer.serialize(topic, r))) + .collect(ArrayList::new, (schemaAndValues, schemaAndValue) -> + schemaAndValues.add(new SinkRecord(topic, 0, null, null, schemaAndValue.schema(), schemaAndValue.value(), schemaAndValues.size())), + ArrayList::addAll); + } } diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java index 8d1719bf..2599ba81 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemalessTestData.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -363,4 +364,34 @@ public static Collection createDecimalTypes(String topic, int partit }); return array; } + + public static Collection createDateTimeTypes(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + LongStream.range(0, totalRecords).forEachOrdered(n -> { + Map value_struct = new HashMap<>(); + value_struct.put("str", "num" + n); + value_struct.put("date", new Date()); + value_struct.put("datetime", new Date()); + value_struct.put("datetime64_3", new Date()); + value_struct.put("datetime64_6", new Date()); + value_struct.put("datetime64_9", new Date()); + value_struct.put("datetime64_12", new Date()); + value_struct.put("datetime64_15", new Date()); + value_struct.put("datetime64_18", new Date()); + + SinkRecord sr = new SinkRecord( + topic, + partition, + null, + null, null, + value_struct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + array.add(sr); + }); + return array; + } }