diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 44579922b..0201be79b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -3,7 +3,6 @@ #include "sample.h" #include "send_buffer.h" #include -#include using namespace lsl; @@ -23,10 +22,13 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { + // acquire lock for more predictable behavior in regards to pop_sample() + std::unique_lock lk(mut_); while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); } + cv_.notify_one(); } sample_p consumer_queue::pop_sample(double timeout) { @@ -35,12 +37,10 @@ sample_p consumer_queue::pop_sample(double timeout) { buffer_.pop(result); } else { if (!buffer_.pop(result)) { - // turn timeout into the point in time at which we give up - timeout += lsl::lsl_clock(); - do { - if (lsl::lsl_clock() >= timeout) break; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } while (!buffer_.pop(result)); + // wait for a new sample until the thread calling push_sample delivers one, or until timeout + std::unique_lock lk(mut_); + std::chrono::duration sec(timeout); + cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } } return result; diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 8a47f21bd..6c5c2ea21 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -4,6 +4,8 @@ #include "common.h" #include "forward.h" #include +#include +#include namespace lsl { /** @@ -52,6 +54,9 @@ class consumer_queue { private: send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer + // used to wait for new samples + std::mutex mut_; + std::condition_variable cv_; }; } // namespace lsl