Skip to content

Commit 7d097df

Browse files
authored
Merge pull request mfontanini#235 from accelerated/promise_bug
Fix tracker promise from throwing when set multiple times
2 parents f1de729 + fbbd5bc commit 7d097df

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,16 @@ void BufferedProducer<BufferType, Allocator>::clear() {
771771

772772
template <typename BufferType, typename Allocator>
773773
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
774-
return messages_.size() + retry_messages_.size();
774+
size_t size = 0;
775+
{
776+
std::lock_guard<std::mutex> lock(mutex_);
777+
size += messages_.size();
778+
}
779+
{
780+
std::lock_guard<std::mutex> lock(retry_mutex_);
781+
size += retry_messages_.size();
782+
}
783+
return size;
775784
}
776785

777786
template <typename BufferType, typename Allocator>
@@ -1025,7 +1034,12 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10251034
}
10261035
// Signal producers
10271036
if (tracker) {
1028-
tracker->should_retry_.set_value(should_retry);
1037+
try {
1038+
tracker->should_retry_.set_value(should_retry);
1039+
}
1040+
catch (const std::future_error& ex) {
1041+
//This is an async retry and future is not being read
1042+
}
10291043
}
10301044
// Decrement the expected acks and check to prevent underflow
10311045
if (pending_acks_ > 0) {

0 commit comments

Comments
 (0)