@@ -23,7 +23,7 @@ stream_outlet_impl::stream_outlet_impl(
2323 send_buffer_(std::make_shared<send_buffer>(max_capacity)),
2424 do_sync_(flags & transp_sync_blocking) {
2525
26- if ((info.channel_format () == cft_string) && (flags & transp_sync_blocking) ) {
26+ if ((info.channel_format () == cft_string) && do_sync_ ) {
2727 LOG_F (WARNING, " sync push not supported for string-formatted streams. Reverting to async." );
2828 do_sync_ = false ;
2929 }
@@ -154,20 +154,8 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
154154 smp->assign_untyped (data); // Note: Makes a copy!
155155 send_buffer_->push_sample (smp);
156156 } else {
157- if (timestamp == DEDUCED_TIMESTAMP) {
158- sync_buffs_.push_back (asio::buffer (&TAG_DEDUCED_TIMESTAMP, 1 ));
159- } else {
160- sync_buffs_.push_back (asio::buffer (&TAG_TRANSMITTED_TIMESTAMP, 1 ));
161- sync_buffs_.push_back (asio::buffer (×tamp, sizeof (timestamp)));
162- }
163- sync_buffs_.push_back (asio::buffer (data, smp->datasize ()));
164- if (pushthrough) {
165- for (auto &tcp_server : tcp_servers_)
166- tcp_server->write_all_blocking (sync_buffs_);
167- sync_buffs_.clear ();
168- }
157+ enqueue_sync (asio::buffer (data, smp->datasize ()), timestamp, pushthrough);
169158 }
170-
171159}
172160
173161bool stream_outlet_impl::have_consumers () { return send_buffer_->have_consumers (); }
@@ -176,6 +164,28 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) {
176164 return send_buffer_->wait_for_consumers (timeout);
177165}
178166
167+ void stream_outlet_impl::push_timestamp_sync (double timestamp) {
168+ if (timestamp == DEDUCED_TIMESTAMP) {
169+ sync_buffs_.emplace_back (asio::buffer (&TAG_DEDUCED_TIMESTAMP, 1 ));
170+ } else {
171+ sync_buffs_.emplace_back (asio::buffer (&TAG_TRANSMITTED_TIMESTAMP, 1 ));
172+ sync_buffs_.emplace_back (asio::buffer (×tamp, sizeof (timestamp)));
173+ }
174+ }
175+
176+ void stream_outlet_impl::pushthrough_sync () {
177+ // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size());
178+ for (auto &tcp_server : tcp_servers_)
179+ tcp_server->write_all_blocking (sync_buffs_);
180+ sync_buffs_.clear ();
181+ }
182+
183+ void stream_outlet_impl::enqueue_sync (asio::const_buffer buff, double timestamp, bool pushthrough) {
184+ push_timestamp_sync (timestamp);
185+ sync_buffs_.push_back (buff);
186+ if (pushthrough) pushthrough_sync ();
187+ }
188+
179189template <class T >
180190void stream_outlet_impl::enqueue (const T *data, double timestamp, bool pushthrough) {
181191 if (lsl::api_config::get_instance ()->force_default_timestamps ()) timestamp = 0.0 ;
@@ -185,18 +195,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
185195 smp->assign_typed (data);
186196 send_buffer_->push_sample (smp);
187197 } else {
188- if (timestamp == DEDUCED_TIMESTAMP) {
189- sync_buffs_.push_back (asio::buffer (&TAG_DEDUCED_TIMESTAMP, 1 ));
190- } else {
191- sync_buffs_.push_back (asio::buffer (&TAG_TRANSMITTED_TIMESTAMP, 1 ));
192- sync_buffs_.push_back (asio::buffer (×tamp, sizeof (timestamp)));
193- }
194- sync_buffs_.push_back (asio::buffer (data, smp->datasize ()));
195- if (pushthrough) {
196- for (auto &tcp_server : tcp_servers_)
197- tcp_server->write_all_blocking (sync_buffs_);
198- sync_buffs_.clear ();
199- }
198+ enqueue_sync (asio::buffer (data, smp->datasize ()), timestamp, pushthrough);
200199 }
201200}
202201
@@ -208,4 +207,10 @@ template void stream_outlet_impl::enqueue<float>(const float *data, double, bool
208207template void stream_outlet_impl::enqueue<double >(const double *data, double , bool );
209208template void stream_outlet_impl::enqueue<std::string>(const std::string *data, double , bool );
210209
210+ void stream_outlet_impl::enqueue_sync_multi (std::vector<asio::const_buffer> buffs, double timestamp, bool pushthrough) {
211+ push_timestamp_sync (timestamp);
212+ sync_buffs_.insert ( sync_buffs_.end (), buffs.begin (), buffs.end () );
213+ if (pushthrough) pushthrough_sync ();
214+ }
215+
211216} // namespace lsl
0 commit comments