Skip to content

Commit a9e41f3

Browse files
committed
add timeouts
1 parent d2952a0 commit a9e41f3

File tree

1 file changed

+28
-15
lines changed

1 file changed

+28
-15
lines changed

src/main/kotlin/io/github/viartemev/rabbitmq/publisher/ConfirmPublisher.kt

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.github.viartemev.rabbitmq.publisher
33
import com.rabbitmq.client.Channel
44
import kotlinx.coroutines.suspendCancellableCoroutine
55
import kotlinx.coroutines.sync.Semaphore
6+
import kotlinx.coroutines.withTimeout
67
import mu.KotlinLogging
78
import java.util.concurrent.ConcurrentHashMap
89
import kotlin.coroutines.Continuation
@@ -24,6 +25,9 @@ class ConfirmPublisher internal constructor(
2425
internal val continuations = ConcurrentHashMap<Long, Continuation<Boolean>>()
2526
private val inFlightSemaphore = Semaphore(maxInFlightMessages)
2627

28+
@Volatile
29+
private var isClosed = false
30+
2731
init {
2832
channel.addConfirmListener(AckListener(continuations, inFlightSemaphore))
2933
}
@@ -32,26 +36,35 @@ class ConfirmPublisher internal constructor(
3236
* Publishes a message with confirmation to the specified exchange and routing key.
3337
*
3438
* @param message The {@link OutboundMessage} to publish.
35-
* @return True if the message was published successfully, false otherwise.
39+
* @param timeoutMillis Optional timeout in milliseconds for ожидания подтверждения. Если null — ждать бесконечно.
40+
* @return True if the message was published успешно, false otherwise.
3641
*/
37-
suspend fun publishWithConfirm(message: OutboundMessage): Boolean {
42+
suspend fun publishWithConfirm(message: OutboundMessage, timeoutMillis: Long? = null): Boolean {
43+
if (isClosed) throw IllegalStateException("Publisher is closed")
3844
val messageSequenceNumber = channel.nextPublishSeqNo
3945
logger.debug { "Generated message Sequence Number: $messageSequenceNumber" }
4046
inFlightSemaphore.acquire()
41-
return suspendCancellableCoroutine { continuation ->
42-
continuation.invokeOnCancellation {
43-
continuations.remove(messageSequenceNumber)
44-
inFlightSemaphore.release()
45-
}
46-
continuations[messageSequenceNumber] = continuation
47-
try {
48-
message.apply { channel.basicPublish(exchange, routingKey, properties, msg) }
49-
logger.debug { "Message successfully published" }
50-
} catch (e: Exception) {
51-
continuations.remove(messageSequenceNumber)
52-
inFlightSemaphore.release()
53-
continuation.resumeWithException(e)
47+
val block: suspend () -> Boolean = {
48+
suspendCancellableCoroutine { continuation ->
49+
continuation.invokeOnCancellation {
50+
continuations.remove(messageSequenceNumber)
51+
inFlightSemaphore.release()
52+
}
53+
continuations[messageSequenceNumber] = continuation
54+
try {
55+
message.apply { channel.basicPublish(exchange, routingKey, properties, msg) }
56+
logger.debug { "Message successfully published" }
57+
} catch (e: Exception) {
58+
continuations.remove(messageSequenceNumber)
59+
inFlightSemaphore.release()
60+
continuation.resumeWithException(e)
61+
}
5462
}
5563
}
64+
return if (timeoutMillis != null) {
65+
withTimeout(timeoutMillis) { block() }
66+
} else {
67+
block()
68+
}
5669
}
5770
}

0 commit comments

Comments
 (0)