Skip to content

Commit 4ce466f

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents b6a3d03 + 4e8f18b commit 4ce466f

File tree

4 files changed

+181
-6
lines changed

4 files changed

+181
-6
lines changed

azmq/detail/receive_op.hpp

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,18 @@
1515
#include "socket_ops.hpp"
1616
#include "reactor_op.hpp"
1717

18+
#include <boost/version.hpp>
19+
#include <boost/asio/dispatch.hpp>
20+
#include <boost/asio/executor_work_guard.hpp>
21+
#if BOOST_VERSION >= 107900
22+
#include <boost/asio/recycling_allocator.hpp>
23+
#include <boost/asio/bind_allocator.hpp>
24+
#endif
25+
1826
#include <zmq.h>
1927

2028
#include <iterator>
29+
#include <type_traits>
2130

2231
namespace azmq {
2332
namespace detail {
@@ -30,6 +39,23 @@ class receive_buffer_op_base : public reactor_op {
3039
{ }
3140

3241
virtual bool do_perform(socket_type& socket) override {
42+
return do_perform_impl(socket);
43+
}
44+
45+
private:
46+
template<typename Buff = MutableBufferSequence>
47+
typename std::enable_if<std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
48+
{
49+
ec_ = boost::system::error_code();
50+
bytes_transferred_ += socket_ops::receive(const_cast<azmq::message&>(buffers_), socket, flags_ | ZMQ_DONTWAIT, ec_);
51+
if (ec_)
52+
return !try_again();
53+
return true;
54+
}
55+
56+
template<typename Buff = MutableBufferSequence>
57+
typename std::enable_if<!std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
58+
{
3359
ec_ = boost::system::error_code();
3460
bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
3561
if (ec_)
@@ -43,7 +69,7 @@ class receive_buffer_op_base : public reactor_op {
4369
}
4470

4571
private:
46-
MutableBufferSequence buffers_;
72+
typename std::conditional<std::is_same<MutableBufferSequence, azmq::message>::value, MutableBufferSequence const&, MutableBufferSequence>::type buffers_;
4773
flags_type flags_;
4874
};
4975

@@ -56,14 +82,30 @@ class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
5682
socket_ops::flags_type flags)
5783
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
5884
, handler_(std::move(handler))
85+
, work_guard(boost::asio::make_work_guard(handler_))
5986
{ }
6087

6188
virtual void do_complete() override {
62-
handler_(this->ec_, this->bytes_transferred_);
89+
#if BOOST_VERSION >= 107900
90+
auto alloc = boost::asio::get_associated_allocator(
91+
handler_, boost::asio::recycling_allocator<void>());
92+
#endif
93+
boost::asio::dispatch(work_guard.get_executor(),
94+
#if BOOST_VERSION >= 107900
95+
boost::asio::bind_allocator(alloc,
96+
#endif
97+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
98+
handler_(ec_, bytes_transferred_);
99+
})
100+
#if BOOST_VERSION >= 107900
101+
)
102+
#endif
103+
;
63104
}
64105

65106
private:
66107
Handler handler_;
108+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
67109
};
68110

69111
template<typename MutableBufferSequence,
@@ -75,14 +117,30 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen
75117
socket_ops::flags_type flags)
76118
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
77119
, handler_(std::move(handler))
120+
, work_guard(boost::asio::make_work_guard(handler_))
78121
{ }
79122

80123
virtual void do_complete() override {
81-
handler_(this->ec_, std::make_pair(this->bytes_transferred_, this->more()));
124+
#if BOOST_VERSION >= 107900
125+
auto alloc = boost::asio::get_associated_allocator(
126+
handler_, boost::asio::recycling_allocator<void>());
127+
#endif
128+
boost::asio::dispatch(work_guard.get_executor(),
129+
#if BOOST_VERSION >= 107900
130+
boost::asio::bind_allocator(alloc,
131+
#endif
132+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_, more = this->more()]() mutable {
133+
handler_(ec_, std::make_pair(bytes_transferred_, more));
134+
})
135+
#if BOOST_VERSION >= 107900
136+
)
137+
#endif
138+
;
82139
}
83140

84141
private:
85142
Handler handler_;
143+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
86144
};
87145

88146
class receive_op_base : public reactor_op {
@@ -111,14 +169,30 @@ class receive_op : public receive_op_base {
111169
socket_ops::flags_type flags)
112170
: receive_op_base(flags)
113171
, handler_(std::move(handler))
172+
, work_guard(boost::asio::make_work_guard(handler_))
114173
{ }
115174

116175
virtual void do_complete() override {
117-
handler_(ec_, msg_, bytes_transferred_);
176+
#if BOOST_VERSION >= 107900
177+
auto alloc = boost::asio::get_associated_allocator(
178+
handler_, boost::asio::recycling_allocator<void>());
179+
#endif
180+
boost::asio::dispatch(work_guard.get_executor(),
181+
#if BOOST_VERSION >= 107900
182+
boost::asio::bind_allocator(alloc,
183+
#endif
184+
[ec_ = this->ec_, handler_ = std::move(handler_), msg_ = std::move(msg_), bytes_transferred_ = this->bytes_transferred_]() mutable {
185+
handler_(ec_, msg_, bytes_transferred_);
186+
})
187+
#if BOOST_VERSION >= 107900
188+
)
189+
#endif
190+
;
118191
}
119192

120193
private:
121194
Handler handler_;
195+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
122196
};
123197
} // namespace detail
124198
} // namespace azmq

azmq/detail/send_op.hpp

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@
1414
#include "socket_ops.hpp"
1515
#include "reactor_op.hpp"
1616

17+
#include <boost/version.hpp>
18+
#include <boost/asio/dispatch.hpp>
19+
#include <boost/asio/executor_work_guard.hpp>
20+
#if BOOST_VERSION >= 107900
21+
#include <boost/asio/recycling_allocator.hpp>
22+
#include <boost/asio/bind_allocator.hpp>
23+
#endif
24+
1725
#include <zmq.h>
1826
#include <iterator>
1927

@@ -51,14 +59,30 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
5159
reactor_op::flags_type flags)
5260
: send_buffer_op_base<ConstBufferSequence>(buffers, flags)
5361
, handler_(std::move(handler))
62+
, work_guard(boost::asio::make_work_guard(handler_))
5463
{ }
5564

5665
virtual void do_complete() override {
57-
handler_(this->ec_, this->bytes_transferred_);
66+
#if BOOST_VERSION >= 107900
67+
auto alloc = boost::asio::get_associated_allocator(
68+
handler_, boost::asio::recycling_allocator<void>());
69+
#endif
70+
boost::asio::dispatch(work_guard.get_executor(),
71+
#if BOOST_VERSION >= 107900
72+
boost::asio::bind_allocator(alloc,
73+
#endif
74+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
75+
handler_(ec_, bytes_transferred_);
76+
})
77+
#if BOOST_VERSION >= 107900
78+
)
79+
#endif
80+
;
5881
}
5982

6083
private:
6184
Handler handler_;
85+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
6286
};
6387

6488
class send_op_base : public reactor_op {
@@ -89,14 +113,31 @@ class send_op : public send_op_base {
89113
flags_type flags)
90114
: send_op_base(std::move(msg), flags)
91115
, handler_(std::move(handler))
116+
, work_guard(boost::asio::make_work_guard(handler_))
92117
{ }
93118

94119
virtual void do_complete() override {
95-
handler_(ec_, bytes_transferred_);
120+
#if BOOST_VERSION >= 107900
121+
auto alloc = boost::asio::get_associated_allocator(
122+
handler_, boost::asio::recycling_allocator<void>());
123+
#endif
124+
boost::asio::dispatch(work_guard.get_executor(),
125+
#if BOOST_VERSION >= 107900
126+
boost::asio::bind_allocator(alloc,
127+
#endif
128+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
129+
handler_(ec_, bytes_transferred_);
130+
})
131+
#if BOOST_VERSION >= 107900
132+
)
133+
#endif
134+
;
135+
96136
}
97137

98138
private:
99139
Handler handler_;
140+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
100141
};
101142

102143
} // namespace detail

test/cpp20/socket/main.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <boost/asio/use_awaitable.hpp>
1616
#include <boost/current_function.hpp>
1717

18+
#include <boost/current_function.hpp>
19+
1820
#include <coroutine>
1921
#include <array>
2022
#include <string>

test/socket/main.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include <boost/asio/use_future.hpp>
2222
#include <boost/asio/spawn.hpp>
2323
#include <boost/optional.hpp>
24+
#if BOOST_VERSION >= 107400
25+
#include <boost/asio/any_io_executor.hpp>
26+
#endif
2427
#endif
2528

2629
#include <array>
@@ -898,14 +901,19 @@ TEST_CASE("Async Operation Send/Receive single message, stackful coroutine, one
898901
auto frame1 = azmq::message{};
899902
auto const btb1 = azmq::async_receive(sb, frame1, yield);
900903
REQUIRE(btb1 == 5);
904+
REQUIRE(frame1.more());
901905

902906
auto frame2 = azmq::message{};
903907
auto const btb2 = azmq::async_receive(sb, frame2, yield);
904908
REQUIRE(btb2 == 2);
909+
REQUIRE(frame2.more());
910+
REQUIRE(message_ref(snd_bufs.at(0)) == message_ref(frame2));
905911

906912
auto frame3 = azmq::message{};
907913
auto const btb3 = azmq::async_receive(sb, frame3, yield);
908914
REQUIRE(btb3 == 2);
915+
REQUIRE(!frame3.more());
916+
REQUIRE(message_ref(snd_bufs.at(1)) == message_ref(frame3));
909917
}
910918
#if BOOST_VERSION >= 108000
911919
, boost::asio::use_future
@@ -915,4 +923,54 @@ TEST_CASE("Async Operation Send/Receive single message, stackful coroutine, one
915923
ios.run();
916924
}
917925

926+
927+
TEST_CASE("Async Operation Send/Receive single message, check thread safety", "[socket_ops]") {
928+
boost::asio::io_service ios;
929+
#if BOOST_VERSION >= 107400
930+
boost::asio::strand<boost::asio::any_io_executor> strand{ios.get_executor()};
931+
#else
932+
boost::asio::strand<boost::asio::executor> strand{ios.get_executor()};
933+
#endif
934+
935+
azmq::socket sb(ios, ZMQ_ROUTER);
936+
sb.bind(subj(BOOST_CURRENT_FUNCTION));
937+
938+
azmq::socket sc(ios, ZMQ_DEALER);
939+
sc.connect(subj(BOOST_CURRENT_FUNCTION));
940+
941+
//send coroutine task
942+
boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
943+
REQUIRE(strand.running_in_this_thread());
944+
boost::system::error_code ecc;
945+
auto const btc = azmq::async_send(sc, snd_bufs, yield[ecc]);
946+
REQUIRE(strand.running_in_this_thread());
947+
REQUIRE(!ecc);
948+
REQUIRE(btc == 4);
949+
});
950+
951+
//receive coroutine task
952+
boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
953+
std::array<char, 5> ident;
954+
std::array<char, 2> a;
955+
std::array<char, 2> b;
956+
957+
std::array<boost::asio::mutable_buffer, 3> rcv_bufs = { {boost::asio::buffer(ident),
958+
boost::asio::buffer(a),
959+
boost::asio::buffer(b)}};
960+
961+
boost::system::error_code ecc;
962+
963+
REQUIRE(strand.running_in_this_thread());
964+
auto const btb = azmq::async_receive(sb, rcv_bufs, yield[ecc]);
965+
REQUIRE(strand.running_in_this_thread());
966+
REQUIRE(!ecc);
967+
REQUIRE(btb == 9);
968+
969+
REQUIRE(message_ref(snd_bufs.at(0)) == boost::string_ref(a.data(), 2));
970+
REQUIRE(message_ref(snd_bufs.at(1)) == boost::string_ref(b.data(), 2));
971+
});
972+
973+
ios.run();
974+
}
975+
918976
#endif // BOOST_VERSION >= 107000

0 commit comments

Comments
 (0)