Skip to content

Commit 92421f7

Browse files
authored
Add Kotlin wrappers for router specs (#3218)
* Add Kotlin wrappers for router specs To avoid casting and extra logic logic in the end-user code, it is better to provide Kotlin-specific API to let end-users to do whatever is really dictated by API and don't think about specific types to cast * * Fix typos; code clean up
1 parent 71e6b27 commit 92421f7

File tree

6 files changed

+266
-35
lines changed

6 files changed

+266
-35
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.springframework.integration.router.AbstractMessageRouter
20+
import org.springframework.messaging.MessageChannel
21+
22+
/**
23+
* An [AbstractRouterSpec] wrapped for Kotlin DSL.
24+
*
25+
* @property delegate the [AbstractRouterSpec] this instance is delegating to.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.3
30+
*/
31+
abstract class AbstractKotlinRouterSpec<S : AbstractRouterSpec<S, R>, R : AbstractMessageRouter>(
32+
open val delegate: AbstractRouterSpec<S, R>) {
33+
34+
fun ignoreSendFailures(ignoreSendFailures: Boolean) {
35+
this.delegate.ignoreSendFailures(ignoreSendFailures)
36+
}
37+
38+
fun applySequence(applySequence: Boolean) {
39+
this.delegate.applySequence(applySequence)
40+
}
41+
42+
fun defaultOutputChannel(channelName: String) {
43+
this.delegate.defaultOutputChannel(channelName)
44+
}
45+
46+
fun defaultOutputChannel(channel: MessageChannel) {
47+
this.delegate.defaultOutputChannel(channel)
48+
}
49+
50+
fun defaultSubFlowMapping(subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
51+
this.delegate.defaultSubFlowMapping { subFlow(KotlinIntegrationFlowDefinition(it)) }
52+
}
53+
54+
fun defaultOutputToParentFlow() {
55+
this.delegate.defaultOutputToParentFlow()
56+
}
57+
58+
}

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
151151
*/
152152
inline fun <reified P, T> route(
153153
crossinline function: (P) -> T,
154-
crossinline configurer: RouterSpec<T, MethodInvokingRouter>.() -> Unit) {
154+
crossinline configurer: KotlinRouterSpec<T, MethodInvokingRouter>.() -> Unit) {
155155

156-
this.delegate.route(P::class.java, { function(it) }) { configurer(it) }
156+
this.delegate.route(P::class.java, { function(it) }) { configurer(KotlinRouterSpec(it)) }
157157
}
158158

159159
/**
@@ -711,10 +711,12 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
711711

712712
/**
713713
* Populate the [MethodInvokingRouter] for provided bean and its method
714-
* with provided options from [RouterSpec].
714+
* with provided options from [KotlinRouterSpec].
715715
*/
716-
fun route(beanName: String, method: String?, routerConfigurer: RouterSpec<Any, MethodInvokingRouter>.() -> Unit) {
717-
this.delegate.route(beanName, method, routerConfigurer)
716+
fun route(beanName: String, method: String?,
717+
routerConfigurer: KotlinRouterSpec<Any, MethodInvokingRouter>.() -> Unit) {
718+
719+
this.delegate.route(beanName, method) { routerConfigurer(KotlinRouterSpec(it)) }
718720
}
719721

720722
/**
@@ -727,18 +729,22 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
727729

728730
/**
729731
* Populate the [MethodInvokingRouter] for the method
730-
* of the provided service and its method with provided options from [RouterSpec].
732+
* of the provided service and its method with provided options from [KotlinRouterSpec].
731733
*/
732-
fun route(service: Any, methodName: String?, routerConfigurer: RouterSpec<Any, MethodInvokingRouter>.() -> Unit) {
733-
this.delegate.route(service, methodName, routerConfigurer)
734+
fun route(service: Any, methodName: String?,
735+
routerConfigurer: KotlinRouterSpec<Any, MethodInvokingRouter>.() -> Unit) {
736+
737+
this.delegate.route(service, methodName) { routerConfigurer(KotlinRouterSpec(it)) }
734738
}
735739

736740
/**
737741
* Populate the [ExpressionEvaluatingRouter] for provided SpEL expression
738-
* with provided options from [RouterSpec].
742+
* with provided options from [KotlinRouterSpec].
739743
*/
740-
fun <T> route(expression: String, routerConfigurer: RouterSpec<T, ExpressionEvaluatingRouter>.() -> Unit = {}) {
741-
this.delegate.route(expression, routerConfigurer)
744+
fun <T> route(expression: String,
745+
routerConfigurer: KotlinRouterSpec<T, ExpressionEvaluatingRouter>.() -> Unit = {}) {
746+
747+
this.delegate.route<T>(expression) { routerConfigurer(KotlinRouterSpec(it)) }
742748
}
743749

744750
/**
@@ -747,25 +753,25 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
747753
* from the provided [MessageProcessorSpec] with default options.
748754
*/
749755
fun route(messageProcessorSpec: MessageProcessorSpec<*>,
750-
routerConfigurer: RouterSpec<Any, MethodInvokingRouter>.() -> Unit = {}) {
756+
routerConfigurer: KotlinRouterSpec<Any, MethodInvokingRouter>.() -> Unit = {}) {
751757

752-
this.delegate.route(messageProcessorSpec, routerConfigurer)
758+
this.delegate.route(messageProcessorSpec) { routerConfigurer(KotlinRouterSpec(it)) }
753759
}
754760

755761
/**
756-
* Populate the [RecipientListRouter] with options from the [RecipientListRouterSpec].
762+
* Populate the [RecipientListRouter] with options from the [KotlinRecipientListRouterSpec].
757763
*/
758-
fun routeToRecipients(routerConfigurer: RecipientListRouterSpec.() -> Unit) {
759-
this.delegate.routeToRecipients(routerConfigurer)
764+
fun routeToRecipients(routerConfigurer: KotlinRecipientListRouterSpec.() -> Unit) {
765+
this.delegate.routeToRecipients { routerConfigurer(KotlinRecipientListRouterSpec(it)) }
760766
}
761767

762768
/**
763-
* Populate the [ErrorMessageExceptionTypeRouter] with options from the [RouterSpec].
769+
* Populate the [ErrorMessageExceptionTypeRouter] with options from the [KotlinRouterSpec].
764770
*/
765771
fun routeByException(
766-
routerConfigurer: RouterSpec<Class<out Throwable>, ErrorMessageExceptionTypeRouter>.() -> Unit) {
772+
routerConfigurer: KotlinRouterSpec<Class<out Throwable>, ErrorMessageExceptionTypeRouter>.() -> Unit) {
767773

768-
this.delegate.routeByException(routerConfigurer)
774+
this.delegate.routeByException { routerConfigurer(KotlinRouterSpec(it)) }
769775
}
770776

771777
/**
@@ -971,31 +977,33 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
971977

972978
/**
973979
* Populate a [ScatterGatherHandler] to the current integration flow position
974-
* based on the provided [RecipientListRouterSpec] for scattering function
980+
* based on the provided [KotlinRecipientListRouterSpec] for scattering function
975981
* and default [AggregatorSpec] for gathering function.
976982
*/
977-
fun scatterGather(scatterer: RecipientListRouterSpec.() -> Unit) {
978-
this.delegate.scatterGather(scatterer)
983+
fun scatterGather(scatterer: KotlinRecipientListRouterSpec.() -> Unit) {
984+
this.delegate.scatterGather(Consumer { scatterer(KotlinRecipientListRouterSpec(it)) })
979985
}
980986

981987
/**
982988
* Populate a [ScatterGatherHandler] to the current integration flow position
983-
* based on the provided [RecipientListRouterSpec] for scattering function
989+
* based on the provided [KotlinRecipientListRouterSpec] for scattering function
984990
* and [AggregatorSpec] for gathering function.
985991
*/
986-
fun scatterGather(scatterer: RecipientListRouterSpec.() -> Unit, gatherer: AggregatorSpec.() -> Unit) {
987-
this.delegate.scatterGather(scatterer, gatherer)
992+
fun scatterGather(scatterer: KotlinRecipientListRouterSpec.() -> Unit, gatherer: AggregatorSpec.() -> Unit) {
993+
this.delegate.scatterGather(Consumer { scatterer(KotlinRecipientListRouterSpec(it)) },
994+
Consumer { gatherer(it) })
988995
}
989996

990997
/**
991998
* Populate a [ScatterGatherHandler] to the current integration flow position
992-
* based on the provided [RecipientListRouterSpec] for scattering function
999+
* based on the provided [KotlinRecipientListRouterSpec] for scattering function
9931000
* and [AggregatorSpec] for gathering function.
9941001
*/
995-
fun scatterGather(scatterer: RecipientListRouterSpec.() -> Unit, gatherer: AggregatorSpec.() -> Unit,
1002+
fun scatterGather(scatterer: KotlinRecipientListRouterSpec.() -> Unit, gatherer: AggregatorSpec.() -> Unit,
9961003
scatterGather: ScatterGatherSpec.() -> Unit) {
9971004

998-
this.delegate.scatterGather(scatterer, gatherer, scatterGather)
1005+
this.delegate.scatterGather(Consumer { scatterer(KotlinRecipientListRouterSpec(it)) },
1006+
Consumer { gatherer(it) }, Consumer { scatterGather(it) })
9991007
}
10001008

10011009
/**
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.springframework.expression.Expression
20+
import org.springframework.integration.core.GenericSelector
21+
import org.springframework.integration.core.MessageSelector
22+
import org.springframework.integration.router.RecipientListRouter
23+
import org.springframework.messaging.Message
24+
import org.springframework.messaging.MessageChannel
25+
26+
/**
27+
* A [RecipientListRouterSpec] wrapped for Kotlin DSL.
28+
*
29+
* @property delegate the [RecipientListRouterSpec] this instance is delegating to.
30+
*
31+
* @author Artem Bilan
32+
*
33+
* @since 5.3
34+
*/
35+
class KotlinRecipientListRouterSpec(override val delegate: RecipientListRouterSpec)
36+
: AbstractKotlinRouterSpec<RecipientListRouterSpec, RecipientListRouter>(delegate) {
37+
38+
fun recipient(channelName: String) {
39+
this.delegate.recipient(channelName)
40+
}
41+
42+
fun recipient(channelName: String, expression: String) {
43+
this.delegate.recipient(channelName, expression)
44+
}
45+
46+
fun recipient(channelName: String, expression: Expression) {
47+
this.delegate.recipient(channelName, expression)
48+
}
49+
50+
inline fun <reified P> recipient(channelName: String, crossinline selector: (P) -> Boolean) {
51+
if (Message::class.java.isAssignableFrom(P::class.java))
52+
this.delegate.recipientMessageSelector(channelName) { selector(it as P) }
53+
else
54+
this.delegate.recipient<P>(channelName) { selector(it) }
55+
}
56+
57+
fun recipient(channel: MessageChannel) {
58+
this.delegate.recipient(channel)
59+
}
60+
61+
fun recipient(channel: MessageChannel, expression: String) {
62+
this.delegate.recipient(channel, expression)
63+
}
64+
65+
fun recipient(channel: MessageChannel, expression: Expression) {
66+
this.delegate.recipient(channel, expression)
67+
}
68+
69+
inline fun <reified P> recipient(channel: MessageChannel, crossinline selector: (P) -> Boolean) {
70+
if (Message::class.java.isAssignableFrom(P::class.java))
71+
this.delegate.recipientMessageSelector(channel, MessageSelector { selector(it as P) })
72+
else
73+
this.delegate.recipient<P>(channel, GenericSelector { selector(it) })
74+
}
75+
76+
inline fun <reified P> recipientFlow(crossinline selector: (P) -> Boolean,
77+
crossinline subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
78+
79+
if (Message::class.java.isAssignableFrom(P::class.java))
80+
this.delegate.recipientMessageSelectorFlow({ selector(it as P) })
81+
{ subFlow(KotlinIntegrationFlowDefinition(it)) }
82+
else
83+
this.delegate.recipientFlow<P>({ selector(it) }) { subFlow(KotlinIntegrationFlowDefinition(it)) }
84+
85+
}
86+
87+
fun recipientFlow(subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
88+
this.delegate.recipientFlow { subFlow(KotlinIntegrationFlowDefinition(it)) }
89+
}
90+
91+
fun recipientFlow(expression: String, subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
92+
this.delegate.recipientFlow(expression) { subFlow(KotlinIntegrationFlowDefinition(it)) }
93+
}
94+
95+
fun recipientFlow(expression: Expression, subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
96+
this.delegate.recipientFlow(expression) { subFlow(KotlinIntegrationFlowDefinition(it)) }
97+
}
98+
99+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl
18+
19+
import org.springframework.integration.router.AbstractMappingMessageRouter
20+
import org.springframework.messaging.MessageChannel
21+
22+
/**
23+
* A [RouterSpec] wrapped for Kotlin DSL.
24+
*
25+
* @property delegate the [RouterSpec] this instance is delegating to.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.3
30+
*/
31+
class KotlinRouterSpec<K, R : AbstractMappingMessageRouter>(override val delegate: RouterSpec<K, R>)
32+
: AbstractKotlinRouterSpec<RouterSpec<K, R>, R>(delegate) {
33+
34+
fun resolutionRequired(resolutionRequired: Boolean) {
35+
this.delegate.resolutionRequired(resolutionRequired)
36+
}
37+
38+
fun dynamicChannelLimit(dynamicChannelLimit: Int) {
39+
this.delegate.dynamicChannelLimit(dynamicChannelLimit)
40+
}
41+
42+
fun prefix(prefix: String) {
43+
this.delegate.prefix(prefix)
44+
}
45+
46+
fun suffix(suffix: String) {
47+
this.delegate.suffix(suffix)
48+
}
49+
50+
fun noChannelKeyFallback() {
51+
this.delegate.noChannelKeyFallback()
52+
}
53+
54+
fun channelMapping(key: K, channelName: String) {
55+
this.delegate.channelMapping(key, channelName)
56+
}
57+
58+
fun channelMapping(key: K, channel: MessageChannel) {
59+
this.delegate.channelMapping(key, channel)
60+
}
61+
62+
fun subFlowMapping(key: K, subFlow: KotlinIntegrationFlowDefinition.() -> Unit) {
63+
this.delegate.subFlowMapping(key) { subFlow(KotlinIntegrationFlowDefinition(it)) }
64+
}
65+
66+
}

spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,9 @@ class KotlinDslTests {
315315
scatterGather(
316316
{
317317
applySequence(true)
318-
recipientFlow(GenericSelector<Any> { true }, integrationFlow { handle<Any> { _, _ -> Math.random() * 10 } })
319-
recipientFlow(GenericSelector<Any> { true }, integrationFlow { handle<Any> { _, _ -> Math.random() * 10 } })
320-
recipientFlow(GenericSelector<Any> { true }, integrationFlow { handle<Any> { _, _ -> Math.random() * 10 } })
318+
recipientFlow<Any>({ true }) { handle<Any> { _, _ -> Math.random() * 10 } }
319+
recipientFlow<Any>({ true }) { handle<Any> { _, _ -> Math.random() * 10 } }
320+
recipientFlow<Any>({ true }) { handle<Any> { _, _ -> Math.random() * 10 } }
321321
},
322322
{
323323
releaseStrategy {

spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/routers/RouterDslTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ class RouterDslTests {
102102
integrationFlow {
103103
split()
104104
route<Int, Boolean>({ it % 2 == 0 }) {
105-
subFlowMapping(true) { sf -> sf.handle<Int> { p, _ -> p * 2 } }
106-
subFlowMapping(false) { sf -> sf.handle<Int> { p, _ -> p * 3 } }
105+
subFlowMapping(true) { handle<Int> { p, _ -> p * 2 } }
106+
subFlowMapping(false) { handle<Int> { p, _ -> p * 3 } }
107107
}
108108
aggregate()
109109
channel { queue("routerTwoSubFlowsOutput") }
@@ -114,8 +114,8 @@ class RouterDslTests {
114114
integrationFlow {
115115
split()
116116
route<Int, Boolean>({ it % 2 == 0 }) {
117-
subFlowMapping(true) { sf -> sf.gateway(oddFlow()) }
118-
subFlowMapping(false) { sf -> sf.gateway(evenFlow()) }
117+
subFlowMapping(true) { gateway(oddFlow().inputChannel) }
118+
subFlowMapping(false) { gateway(evenFlow().inputChannel) }
119119
}
120120
aggregate()
121121
}

0 commit comments

Comments
 (0)