Skip to content

Commit d2952a0

Browse files
committed
fix multiple edge-cases
1 parent 4327125 commit d2952a0

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

src/main/kotlin/io/github/viartemev/rabbitmq/publisher/AckListener.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ private val logger = KotlinLogging.logger {}
1919
*/
2020
internal class AckListener(
2121
private val continuations: ConcurrentHashMap<Long, Continuation<Boolean>>,
22-
inFlightSemaphore: Semaphore
22+
private val inFlightSemaphore: Semaphore
2323
) : ConfirmListener {
2424

2525
private val lowerBoundOfMultiple = AtomicLong(1)
@@ -54,16 +54,25 @@ internal class AckListener(
5454
*/
5555
private fun handle(deliveryTag: Long, multiple: Boolean, ack: Boolean) {
5656
logger.debug { "deliveryTag = [$deliveryTag], multiple = [$multiple], positive = [$ack]" }
57-
val lowerBound = lowerBoundOfMultiple.get()
5857
if (multiple) {
59-
for (tag in lowerBound..deliveryTag) {
60-
continuations.remove(tag)?.resume(ack)
58+
// Итерируемся по ключам map, которые <= deliveryTag
59+
val tagsToAck = continuations.keys.filter { it <= deliveryTag }
60+
for (tag in tagsToAck) {
61+
val cont = continuations.remove(tag)
62+
if (cont != null) {
63+
cont.resume(ack)
64+
inFlightSemaphore.release()
65+
} else {
66+
logger.warn { "Continuation for $tag not found (maybe already cancelled)" }
67+
}
6168
}
62-
lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag)
6369
} else {
64-
continuations.remove(deliveryTag)?.resume(ack)
65-
if (deliveryTag == lowerBound + 1) {
66-
lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag)
70+
val cont = continuations.remove(deliveryTag)
71+
if (cont != null) {
72+
cont.resume(ack)
73+
inFlightSemaphore.release()
74+
} else {
75+
logger.warn { "Continuation for $deliveryTag not found (maybe already cancelled)" }
6776
}
6877
}
6978
}

0 commit comments

Comments
 (0)