@@ -20,7 +20,9 @@ import io.github.trueangle.knative.lambda.runtime.api.LambdaClient
2020import io.github.trueangle.knative.lambda.runtime.api.LambdaClientImpl
2121import io.github.trueangle.knative.lambda.runtime.events.apigateway.APIGatewayRequest
2222import io.github.trueangle.knative.lambda.runtime.handler.LambdaBufferedHandler
23+ import io.github.trueangle.knative.lambda.runtime.handler.LambdaStreamHandler
2324import io.github.trueangle.knative.lambda.runtime.log.LambdaLogger
25+ import io.github.trueangle.knative.lambda.runtime.log.Log
2426import io.github.trueangle.knative.lambda.runtime.log.LogLevel.ERROR
2527import io.github.trueangle.knative.lambda.runtime.log.LogLevel.FATAL
2628import io.ktor.client.engine.HttpClientEngine
@@ -29,15 +31,24 @@ import io.ktor.client.engine.mock.MockRequestHandleScope
2931import io.ktor.client.engine.mock.respond
3032import io.ktor.client.engine.mock.respondBadRequest
3133import io.ktor.client.engine.mock.respondError
34+ import io.ktor.client.engine.mock.respondOk
35+ import io.ktor.client.request.HttpRequestData
3236import io.ktor.http.HttpHeaders
3337import io.ktor.http.HttpStatusCode
38+ import io.ktor.http.content.ChannelWriterContent
39+ import io.ktor.http.content.OutgoingContent
3440import io.ktor.http.headers
3541import io.ktor.http.headersOf
3642import io.ktor.util.reflect.typeInfo
3743import io.ktor.utils.io.ByteReadChannel
44+ import io.ktor.utils.io.ByteWriteChannel
45+ import io.ktor.utils.io.copyTo
3846import kotlinx.cinterop.ExperimentalForeignApi
3947import kotlinx.cinterop.toKString
4048import kotlinx.coroutines.test.runTest
49+ import kotlinx.io.Buffer
50+ import kotlinx.io.RawSource
51+ import kotlinx.io.Source
4152import kotlinx.serialization.json.Json
4253import platform.posix.getenv
4354import platform.posix.setenv
@@ -49,7 +60,7 @@ import kotlin.test.assertTrue
4960internal const val RESOURCES_PATH = " src/nativeTest/resources"
5061
5162class LambdaRuntimeTest {
52- private val log = mock <LambdaLogger >()
63+ private val log = spy <LambdaLogger >(Log )
5364 private val context = Context (
5465 awsRequestId = " 156cb537-e2d4-11e8-9b34-d36013741fb9" ,
5566 deadlineTimeInMs = 1542409706888L ,
@@ -203,7 +214,7 @@ class LambdaRuntimeTest {
203214 }
204215
205216 @Test
206- fun `GIVEN EventBodyParseException WHEN retrieveNextEvent THEN report error AND continue working ` () = runTest {
217+ fun `GIVEN EventBodyParseException WHEN retrieveNextEvent THEN report error AND skip event ` () = runTest {
207218 val event = NonSerialObject (" " )
208219 val lambdaRunner = createRunner(MockEngine { request ->
209220 val path = request.url.encodedPath
@@ -227,7 +238,7 @@ class LambdaRuntimeTest {
227238 }
228239
229240 @Test
230- fun `GIVEN Handler exception WHEN handleRequest THEN report error AND continue working ` () = runTest {
241+ fun `GIVEN Handler exception WHEN handleRequest THEN report error AND skip event ` () = runTest {
231242 val lambdaRunner = createRunner(MockEngine { request ->
232243 val path = request.url.encodedPath
233244 when {
@@ -249,6 +260,43 @@ class LambdaRuntimeTest {
249260 verify(not ) { lambdaRunner.env.terminate() }
250261 }
251262
263+ @Test
264+ fun `GIVEN Handler exception WHEN streamingResponse THEN consume error` () = runTest {
265+ val lambdaRunner = createRunner(MockEngine { request ->
266+ val path = request.url.encodedPath
267+ when {
268+ path.contains(" invocation/next" ) -> respondNextEventSuccess(" " )
269+ path.contains(" ${context.awsRequestId} /response" ) -> {
270+ println (request.body.contentLength)
271+ println (request.body.status)
272+ respond(" " , HttpStatusCode .Accepted )
273+ }
274+
275+ else -> respondBadRequest()
276+ }
277+ })
278+ val client = lambdaRunner.client
279+
280+ val handler = object : LambdaStreamHandler <String , ByteWriteChannel > {
281+ override suspend fun handleRequest (input : String , output : ByteWriteChannel , context : Context ) {
282+ output.writeMidstreamError(RuntimeException ())
283+ ByteReadChannel (" " ).copyTo(output)
284+ }
285+ }
286+
287+ lambdaRunner.run (singleEventMode = true ) { handler }
288+
289+ // assertTrue(request.body is ByteWriteChannel)
290+
291+ // val condition = body?.trailers()?.contains(RuntimeException().toTrailer())
292+
293+ // assertTrue(condition == true)
294+
295+ verifySuspend { client.streamResponse(any(), any()) }
296+ verify(not ) { log.log(FATAL , any<Any >(), any()) }
297+ verify(not ) { lambdaRunner.env.terminate() }
298+ }
299+
252300 @OptIn(ExperimentalForeignApi ::class )
253301 private fun mockEnvironment () {
254302 if (getenv(AWS_LAMBDA_FUNCTION_NAME )?.toKString().isNullOrEmpty()) {
0 commit comments