@@ -135,30 +135,21 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
135135
136136 def join [VO , VR ](otherStream : KStreamS [K , VO ],
137137 joiner : (V , VO ) => VR ,
138- windows : JoinWindows ): KStreamS [K , VR ] = {
138+ windows : JoinWindows )( implicit joined : Perhaps [ Joined [ K , V , VO ]]) : KStreamS [K , VR ] = {
139139
140- inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows)
141- }
142-
143- def join [VO , VR ](otherStream : KStreamS [K , VO ],
144- joiner : (V , VO ) => VR ,
145- windows : JoinWindows ,
146- joined : Joined [K , V , VO ]): KStreamS [K , VR ] = {
147-
148- inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, joined)
149- }
150-
151- def join [VT , VR ](table : KTableS [K , VT ],
152- joiner : (V , VT ) => VR ): KStreamS [K , VR ] = {
153-
154- inner.join[VT , VR ](table.inner, joiner.asValueJoiner)
140+ joined.fold[KStreamS [K , VR ]] {
141+ inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
142+ inner.join[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
143+ }
155144 }
156145
157146 def join [VT , VR ](table : KTableS [K , VT ],
158- joiner : (V , VT ) => VR ,
159- joined : Joined [K , V , VT ]): KStreamS [K , VR ] = {
147+ joiner : (V , VT ) => VR )(implicit joined : Perhaps [Joined [K , V , VT ]]): KStreamS [K , VR ] = {
160148
161- inner.join[VT , VR ](table.inner, joiner.asValueJoiner, joined)
149+ joined.fold[KStreamS [K , VR ]] {
150+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
151+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
152+ }
162153 }
163154
164155 def join [GK , GV , RV ](globalKTable : GlobalKTable [GK , GV ],
@@ -170,30 +161,21 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
170161
171162 def leftJoin [VO , VR ](otherStream : KStreamS [K , VO ],
172163 joiner : (V , VO ) => VR ,
173- windows : JoinWindows ): KStreamS [K , VR ] = {
174-
175- inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows)
176- }
177-
178- def leftJoin [VO , VR ](otherStream : KStreamS [K , VO ],
179- joiner : (V , VO ) => VR ,
180- windows : JoinWindows ,
181- joined : Joined [K , V , VO ]): KStreamS [K , VR ] = {
182-
183- inner.leftJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, joined)
184- }
164+ windows : JoinWindows )(implicit joined : Perhaps [Joined [K , V , VO ]]): KStreamS [K , VR ] = {
185165
186- def leftJoin [ VT , VR ]( table : KTableS [ K , VT ],
187- joiner : ( V , VT ) => VR ) : KStreamS [ K , VR ] = {
188-
189- inner.leftJoin[ VT , VR ](table.inner, joiner.asValueJoiner)
166+ joined.fold[ KStreamS [ K , VR ]] {
167+ inner.leftJoin[ VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
168+ inner.leftJoin[ VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
169+ }
190170 }
191171
192172 def leftJoin [VT , VR ](table : KTableS [K , VT ],
193- joiner : (V , VT ) => VR ,
194- joined : Joined [K , V , VT ]): KStreamS [K , VR ] = {
173+ joiner : (V , VT ) => VR )(implicit joined : Perhaps [Joined [K , V , VT ]]): KStreamS [K , VR ] = {
195174
196- inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, joined)
175+ joined.fold[KStreamS [K , VR ]] {
176+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner) } { implicit ev =>
177+ inner.leftJoin[VT , VR ](table.inner, joiner.asValueJoiner, ev)
178+ }
197179 }
198180
199181 def leftJoin [GK , GV , RV ](globalKTable : GlobalKTable [GK , GV ],
@@ -205,17 +187,12 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
205187
206188 def outerJoin [VO , VR ](otherStream : KStreamS [K , VO ],
207189 joiner : (V , VO ) => VR ,
208- windows : JoinWindows ): KStreamS [K , VR ] = {
209-
210- inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows)
211- }
190+ windows : JoinWindows )(implicit joined : Perhaps [Joined [K , V , VO ]]): KStreamS [K , VR ] = {
212191
213- def outerJoin [VO , VR ](otherStream : KStreamS [K , VO ],
214- joiner : (V , VO ) => VR ,
215- windows : JoinWindows ,
216- joined : Joined [K , V , VO ]): KStreamS [K , VR ] = {
217-
218- inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, joined)
192+ joined.fold[KStreamS [K , VR ]] {
193+ inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows) } { implicit ev =>
194+ inner.outerJoin[VO , VR ](otherStream.inner, joiner.asValueJoiner, windows, ev)
195+ }
219196 }
220197
221198 def merge (stream : KStreamS [K , V ]): KStreamS [K , V ] = inner.merge(stream)
0 commit comments