diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 5ff2fd67..ec2f91e2 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -1,26 +1,63 @@ +#include #include #include #include +#include -// define a packed sample struct (here a stereo sample). -#pragma pack(1) -struct stereo_sample { - int16_t l, r; -}; +int main(int argc, char **argv) { + std::cout << "ReceiveDataInChunks" << std::endl; + std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl; -int main(int, char *[]) { try { + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; + int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360; + bool flush = argc > 3; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", "MyAudioStream").at(0)); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); + + // Use set_postprocessing to get the timestamps in a common base clock. + // Do not use if this application will record timestamps to disk -- it is better to + // do posthoc synchronization. + inlet.set_postprocessing(lsl::post_ALL); + + // Inlet opening is implicit when doing pull_sample or pull_chunk. + // Here we open the stream explicitly because we might be doing + // `flush` only. + inlet.open_stream(); + + double starttime = lsl::local_clock(), next_display = starttime + 1, + next_reset = starttime + 10; + + // and retrieve the chunks + uint64_t k = 0, num_samples = 0; + std::vector> result; + auto fetch_interval = std::chrono::milliseconds(20); + auto next_fetch = std::chrono::steady_clock::now() + fetch_interval; + - // and retrieve the chunks (note: this can of course also be done with pure std::vectors - // instead of stereo_samples) while (true) { - std::vector result; - if (double timestamp = inlet.pull_chunk_numeric_structs(result)) - std::cout << timestamp << std::endl; // only showing the time stamps here + std::this_thread::sleep_until(next_fetch); + if (flush) { + // You almost certainly don't want to use flush. This is here so we + // can test maximum outlet throughput. + num_samples += inlet.flush(); + } else { + if (double timestamp = inlet.pull_chunk(result)) num_samples += result.size(); + } + k++; + next_fetch += fetch_interval; + if (k % 50 == 0) { + double now = lsl::local_clock(); + std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; + if (now > next_reset) { + std::cout << "Resetting counters..." << std::endl; + starttime = now; + next_reset = now + 10; + num_samples = 0; + } + } } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 2ffce9a4..6e2747f4 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -1,7 +1,14 @@ #include #include +#include +#include #include #include +#include +#include +#ifndef M_PI +#define M_PI 3.14159265358979323846 +#endif // define a packed sample struct (here: a 16 bit stereo sample). @@ -10,31 +17,128 @@ struct stereo_sample { int16_t l, r; }; +struct fake_device { + /* + We create a fake device that will generate data. The inner details are not + so important because typically it will be up to the real data source + SDK + to provide a way to get data. + */ + std::size_t n_channels; + double srate; + int64_t pattern_samples; + int64_t head; + std::vector pattern; + std::chrono::steady_clock::time_point last_time; + + fake_device(const int16_t n_channels, const float srate) + : n_channels(n_channels), srate(srate), head(0) { + pattern_samples = (int64_t)(srate - 0.5) + 1; // truncate OK. + + // Pre-allocate entire test pattern. The data _could_ be generated on the fly + // for a much smaller memory hit, but we also use this example application + // to test LSL Outlet performance so we want to reduce out-of-LSL CPU + // utilization. + int64_t magnitude = std::numeric_limits::max(); + int64_t offset_0 = magnitude / 2; + int64_t offset_step = magnitude / n_channels; + pattern.reserve(pattern_samples * n_channels); + for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) { + for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) { + pattern.emplace_back( + offset_0 + chan_ix * offset_step + + magnitude * static_cast(sin(M_PI * chan_ix * sample_ix / n_channels))); + } + } + last_time = std::chrono::steady_clock::now(); + } + + std::vector get_data() { + auto now = std::chrono::steady_clock::now(); + auto elapsed_nano = + std::chrono::duration_cast(now - last_time).count(); + std::size_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK. + std::vector result; + result.resize(elapsed_samples * n_channels); + int64_t ret_samples = get_data(result); + std::vector output(result.begin(), result.begin() + ret_samples); + return output; + } + + std::size_t get_data(std::vector &buffer) { + auto now = std::chrono::steady_clock::now(); + auto elapsed_nano = + std::chrono::duration_cast(now - last_time).count(); + int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK. + elapsed_samples = std::min(elapsed_samples, (int64_t)buffer.size()); + if (false) { + // The fastest but no patterns. + memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]); + } else { + std::size_t end_sample = head + elapsed_samples; + std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples); + memcpy(&buffer[0], &(pattern[head]), nowrap_samples); + if (end_sample > pattern_samples) { + memcpy(&buffer[nowrap_samples], &(pattern[0]), elapsed_samples - nowrap_samples); + } + } + head = (head + elapsed_samples) % pattern_samples; + last_time += std::chrono::nanoseconds(int64_t(1e9 * elapsed_samples / srate)); + return elapsed_samples; + } +}; + int main(int argc, char **argv) { + std::cout << "SendDataInChunks" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; + std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; - int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; + int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device. + int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device. + int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. + int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk + try { - // make a new stream_info (44.1Khz, 16bit, audio, 2 channels) and open an outlet with it - lsl::stream_info info(name, type, 2, samplingrate, lsl::cf_int16); - lsl::stream_outlet outlet(info); + // Prepare the LSL stream. + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16); + lsl::stream_outlet outlet(info, 0, max_buffered); + lsl::xml_element desc = info.desc(); + desc.append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = desc.append_child("channels"); + for (int c = 0; c < n_channels; c++) { + lsl::xml_element chn = chns.append_child("channel"); + chn.append_child_value("label", "Chan-" + std::to_string(c)); + chn.append_child_value("unit", "microvolts"); + chn.append_child_value("type", "EEG"); + } + + // Create a connection to our device. + fake_device my_device(n_channels, (float)samplingrate); + + // Prepare buffer to get data from 'device'. + // The buffer should be largery than you think you need. Here we make it twice as large. + std::vector chunk_buffer(2 * chunk_samples * n_channels); std::cout << "Now sending data..." << std::endl; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. auto nextsample = std::chrono::high_resolution_clock::now(); - std::vector mychunk(info.nominal_srate() / 10); - int phase = 0; + uint64_t sample_counter = 0; for (unsigned c = 0;; c++) { - // wait a bit and generate a chunk of random data - nextsample += std::chrono::milliseconds(100); + // wait a bit + nextsample += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(nextsample); - for (stereo_sample &sample : mychunk) { - sample.l = static_cast(100 * sin(phase / 200.)); - sample.r = static_cast(120 * sin(phase / 400.)); - phase++; - } + // Get data from device + std::size_t returned_samples = my_device.get_data(chunk_buffer); - // send it - outlet.push_chunk_numeric_structs(mychunk); + // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. + // other push_chunk methods are easier but slightly slower. + outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true); } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/src/common.h b/src/common.h index 5e44e7d1..5d6614b3 100644 --- a/src/common.h +++ b/src/common.h @@ -30,6 +30,16 @@ extern "C" { "Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions." #endif +// compiler hint that the given expression is likely or unlikely +// (e.g., in conditional statements) +#if defined(__clang__) || defined(__GNUC__) +#define LIKELY(x) __builtin_expect(!!(x), 1) +#define UNLIKELY(x) __builtin_expect(!!(x), 0) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + // the highest supported protocol version // * 100 is the original version, supported by library versions 1.00+ // * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+ diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 6869de5e..53d13891 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -1,5 +1,5 @@ #include "consumer_queue.h" -#include "sample.h" +#include "common.h" #include "send_buffer.h" #include #include @@ -7,8 +7,17 @@ using namespace lsl; -consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry) - : registry_(std::move(registry)), buffer_(max_capacity) { +consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry) + : registry_(std::move(registry)), buffer_(new item_t[size]), size_(size), + // largest integer at which we can wrap correctly + wrap_at_(std::numeric_limits::max() - size - + std::numeric_limits::max() % size) { + assert(size_ > 1); + for (std::size_t i = 0; i < size_; ++i) + buffer_[i].seq_state.store(i, std::memory_order_release); + write_idx_.store(0, std::memory_order_release); + read_idx_.store(0, std::memory_order_release); + done_sync_.store(false, std::memory_order_release); if (registry_) registry_->register_consumer(this); } @@ -20,41 +29,23 @@ consumer_queue::~consumer_queue() { "Unexpected error while trying to unregister a consumer queue from its registry: %s", e.what()); } -} - -void consumer_queue::push_sample(const sample_p &sample) { - // push a sample, dropping the oldest sample if the queue ist already full. - // During this operation the producer becomes a second consumer, i.e., a case - // where the underlying spsc queue isn't thread-safe) so the mutex is locked. - std::lock_guard lk(mut_); - while (!buffer_.push(sample)) { - buffer_.pop(); - } - cv_.notify_one(); -} - -sample_p consumer_queue::pop_sample(double timeout) { - sample_p result; - if (timeout <= 0.0) { - std::lock_guard lk(mut_); - buffer_.pop(result); - } else { - std::unique_lock lk(mut_); - if (!buffer_.pop(result)) { - // wait for a new sample until the thread calling push_sample delivers one and sends a - // notification, or until timeout - std::chrono::duration sec(timeout); - cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); - } - } - return result; + delete[] buffer_; } uint32_t consumer_queue::flush() noexcept { - std::lock_guard lk(mut_); uint32_t n = 0; - while (buffer_.pop()) n++; + while (try_pop()) n++; return n; } -bool consumer_queue::empty() { return buffer_.empty(); } +std::size_t consumer_queue::read_available() const { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + if (write_index >= read_index) return write_index - read_index; + const std::size_t ret = write_index + size_ - read_index; + return ret; +} + +bool consumer_queue::empty() const { + return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed); +} diff --git a/src/consumer_queue.h b/src/consumer_queue.h index a84b26b2..0f83ae7b 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -2,60 +2,181 @@ #define CONSUMER_QUEUE_H #include "common.h" -#include "forward.h" -#include +#include "sample.h" +#include #include #include +#include namespace lsl { + +// size of a cache line +#if defined(__s390__) || defined(__s390x__) +constexpr int CACHELINE_BYTES = 256; +#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__) +constexpr int CACHELINE_BYTES = 128; +#else +constexpr int CACHELINE_BYTES = 64; +#endif + /** - * A thread-safe producer-consumer queue of unread samples. + * A thread-safe producer/consumer queue of unread samples. * - * Erases the oldest samples if max capacity is exceeded. Implemented as a circular buffer. + * Erases the oldest samples if max capacity is exceeded. Implemented as a ring buffer (wait-free + * unless the buffer is full or empty). */ class consumer_queue { - using buffer_type = lslboost::lockfree::spsc_queue; - public: /** * Create a new queue with a given capacity. - * @param max_capacity The maximum number of samples that can be held by the queue. Beyond that, + * @param size The maximum number of samples that can be held by the queue. Beyond that, * the oldest samples are dropped. * @param registry Optionally a pointer to a registration facility, for multiple-reader * arrangements. */ - consumer_queue(std::size_t max_capacity, send_buffer_p registry = send_buffer_p()); + explicit consumer_queue(std::size_t size, send_buffer_p registry = send_buffer_p()); /// Destructor. Unregisters from the send buffer, if any. ~consumer_queue(); - /// Push a new sample onto the queue. - void push_sample(const sample_p &sample); + /** + * Push a new sample onto the queue. Can only be called by one thread (single-producer). + * This deletes the oldest sample if the max capacity is exceeded. + */ + template void push_sample(T &&sample) { + while (!try_push(std::forward(sample))) { + // buffer full, drop oldest sample + if (!done_sync_.load(std::memory_order_acquire)) { + // synchronizes-with store to done_sync_ in ctor + std::atomic_thread_fence(std::memory_order_acquire); + done_sync_.store(true, std::memory_order_release); + } + try_pop(); + } + { + // ensure that notify_one doesn't happen in between try_pop and wait_for + std::lock_guard lk(mut_); + cv_.notify_one(); + } + } /** - * Pop a sample from the queue. - * Blocks if empty. + * Pop a sample from the queue. Can be called by multiple threads (multi-consumer). + * Blocks if empty and if a nonzero timeout is used. * @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned. */ - sample_p pop_sample(double timeout = FOREVER); + sample_p pop_sample(double timeout = FOREVER) { + sample_p result; + bool success = try_pop(result); + if (!success && timeout > 0.0) { + // only acquire mutex if we have to do a blocking wait with timeout + std::chrono::duration sec(timeout); + std::unique_lock lk(mut_); + if (!try_pop(result)) cv_.wait_for(lk, sec, [&] { return this->try_pop(result); }); + } + return result; + } - /// Number of available samples - std::size_t read_available() const { return buffer_.read_available(); } + /// Number of available samples. This is approximate unless called by the thread calling the + /// pop_sample(). + std::size_t read_available() const; - /// Flush the queue, return the number of dropped samples + /// Flush the queue, return the number of dropped samples. uint32_t flush() noexcept; - /// Check whether the buffer is empty. - bool empty(); + /// Check whether the buffer is empty. This is approximate unless called by the thread calling + /// the pop_sample(). + bool empty() const; consumer_queue(const consumer_queue&) = delete; + consumer_queue(consumer_queue &&) = delete; consumer_queue& operator=(const consumer_queue&) = delete; + consumer_queue &operator=(consumer_queue &&) = delete; private: - send_buffer_p registry_; // optional consumer registry - buffer_type buffer_; // the sample buffer - std::mutex mut_; // mutex for cond var (also to protect queue at buffer overflow) - std::condition_variable cv_; // to allow for blocking wait by consumer + // an item stored in the queue + struct item_t { + std::atomic seq_state; + sample_p value; + }; + + // Push a new element to the queue. + // Returns true if successful or false if queue full. + template bool try_push(T &&sample) { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t next_idx = add1_wrap(write_index); + item_t &item = buffer_[write_index % size_]; + if (UNLIKELY(write_index != item.seq_state.load(std::memory_order_acquire))) + return false; // item currently occupied, queue full + write_idx_.store(next_idx, std::memory_order_release); + copy_or_move(item.value, std::forward(sample)); + item.seq_state.store(next_idx, std::memory_order_release); + return true; + } + + // Pop an element from the queue (can be called with zero or one argument). Returns true if + // successful or false if queue is empty. Uses the same method as Vyukov's bounded MPMC queue. + template bool try_pop(T &...result) { + item_t *item; + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + for (;;) { + item = &buffer_[read_index % size_]; + const std::size_t seq_state = item->seq_state.load(std::memory_order_acquire); + const std::size_t next_idx = add1_wrap(read_index); + // check if the item is ok to pop + if (LIKELY(seq_state == next_idx)) { + // yes, try to claim slot using CAS + if (LIKELY(read_idx_.compare_exchange_weak( + read_index, next_idx, std::memory_order_relaxed))) + break; + } else if (LIKELY(seq_state == read_index)) + return false; // queue empty + else + // we're behind or ahead of another pop, try again + read_index = read_idx_.load(std::memory_order_relaxed); + } + move_or_drop(item->value, result...); + // mark item as free for next pass + item->seq_state.store(add_wrap(read_index, size_), std::memory_order_release); + return true; + } + + // helper to either copy or move a value, depending on whether it's an rvalue ref + inline static void copy_or_move(sample_p &dst, const sample_p &src) { dst = src; } + inline static void copy_or_move(sample_p &dst, sample_p &&src) { dst = std::move(src); } + // helper to either move or drop a value, depending on whether a dst argument is given + inline static void move_or_drop(sample_p &src) { src.~sample_p(); } + inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } + + /// helper to add a delta to the given index and wrap correctly + inline std::size_t add_wrap(std::size_t x, std::size_t delta) const noexcept { + const std::size_t xp = x + delta; + return xp >= wrap_at_ ? xp - wrap_at_ : xp; + } + + /// helper to increment the given index, wrapping it if necessary + inline std::size_t add1_wrap(std::size_t x) const noexcept { + return ++x == wrap_at_ ? 0 : x; + } + + /// optional consumer registry + send_buffer_p registry_; + /// the sample buffer + item_t *buffer_; + /// max number of elements in the queue + const std::size_t size_; + /// threshold at which to wrap read/write indices + const std::size_t wrap_at_; + // whether we have performed a sync on the data stored by the constructor + alignas(CACHELINE_BYTES) std::atomic done_sync_; + /// current write position + alignas(CACHELINE_BYTES) std::atomic write_idx_; + /// current read position + alignas(CACHELINE_BYTES) std::atomic read_idx_; + /// for use with the condition variable + std::mutex mut_; + /// condition for waiting with timeout + std::condition_variable cv_; }; } // namespace lsl diff --git a/src/send_buffer.cpp b/src/send_buffer.cpp index 2de12ffd..0c4c0430 100644 --- a/src/send_buffer.cpp +++ b/src/send_buffer.cpp @@ -1,5 +1,6 @@ #include "send_buffer.h" #include "consumer_queue.h" +#include #include #include #include