Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 9805988

Browse files
committed
Made implementation type safe, no more serde using default config. Added an Avro based serialization example.
1 parent d53b56d commit 9805988

18 files changed

+390
-458
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ jdk: oraclejdk8
44
scala:
55
- 2.11.11
66
- 2.12.4
7-
sbt_args: -mem 1500
7+
sbt_args: -mem 2000
88
script:
99
- sbt "++ ${TRAVIS_SCALA_VERSION}!" test
1010
cache:

README.md

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,46 @@ Also, the explicit conversion `asJava` from a Scala `Iterable` to a Java `Iterab
9191

9292
## Implicit Serdes
9393

94-
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers implicit serdes to provide the serializers and de-serializers but at the same time also *allows the opt-in to use the default serializers registered in the Kafka Streams config*.
94+
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers type safe implicit serdes to provide the serializers and de-serializers. In doing so, the Scala library **does not use configuration based default serdes** which is not type safe and prone to runtime errors.
9595

96-
The optional implicit pattern is implemented with the usual null-default-value trick, but with a difference. The technique used is adopted from [this blog post](http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html).
96+
The implementation allows implicits for the `Serde`s or for `Serialized`, `Consumed`, `Produced` and `Joined`. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.
9797

98-
The standard way to implement the null-default-value trick could not be applied as Scala [does not allow](https://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments/4652681#4652681) a mix of default values and function overloads. And we have quite a few examples of such overloaded functions in the Kafka Streams API set.
98+
### Default Serdes
9999

100-
The implementation allows implicits for the `Serde`s or for `Serialized`, `Consumed` and `Produced`. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.
100+
The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.
101101

102-
The implementation does a trade-off in using the null-default-value trick as it moves some of the compile time errors to runtime.
102+
```scala
103+
object DefaultSerdes {
104+
implicit val stringSerde: Serde[String] = Serdes.String()
105+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
106+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
107+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
108+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
109+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
110+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
111+
}
112+
```
113+
114+
### Compile time typesafe
115+
116+
Not only the serdes, but `DefaultSerdes` also brings into scope implicit `Serialized`, `Produced`, `Consumed` and `Joined` instances. So all APIs that accept `Serialized`, `Produced`, `Consumed` or `Joined` will get these instances automatically with an `import DefaultSerdes._`.
103117

104-
### Examples
118+
Just one import of `DefaultSerdes._` and the following code does not need a bit of `Serialized`, `Produced`, `Consumed` or `Joined` to be specified explicitly or through the default config. **And the best part is that for any missing instances of these you get a compilation error.** ..
119+
120+
```scala
121+
val clicksPerRegion: KTableS[String, Long] =
122+
userClicksStream
123+
124+
// Join the stream against the table.
125+
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
126+
127+
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
128+
.map((_, regionWithClicks) => regionWithClicks)
129+
130+
// Compute the total per region by summing the individual click counts per region.
131+
.groupByKey
132+
.reduce(_ + _)
105133

106-
1. The example [StreamToTableJoinScalaIntegrationTestImplicitSerdes](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala) demonstrates how to use the technique of implicit `Serde`s
107-
2. The example [StreamToTableJoinScalaIntegrationTestImplicitSerialized](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerialized.scala) demonstrates how to use the technique of implicit `Serialized`, `Consumed` and `Produced`.
108-
3. The example [StreamToTableJoinScalaIntegrationTestMixImplicitSerialized](https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestMixImplicitSerialized.scala) demonstrates how to use the technique of how to use default serdes along with implicit `Serialized`, `Consumed` and `Produced`.
134+
// Write the (continuously updating) results to the output topic.
135+
clicksPerRegion.toStream.to(outputTopic)
136+
```

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ libraryDependencies ++= Seq(
2323
minitest % "test",
2424
minitestLaws % "test",
2525
algebird % "test",
26-
chill % "test"
26+
chill % "test",
27+
avro4s % "test"
2728
)
2829

2930
testFrameworks += new TestFramework("minitest.runner.Framework")

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ object Dependencies {
2121
val minitestLaws = "io.monix" %% "minitest-laws" % MinitestVersion
2222
val algebird = "com.twitter" %% "algebird-core" % AlgebirdVersion
2323
val chill = "com.twitter" %% "chill" % ChillVersion
24+
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % Avro4sVersion
2425
}

project/Versions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ object Versions {
99
val JDKVersion = "1.8"
1010
val Scala_2_12_Version = "2.12.4"
1111
val Scala_2_11_Version = "2.11.11"
12+
val Avro4sVersion = "1.8.3"
1213
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
1314
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package com.lightbend.kafka.scala.streams
6+
7+
import org.apache.kafka.common.serialization.{Serde, Serdes}
8+
9+
10+
/**
11+
* Implicit values for default serdes
12+
*/
13+
object DefaultSerdes {
14+
implicit val stringSerde: Serde[String] = Serdes.String()
15+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
16+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
17+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
18+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
19+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
20+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
21+
}

src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,6 @@ object ImplicitConversions {
3636

3737
implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
3838

39-
// technique for optional implicits adopted from
40-
// http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html
41-
42-
case class Perhaps[E](value: Option[E]) {
43-
def fold[F](ifAbsent: => F)(ifPresent: E => F): F = {
44-
value.fold(ifAbsent)(ifPresent)
45-
}
46-
}
47-
48-
//scalastyle:off null
49-
implicit def perhaps[E](implicit ev: E = null): Perhaps[E] = Perhaps(Option(ev))
5039
//scalastyle:on null
5140
// we would also like to allow users implicit serdes
5241
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
4242
}
4343

4444
def reduce(reducer: (V, V) => V,
45-
storeName: String): KTableS[K, V] = {
45+
storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
4646

4747
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4848
// works perfectly with Scala 2.12 though
49-
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
49+
inner.reduce(((v1: V, v2: V) =>
50+
reducer(v1, v2)).asReducer,
51+
Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
52+
.withKeySerde(keySerde)
53+
.withValueSerde(valueSerde)
54+
)
5055
}
5156

5257
def aggregate[VR](initializer: () => VR,

src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala

Lines changed: 21 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
5757
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
5858
}
5959

60-
def through(topic: String)(implicit produced: Perhaps[Produced[K, V]]): KStreamS[K, V] =
61-
produced.fold[KStreamS[K, V]] { inner.through(topic) } { ev => inner.through(topic, ev) }
60+
def through(topic: String)(implicit produced: Produced[K, V]): KStreamS[K, V] =
61+
inner.through(topic, produced)
6262

63-
def to(topic: String)(implicit produced: Perhaps[Produced[K, V]]): Unit =
64-
produced.fold[Unit] { inner.to(topic) } { implicit ev => inner.to(topic, ev) }
63+
def to(topic: String)(implicit produced: Produced[K, V]): Unit =
64+
inner.to(topic, produced)
6565

6666
//scalastyle:off null
6767
def transform[K1, V1](transformerSupplier: () => Transformer[K, V, (K1, V1)],
@@ -124,61 +124,34 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
124124
* implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
125125
* - .groupByKey
126126
*/
127-
def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] =
128-
serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }
129-
130-
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Perhaps[Serialized[KR, V]]): KGroupedStreamS[KR, V] = {
131-
serialized.fold[KGroupedStreamS[KR, V]] {
132-
inner.groupBy(selector.asKeyValueMapper)
133-
} { implicit ev =>
134-
inner.groupBy(selector.asKeyValueMapper, ev)
135-
}
136-
}
127+
def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStreamS[K, V] =
128+
inner.groupByKey(serialized)
129+
130+
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStreamS[KR, V] =
131+
inner.groupBy(selector.asKeyValueMapper, serialized)
137132

138133
def join[VO, VR](otherStream: KStreamS[K, VO],
139134
joiner: (V, VO) => VR,
140-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
141-
142-
joined.fold[KStreamS[K, VR]] {
143-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
144-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
145-
}
146-
}
135+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
136+
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
147137

148138
def join[VT, VR](table: KTableS[K, VT],
149-
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
150-
151-
joined.fold[KStreamS[K, VR]] {
152-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
153-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
154-
}
155-
}
139+
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
140+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
156141

157142
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
158143
keyValueMapper: (K, V) => GK,
159-
joiner: (V, GV) => RV): KStreamS[K, RV] = {
160-
161-
inner.join[GK, GV, RV](globalKTable, keyValueMapper(_,_), joiner(_,_))
162-
}
144+
joiner: (V, GV) => RV): KStreamS[K, RV] =
145+
inner.join[GK, GV, RV](globalKTable, keyValueMapper(_,_), joiner(_,_))
163146

164147
def leftJoin[VO, VR](otherStream: KStreamS[K, VO],
165148
joiner: (V, VO) => VR,
166-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
167-
168-
joined.fold[KStreamS[K, VR]] {
169-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
170-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
171-
}
172-
}
149+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
150+
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
173151

174152
def leftJoin[VT, VR](table: KTableS[K, VT],
175-
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
176-
177-
joined.fold[KStreamS[K, VR]] {
178-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
179-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
180-
}
181-
}
153+
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
154+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
182155

183156
def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
184157
keyValueMapper: (K, V) => GK,
@@ -189,13 +162,8 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
189162

190163
def outerJoin[VO, VR](otherStream: KStreamS[K, VO],
191164
joiner: (V, VO) => VR,
192-
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
193-
194-
joined.fold[KStreamS[K, VR]] {
195-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
196-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
197-
}
198-
}
165+
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
166+
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
199167

200168
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)
201169

src/main/scala/com/lightbend/kafka/scala/streams/KTableS.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,8 @@ class KTableS[K, V](val inner: KTable[K, V]) {
4848
inner.toStream[KR](mapper.asKeyValueMapper)
4949
}
5050

51-
def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Perhaps[Serialized[KR, VR]]): KGroupedTableS[KR, VR] = {
52-
serialized.fold[KGroupedTableS[KR, VR]] {
53-
inner.groupBy(selector.asKeyValueMapper)
54-
} { implicit ev =>
55-
inner.groupBy(selector.asKeyValueMapper, ev)
56-
}
51+
def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTableS[KR, VR] = {
52+
inner.groupBy(selector.asKeyValueMapper, serialized)
5753
}
5854

5955
def join[VO, VR](other: KTableS[K, VO],

0 commit comments

Comments
 (0)