Skip to content

Commit aeb2000

Browse files
committed
Reduce enqueue overhead
1 parent 15d16aa commit aeb2000

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

src/sample.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class factory {
5454
/// Reclaim a sample that's no longer used.
5555
void reclaim_sample(sample *s);
5656

57+
std::size_t datasize() const { return format_sizes[fmt_] * static_cast<std::size_t>(num_chans_); }
58+
5759
private:
5860
/// Pop a sample from the freelist (multi-producer/single-consumer queue by Dmitry Vjukov)
5961
sample *pop_freelist();

src/stream_outlet_impl.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,38 +169,43 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) {
169169
return send_buffer_->wait_for_consumers(timeout);
170170
}
171171

172-
void stream_outlet_impl::push_timestamp_sync(const double &timestamp) {
172+
void stream_outlet_impl::push_timestamp_sync(double timestamp) {
173+
static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP");
174+
const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL;
173175
if (timestamp == DEDUCED_TIMESTAMP) {
174-
sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
176+
sync_buffs_.emplace_back(&TAG_DEDUCED_TIMESTAMP, 1);
175177
} else {
176-
sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
177-
sync_buffs_.emplace_back(asio::buffer(&timestamp, sizeof(timestamp)));
178+
sync_timestamps_.emplace_back(ENDIAN_SAFE_TAG_TRANSMITTED, timestamp);
179+
// add a pointer to the memory region containing |TAG_TRANSMITTED_TIMESTAMP|timestamp
180+
// one byte for the tag, 8 for the timestamp
181+
sync_buffs_.emplace_back(reinterpret_cast<const char*>(&sync_timestamps_.back()) + 7, 9);
178182
}
179183
}
180184

181185
void stream_outlet_impl::pushthrough_sync() {
182186
// LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size());
183187
tcp_server_->write_all_blocking(sync_buffs_);
184188
sync_buffs_.clear();
189+
sync_timestamps_.clear();
185190
}
186191

187192
void stream_outlet_impl::enqueue_sync(
188-
asio::const_buffer buff, const double &timestamp, bool pushthrough) {
193+
asio::const_buffer buff, double timestamp, bool pushthrough) {
189194
push_timestamp_sync(timestamp);
190195
sync_buffs_.push_back(buff);
191196
if (pushthrough) pushthrough_sync();
192197
}
193198

194199
template <class T>
195200
void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) {
196-
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
197-
sample_p smp(
198-
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
201+
if (timestamp == 0.0 || lsl::api_config::get_instance()->force_default_timestamps()) timestamp = lsl_local_clock();
199202
if (!do_sync_) {
203+
sample_p smp(
204+
sample_factory_->new_sample(timestamp, pushthrough));
200205
smp->assign_typed(data);
201206
send_buffer_->push_sample(smp);
202207
} else {
203-
enqueue_sync(asio::buffer(data, smp->datasize()), smp->timestamp, smp->pushthrough);
208+
enqueue_sync(asio::buffer(data, sample_factory_->datasize()), timestamp, pushthrough);
204209
}
205210
}
206211

src/stream_outlet_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ class stream_outlet_impl {
256256
throw std::runtime_error("The number of buffer elements to send is not a multiple of "
257257
"the stream's channel count.");
258258
if (num_samples > 0) {
259-
if (timestamp == 0.0) timestamp = lsl_clock();
260259
if (info().nominal_srate() != IRREGULAR_RATE)
261260
timestamp = timestamp - (num_samples - 1) / info().nominal_srate();
262261
push_sample(buffer, timestamp, pushthrough && (num_samples == 1));
@@ -311,14 +310,14 @@ class stream_outlet_impl {
311310

312311
/// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single
313312
/// timestamp.
314-
void push_timestamp_sync(const double &timestamp);
313+
void push_timestamp_sync(double timestamp);
315314

316315
/// push sync_buffs_ through each tcp server.
317316
void pushthrough_sync();
318317

319318
/// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the
320319
/// server.
321-
void enqueue_sync(asio::const_buffer buff, const double &timestamp, bool pushthrough);
320+
void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough);
322321

323322
/**
324323
* Append a single timestamp and multiple within-sample buffers to sync_buffs_.
@@ -362,6 +361,8 @@ class stream_outlet_impl {
362361
std::vector<thread_p> io_threads_;
363362
/// buffers used in synchronous call to gather-write data directly to the socket.
364363
std::vector<asio::const_buffer> sync_buffs_;
364+
/// timestamp buffer for sync transfers
365+
std::vector<std::pair<uint64_t, double>> sync_timestamps_;
365366
};
366367

367368
} // namespace lsl

0 commit comments

Comments
 (0)