diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 44579922b..676839904 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -3,7 +3,6 @@ #include "sample.h" #include "send_buffer.h" #include -#include using namespace lsl; @@ -23,30 +22,34 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { + // acquire lock for more predictable behavior and avoid race condition with pop_sample() + std::unique_lock lk(mut_); + // if the buffer is full, drop oldest samples while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); } + cv_.notify_one(); } sample_p consumer_queue::pop_sample(double timeout) { sample_p result; if (timeout <= 0.0) { + std::unique_lock lk(mut_); buffer_.pop(result); } else { + std::unique_lock lk(mut_); if (!buffer_.pop(result)) { - // turn timeout into the point in time at which we give up - timeout += lsl::lsl_clock(); - do { - if (lsl::lsl_clock() >= timeout) break; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } while (!buffer_.pop(result)); + // release lock, wait for a new sample until the thread calling push_sample delivers one, or until timeout + std::chrono::duration sec(timeout); + cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } } return result; } uint32_t consumer_queue::flush() noexcept { + std::unique_lock lk(mut_); uint32_t n = 0; while (buffer_.pop()) n++; return n; diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 8a47f21bd..6c5c2ea21 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -4,6 +4,8 @@ #include "common.h" #include "forward.h" #include +#include +#include namespace lsl { /** @@ -52,6 +54,9 @@ class consumer_queue { private: send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer + // used to wait for new samples + std::mutex mut_; + std::condition_variable cv_; }; } // namespace lsl diff --git a/src/sample.h b/src/sample.h index 32f105621..23b417eef 100644 --- a/src/sample.h +++ b/src/sample.h @@ -146,12 +146,10 @@ class sample { void operator delete(void *x) { // delete the underlying memory only if it wasn't allocated in the factory's storage area sample *s = (sample *)x; - if (s && !(s->factory_ && - (((char *)s) >= s->factory_->storage_ && - ((char *)s) <= s->factory_->storage_ + s->factory_->storage_size_))) + if (s && !s->is_from_factory()) delete[](char *) x; } - + /// Test for equality with another sample. bool operator==(const sample &rhs) const noexcept; @@ -368,16 +366,27 @@ class sample { ; } + /// Test if the sample wasn't allocated in the factory's storage area + bool is_from_factory(void) { + return (factory_ && (((char *)this) >= factory_->storage_ && + ((char *)this) <= factory_->storage_ + factory_->storage_size_)); + } + /// Increment ref count. friend void intrusive_ptr_add_ref(sample *s) { s->refcount_.fetch_add(1, std::memory_order_relaxed); } - /// Decrement ref count and reclaim if unreferenced. + /// Decrement ref count, reclaim if unreferenced and belong to factory's storage, delete otherwise to avoid memory leaks friend void intrusive_ptr_release(sample *s) { if (s->refcount_.fetch_sub(1, std::memory_order_release) == 1) { std::atomic_thread_fence(std::memory_order_acquire); - s->factory_->reclaim_sample(s); + if (s->is_from_factory()) { + s->factory_->reclaim_sample(s); + } + else { + delete s; + } } } };