Skip to content

Commit a891038

Browse files
#13: Added Avro Logical Types Support (#14)
Co-authored-by: Anastasiia Sergienko <46891819+AnastasiiaSergienko@users.noreply.github.com>
1 parent 28f7c5a commit a891038

File tree

7 files changed

+374
-137
lines changed

7 files changed

+374
-137
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ These plugins help with project development.
6060
| [SBT PGP][sbt-pgp-link] | PGP plugin for `sbt` | BSD 3-Clause License |
6161
| [SBT Git][sbt-git-link] | A plugin for Git integration, used to version the release jars | BSD 2-Clause License |
6262

63-
[travis-badge]: https://img.shields.io/travis/exasol/import-export-udf-common-scala/master.svg?logo=travis
63+
[travis-badge]: https://img.shields.io/travis/com/exasol/import-export-udf-common-scala/master.svg?logo=travis
6464
[travis-link]: https://travis-ci.com/exasol/import-export-udf-common-scala
6565
[coveralls-badge]: https://coveralls.io/repos/github/exasol/import-export-udf-common-scala/badge.svg?branch=master
6666
[coveralls-link]: https://coveralls.io/github/exasol/import-export-udf-common-scala?branch=master

doc/changes/changes_0.1.1.md renamed to doc/changes/changes_0.2.0.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
# Import Export UDF Common Scala 0.1.1, released 2020-10-DD
1+
# Import Export UDF Common Scala 0.2.0, released 2020-10-DD
22

33
## Features
44

55
* #9: Added SLF4J Logging Library as Common Dependency (PR #10)
6-
* #11: Added Support for Complex Avro Types (Array, Map, Record) (PR #12)
6+
* #11: Added Support for Avro Complex Types (Array, Map, Record) (PR #12)
7+
* #13: Added Support for Avro Logical Types (BigDecimal, Date, Timestamp) (PR #14)
78

89
## Documentation
910

@@ -19,10 +20,10 @@
1920

2021
### Test Dependency Updates
2122

22-
* Updated `org.mockito:mockito-core` from `3.5.10` to `3.5.15`.
23+
* Updated `org.mockito:mockito-core` from `3.5.10` to `3.6.0`.
2324

2425
### Plugin Updates
2526

2627
* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`.
27-
* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.11`.
28-
* Updated `org.wartremover:sbt-wartremover-contib` from `1.3.8` to `1.3.9`.
28+
* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.12`.
29+
* Updated `org.wartremover:sbt-wartremover-contib` from `1.3.8` to `1.3.10`.

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ object Dependencies {
1515
// Test dependencies versions
1616
private val ScalaTestVersion = "3.2.2"
1717
private val ScalaTestPlusVersion = "1.0.0-M2"
18-
private val MockitoCoreVersion = "3.5.15"
18+
private val MockitoCoreVersion = "3.6.0"
1919

2020
val ExasolResolvers: Seq[Resolver] = Seq(
2121
"Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases"

project/plugins.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
// Adds a `wartremover` a flexible Scala code linting tool
22
// http://github.com/puffnfresh/wartremover
3-
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.11")
3+
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.12")
44

55
// Adds Contrib Warts
66
// http://github.com/wartremover/wartremover-contrib/
7-
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.9")
7+
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.10")
88

99
// Adds most common doc api mappings
1010
// https://github.com/ThoughtWorksInc/sbt-api-mappings
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package com.exasol.common.avro
2+
3+
import java.nio.ByteBuffer
4+
import java.sql.Date
5+
import java.sql.Timestamp
6+
import java.time._
7+
import java.util.{Map => JMap}
8+
import java.util.Collection
9+
10+
import com.exasol.common.json.JsonMapper
11+
12+
import org.apache.avro.Conversions.DecimalConversion
13+
import org.apache.avro.LogicalTypes
14+
import org.apache.avro.Schema
15+
import org.apache.avro.data.TimeConversions.TimestampMicrosConversion
16+
import org.apache.avro.data.TimeConversions.TimestampMillisConversion
17+
import org.apache.avro.generic.GenericFixed
18+
import org.apache.avro.generic.IndexedRecord
19+
import org.apache.avro.util.Utf8
20+
21+
/**
22+
* Avro data type converter helper class.
23+
*/
24+
final class AvroConverter {
25+
26+
private[this] val EXASOL_DECIMAL_PRECISION = 36
27+
private[this] lazy val decimalConverter = new DecimalConversion()
28+
private[this] lazy val timestampMillisConverter = new TimestampMillisConversion()
29+
private[this] lazy val timestampMicrosConverter = new TimestampMicrosConversion()
30+
31+
/**
32+
* Converts Avro schema field value into a Java datatype.
33+
*
34+
* If Avro value is a complex datatype, then it is converted to the
35+
* JSON string.
36+
*
37+
* @param value Avro record field value
38+
* @param schema Avro record field schema
39+
* @return regular Java data type
40+
*/
41+
def convert(value: Any, schema: Schema): Any = {
42+
val fieldValue = getAvroValue(value, schema)
43+
if (isPrimitiveAvroType(schema.getType())) {
44+
fieldValue
45+
} else {
46+
JsonMapper.toJson(fieldValue)
47+
}
48+
}
49+
50+
private[this] def isPrimitiveAvroType(avroType: Schema.Type): Boolean =
51+
avroType match {
52+
case Schema.Type.ARRAY => false
53+
case Schema.Type.MAP => false
54+
case Schema.Type.RECORD => false
55+
case _ => true
56+
}
57+
58+
@SuppressWarnings(Array("org.wartremover.warts.Return", "org.wartremover.warts.ToString"))
59+
private[this] def getAvroValue(value: Any, field: Schema): Any = {
60+
if (value == null) {
61+
return null // scalastyle:ignore return
62+
}
63+
field.getType() match {
64+
case Schema.Type.NULL => value
65+
case Schema.Type.BOOLEAN => value
66+
case Schema.Type.INT => getIntValue(value, field)
67+
case Schema.Type.LONG => getLongValue(value, field)
68+
case Schema.Type.FLOAT => value
69+
case Schema.Type.DOUBLE => value
70+
case Schema.Type.STRING => getStringValue(value, field)
71+
case Schema.Type.FIXED => getFixedValue(value, field)
72+
case Schema.Type.BYTES => getBytesValue(value, field)
73+
case Schema.Type.ENUM => value.toString
74+
case Schema.Type.UNION => getUnionValue(value, field)
75+
case Schema.Type.ARRAY => getArrayValue(value, field)
76+
case Schema.Type.MAP => getMapValue(value, field)
77+
case Schema.Type.RECORD => getRecordValue(value)
78+
}
79+
}
80+
81+
private[this] def getIntValue(value: Any, field: Schema): Any =
82+
field.getLogicalType() match {
83+
case _: LogicalTypes.Date => dateFromSinceEpoch(value.asInstanceOf[Int].longValue())
84+
case _ => value
85+
}
86+
87+
private[this] def dateFromSinceEpoch(days: Long): Date = {
88+
// scalastyle:off magic.number
89+
val date = LocalDateTime.of(1970, 1, 1, 0, 0, 0).plusDays(days)
90+
// scalastyle:on
91+
val millis = date.atZone(ZoneId.systemDefault).toInstant().toEpochMilli()
92+
new Date(millis)
93+
}
94+
95+
private[this] def getLongValue(value: Any, field: Schema): Any =
96+
field.getLogicalType() match {
97+
case lt: LogicalTypes.TimestampMillis =>
98+
Timestamp.from(timestampMillisConverter.fromLong(value.asInstanceOf[Long], field, lt))
99+
case lt: LogicalTypes.TimestampMicros =>
100+
Timestamp.from(timestampMicrosConverter.fromLong(value.asInstanceOf[Long], field, lt))
101+
case _ => value
102+
}
103+
104+
private[this] def getFixedValue(value: Any, field: Schema): Any =
105+
field.getLogicalType() match {
106+
case lt: LogicalTypes.Decimal =>
107+
checkPrecision(lt)
108+
decimalConverter.fromFixed(value.asInstanceOf[GenericFixed], field, lt)
109+
case _ => getStringValue(value, field)
110+
}
111+
112+
private[this] def getBytesValue(value: Any, field: Schema): Any =
113+
field.getLogicalType() match {
114+
case lt: LogicalTypes.Decimal =>
115+
checkPrecision(lt)
116+
decimalConverter.fromBytes(value.asInstanceOf[ByteBuffer], field, lt)
117+
case _ => getStringValue(value, field)
118+
}
119+
120+
private[this] def checkPrecision(logicalType: LogicalTypes.Decimal): Unit = {
121+
val precision = logicalType.getPrecision()
122+
if (precision > EXASOL_DECIMAL_PRECISION) {
123+
throw new IllegalArgumentException(
124+
s"Decimal precision ${precision.toString()} is larger than " +
125+
s"maximum allowed precision ${EXASOL_DECIMAL_PRECISION.toString()}."
126+
)
127+
}
128+
}
129+
130+
private[this] def getStringValue(value: Any, field: Schema): String =
131+
value match {
132+
case str: String => str
133+
case utf: Utf8 => utf.toString
134+
case byteBuffer: ByteBuffer => new String(byteBuffer.array)
135+
case arrayByte: Array[Byte] => new String(arrayByte)
136+
case fixed: GenericFixed => new String(fixed.bytes())
137+
case _ =>
138+
throw new IllegalArgumentException(
139+
s"Avro ${field.getName} type cannot be converted to string!"
140+
)
141+
}
142+
143+
private[this] def getUnionValue(value: Any, field: Schema): Any = {
144+
val types = field.getTypes()
145+
val typesSize = types.size()
146+
typesSize match {
147+
case 1 => getAvroValue(value, types.get(0))
148+
case 2 =>
149+
if (types.get(0).getType() == Schema.Type.NULL) {
150+
getAvroValue(value, types.get(1))
151+
} else if (types.get(1).getType() == Schema.Type.NULL) {
152+
getAvroValue(value, types.get(0))
153+
} else {
154+
throw new IllegalArgumentException(
155+
"Avro Union type should contain a primitive and null!"
156+
)
157+
}
158+
case _ =>
159+
throw new IllegalArgumentException("Avro Union type should contain a primitive and null!")
160+
}
161+
}
162+
163+
private[this] def getArrayValue(value: Any, field: Schema): Array[Any] = value match {
164+
case array: Array[_] => array.map(getAvroValue(_, field.getElementType()))
165+
case list: Collection[_] =>
166+
val result = new Array[Any](list.size)
167+
var i = 0
168+
list.stream().forEach { element =>
169+
val _ = result.update(i, getAvroValue(element, field.getElementType()))
170+
i += 1
171+
}
172+
result
173+
case other =>
174+
throw new IllegalArgumentException(
175+
s"Unsupported Avro Array type '${other.getClass.getName()}'."
176+
)
177+
}
178+
179+
private[this] def getMapValue(map: Any, field: Schema): JMap[String, Any] = {
180+
val result = new java.util.HashMap[String, Any]()
181+
map.asInstanceOf[JMap[String, _]].forEach { (key, value) =>
182+
val _ = result.put(key, getAvroValue(value, field.getValueType()))
183+
}
184+
result
185+
}
186+
187+
private[this] def getRecordValue(value: Any): JMap[String, Any] = value match {
188+
case record: IndexedRecord =>
189+
val size = record.getSchema().getFields().size
190+
val fields = record.getSchema().getFields()
191+
val result = new java.util.HashMap[String, Any]()
192+
var i = 0
193+
while (i < size) {
194+
val _ =
195+
result.put(fields.get(i).name, getAvroValue(record.get(i), fields.get(i).schema))
196+
i += 1
197+
}
198+
result
199+
case other =>
200+
throw new IllegalArgumentException(
201+
s"Unsupported Avro Record type '${other.getClass.getName()}'."
202+
)
203+
}
204+
205+
}

0 commit comments

Comments
 (0)