Skip to content

Commit db07abf

Browse files
authored
✨ IPv4/IPv6 Multicast support (#3)
Implement multicast support for networking
1 parent 8b25976 commit db07abf

File tree

7 files changed

+295
-1
lines changed

7 files changed

+295
-1
lines changed

include/asyncpp/io/detail/io_engine.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ namespace asyncpp::io::detail {
7979
virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0;
8080
virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0;
8181
virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0;
82+
virtual void socket_multicast_join(socket_handle_t socket, address group, address iface) = 0;
83+
virtual void socket_multicast_drop(socket_handle_t socket, address group, address iface) = 0;
84+
virtual void socket_multicast_set_send_interface(socket_handle_t socket, address iface) = 0;
85+
virtual void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) = 0;
86+
virtual void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) = 0;
8287
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
8388
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
8489
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;

include/asyncpp/io/socket.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ namespace asyncpp::io {
9797
void listen(std::uint32_t backlog = 0);
9898

9999
void allow_broadcast(bool enable);
100+
void multicast_join(address group, address iface);
101+
void multicast_join(address group);
102+
void multicast_drop(address group, address iface);
103+
void multicast_drop(address group);
104+
void multicast_set_send_interface(address iface);
105+
void multicast_set_ttl(size_t ttl);
106+
void multicast_set_loopback(bool enabled);
100107

101108
[[nodiscard]] detail::io_engine::socket_handle_t native_handle() const noexcept { return m_fd; }
102109
[[nodiscard]] detail::io_engine::socket_handle_t release() noexcept {
@@ -620,7 +627,7 @@ namespace asyncpp::io {
620627
if (that->result)
621628
that->real_cb(that->result);
622629
else
623-
that->real_cb(socket::from_fd(that->service(), that->result_handle));
630+
that->real_cb(socket::from_fd(that->service, that->result_handle));
624631

625632
delete that;
626633
};

src/io_engine_generic_unix.cpp

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
#ifndef _WIN32
22
#include "io_engine_generic_unix.h"
33

4+
#ifdef __APPLE__
5+
#define _DARWIN_C_SOURCE
6+
#endif
7+
48
#include <cstring>
59

610
#include <fcntl.h>
@@ -137,6 +141,99 @@ namespace asyncpp::io::detail {
137141
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
138142
}
139143

144+
void io_engine_generic_unix::socket_multicast_join(socket_handle_t socket, address group, address iface) {
145+
if (group.type() != iface.type())
146+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
147+
"group and interface need to be of the same type");
148+
if (group.is_ipv4()) {
149+
struct ip_mreq mc_req {};
150+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
151+
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
152+
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
153+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
154+
} else if (group.is_ipv6()) {
155+
struct ipv6_mreq mc_req {};
156+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
157+
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
158+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mc_req, sizeof(mc_req));
159+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
160+
} else {
161+
throw std::system_error(std::make_error_code(std::errc::not_supported),
162+
"multicast is only supported on IPv4/IPv6");
163+
}
164+
}
165+
166+
void io_engine_generic_unix::socket_multicast_drop(socket_handle_t socket, address group, address iface) {
167+
if (group.type() != iface.type())
168+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
169+
"group and interface need to be of the same type");
170+
if (group.is_ipv4()) {
171+
struct ip_mreq mc_req {};
172+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
173+
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
174+
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
175+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
176+
} else if (group.is_ipv6()) {
177+
struct ipv6_mreq mc_req {};
178+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
179+
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
180+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mc_req, sizeof(mc_req));
181+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
182+
} else {
183+
throw std::system_error(std::make_error_code(std::errc::not_supported),
184+
"multicast is only supported on IPv4/IPv6");
185+
}
186+
}
187+
188+
void io_engine_generic_unix::socket_multicast_set_send_interface(socket_handle_t socket, address iface) {
189+
if (iface.is_ipv4()) {
190+
auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
191+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<char*>(&addr), sizeof(addr));
192+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
193+
} else if (iface.is_ipv6()) {
194+
auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
195+
auto res =
196+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<char*>(&scope), sizeof(scope));
197+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
198+
} else {
199+
throw std::system_error(std::make_error_code(std::errc::not_supported),
200+
"multicast is only supported on IPv4/IPv6");
201+
}
202+
}
203+
204+
void io_engine_generic_unix::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
205+
auto type = get_handle_type(socket);
206+
if (ttl > std::numeric_limits<int>::max()) throw std::invalid_argument("ttl value out of range");
207+
int ittl = ttl;
208+
if (type == address_type::ipv4) {
209+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<char*>(&ittl), sizeof(ittl));
210+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
211+
} else if (type == address_type::ipv6) {
212+
auto res =
213+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<char*>(&ittl), sizeof(ittl));
214+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
215+
} else {
216+
throw std::system_error(std::make_error_code(std::errc::not_supported),
217+
"multicast is only supported on IPv4/IPv6");
218+
}
219+
}
220+
221+
void io_engine_generic_unix::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
222+
auto type = get_handle_type(socket);
223+
int val = enabled ? 1 : 0;
224+
if (type == address_type::ipv4) {
225+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
226+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
227+
} else if (type == address_type::ipv6) {
228+
auto res =
229+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
230+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
231+
} else {
232+
throw std::system_error(std::make_error_code(std::errc::not_supported),
233+
"multicast is only supported on IPv4/IPv6");
234+
}
235+
}
236+
140237
void io_engine_generic_unix::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
141238
int mode = 0;
142239
if (receive && send)
@@ -151,6 +248,19 @@ namespace asyncpp::io::detail {
151248
if (res < 0 && errno != ENOTCONN) throw std::system_error(errno, std::system_category(), "shutdown failed");
152249
}
153250

251+
address_type io_engine_generic_unix::get_handle_type(socket_handle_t socket) {
252+
int type = -1;
253+
socklen_t length = sizeof(int);
254+
auto res = getsockopt(socket, SOL_SOCKET, SO_TYPE, &type, &length);
255+
if (res < 0) throw std::system_error(errno, std::system_category(), "getsockopt failed");
256+
switch (type) {
257+
case AF_INET: return address_type::ipv4;
258+
case AF_INET6: return address_type::ipv6;
259+
case AF_UNIX: return address_type::uds;
260+
default: throw std::logic_error("unknown socket type");
261+
}
262+
}
263+
154264
io_engine::file_handle_t io_engine_generic_unix::file_open(const char* filename, std::ios_base::openmode mode) {
155265
if ((mode & std::ios_base::ate) == std::ios_base::ate) throw std::logic_error("unsupported flag");
156266
int m = 0;

src/io_engine_generic_unix.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@ namespace asyncpp::io::detail {
1414
endpoint socket_local_endpoint(socket_handle_t socket) override;
1515
endpoint socket_remote_endpoint(socket_handle_t socket) override;
1616
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
17+
void socket_multicast_join(socket_handle_t socket, address group, address iface) override;
18+
void socket_multicast_drop(socket_handle_t socket, address group, address iface) override;
19+
void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override;
20+
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
21+
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
1722
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
1823

1924
file_handle_t file_open(const char* filename, std::ios_base::openmode mode) override;
2025
void file_close(file_handle_t fd) override;
2126
uint64_t file_size(file_handle_t fd) override;
2227

28+
protected:
29+
address_type get_handle_type(socket_handle_t socket);
30+
2331
private:
2432
};
2533

src/io_engine_iocp.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ namespace asyncpp::io::detail {
8484
endpoint socket_local_endpoint(socket_handle_t socket) override;
8585
endpoint socket_remote_endpoint(socket_handle_t socket) override;
8686
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
87+
void socket_multicast_join(socket_handle_t socket, address group, address iface) override;
88+
void socket_multicast_drop(socket_handle_t socket, address group, address iface) override;
89+
void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override;
90+
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
91+
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
8792
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
8893
bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) override;
8994
bool enqueue_accept(socket_handle_t socket, completion_data* cd) override;
@@ -109,6 +114,8 @@ namespace asyncpp::io::detail {
109114
private:
110115
HANDLE m_completion_port = INVALID_HANDLE_VALUE;
111116
std::atomic<size_t> m_inflight_count{};
117+
118+
address_type get_handle_type(socket_handle_t socket);
112119
};
113120

114121
std::unique_ptr<io_engine> create_io_engine_iocp() { return std::make_unique<io_engine_iocp>(); }
@@ -345,6 +352,106 @@ namespace asyncpp::io::detail {
345352
if (res == SOCKET_ERROR) throw std::system_error(WSAGetLastError(), std::system_category());
346353
}
347354

355+
void io_engine_iocp::socket_multicast_join(socket_handle_t socket, address group, address iface) {
356+
if (group.type() != iface.type())
357+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
358+
"group and interface need to be of the same type");
359+
if (group.is_ipv4()) {
360+
struct ip_mreq mc_req {};
361+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
362+
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
363+
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
364+
sizeof(mc_req));
365+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
366+
} else if (group.is_ipv6()) {
367+
struct ipv6_mreq mc_req {};
368+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
369+
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
370+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
371+
sizeof(mc_req));
372+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
373+
} else {
374+
throw std::system_error(std::make_error_code(std::errc::not_supported),
375+
"multicast is only supported on IPv4/IPv6");
376+
}
377+
}
378+
379+
void io_engine_iocp::socket_multicast_drop(socket_handle_t socket, address group, address iface) {
380+
if (group.type() != iface.type())
381+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
382+
"group and interface need to be of the same type");
383+
if (group.is_ipv4()) {
384+
struct ip_mreq mc_req {};
385+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
386+
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
387+
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
388+
sizeof(mc_req));
389+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
390+
} else if (group.is_ipv6()) {
391+
struct ipv6_mreq mc_req {};
392+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
393+
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
394+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
395+
sizeof(mc_req));
396+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
397+
} else {
398+
throw std::system_error(std::make_error_code(std::errc::not_supported),
399+
"multicast is only supported on IPv4/IPv6");
400+
}
401+
}
402+
403+
void io_engine_iocp::socket_multicast_set_send_interface(socket_handle_t socket, address iface) {
404+
if (iface.is_ipv4()) {
405+
auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
406+
auto res =
407+
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
408+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
409+
} else if (iface.is_ipv6()) {
410+
auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
411+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<const char*>(&scope),
412+
sizeof(scope));
413+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
414+
} else {
415+
throw std::system_error(std::make_error_code(std::errc::not_supported),
416+
"multicast is only supported on IPv4/IPv6");
417+
}
418+
}
419+
420+
void io_engine_iocp::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
421+
auto type = get_handle_type(socket);
422+
if (ttl > (std::numeric_limits<int>::max)()) throw std::invalid_argument("ttl value out of range");
423+
int ittl = ttl;
424+
if (type == address_type::ipv4) {
425+
auto res =
426+
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<const char*>(&ittl), sizeof(ittl));
427+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
428+
} else if (type == address_type::ipv6) {
429+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<const char*>(&ittl),
430+
sizeof(ittl));
431+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
432+
} else {
433+
throw std::system_error(std::make_error_code(std::errc::not_supported),
434+
"multicast is only supported on IPv4/IPv6");
435+
}
436+
}
437+
438+
void io_engine_iocp::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
439+
auto type = get_handle_type(socket);
440+
int val = enabled ? 1 : 0;
441+
if (type == address_type::ipv4) {
442+
auto res =
443+
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<const char*>(&val), sizeof(val));
444+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
445+
} else if (type == address_type::ipv6) {
446+
auto res =
447+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<const char*>(&val), sizeof(val));
448+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
449+
} else {
450+
throw std::system_error(std::make_error_code(std::errc::not_supported),
451+
"multicast is only supported on IPv4/IPv6");
452+
}
453+
}
454+
348455
void io_engine_iocp::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
349456
int mode = 0;
350457
if (receive && send)
@@ -532,6 +639,18 @@ namespace asyncpp::io::detail {
532639
}
533640
}
534641

642+
address_type io_engine_iocp::get_handle_type(socket_handle_t socket) {
643+
WSAPROTOCOL_INFO info{};
644+
socklen_t length = sizeof(info);
645+
auto res = getsockopt(socket, SOL_SOCKET, SO_PROTOCOL_INFO, reinterpret_cast<char*>(&info), &length);
646+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "getsockopt failed");
647+
switch (info.iAddressFamily) {
648+
case AF_INET: return address_type::ipv4;
649+
case AF_INET6: return address_type::ipv6;
650+
default: throw std::logic_error("unknown socket type");
651+
}
652+
}
653+
535654
io_engine::file_handle_t io_engine_iocp::file_open(const char* filename, std::ios_base::openmode mode) {
536655
DWORD access_mode = 0;
537656
if ((mode & std::ios_base::in) == std::ios_base::in) access_mode |= GENERIC_READ;

src/io_engine_uring.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ namespace asyncpp::io::detail {
1717
#include <sys/time.h>
1818
#include <unistd.h>
1919

20+
#ifndef __NR_io_uring_register
21+
#ifdef __alpha__
22+
#define __NR_io_uring_register 537
23+
#else
24+
#define __NR_io_uring_register 427
25+
#endif
26+
#endif
27+
2028
namespace asyncpp::io::detail {
2129

2230
class io_engine_uring : public io_engine_generic_unix {

src/socket.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,43 @@ namespace asyncpp::io {
110110
m_io->engine()->socket_enable_broadcast(m_fd, enable);
111111
}
112112

113+
void socket::multicast_join(address group, address interface) {
114+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
115+
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
116+
m_io->engine()->socket_multicast_join(m_fd, group, interface);
117+
}
118+
119+
void socket::multicast_join(address group) {
120+
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
121+
return multicast_join(group, iface);
122+
}
123+
124+
void socket::multicast_drop(address group, address interface) {
125+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
126+
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
127+
m_io->engine()->socket_multicast_drop(m_fd, group, interface);
128+
}
129+
130+
void socket::multicast_drop(address group) {
131+
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
132+
return multicast_drop(group, iface);
133+
}
134+
135+
void socket::multicast_set_send_interface(address interface) {
136+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
137+
m_io->engine()->socket_multicast_set_send_interface(m_fd, interface);
138+
}
139+
140+
void socket::multicast_set_ttl(size_t ttl) {
141+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
142+
m_io->engine()->socket_multicast_set_ttl(m_fd, ttl);
143+
}
144+
145+
void socket::multicast_set_loopback(bool enabled) {
146+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
147+
m_io->engine()->socket_multicast_set_loopback(m_fd, enabled);
148+
}
149+
113150
void socket::close_send() {
114151
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
115152
m_io->engine()->socket_shutdown(m_fd, false, true);

0 commit comments

Comments
 (0)