3131import org .apache .flink .table .types .logical .TimestampType ;
3232import org .apache .flink .util .Preconditions ;
3333
34+ import com .clickhouse .data .value .UnsignedByte ;
35+ import com .clickhouse .data .value .UnsignedInteger ;
36+ import com .clickhouse .data .value .UnsignedLong ;
37+ import com .clickhouse .data .value .UnsignedShort ;
3438import com .clickhouse .jdbc .ClickHousePreparedStatement ;
3539import com .clickhouse .jdbc .ClickHouseResultSet ;
3640
3741import java .io .Serializable ;
3842import java .math .BigDecimal ;
3943import java .math .BigInteger ;
44+ import java .net .InetAddress ;
4045import java .sql .Date ;
4146import java .sql .ResultSet ;
4247import java .sql .SQLException ;
4550import java .time .LocalDate ;
4651import java .time .LocalDateTime ;
4752import java .time .LocalTime ;
53+ import java .time .OffsetDateTime ;
4854import java .util .UUID ;
4955
5056import static org .apache .flink .connector .clickhouse .internal .converter .ClickHouseConverterUtils .BOOL_TRUE ;
@@ -109,30 +115,41 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
109115 case DOUBLE :
110116 case INTERVAL_YEAR_MONTH :
111117 case INTERVAL_DAY_TIME :
112- case INTEGER :
113- case BIGINT :
118+ case TINYINT :
114119 case BINARY :
115120 case VARBINARY :
116121 return val -> val ;
117- case TINYINT :
118- return val -> ((Integer ) val ).byteValue ();
119122 case SMALLINT :
120- return val -> val instanceof Integer ? ((Integer ) val ).shortValue () : val ;
123+ return val -> val instanceof UnsignedByte ? ((UnsignedByte ) val ).shortValue () : val ;
124+ case INTEGER :
125+ return val -> val instanceof UnsignedShort ? ((UnsignedShort ) val ).intValue () : val ;
126+ case BIGINT :
127+ return val ->
128+ val instanceof UnsignedInteger ? ((UnsignedInteger ) val ).longValue () : val ;
121129 case DECIMAL :
122130 final int precision = ((DecimalType ) type ).getPrecision ();
123131 final int scale = ((DecimalType ) type ).getScale ();
124- return val ->
125- val instanceof BigInteger
126- ? DecimalData .fromBigDecimal (
127- new BigDecimal ((BigInteger ) val , 0 ), precision , scale )
128- : DecimalData .fromBigDecimal ((BigDecimal ) val , precision , scale );
132+ return val -> {
133+ BigDecimal decimalValue =
134+ val instanceof BigDecimal
135+ ? (BigDecimal ) val
136+ : new BigDecimal (
137+ val instanceof UnsignedLong
138+ ? ((UnsignedLong ) val ).bigIntegerValue ()
139+ : (BigInteger ) val );
140+ return DecimalData .fromBigDecimal (decimalValue , precision , scale );
141+ };
129142 case DATE :
130- return val -> (int ) ((Date ) val ). toLocalDate ( ).toEpochDay ();
143+ return val -> (int ) ((LocalDate ) val ).toEpochDay ();
131144 case TIME_WITHOUT_TIME_ZONE :
132145 return val -> (int ) (((Time ) val ).toLocalTime ().toNanoOfDay () / 1_000_000L );
133146 case TIMESTAMP_WITH_TIME_ZONE :
134147 case TIMESTAMP_WITHOUT_TIME_ZONE :
135- return val -> TimestampData .fromLocalDateTime ((LocalDateTime ) val );
148+ return val ->
149+ TimestampData .fromLocalDateTime (
150+ val instanceof OffsetDateTime
151+ ? ((OffsetDateTime ) val ).toLocalDateTime ()
152+ : (LocalDateTime ) val );
136153 case TIMESTAMP_WITH_LOCAL_TIME_ZONE :
137154 return val ->
138155 TimestampData .fromInstant (
@@ -141,10 +158,15 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
141158 .toInstant ());
142159 case CHAR :
143160 case VARCHAR :
144- return val ->
145- val instanceof UUID
146- ? StringData .fromString (val .toString ())
147- : StringData .fromString ((String ) val );
161+ return val -> {
162+ if (val instanceof UUID ) {
163+ return StringData .fromString (val .toString ());
164+ } else if (val instanceof InetAddress ) {
165+ return StringData .fromString (((InetAddress ) val ).getHostAddress ());
166+ } else {
167+ return StringData .fromString ((String ) val );
168+ }
169+ };
148170 case ARRAY :
149171 case MAP :
150172 return val -> ClickHouseConverterUtils .toInternal (val , type );
@@ -242,6 +264,7 @@ private SerializationConverter createToExternalConverter(LogicalType type) {
242264
243265 @ FunctionalInterface
244266 interface SerializationConverter extends Serializable {
267+
245268 /**
246269 * Convert an internal field to java object and fill into the {@link
247270 * ClickHousePreparedStatement}.
@@ -252,6 +275,7 @@ void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement)
252275
253276 @ FunctionalInterface
254277 interface DeserializationConverter extends Serializable {
278+
255279 /**
256280 * Convert an object of {@link ClickHouseResultSet} to the internal data structure object.
257281 */
0 commit comments