Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 91320eb

Browse files
committed
Implement implicit serde for join and leftJoin - part of Issue #46
1 parent 675ce54 commit 91320eb

File tree

3 files changed

+50
-51
lines changed

3 files changed

+50
-51
lines changed

src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,17 @@ object ImplicitConversions {
5252
// we would also like to allow users implicit serdes
5353
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
5454

55-
implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] =
55+
implicit def SerializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
5656
Serialized.`with`(keySerde, valueSerde)
5757

58-
implicit def ConsumedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K,V] =
58+
implicit def ConsumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
5959
Consumed.`with`(keySerde, valueSerde)
6060

61-
implicit def ProducedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K,V] =
61+
implicit def ProducedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
6262
Produced.`with`(keySerde, valueSerde)
63+
64+
implicit def JoinedFromKVOSerde[K, V, VO](implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] = {
65+
println(s"ks: $keySerde vs: $valueSerde ovs: $otherValueSerde")
66+
Joined.`with`(keySerde, valueSerde, otherValueSerde)
67+
}
6368
}

src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -135,30 +135,21 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
135135

136136
def join[VO, VR](otherStream: KStreamS[K, VO],
137137
joiner: (V, VO) => VR,
138-
windows: JoinWindows): KStreamS[K, VR] = {
138+
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
139139

140-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows)
141-
}
142-
143-
def join[VO, VR](otherStream: KStreamS[K, VO],
144-
joiner: (V, VO) => VR,
145-
windows: JoinWindows,
146-
joined: Joined[K, V, VO]): KStreamS[K, VR] = {
147-
148-
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
149-
}
150-
151-
def join[VT, VR](table: KTableS[K, VT],
152-
joiner: (V, VT) => VR): KStreamS[K, VR] = {
153-
154-
inner.join[VT, VR](table.inner, joiner.asValueJoiner)
140+
joined.fold[KStreamS[K, VR]] {
141+
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
142+
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
143+
}
155144
}
156145

157146
def join[VT, VR](table: KTableS[K, VT],
158-
joiner: (V, VT) => VR,
159-
joined: Joined[K, V, VT]): KStreamS[K, VR] = {
147+
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
160148

161-
inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
149+
joined.fold[KStreamS[K, VR]] {
150+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
151+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
152+
}
162153
}
163154

164155
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
@@ -170,30 +161,21 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
170161

171162
def leftJoin[VO, VR](otherStream: KStreamS[K, VO],
172163
joiner: (V, VO) => VR,
173-
windows: JoinWindows): KStreamS[K, VR] = {
174-
175-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows)
176-
}
177-
178-
def leftJoin[VO, VR](otherStream: KStreamS[K, VO],
179-
joiner: (V, VO) => VR,
180-
windows: JoinWindows,
181-
joined: Joined[K, V, VO]): KStreamS[K, VR] = {
182-
183-
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
184-
}
164+
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
185165

186-
def leftJoin[VT, VR](table: KTableS[K, VT],
187-
joiner: (V, VT) => VR): KStreamS[K, VR] = {
188-
189-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner)
166+
joined.fold[KStreamS[K, VR]] {
167+
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
168+
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
169+
}
190170
}
191171

192172
def leftJoin[VT, VR](table: KTableS[K, VT],
193-
joiner: (V, VT) => VR,
194-
joined: Joined[K, V, VT]): KStreamS[K, VR] = {
173+
joiner: (V, VT) => VR)(implicit joined: Perhaps[Joined[K, V, VT]]): KStreamS[K, VR] = {
195174

196-
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
175+
joined.fold[KStreamS[K, VR]] {
176+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner) } { implicit ev =>
177+
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, ev)
178+
}
197179
}
198180

199181
def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
@@ -205,17 +187,12 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
205187

206188
def outerJoin[VO, VR](otherStream: KStreamS[K, VO],
207189
joiner: (V, VO) => VR,
208-
windows: JoinWindows): KStreamS[K, VR] = {
209-
210-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows)
211-
}
190+
windows: JoinWindows)(implicit joined: Perhaps[Joined[K, V, VO]]): KStreamS[K, VR] = {
212191

213-
def outerJoin[VO, VR](otherStream: KStreamS[K, VO],
214-
joiner: (V, VO) => VR,
215-
windows: JoinWindows,
216-
joined: Joined[K, V, VO]): KStreamS[K, VR] = {
217-
218-
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
192+
joined.fold[KStreamS[K, VR]] {
193+
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
194+
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, ev)
195+
}
219196
}
220197

221198
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)

src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, Messa
2626

2727
import org.apache.kafka.common.serialization._
2828
import org.apache.kafka.streams._
29+
import org.apache.kafka.streams.kstream.Joined
2930

3031
import org.apache.kafka.clients.consumer.ConsumerRecord
3132
import ImplicitConversions._
@@ -79,6 +80,22 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes extends TestSuite[Kaf
7980
p
8081
}
8182

83+
// this will make the leftJoin use the key and value serde from this implicit, while use the default
84+
// from config for otherValueSerde
85+
implicit val joined: Joined[String, Long, String] = Joined.keySerde(stringSerde).withValueSerde(longSerde)
86+
87+
/**
88+
* Patterns for handling serdes in leftJoin (similar will be the handling of all join functions that accept a Joined argument)
89+
*
90+
* a. For `Joined[K, V, VO`], just make the implicit serdes available for `K`, `V` and `VO`. In the following
91+
* example of `leftJoin`, we have `Joined[String, Long, String]` and have the implicits `stringSerde` and
92+
* `longSerdes` available in scope. This should be enough to make an implicit `Joined` for `leftJoin`.
93+
*
94+
* b. Want to use default serdes from config for key and otherValue. Add the implicit `Joined` in scope as:
95+
* `implicit val joined: Joined[String, Long, String] = Joined.valueSerde(longSerde)`. The other serdes will
96+
* be picked up as `null` and used from the config.
97+
*/
98+
8299
val builder = new StreamsBuilderS()
83100

84101
val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

0 commit comments

Comments
 (0)