diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt index 0e0426d6c..a5f939f36 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt @@ -8,9 +8,9 @@ import kotlinx.coroutines.flow.Flow import kotlinx.rpc.RpcCall import kotlinx.rpc.RpcClient import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.client.internal.ManagedChannel import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder +import kotlinx.rpc.grpc.client.internal.applyConfig import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc import kotlinx.rpc.grpc.client.internal.buildChannel import kotlinx.rpc.grpc.client.internal.clientStreamingRpc @@ -27,6 +27,7 @@ import kotlinx.rpc.grpc.descriptor.MethodType import kotlinx.rpc.grpc.descriptor.methodType import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds private typealias RequestClient = Any @@ -180,9 +181,7 @@ private fun GrpcClient( builder: ManagedChannelBuilder<*>, config: GrpcClientConfiguration, ): GrpcClient { - val channel = builder.apply { - config.overrideAuthority?.let { overrideAuthority(it) } - }.buildChannel() + val channel = builder.applyConfig(config).buildChannel() return GrpcClient(channel, config.messageCodecResolver, config.interceptors) } @@ -198,6 +197,7 @@ private fun GrpcClient( */ public class GrpcClientConfiguration internal constructor() { internal val interceptors: MutableList = mutableListOf() + internal var keepAlive: KeepAlive? = null /** * Configurable resolver used to determine the appropriate codec for a given Kotlin type @@ -294,4 +294,55 @@ public class GrpcClientConfiguration internal constructor() { public fun tls(configure: TlsClientCredentialsBuilder.() -> Unit): ClientCredentials = TlsClientCredentials(configure) -} \ No newline at end of file + /** + * Configures keep-alive settings for the gRPC client. + * + * Keep-alive allows you to fine-tune the behavior of the client to ensure the connection + * between the client and server remains active according to specific parameters. + * + * By default, keep-alive is disabled. + * + * @param configure A lambda to apply custom configurations to the [KeepAlive] instance. + * The [KeepAlive] settings include: + * - `time`: The maximum amount of time that the channel can be idle before a keep-alive + * ping is sent. + * - `timeout`: The time allowed for a keep-alive ping to complete. + * - `withoutCalls`: Whether to send keep-alive pings even when there are no outstanding + * RPCs on the connection. + * + * @see KeepAlive + */ + public fun keepAlive(configure: KeepAlive.() -> Unit) { + keepAlive = KeepAlive().apply(configure) + } + + /** + * Represents keep-alive settings for a gRPC client connection. + * + * Keep-alive ensures that the connection between the client and the server remains active. + * It helps detect connection issues proactively before a request is made and facilitates + * maintaining long-lived idle connections. + * + * Client authors must coordinate with service owners for whether a particular client-side + * setting is acceptable. + * + * @property time Specifies the maximum amount of time the channel can remain idle before a + * keep-alive ping is sent to the server to check the connection state. + * The default value is `Duration.INFINITE`, which disables keep-alive pings when idle. + * + * @property timeout Sets the amount of time to wait for a keep-alive ping response. + * If the server does not respond within this timeout, the connection will be considered broken. + * The default value is 20 seconds. + * + * @property withoutCalls Defines whether keep-alive pings will be sent even when there + * are no active RPCs on the connection. If set to `true`, pings will be sent regardless + * of ongoing calls; otherwise, pings are only sent during active RPCs. + * The default value is `false`. + */ + public class KeepAlive internal constructor() { + public var time: Duration = Duration.INFINITE + public var timeout: Duration = 20.seconds + public var withoutCalls: Boolean = false + } +} + diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt index c6239c781..82b782985 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt @@ -7,6 +7,7 @@ package kotlinx.rpc.grpc.client.internal import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.internal.utils.InternalRpcApi import kotlin.time.Duration @@ -71,9 +72,7 @@ public interface ManagedChannel { * Builder class for [ManagedChannel]. */ @InternalRpcApi -public expect abstract class ManagedChannelBuilder> { - public abstract fun overrideAuthority(authority: String): T -} +public expect abstract class ManagedChannelBuilder> @InternalRpcApi public expect fun ManagedChannelBuilder( @@ -88,5 +87,7 @@ public expect fun ManagedChannelBuilder( credentials: ClientCredentials? = null, ): ManagedChannelBuilder<*> +internal expect fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> + @InternalRpcApi public expect fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt index 71d2d4345..509f467bb 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt @@ -10,6 +10,7 @@ import io.grpc.Grpc import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.internal.utils.InternalRpcApi import java.util.concurrent.TimeUnit import kotlin.time.Duration @@ -80,3 +81,14 @@ private class JvmManagedChannel(private val channel: io.grpc.ManagedChannel) : M override val platformApi: ManagedChannelPlatform get() = channel } + +internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> { + config.keepAlive?.let { + keepAliveTime(it.time.inWholeMilliseconds, TimeUnit.MILLISECONDS) + keepAliveTimeout(it.timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS) + keepAliveWithoutCalls(it.withoutCalls) + } + + config.overrideAuthority?.let { overrideAuthority(it) } + return this +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt index a1eb2c987..10741e1c0 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt @@ -7,6 +7,7 @@ package kotlinx.rpc.grpc.client.internal import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.client.TlsClientCredentials import kotlinx.rpc.grpc.internal.internalError import kotlinx.rpc.internal.utils.InternalRpcApi @@ -22,25 +23,23 @@ public actual abstract class ManagedChannelPlatform : GrpcChannel() */ @InternalRpcApi public actual abstract class ManagedChannelBuilder> { - public actual abstract fun overrideAuthority(authority: String): T + internal var config: GrpcClientConfiguration? = null } internal class NativeManagedChannelBuilder( private val target: String, private var credentials: Lazy, ) : ManagedChannelBuilder() { - - private var authority: String? = null - - override fun overrideAuthority(authority: String): NativeManagedChannelBuilder { - this.authority = authority - return this - } - fun buildChannel(): NativeManagedChannel { + val keepAlive = config?.keepAlive + keepAlive?.run { + require(time.isPositive()) { "keepalive time must be positive" } + require(timeout.isPositive()) { "keepalive timeout must be positive" } + } return NativeManagedChannel( target, - authority = authority, + authority = config?.overrideAuthority, + keepAlive = config?.keepAlive, credentials = credentials.value, ) } @@ -70,3 +69,7 @@ public actual fun ManagedChannelBuilder(target: String, credentials: ClientCrede } +internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> { + this.config = config + return this +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt index 5cf6563b9..fad1b9371 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt @@ -10,7 +10,10 @@ import cnames.structs.grpc_channel import kotlinx.atomicfu.atomic import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.MemScope import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert import kotlinx.cinterop.cstr import kotlinx.cinterop.memScoped import kotlinx.cinterop.ptr @@ -21,6 +24,7 @@ import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.client.ClientCredentials import kotlinx.rpc.grpc.client.GrpcCallOptions +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.client.rawDeadline import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.CompletionQueue @@ -50,6 +54,7 @@ import kotlin.time.Duration internal class NativeManagedChannel( target: String, val authority: String?, + val keepAlive: GrpcClientConfiguration.KeepAlive?, // we must store them, otherwise the credentials are getting released credentials: ClientCredentials, ) : ManagedChannel, ManagedChannelPlatform() { @@ -66,22 +71,36 @@ internal class NativeManagedChannel( private val cq = CompletionQueue() internal val raw: CPointer = memScoped { - val args = authority?.let { + val args = mutableListOf() + + authority?.let { // the C Core API doesn't have a way to override the authority (used for TLS SNI) as it // is available in the Java gRPC implementation. // instead, it can be done by setting the "grpc.ssl_target_name_override" argument. - val authorityOverride = alloc { - type = grpc_arg_type.GRPC_ARG_STRING - key = "grpc.ssl_target_name_override".cstr.ptr - value.string = authority.cstr.ptr - } + args.add(GrpcArg.Str( + key = "grpc.ssl_target_name_override", + value = it + )) + } - alloc { - num_args = 1u - args = authorityOverride.ptr - } + keepAlive?.let { + args.add(GrpcArg.Integer( + key = "grpc.keepalive_time_ms", + value = it.time.inWholeMilliseconds.convert() + )) + args.add(GrpcArg.Integer( + key = "grpc.keepalive_timeout_ms", + value = it.timeout.inWholeMilliseconds.convert() + )) + args.add(GrpcArg.Integer( + key = "grpc.keepalive_permit_without_calls", + value = if (it.withoutCalls) 1 else 0 + )) } - grpc_channel_create(target, credentials.raw, args?.ptr) + + var rawArgs = if (args.isNotEmpty()) args.toRaw(this) else null + + grpc_channel_create(target, credentials.raw, rawArgs?.ptr) ?: error("Failed to create channel") } @@ -170,3 +189,33 @@ internal class NativeManagedChannel( } } + +internal sealed class GrpcArg(val key: String) { + internal class Str(key: String, val value: String) : GrpcArg(key) + internal class Integer(key: String, val value: Int) : GrpcArg(key) + + internal val rawType: grpc_arg_type + get() = when (this) { + is Str -> grpc_arg_type.GRPC_ARG_STRING + is Integer -> grpc_arg_type.GRPC_ARG_INTEGER + } +} + +private fun List.toRaw(memScope: MemScope): grpc_channel_args { + with(memScope) { + val arr = allocArray(size) { + val arg = get(it) + type = arg.rawType + key = arg.key.cstr.ptr + when (arg) { + is GrpcArg.Str -> value.string = arg.value.cstr.ptr + is GrpcArg.Integer -> value.integer = arg.value.convert() + } + } + + return alloc { + num_args = size.convert() + args = arr + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt index e14276919..ec04c156f 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt @@ -156,7 +156,7 @@ class GrpcCompressionTest : GrpcProtoTest() { block() } } finally { - clearNativeEnv("GRPC_GRACE") + clearNativeEnv("GRPC_TRACE") } } diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt new file mode 100644 index 000000000..9f7ab2df6 --- /dev/null +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.rpc.RpcServer +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.EchoServiceImpl +import kotlinx.rpc.registerService +import kotlin.test.Test +import kotlin.test.assertContains +import kotlin.test.assertFailsWith +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * Tests that the client can configure the compression of requests. + * + * This test is hard to realize on native, as the gRPC-Core doesn't expose internal headers like + * `grpc-encoding` to the user application. This means we cannot verify that the client or sever + * actually sent those headers on native. Instead, we capture the grpc trace output (written to stderr) + * and verify that the client and server actually used the compression algorithm. + */ +class GrpcKeepAliveTest : GrpcProtoTest() { + override fun RpcServer.registerServices() { + return registerService { EchoServiceImpl() } + } + + @Test + fun `test keepalive set - should propagate settings to core libraries`() = testKeepAlive( + time = 15.seconds, + timeout = 5.seconds, + withoutCalls = true, + ) + + @Test + fun `test keepalive negative time - should fail`() { + val error = assertFailsWith { + runGrpcTest( + configure = { + keepAlive { + this.time = (-1).seconds + } + } + ) { + // not reached + } + } + assertContains(error.message!!, "keepalive time must be positive") + } + + @Test + fun `test keepalive negative timeout - should fail`() { + val error = assertFailsWith { + runGrpcTest( + configure = { + keepAlive { + this.timeout = (-1).seconds + } + } + ) { + // not reached + } + } + assertContains(error.message!!, "keepalive timeout must be positive") + } +} + +expect fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean, +) diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt index 231d4cdaf..94948ec03 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt @@ -5,6 +5,7 @@ package kotlinx.rpc.grpc.test.proto import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.test.runTest @@ -13,6 +14,7 @@ import kotlinx.rpc.grpc.client.ClientCallScope import kotlinx.rpc.grpc.client.ClientCredentials import kotlinx.rpc.grpc.client.ClientInterceptor import kotlinx.rpc.grpc.client.GrpcClient +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.server.GrpcServer import kotlinx.rpc.grpc.server.ServerCallScope import kotlinx.rpc.grpc.server.ServerCredentials @@ -23,19 +25,21 @@ abstract class GrpcProtoTest { abstract fun RpcServer.registerServices() - protected fun runGrpcTest( + fun runGrpcTest( serverCreds: ServerCredentials? = null, clientCreds: ClientCredentials? = null, overrideAuthority: String? = null, clientInterceptors: List = emptyList(), serverInterceptors: List = emptyList(), + configure: GrpcClientConfiguration.() -> Unit = {}, test: suspend (GrpcClient) -> Unit, - ) = runTest { + ) = runBlocking { serverMutex.withLock { val grpcClient = GrpcClient("localhost", PORT) { credentials = clientCreds ?: plaintext() if (overrideAuthority != null) this.overrideAuthority = overrideAuthority clientInterceptors.forEach { intercept(it) } + configure() } val grpcServer = GrpcServer( diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt index f4b0ea9cf..7f3af27c2 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt @@ -45,3 +45,10 @@ expect fun clearNativeEnv(key: String) */ expect suspend fun captureStdErr(block: suspend () -> Unit): String +expect suspend fun captureGrpcLogs( + jvmLogLevel: String = "DEBUG", + jvmLoggers: List = listOf("io.grpc"), + nativeVerbosity: String = "DEBUG", + nativeTracers: List = listOf("all"), + block: suspend () -> Unit +): String \ No newline at end of file diff --git a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt new file mode 100644 index 000000000..f2bc226e6 --- /dev/null +++ b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.rpc.grpc.client.internal.ManagedChannel +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.withService +import java.lang.reflect.Field +import kotlin.test.assertEquals +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds + +actual fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean +) { + runGrpcTest( + configure = { + keepAlive { + this.time = time + this.timeout = timeout + this.withoutCalls = withoutCalls + } + } + ) { + it.withService().UnaryEcho(EchoRequest { message = "Hello" }) + val nettyClientTransport = it.getField("channel") + .platformApi + .getField>("delegate", "subchannels") + .first() + .getField>("transports").first() + .getField("delegate", "delegate") + + val keepAliveTime = nettyClientTransport.getField("keepAliveTimeNanos").nanoseconds + val keepAliveTimeout = nettyClientTransport.getField("keepAliveTimeoutNanos").nanoseconds + val keepAliveWithoutCalls = nettyClientTransport.getField("keepAliveWithoutCalls") + + assertEquals(time, keepAliveTime) + assertEquals(timeout, keepAliveTimeout) + assertEquals(withoutCalls, keepAliveWithoutCalls) + } +} + +private inline fun Any.getField(vararg names: String): R { + var curr: Any = this + for (name in names) { + val field = findFieldInHierarchy(curr::class.java, name) + ?: throw NoSuchFieldException("Field '$name' not found in ${curr::class.java}") + field.isAccessible = true + curr = field.get(curr) as Any + } + return curr as R +} + +private fun findFieldInHierarchy(clazz: Class<*>, name: String): Field? { + var c: Class<*>? = clazz + while (c != null) { + try { + return c.getDeclaredField(name) + } catch (_: NoSuchFieldException) { + c = c.superclass + } + } + return null +} \ No newline at end of file diff --git a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt index ef56db05d..c06444859 100644 --- a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt +++ b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt @@ -6,6 +6,10 @@ package kotlinx.rpc.grpc.test import java.io.ByteArrayOutputStream import java.io.PrintStream +import java.util.logging.Handler +import java.util.logging.Level +import java.util.logging.LogRecord +import java.util.logging.Logger actual val runtime: Runtime get() = Runtime.JVM @@ -30,3 +34,41 @@ actual suspend fun captureStdErr(block: suspend () -> Unit): String { } } +actual suspend fun captureGrpcLogs( + jvmLogLevel: String, + jvmLoggers: List, + nativeVerbosity: String, + nativeTracers: List, + block: suspend () -> Unit +): String { + val sb = StringBuilder() + val handler = object : Handler() { + override fun publish(record: LogRecord) { + sb.append('[').append(record.loggerName).append("] ") + .append(record.level).append(": ") + .append(record.message).append('\n') + } + override fun flush() {} + override fun close() {} + } + handler.level = Level.ALL + + val saved = mutableListOf>() + try { + for (name in jvmLoggers) { + val logger = Logger.getLogger(name) + saved += logger to logger.level + logger.level = Level.ALL + logger.useParentHandlers = false + logger.addHandler(handler) + } + block() + return sb.toString() + } finally { + saved.forEach { (lg, lvl) -> + lg.level = lvl + lg.handlers.filterIsInstance() + .forEach { if (it === handler) lg.removeHandler(it) } + } + } +} diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt new file mode 100644 index 000000000..ce30e151c --- /dev/null +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.coroutines.test.runTest +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.captureGrpcLogs +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.withService +import kotlin.test.assertEquals +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + + +actual fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean, +) = runTest { + val logs = captureGrpcLogs( + nativeTracers = listOf("pick_first") + ) { + runGrpcTest( + configure = { + keepAlive { + this.time = time + this.timeout = timeout + this.withoutCalls = withoutCalls + } + } + ) { + it.withService().UnaryEcho(EchoRequest { message = "Hello" }) + } + } + + val keepAliveSettings = extractKeepAliveSettings(logs) + assertEquals(time, keepAliveSettings.time) + assertEquals(timeout, keepAliveSettings.timeout) + assertEquals(withoutCalls, keepAliveSettings.permitWithoutCalls) +} + +private data class KeepAliveSettings( + val permitWithoutCalls: Boolean, + val time: Duration, + val timeout: Duration +) + +private fun extractKeepAliveSettings(logs: String): KeepAliveSettings { + val channelArgsPattern = Regex("""channel args: \{([^}]+)\}""") + val channelArgsMatch = channelArgsPattern.find(logs) + ?: error("Could not find channel args in logs") + + val argsText = channelArgsMatch.groupValues[1] + + val permitWithoutCalls = Regex("""grpc\.keepalive_permit_without_calls=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() == 1 + + val timeMs = Regex("""grpc\.keepalive_time_ms=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() + ?: error("Could not find grpc.keepalive_time_ms in logs") + + val timeoutMs = Regex("""grpc\.keepalive_timeout_ms=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() + ?: error("Could not find grpc.keepalive_timeout_ms in logs") + + return KeepAliveSettings( + permitWithoutCalls = permitWithoutCalls, + time = timeMs.milliseconds, + timeout = timeoutMs.milliseconds + ) +} diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt index 17565693a..d3f46e3a1 100644 --- a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt @@ -22,7 +22,6 @@ import platform.posix.close import platform.posix.dup import platform.posix.dup2 import platform.posix.fflush -import platform.posix.fprintf import platform.posix.pipe import platform.posix.read import platform.posix.stderr @@ -75,3 +74,21 @@ actual suspend fun captureStdErr(block: suspend () -> Unit): String = coroutineS } } +actual suspend fun captureGrpcLogs( + jvmLogLevel: String, + jvmLoggers: List, + nativeVerbosity: String, + nativeTracers: List, + block: suspend () -> Unit +): String { + try { + return captureStdErr { + setNativeEnv("GRPC_TRACE", nativeTracers.joinToString(",")) + setNativeEnv("GRPC_VERBOSITY", nativeVerbosity) + block() + } + } finally { + clearNativeEnv("GRPC_TRACE") + clearNativeEnv("GRPC_VERBOSITY") + } +} \ No newline at end of file