From 4bfec09587a756d5f3dbffaa986defdae75a3263 Mon Sep 17 00:00:00 2001 From: jfrey Date: Sat, 28 Mar 2020 15:54:15 +0100 Subject: [PATCH 1/3] improve efficiency while waiting for new samples --- src/consumer_queue.cpp | 13 ++++++------- src/consumer_queue.h | 5 +++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 6789828ee..66613c49b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -2,7 +2,7 @@ #include "common.h" #include "sample.h" #include "send_buffer.h" -#include +#include using namespace lsl; @@ -26,6 +26,7 @@ void consumer_queue::push_sample(const sample_p &sample) { sample_p dummy; buffer_.pop(dummy); } + cv_.notify_one(); } sample_p consumer_queue::pop_sample(double timeout) { @@ -34,12 +35,10 @@ sample_p consumer_queue::pop_sample(double timeout) { buffer_.pop(result); } else { if (!buffer_.pop(result)) { - // turn timeout into the point in time at which we give up - timeout += lsl::lsl_clock(); - do { - if (lsl::lsl_clock() >= timeout) break; - lslboost::this_thread::sleep_for(lslboost::chrono::milliseconds(1)); - } while (!buffer_.pop(result)); + // wait untill for a new sample until the thread calling push_sample delivers one, or until timeout + std::unique_lock lk(lock_); + std::chrono::duration sec(timeout); + cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } } return result; diff --git a/src/consumer_queue.h b/src/consumer_queue.h index ac0f24cd1..49b364877 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -4,6 +4,8 @@ #include "common.h" #include "forward.h" #include +#include +#include namespace lsl { /** @@ -43,6 +45,9 @@ class consumer_queue : private lslboost::noncopyable { private: send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer + // used to wait for new samples + std::mutex lock_; + std::condition_variable cv_; }; } // namespace lsl From 4ad3896baa6f430629cd764dfd079479d47193b0 Mon Sep 17 00:00:00 2001 From: jfrey Date: Mon, 6 Apr 2020 21:02:14 +0200 Subject: [PATCH 2/3] consumer_queue: acquire lock in push_sample() for more predictable behavior --- src/consumer_queue.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index d70cffa02..712a405f8 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -22,6 +22,8 @@ consumer_queue::~consumer_queue() { } void consumer_queue::push_sample(const sample_p &sample) { + // acquire lock for more predictable behavior in regards to pop_sample() + std::unique_lock lk(lock_); while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); From 926d2bab96a8d997a3822327721857ddffe926fa Mon Sep 17 00:00:00 2001 From: jfrey Date: Fri, 30 Oct 2020 09:46:49 +0100 Subject: [PATCH 3/3] fix variable name --- src/consumer_queue.cpp | 6 +++--- src/consumer_queue.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 712a405f8..0201be79b 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -23,7 +23,7 @@ consumer_queue::~consumer_queue() { void consumer_queue::push_sample(const sample_p &sample) { // acquire lock for more predictable behavior in regards to pop_sample() - std::unique_lock lk(lock_); + std::unique_lock lk(mut_); while (!buffer_.push(sample)) { sample_p dummy; buffer_.pop(dummy); @@ -37,8 +37,8 @@ sample_p consumer_queue::pop_sample(double timeout) { buffer_.pop(result); } else { if (!buffer_.pop(result)) { - // wait untill for a new sample until the thread calling push_sample delivers one, or until timeout - std::unique_lock lk(lock_); + // wait for a new sample until the thread calling push_sample delivers one, or until timeout + std::unique_lock lk(mut_); std::chrono::duration sec(timeout); cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); } diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 545a6e870..6c5c2ea21 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -55,7 +55,7 @@ class consumer_queue { send_buffer_p registry_; // optional consumer registry buffer_type buffer_; // the sample buffer // used to wait for new samples - std::mutex lock_; + std::mutex mut_; std::condition_variable cv_; };