@@ -3,12 +3,20 @@ package io.github.trueangle.knative.lambda.runtime
33import io.github.trueangle.knative.lambda.runtime.LambdaEnvironmentException.NonRecoverableStateException
44import io.github.trueangle.knative.lambda.runtime.api.Context
55import io.github.trueangle.knative.lambda.runtime.api.LambdaClient
6+ import io.github.trueangle.knative.lambda.runtime.api.LambdaClientImpl
67import io.github.trueangle.knative.lambda.runtime.handler.LambdaBufferedHandler
78import io.github.trueangle.knative.lambda.runtime.handler.LambdaHandler
89import io.github.trueangle.knative.lambda.runtime.handler.LambdaStreamHandler
910import io.github.trueangle.knative.lambda.runtime.log.KtorLogger
11+ import io.github.trueangle.knative.lambda.runtime.log.LambdaLogger
1012import io.github.trueangle.knative.lambda.runtime.log.Log
13+ import io.github.trueangle.knative.lambda.runtime.log.debug
14+ import io.github.trueangle.knative.lambda.runtime.log.error
15+ import io.github.trueangle.knative.lambda.runtime.log.fatal
16+ import io.github.trueangle.knative.lambda.runtime.log.info
17+ import io.github.trueangle.knative.lambda.runtime.log.warn
1118import io.ktor.client.HttpClient
19+ import io.ktor.client.engine.HttpClientEngine
1220import io.ktor.client.engine.curl.Curl
1321import io.ktor.client.plugins.HttpTimeout
1422import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
@@ -22,13 +30,20 @@ import io.ktor.utils.io.writeStringUtf8
2230import kotlinx.coroutines.runBlocking
2331import kotlinx.serialization.ExperimentalSerializationApi
2432import kotlinx.serialization.json.Json
25- import kotlin.system.exitProcess
2633
2734object LambdaRuntime {
2835 @OptIn(ExperimentalSerializationApi ::class )
2936 internal val json = Json { explicitNulls = false }
3037
31- private val httpClient = HttpClient (Curl ) {
38+ inline fun <reified I , reified O > run (crossinline initHandler : () -> LambdaHandler <I , O >) = runBlocking {
39+ val curlHttpClient = createHttpClient(Curl .create())
40+ val lambdaClient = LambdaClientImpl (curlHttpClient)
41+
42+ Runner (client = lambdaClient, log = Log ).run (false , initHandler)
43+ }
44+
45+ @PublishedApi
46+ internal fun createHttpClient (engine : HttpClientEngine ) = HttpClient (engine) {
3247 install(HttpTimeout )
3348 install(ContentNegotiation ) { json(json) }
3449 install(Logging ) {
@@ -39,86 +54,93 @@ object LambdaRuntime {
3954 filter { ! it.headers.contains(" Lambda-Runtime-Function-Response-Mode" , " streaming" ) }
4055 }
4156 }
57+ }
4258
43- @PublishedApi
44- internal val client = LambdaClient (httpClient)
45-
46- inline fun <reified I , reified O > run (crossinline initHandler : () -> LambdaHandler <I , O >) = runBlocking {
59+ @PublishedApi
60+ internal class Runner (
61+ val client : LambdaClient ,
62+ val log : LambdaLogger ,
63+ val env : LambdaEnvironment = LambdaEnvironment ()
64+ ) {
65+ suspend inline fun <reified I , reified O > run (singleEventMode : Boolean = false, crossinline initHandler : () -> LambdaHandler <I , O >) {
4766 val handler = try {
48- Log .info(" Initializing Kotlin Native Lambda Runtime" )
67+ log .info(" Initializing Kotlin Native Lambda Runtime" )
4968
5069 initHandler()
5170 } catch (e: Exception ) {
52- Log .fatal(e)
71+ log .fatal(e)
5372
5473 client.reportError(e.asInitError())
55- exitProcess(1 )
74+
75+ env.terminate()
5676 }
5777
5878 val handlerName = handler::class .simpleName
5979 val inputTypeInfo = typeInfo<I >()
6080 val outputTypeInfo = typeInfo<O >()
6181
62- while (true ) {
82+ var shouldExit = false
83+ while (! shouldExit) {
6384 try {
64- Log .info(" Runtime is ready for a new event" )
85+ log .info(" Runtime is ready for a new event" )
6586
6687 val (event, context) = client.retrieveNextEvent<I >(inputTypeInfo)
6788
68- with (Log ) {
89+ with (log ) {
6990 setContext(context)
7091
7192 debug(event)
7293 debug(context)
94+ info(" $handlerName invocation started" )
7395 }
7496
75- Log .info(" $handlerName invocation started" )
76-
7797 if (handler is LambdaStreamHandler <I , * >) {
7898 val response = streamingResponse { handler.handleRequest(event, it, context) }
7999
80- Log .info(" $handlerName started response streaming" )
100+ log .info(" $handlerName started response streaming" )
81101
82102 client.streamResponse(context, response)
83103 } else {
84104 handler as LambdaBufferedHandler <I , O >
85105 val response = bufferedResponse(context) { handler.handleRequest(event, context) }
86106
87- Log .info(" $handlerName invocation completed" )
88- Log .debug(response)
107+ log .info(" $handlerName invocation completed" )
108+ log .debug(response)
89109
90110 client.sendResponse(context, response, outputTypeInfo)
91111 }
92112 } catch (e: LambdaRuntimeException ) {
93- Log .error(e)
113+ log.error(e)
114+
94115 client.reportError(e)
95116 } catch (e: LambdaEnvironmentException ) {
96117 when (e) {
97118 is NonRecoverableStateException -> {
98- Log .fatal(e)
119+ log .fatal(e)
99120
100- exitProcess( 1 )
121+ env.terminate( )
101122 }
102123
103- else -> Log .error(e)
124+ else -> log .error(e)
104125 }
105126 } catch (e: Throwable ) {
106- Log .fatal(e)
127+ log.fatal(e)
128+
129+ env.terminate()
130+ }
107131
108- exitProcess(1 )
132+ if (singleEventMode) {
133+ shouldExit = singleEventMode
109134 }
110135 }
111136 }
112- }
113137
114- @PublishedApi
115- internal inline fun streamingResponse (crossinline handler : suspend (ByteWriteChannel ) -> Unit ) =
116- object : WriteChannelContent () {
138+ inline fun streamingResponse (crossinline handler : suspend (ByteWriteChannel ) -> Unit ) = object : WriteChannelContent () {
117139 override suspend fun writeTo (channel : ByteWriteChannel ) {
118140 try {
119141 handler(channel)
120142 } catch (e: Exception ) {
121- Log .warn(" Exception occurred on streaming: " + e.message)
143+ log .warn(" Exception occurred on streaming: " + e.message)
122144
123145 channel.writeStringUtf8(e.toTrailer())
124146 }
@@ -128,9 +150,9 @@ internal inline fun streamingResponse(crossinline handler: suspend (ByteWriteCha
128150 " Lambda-Runtime-Function-Error-Type: Runtime.StreamError\r\n Lambda-Runtime-Function-Error-Body: ${stackTraceToString().encodeBase64()} \r\n "
129151 }
130152
131- @PublishedApi
132- internal inline fun < T , R > T. bufferedResponse ( context : Context , block : T .() -> R ): R = try {
133- block()
134- } catch (e : Exception ) {
135- throw e.asHandlerError(context)
153+ inline fun < T , R > T. bufferedResponse ( context : Context , block : T .() -> R ): R = try {
154+ block()
155+ } catch (e : Exception ) {
156+ throw e.asHandlerError(context)
157+ }
136158}
0 commit comments