Skip to content

Commit bd1b1ba

Browse files
authored
Update Ktor integration to Ktor 2.x (#391)
The Ktor 2.x version made some breaking changes, this updates the ktor-core module to use the new interfaces. This also bumps up the Kotlin version to a compatible version.
1 parent 5b944fc commit bd1b1ba

File tree

5 files changed

+46
-48
lines changed

5 files changed

+46
-48
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ plugins {
88
`java-library`
99
checkstyle
1010
jacoco
11-
kotlin("jvm") version "1.4.32"
11+
kotlin("jvm") version "1.6.21"
1212
id("com.github.spotbugs")
1313
id("com.jashmore.gradle.github.release")
1414
id("org.jlleitschuh.gradle.ktlint") apply false

examples/ktor-example/src/main/kotlin/com/jashmore/sqs/examples/KtorApplicationExample.kt

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient
77
import com.jashmore.sqs.ktor.container.batchingMessageListener
88
import com.jashmore.sqs.ktor.container.messageListener
99
import com.jashmore.sqs.ktor.container.prefetchingMessageListener
10-
import io.ktor.application.call
11-
import io.ktor.application.log
1210
import io.ktor.http.ContentType
1311
import io.ktor.http.HttpStatusCode
14-
import io.ktor.response.respond
15-
import io.ktor.response.respondText
16-
import io.ktor.routing.get
17-
import io.ktor.routing.routing
12+
import io.ktor.server.application.call
13+
import io.ktor.server.application.log
1814
import io.ktor.server.engine.embeddedServer
1915
import io.ktor.server.netty.Netty
16+
import io.ktor.server.response.respond
17+
import io.ktor.server.response.respondText
18+
import io.ktor.server.routing.get
19+
import io.ktor.server.routing.routing
2020
import kotlinx.coroutines.GlobalScope
2121
import kotlinx.coroutines.delay
2222
import kotlinx.coroutines.future.await
@@ -141,13 +141,7 @@ fun main() {
141141
}
142142
}
143143
}
144-
server.start()
145-
Runtime.getRuntime().addShutdownHook(
146-
Thread {
147-
server.stop(1, 30_000)
148-
}
149-
)
150-
Thread.currentThread().join()
144+
server.start(wait = true)
151145
}
152146

153147
suspend fun something(@Suppress("UNUSED_PARAMETER") message: Message) = GlobalScope.launch {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ guavaVersion=29.0-jre
1111
immutablesVersion=2.8.9-ea-1
1212
jacksonVersion=2.12.7
1313
junitJupiterVersion=5.7.2
14-
ktorVersion=1.6.0
14+
ktorVersion=2.2.3
1515
logbackVersion=1.2.11
1616
lombokVersion=1.18.26
1717
mockitoVersion=3.11.2

ktor/core/src/main/kotlin/com/jashmore/sqs/ktor/container/KtorCoreExtension.kt

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ import com.jashmore.sqs.core.kotlin.dsl.container.CoreMessageListenerContainerDs
66
import com.jashmore.sqs.core.kotlin.dsl.container.FifoMessageListenerContainerDslBuilder
77
import com.jashmore.sqs.core.kotlin.dsl.container.PrefetchingMessageListenerContainerDslBuilder
88
import com.jashmore.sqs.core.kotlin.dsl.container.coreMessageListener
9-
import io.ktor.application.Application
10-
import io.ktor.application.ApplicationEnvironment
11-
import io.ktor.application.ApplicationStarted
12-
import io.ktor.application.ApplicationStopped
13-
import io.ktor.util.pipeline.ContextDsl
9+
import io.ktor.server.application.Application
10+
import io.ktor.server.application.ApplicationEnvironment
11+
import io.ktor.server.application.ApplicationStarted
12+
import io.ktor.server.application.ApplicationStopped
13+
import io.ktor.util.KtorDsl
1414
import software.amazon.awssdk.services.sqs.SqsAsyncClient
1515

16-
@ContextDsl
16+
@KtorDsl
1717
fun Application.messageListener(
1818
identifier: String,
1919
sqsAsyncClient: SqsAsyncClient,
@@ -23,7 +23,7 @@ fun Application.messageListener(
2323
return initMessageListener(environment, coreMessageListener(identifier, sqsAsyncClient, queueUrl, config))
2424
}
2525

26-
@ContextDsl
26+
@KtorDsl
2727
fun Application.prefetchingMessageListener(
2828
identifier: String,
2929
sqsAsyncClient: SqsAsyncClient,
@@ -33,7 +33,7 @@ fun Application.prefetchingMessageListener(
3333
return initMessageListener(environment, com.jashmore.sqs.core.kotlin.dsl.container.prefetchingMessageListener(identifier, sqsAsyncClient, queueUrl, init))
3434
}
3535

36-
@ContextDsl
36+
@KtorDsl
3737
fun Application.batchingMessageListener(
3838
identifier: String,
3939
sqsAsyncClient: SqsAsyncClient,
@@ -43,7 +43,7 @@ fun Application.batchingMessageListener(
4343
return initMessageListener(environment, com.jashmore.sqs.core.kotlin.dsl.container.batchingMessageListener(identifier, sqsAsyncClient, queueUrl, init))
4444
}
4545

46-
@ContextDsl
46+
@KtorDsl
4747
fun Application.fifoMessageListener(
4848
identifier: String,
4949
sqsAsyncClient: SqsAsyncClient,
@@ -57,13 +57,19 @@ fun initMessageListener(
5757
environment: ApplicationEnvironment,
5858
container: MessageListenerContainer
5959
): MessageListenerContainer {
60+
val hook = Thread { container.stop() }
6061
environment.monitor.subscribe(ApplicationStarted) {
62+
environment.monitor.subscribe(ApplicationStopped) {
63+
try {
64+
Runtime.getRuntime().removeShutdownHook(hook)
65+
} catch (alreadyShuttingDown: IllegalStateException) {
66+
// ignore
67+
}
68+
container.stop()
69+
}
70+
Runtime.getRuntime().addShutdownHook(hook)
6171
container.start()
6272
}
6373

64-
environment.monitor.subscribe(ApplicationStopped) {
65-
container.stop()
66-
}
67-
6874
return container
6975
}

ktor/core/src/test/kotlin/com/jashmore/sqs/ktor/container/KtorCoreExtensionTest.kt

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ package com.jashmore.sqs.ktor.container
22

33
import com.jashmore.sqs.elasticmq.ElasticMqSqsAsyncClient
44
import com.jashmore.sqs.util.LocalSqsAsyncClient
5-
import io.ktor.application.log
6-
import io.ktor.server.engine.embeddedServer
7-
import io.ktor.server.netty.Netty
8-
import io.ktor.server.testing.withTestApplication
5+
import io.ktor.server.application.log
6+
import io.ktor.server.testing.testApplication
97
import org.assertj.core.api.Assertions.assertThat
108
import org.junit.jupiter.api.AfterAll
119
import org.junit.jupiter.api.Test
@@ -25,8 +23,8 @@ class KtorCoreExtensionTest {
2523
fun `message listener can be registered`() {
2624
val queueUrl = sqsClient.createRandomQueue().get().queueUrl()
2725
val countDownLatch = CountDownLatch(1)
28-
withTestApplication({
29-
val server = embeddedServer(Netty, 8080) {
26+
testApplication {
27+
application {
3028
messageListener("core-listener", sqsClient, queueUrl) {
3129
processor = lambdaProcessor {
3230
method { message ->
@@ -48,8 +46,8 @@ class KtorCoreExtensionTest {
4846
}
4947
}
5048
}
51-
server.start()
52-
}) {
49+
startApplication()
50+
5351
sqsClient.sendMessage { it.queueUrl(queueUrl).messageBody("body") }.get()
5452

5553
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue()
@@ -60,8 +58,8 @@ class KtorCoreExtensionTest {
6058
fun `prefetching message listener can be registered`() {
6159
val queueUrl = sqsClient.createRandomQueue().get().queueUrl()
6260
val countDownLatch = CountDownLatch(1)
63-
withTestApplication({
64-
val server = embeddedServer(Netty, 8080) {
61+
testApplication {
62+
application {
6563
prefetchingMessageListener("prefetching-listener", sqsClient, queueUrl) {
6664
concurrencyLevel = { 5 }
6765
desiredPrefetchedMessages = 1
@@ -74,8 +72,8 @@ class KtorCoreExtensionTest {
7472
}
7573
}
7674
}
77-
server.start()
78-
}) {
75+
startApplication()
76+
7977
sqsClient.sendMessage { it.queueUrl(queueUrl).messageBody("body") }.get()
8078

8179
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue()
@@ -86,8 +84,8 @@ class KtorCoreExtensionTest {
8684
fun `batching message listener can be registered`() {
8785
val queueUrl = sqsClient.createRandomQueue().get().queueUrl()
8886
val countDownLatch = CountDownLatch(1)
89-
withTestApplication({
90-
val server = embeddedServer(Netty, 8080) {
87+
testApplication {
88+
application {
9189
batchingMessageListener("batching-listener", sqsClient, queueUrl) {
9290
concurrencyLevel = { 5 }
9391
processor = lambdaProcessor {
@@ -98,8 +96,8 @@ class KtorCoreExtensionTest {
9896
}
9997
}
10098
}
101-
server.start()
102-
}) {
99+
startApplication()
100+
103101
sqsClient.sendMessage { it.queueUrl(queueUrl).messageBody("body") }.get()
104102

105103
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue()
@@ -110,8 +108,8 @@ class KtorCoreExtensionTest {
110108
fun `fifo message listener can be registered`() {
111109
val queueUrl = sqsClient.createRandomFifoQueue().get().response.queueUrl()
112110
val countDownLatch = CountDownLatch(1)
113-
withTestApplication({
114-
val server = embeddedServer(Netty, 8080) {
111+
testApplication {
112+
application {
115113
fifoMessageListener("fifo-listener", sqsClient, queueUrl) {
116114
concurrencyLevel = { 5 }
117115
processor = lambdaProcessor {
@@ -122,8 +120,8 @@ class KtorCoreExtensionTest {
122120
}
123121
}
124122
}
125-
server.start()
126-
}) {
123+
startApplication()
124+
127125
sqsClient.sendMessage {
128126
it.queueUrl(queueUrl)
129127
.messageBody("body")

0 commit comments

Comments
 (0)