Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 74aabcd

Browse files
authored
Merge pull request #59 from lightbend/no-config-serde
Made implementation completely typesafe - no more default serde with config
2 parents 6f49e62 + 0cd5df6 commit 74aabcd

16 files changed

+384
-455
lines changed

README.md

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,48 @@ Also, the explicit conversion `asJava` from a Scala `Iterable` to a Java `Iterab
9191

9292
## Implicit Serdes
9393

94-
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers implicit serdes to provide the serializers and de-serializers but at the same time also *allows the opt-in to use the default serializers registered in the Kafka Streams config*.
94+
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers type safe implicit serdes to provide the serializers and de-serializers. In doing so, the Scala library **does not use configuration based default serdes** which is not type safe and prone to runtime errors.
9595

96-
The optional implicit pattern is implemented with the usual null-default-value trick, but with a difference. The technique used is adopted from [this blog post](http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html).
96+
The implementation allows implicits for the `Serde`s or for `Serialized`, `Consumed`, `Produced` and `Joined`. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.
9797

98-
The standard way to implement the null-default-value trick could not be applied as Scala [does not allow](https://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments/4652681#4652681) a mix of default values and function overloads. And we have quite a few examples of such overloaded functions in the Kafka Streams API set.
98+
### Default Serdes
9999

100-
The implementation allows implicits for the `Serde`s or for `Serialized`, `Consumed` and `Produced`. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.
100+
The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.
101101

102-
The implementation does a trade-off in using the null-default-value trick as it moves some of the compile time errors to runtime.
102+
```scala
103+
object DefaultSerdes {
104+
implicit val stringSerde: Serde[String] = Serdes.String()
105+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
106+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
107+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
108+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
109+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
110+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
111+
}
112+
```
113+
114+
### Compile time typesafe
115+
116+
Not only the serdes, but `DefaultSerdes` also brings into scope implicit `Serialized`, `Produced`, `Consumed` and `Joined` instances. So all APIs that accept `Serialized`, `Produced`, `Consumed` or `Joined` will get these instances automatically with an `import DefaultSerdes._`.
117+
118+
Just one import of `DefaultSerdes._` and the following code does not need a bit of `Serialized`, `Produced`, `Consumed` or `Joined` to be specified explicitly or through the default config. **And the best part is that for any missing instances of these you get a compilation error.** ..
119+
120+
```scala
121+
import DefaultSerdes._
122+
123+
val clicksPerRegion: KTableS[String, Long] =
124+
userClicksStream
103125

104-
### Examples
126+
// Join the stream against the table.
127+
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
128+
129+
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
130+
.map((_, regionWithClicks) => regionWithClicks)
131+
132+
// Compute the total per region by summing the individual click counts per region.
133+
.groupByKey
134+
.reduce(_ + _)
105135

106-
1. The example [StreamToTableJoinScalaIntegrationTestImplicitSerdes](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala) demonstrates how to use the technique of implicit `Serde`s
107-
2. The example [StreamToTableJoinScalaIntegrationTestImplicitSerialized](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerialized.scala) demonstrates how to use the technique of implicit `Serialized`, `Consumed` and `Produced`.
108-
3. The example [StreamToTableJoinScalaIntegrationTestMixImplicitSerialized](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestMixImplicitSerialized.scala) demonstrates how to use the technique of how to use default serdes along with implicit `Serialized`, `Consumed` and `Produced`.
136+
// Write the (continuously updating) results to the output topic.
137+
clicksPerRegion.toStream.to(outputTopic)
138+
```

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ libraryDependencies ++= Seq(
2323
minitest % "test",
2424
minitestLaws % "test",
2525
algebird % "test",
26-
chill % "test"
26+
chill % "test",
27+
avro4s % "test"
2728
)
2829

2930
testFrameworks += new TestFramework("minitest.runner.Framework")

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ object Dependencies {
2121
val minitestLaws = "io.monix" %% "minitest-laws" % MinitestVersion
2222
val algebird = "com.twitter" %% "algebird-core" % AlgebirdVersion
2323
val chill = "com.twitter" %% "chill" % ChillVersion
24+
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % Avro4sVersion
2425
}

project/Versions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ object Versions {
99
val JDKVersion = "1.8"
1010
val Scala_2_12_Version = "2.12.4"
1111
val Scala_2_11_Version = "2.11.11"
12+
val Avro4sVersion = "1.8.3"
1213
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
1314
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package com.lightbend.kafka.scala.streams
6+
7+
import org.apache.kafka.common.serialization.{Serde, Serdes}
8+
9+
10+
/**
11+
* Implicit values for default serdes
12+
*/
13+
object DefaultSerdes {
14+
implicit val stringSerde: Serde[String] = Serdes.String()
15+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
16+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
17+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
18+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
19+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
20+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
21+
}

src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,6 @@ object ImplicitConversions {
3636

3737
implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
3838

39-
// technique for optional implicits adopted from
40-
// http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html
41-
42-
case class Perhaps[E](value: Option[E]) {
43-
def fold[F](ifAbsent: => F)(ifPresent: E => F): F = {
44-
value.fold(ifAbsent)(ifPresent)
45-
}
46-
}
47-
48-
//scalastyle:off null
49-
implicit def perhaps[E](implicit ev: E = null): Perhaps[E] = Perhaps(Option(ev))
5039
//scalastyle:on null
5140
// we would also like to allow users implicit serdes
5241
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`

src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala

Lines changed: 21 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
5757
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
5858
}
5959

60-
def through(topic: String)(implicit produced: Perhaps[Produced[K, V]]): KStreamS[K, V] =
61-
produced.fold[KStreamS[K, V]] { inner.through(topic) } { ev => inner.through(topic, ev) }
60+
def through(topic: String)(implicit produced: Produced[K, V]): KStreamS[K, V] =
61+
inner.through(topic, produced)
6262

63-
def to(topic: String)(implicit produced: Perhaps[Produced[K, V]]): Unit =
64-
produced.fold[Unit] { inner.to(topic) } { implicit ev => inner.to(topic, ev) }
63+
def to(topic: String)(implicit produced: Produced[K, V]): Unit =
64+
inner.to(topic, produced)
6565

6666
//scalastyle:off null
6767
def transform[K1, V1](transformerSupplier: () => Transformer[K, V, (K1, V1)],
@@ -124,61 +124,34 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
124124
* implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
125125
* - .groupByKey
126126
*/
127-
def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] =
128-
serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }
129-
130-
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Perhaps[Serialized[KR, V]]): KGroupedStreamS[KR, V] = {
131-
serialized.fold[KGroupedStreamS[KR, V]] {
132-
inner.groupBy(selector.asKeyValueMapper)
133-
} { implicit ev =>
134-
inner.groupBy(selector.asKeyValueMapper, ev)
135-
}
136-
}
127+
def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStreamS[K, V] =
128+
inner.groupByKey(serialized)
129+
130+
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStreamS[KR, V] =
131+
inner.groupBy(selector.asKeyValueMapper, serialized)
137132

138133
def join[VO, VR](otherStream: KStreamS[K, VO],
139134
joiner: (V, VO) => VR,
140-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
141-
142-
joined.fold[KStreamS[K, VR]] {
143-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
144-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
145-
}
146-
}
135+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
136+
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
147137

148138
def join[VT, VR](table: KTableS[K, VT],
149-
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
150-
151-
joined.fold[KStreamS[K, VR]] {
152-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
153-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
154-
}
155-
}
139+
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
140+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
156141

157142
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
158143
keyValueMapper: (K, V) => GK,
159-
joiner: (V, GV) => RV): KStreamS[K, RV] = {
160-
161-
inner.join[GK, GV, RV](globalKTable, keyValueMapper(_,_), joiner(_,_))
162-
}
144+
joiner: (V, GV) => RV): KStreamS[K, RV] =
145+
inner.join[GK, GV, RV](globalKTable, keyValueMapper(_,_), joiner(_,_))
163146

164147
def leftJoin[VO, VR](otherStream: KStreamS[K, VO],
165148
joiner: (V, VO) => VR,
166-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
167-
168-
joined.fold[KStreamS[K, VR]] {
169-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
170-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
171-
}
172-
}
149+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
150+
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
173151

174152
def leftJoin[VT, VR](table: KTableS[K, VT],
175-
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
176-
177-
joined.fold[KStreamS[K, VR]] {
178-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
179-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
180-
}
181-
}
153+
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
154+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
182155

183156
def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
184157
keyValueMapper: (K, V) => GK,
@@ -189,13 +162,8 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
189162

190163
def outerJoin[VO, VR](otherStream: KStreamS[K, VO],
191164
joiner: (V, VO) => VR,
192-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
193-
194-
joined.fold[KStreamS[K, VR]] {
195-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
196-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
197-
}
198-
}
165+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
166+
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
199167

200168
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)
201169

src/main/scala/com/lightbend/kafka/scala/streams/KTableS.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,8 @@ class KTableS[K, V](val inner: KTable[K, V]) {
4848
inner.toStream[KR](mapper.asKeyValueMapper)
4949
}
5050

51-
def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Perhaps[Serialized[KR, VR]]): KGroupedTableS[KR, VR] = {
52-
serialized.fold[KGroupedTableS[KR, VR]] {
53-
inner.groupBy(selector.asKeyValueMapper)
54-
} { implicit ev =>
55-
inner.groupBy(selector.asKeyValueMapper, ev)
56-
}
51+
def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTableS[KR, VR] = {
52+
inner.groupBy(selector.asKeyValueMapper, serialized)
5753
}
5854

5955
def join[VO, VR](other: KTableS[K, VO],
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// adopted from Openshine implementation
2+
package com.lightbend.kafka.scala.streams
3+
4+
import org.apache.kafka.common.serialization.{Serde, Deserializer => JDeserializer, Serializer => JSerializer}
5+
6+
trait ScalaSerde[T] extends Serde[T] {
7+
override def deserializer(): JDeserializer[T]
8+
9+
override def serializer(): JSerializer[T]
10+
11+
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
12+
13+
override def close(): Unit = ()
14+
}
15+
16+
trait StatelessScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] {
17+
def serialize(data: T): Array[Byte]
18+
def deserialize(data: Array[Byte]): Option[T]
19+
20+
override def deserializer(): Deserializer[T] =
21+
(data: Array[Byte]) => deserialize(data)
22+
23+
override def serializer(): Serializer[T] =
24+
(data: T) => serialize(data)
25+
}
26+
27+
trait Deserializer[T >: Null] extends JDeserializer[T] {
28+
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
29+
30+
override def close(): Unit = ()
31+
32+
override def deserialize(topic: String, data: Array[Byte]): T =
33+
Option(data).flatMap(deserialize).orNull
34+
35+
def deserialize(data: Array[Byte]): Option[T]
36+
}
37+
38+
trait Serializer[T] extends JSerializer[T] {
39+
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
40+
41+
override def close(): Unit = ()
42+
43+
override def serialize(topic: String, data: T): Array[Byte] =
44+
Option(data).map(serialize).orNull
45+
46+
def serialize(data: T): Array[Byte]
47+
}

src/main/scala/com/lightbend/kafka/scala/streams/StreamsBuilderS.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,28 @@ import scala.collection.JavaConverters._
2020
*/
2121
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {
2222

23-
def stream[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
24-
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topic) } { implicit ev => inner.stream[K, V](topic, ev) }
23+
def stream[K, V](topic: String)(implicit consumed: Consumed[K, V]): KStreamS[K, V] =
24+
inner.stream[K, V](topic, consumed)
2525

26-
def stream[K, V](topics: List[String])(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
27-
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topics.asJava) } { implicit ev => inner.stream[K, V](topics.asJava, ev) }
26+
def stream[K, V](topics: List[String])(implicit consumed: Consumed[K, V]): KStreamS[K, V] =
27+
inner.stream[K, V](topics.asJava, consumed)
2828

29-
def stream[K, V](topicPattern: Pattern)(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
30-
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topicPattern) } { implicit ev => inner.stream[K, V](topicPattern, ev) }
29+
def stream[K, V](topicPattern: Pattern)(implicit consumed: Consumed[K, V]): KStreamS[K, V] =
30+
inner.stream[K, V](topicPattern, consumed)
3131

32-
def table[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): KTableS[K, V] =
33-
consumed.fold[KTableS[K, V]] { inner.table[K, V](topic) } { implicit ev => inner.table[K, V](topic, ev) }
32+
def table[K, V](topic: String)(implicit consumed: Consumed[K, V]): KTableS[K, V] =
33+
inner.table[K, V](topic, consumed)
3434

3535
def table[K, V](topic: String, materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]])
36-
(implicit consumed: Perhaps[Consumed[K, V]]): KTableS[K, V] =
37-
consumed.fold[KTableS[K, V]] { inner.table(topic, materialized) } { implicit ev => inner.table[K, V](topic, ev, materialized) }
36+
(implicit consumed: Consumed[K, V]): KTableS[K, V] =
37+
inner.table[K, V](topic, consumed, materialized)
3838

39-
def globalTable[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): GlobalKTable[K, V] =
40-
consumed.fold[GlobalKTable[K, V]] { inner.globalTable(topic) } { implicit ev => inner.globalTable(topic, ev) }
39+
def globalTable[K, V](topic: String)(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
40+
inner.globalTable(topic, consumed)
4141

4242
def globalTable[K, V](topic: String, materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]])
43-
(implicit consumed: Perhaps[Consumed[K, V]]): GlobalKTable[K, V] =
44-
consumed.fold[GlobalKTable[K, V]] { inner.globalTable(topic, materialized) } { implicit ev => inner.globalTable(topic, ev, materialized) }
43+
(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
44+
inner.globalTable(topic, consumed, materialized)
4545

4646
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)
4747

0 commit comments

Comments
 (0)