1+ #pragma once
12//
23// cancellable_streambuf.hpp
34// ~~~~~~~~~~~~~~~~~~~~~~~~~~
1213// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
1314//
1415
15- #ifndef CANCELLABLE_STREAMBUF_HPP
16- #define CANCELLABLE_STREAMBUF_HPP
17-
18- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
19- #pragma once
20- #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
21-
16+ #define BOOST_ASIO_NO_DEPRECATED
2217#include " cancellation.h"
23- #include < boost/asio/basic_socket.hpp>
24- #include < boost/asio/detail/array.hpp>
25- #include < boost/asio/detail/config.hpp>
26- #include < boost/asio/detail/throw_error.hpp>
18+ #include < boost/asio/basic_stream_socket.hpp>
2719#include < boost/asio/io_context.hpp>
2820#include < boost/asio/ip/tcp.hpp>
29- #include < boost/asio/stream_socket_service.hpp>
30- #include < boost/utility/base_from_member.hpp>
3121#include < exception>
32- #include < set>
3322#include < streambuf>
3423
3524namespace asio = lslboost::asio;
3625using lslboost::system::error_code;
37- using asio::basic_socket;
3826using asio::io_context;
39- using asio::ip::tcp;
4027
4128namespace lsl {
42- typedef tcp Protocol;
29+ using Protocol = asio::ip::tcp;
30+ using Socket = asio::basic_stream_socket<Protocol>;
4331// / Iostream streambuf for a socket.
4432class cancellable_streambuf : public std ::streambuf,
45- private lslboost::base_from_member< io_context> ,
46- public basic_socket<Protocol> ,
33+ private asio:: io_context,
34+ private Socket ,
4735 public lsl::cancellable_obj {
4836public:
4937 // / Construct a cancellable_streambuf without establishing a connection.
50- cancellable_streambuf ()
51- : basic_socket<Protocol>(lslboost::base_from_member<asio::io_context>::member),
52- cancel_issued_ (false ), cancel_started_(false ) {
53- init_buffers ();
54- }
38+ cancellable_streambuf () : io_context(), Socket(as_context()) { init_buffers (); }
5539
5640 // / Destructor flushes buffered data.
57- virtual ~cancellable_streambuf () {
41+ virtual ~cancellable_streambuf () override {
5842 // no cancel() can fire after this call
5943 unregister_from_all ();
6044 if (pptr () != pbase ()) overflow (traits_type::eof ());
@@ -69,7 +53,7 @@ class cancellable_streambuf : public std::streambuf,
6953 cancel_issued_ = true ;
7054 std::lock_guard<std::recursive_mutex> lock (cancel_mut_);
7155 cancel_started_ = false ;
72- this ->get_service (). get_io_context (). post ( [this ]() { close_if_open (); });
56+ asio::post ( this ->as_context (), [this ]() { close_if_open (); });
7357 }
7458
7559
@@ -88,15 +72,12 @@ class cancellable_streambuf : public std::streambuf,
8872 " Attempt to connect() a cancellable_streambuf after it has been cancelled." );
8973
9074 init_buffers ();
91- this ->basic_socket <Protocol>::close (ec_);
92-
93- io_handler handler = {this };
94- this ->basic_socket <Protocol>::async_connect (endpoint, handler);
95- this ->get_service ().get_io_context ().reset ();
75+ socket ().close (ec_);
76+ socket ().async_connect (endpoint, [this ](const error_code &ec) { this ->ec_ = ec; });
77+ this ->as_context ().restart ();
9678 }
9779 ec_ = asio::error::would_block;
98- do
99- this ->get_service ().get_io_context ().run_one ();
80+ do as_context ().run_one ();
10081 while (!cancel_issued_ && ec_ == asio::error::would_block);
10182 return !ec_ ? this : nullptr ;
10283 }
@@ -108,7 +89,7 @@ class cancellable_streambuf : public std::streambuf,
10889 */
10990 cancellable_streambuf *close () {
11091 sync ();
111- this -> basic_socket <Protocol>:: close (ec_);
92+ socket (). close (ec_);
11293 if (!ec_) init_buffers ();
11394 return !ec_ ? this : nullptr ;
11495 }
@@ -122,36 +103,43 @@ class cancellable_streambuf : public std::streambuf,
122103protected:
123104 // / Close the socket if it's open.
124105 void close_if_open () {
125- if (!cancel_started_ && this -> is_open ()) {
106+ if (!cancel_started_ && socket (). is_open ()) {
126107 cancel_started_ = true ;
127- close ();
108+ socket (). close ();
128109 }
129110 }
130111
131- // / This function makes sure that a cancellation, if issued, is not being eaten by the
132- // / io_context reset()
112+ // / Convenience method to call methods inherited from `Socket`
113+ Socket &socket () { return *this ; }
114+
115+ // / Convenience method to call methods inherited from `io_context`
116+ asio::io_context &as_context () { return *this ; }
117+
118+ // / Make sure that a cancellation, if issued, is not being eaten by `io_context::reset()`
133119 void protected_reset () {
134120 std::lock_guard<std::recursive_mutex> lock (cancel_mut_);
135121 // if the cancel() comes between completion of a run_one() and this call, close will be
136122 // issued right here at the next opportunity
137123 if (cancel_issued_) close_if_open ();
138- this ->get_service ().get_io_context (). reset ();
124+ this ->as_context ().restart ();
139125 // if the cancel() comes between this call and a completion of run_one(), the posted close
140126 // will be processed by the run_one
141127 }
142128
143129 int_type underflow () override {
144130 if (gptr () == egptr ()) {
145- io_handler handler = {this };
146- this ->get_service ().async_receive (this ->get_implementation (),
147- asio::buffer (asio::buffer (get_buffer_) + putback_max), 0 ,
148- handler);
131+ std::size_t bytes_transferred_;
132+ socket ().async_receive (asio::buffer (asio::buffer (get_buffer_) + putback_max),
133+ [this , &bytes_transferred_](
134+ const error_code &ec, std::size_t bytes_transferred = 0 ) {
135+ this ->ec_ = ec;
136+ bytes_transferred_ = bytes_transferred;
137+ });
149138
150139 ec_ = asio::error::would_block;
151140 protected_reset (); // line changed for lsl
152- do
153- this ->get_service ().get_io_context ().run_one ();
154- while (ec_ == asio::error::would_block);
141+ do as_context ().run_one ();
142+ while (!cancel_issued_ && ec_ == asio::error::would_block);
155143 if (ec_) return traits_type::eof ();
156144
157145 setg (&get_buffer_[0 ], &get_buffer_[0 ] + putback_max,
@@ -165,18 +153,20 @@ class cancellable_streambuf : public std::streambuf,
165153 // Send all data in the output buffer.
166154 asio::const_buffer buffer = asio::buffer (pbase (), pptr () - pbase ());
167155 while (asio::buffer_size (buffer) > 0 ) {
168- io_handler handler = {this };
169- this ->get_service ().async_send (
170- this ->get_implementation (), asio::buffer (buffer), 0 , handler);
156+ std::size_t bytes_transferred_;
157+ socket ().async_send (asio::buffer (buffer),
158+ [this , &bytes_transferred_](const error_code &ec, std::size_t bytes_transferred) {
159+ this ->ec_ = ec;
160+ bytes_transferred_ = bytes_transferred;
161+ });
171162 ec_ = asio::error::would_block;
172163 protected_reset (); // line changed for lsl
173- do
174- this ->get_service ().get_io_context ().run_one ();
175- while (ec_ == asio::error::would_block);
164+ do as_context ().run_one ();
165+ while (!cancel_issued_ && ec_ == asio::error::would_block);
176166 if (ec_) return traits_type::eof ();
177167 buffer = buffer + bytes_transferred_;
178168 }
179- setp (&put_buffer_[0 ], &put_buffer_[0 ] + put_buffer_. size ( ));
169+ setp (&put_buffer_[0 ], &put_buffer_[0 ] + sizeof (put_buffer_ ));
180170
181171 // If the new character is eof then our work here is done.
182172 if (traits_type::eq_int_type (c, traits_type::eof ())) return traits_type::not_eof (c);
@@ -194,32 +184,17 @@ class cancellable_streambuf : public std::streambuf,
194184 return nullptr ;
195185 }
196186
197- // private:
198187 void init_buffers () {
199188 setg (&get_buffer_[0 ], &get_buffer_[0 ] + putback_max, &get_buffer_[0 ] + putback_max);
200- setp (&put_buffer_[0 ], &put_buffer_[0 ] + put_buffer_. size ( ));
189+ setp (&put_buffer_[0 ], &put_buffer_[0 ] + sizeof (put_buffer_ ));
201190 }
202191
203- struct io_handler ;
204- friend struct io_handler ;
205- struct io_handler {
206- cancellable_streambuf *this_;
207- void operator ()(const error_code & ec, std::size_t bytes_transferred = 0 ) {
208- this_->ec_ = ec;
209- this_->bytes_transferred_ = bytes_transferred;
210- }
211- };
212-
213192 enum { putback_max = 8 };
214193 enum { buffer_size = 512 };
215- asio::detail::array<char , buffer_size> get_buffer_;
216- asio::detail::array<char , buffer_size> put_buffer_;
194+ char get_buffer_[buffer_size], put_buffer_[buffer_size];
217195 error_code ec_;
218- std::size_t bytes_transferred_;
219- std::atomic<bool > cancel_issued_;
220- bool cancel_started_;
196+ std::atomic<bool > cancel_issued_{false };
197+ bool cancel_started_{false };
221198 std::recursive_mutex cancel_mut_;
222199};
223200} // namespace lsl
224-
225- #endif // CANCELLABLE_STREAMBUF_HPP
0 commit comments