Skip to content

Commit 39a1608

Browse files
committed
✨ Initial IP multicast support
1 parent 8b25976 commit 39a1608

File tree

6 files changed

+279
-4
lines changed

6 files changed

+279
-4
lines changed

include/asyncpp/io/detail/io_engine.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ namespace asyncpp::io::detail {
7979
virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0;
8080
virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0;
8181
virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0;
82+
virtual void socket_multicast_join(socket_handle_t socket, address group, address interface) = 0;
83+
virtual void socket_multicast_drop(socket_handle_t socket, address group, address interface) = 0;
84+
virtual void socket_multicast_set_send_interface(socket_handle_t socket, address interface) = 0;
85+
virtual void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) = 0;
86+
virtual void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) = 0;
8287
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
8388
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
8489
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;

include/asyncpp/io/socket.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ namespace asyncpp::io {
9797
void listen(std::uint32_t backlog = 0);
9898

9999
void allow_broadcast(bool enable);
100+
void multicast_join(address group, address interface);
101+
void multicast_join(address group);
102+
void multicast_drop(address group, address interface);
103+
void multicast_drop(address group);
104+
void multicast_set_send_interface(address interface);
105+
void multicast_set_ttl(size_t ttl);
106+
void multicast_set_loopback(bool enabled);
100107

101108
[[nodiscard]] detail::io_engine::socket_handle_t native_handle() const noexcept { return m_fd; }
102109
[[nodiscard]] detail::io_engine::socket_handle_t release() noexcept {
@@ -620,7 +627,7 @@ namespace asyncpp::io {
620627
if (that->result)
621628
that->real_cb(that->result);
622629
else
623-
that->real_cb(socket::from_fd(that->service(), that->result_handle));
630+
that->real_cb(socket::from_fd(that->service, that->result_handle));
624631

625632
delete that;
626633
};

src/io_engine_generic_unix.cpp

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,99 @@ namespace asyncpp::io::detail {
137137
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
138138
}
139139

140+
void io_engine_generic_unix::socket_multicast_join(socket_handle_t socket, address group, address interface) {
141+
if (group.type() != interface.type())
142+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
143+
"group and interface need to be of the same type");
144+
if (group.is_ipv4()) {
145+
struct ip_mreq mc_req{};
146+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
147+
mc_req.imr_interface = interface.ipv4().to_sockaddr_in().first.sin_addr;
148+
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
149+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
150+
} else if (group.is_ipv6()) {
151+
struct ipv6_mreq mc_req{};
152+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
153+
mc_req.ipv6mr_interface = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
154+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
155+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
156+
} else {
157+
throw std::system_error(std::make_error_code(std::errc::not_supported),
158+
"multicast is only supported on IPv4/IPv6");
159+
}
160+
}
161+
162+
void io_engine_generic_unix::socket_multicast_drop(socket_handle_t socket, address group, address interface) {
163+
if (group.type() != interface.type())
164+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
165+
"group and interface need to be of the same type");
166+
if (group.is_ipv4()) {
167+
struct ip_mreq mc_req{};
168+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
169+
mc_req.imr_interface = interface.ipv4().to_sockaddr_in().first.sin_addr;
170+
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
171+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
172+
} else if (group.is_ipv6()) {
173+
struct ipv6_mreq mc_req{};
174+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
175+
mc_req.ipv6mr_interface = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
176+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
177+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
178+
} else {
179+
throw std::system_error(std::make_error_code(std::errc::not_supported),
180+
"multicast is only supported on IPv4/IPv6");
181+
}
182+
}
183+
184+
void io_engine_generic_unix::socket_multicast_set_send_interface(socket_handle_t socket, address interface) {
185+
if (interface.is_ipv4()) {
186+
auto addr = interface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
187+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<char*>(&addr), sizeof(addr));
188+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
189+
} else if (interface.is_ipv6()) {
190+
auto scope = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
191+
auto res =
192+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<char*>(&scope), sizeof(scope));
193+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
194+
} else {
195+
throw std::system_error(std::make_error_code(std::errc::not_supported),
196+
"multicast is only supported on IPv4/IPv6");
197+
}
198+
}
199+
200+
void io_engine_generic_unix::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
201+
auto type = get_handle_type(socket);
202+
if (ttl > std::numeric_limits<int>::max()) throw std::invalid_argument("ttl value out of range");
203+
int ittl = ttl;
204+
if (type == address_type::ipv4) {
205+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<char*>(&ittl), sizeof(ittl));
206+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
207+
} else if (type == address_type::ipv6) {
208+
auto res =
209+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<char*>(&ittl), sizeof(ittl));
210+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
211+
} else {
212+
throw std::system_error(std::make_error_code(std::errc::not_supported),
213+
"multicast is only supported on IPv4/IPv6");
214+
}
215+
}
216+
217+
void io_engine_generic_unix::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
218+
auto type = get_handle_type(socket);
219+
int val = enabled ? 1 : 0;
220+
if (type == address_type::ipv4) {
221+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
222+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
223+
} else if (type == address_type::ipv6) {
224+
auto res =
225+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
226+
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
227+
} else {
228+
throw std::system_error(std::make_error_code(std::errc::not_supported),
229+
"multicast is only supported on IPv4/IPv6");
230+
}
231+
}
232+
140233
void io_engine_generic_unix::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
141234
int mode = 0;
142235
if (receive && send)
@@ -151,6 +244,19 @@ namespace asyncpp::io::detail {
151244
if (res < 0 && errno != ENOTCONN) throw std::system_error(errno, std::system_category(), "shutdown failed");
152245
}
153246

247+
address_type io_engine_generic_unix::get_handle_type(socket_handle_t socket) {
248+
int type = -1;
249+
socklen_t length = sizeof(int);
250+
auto res = getsockopt(socket, SOL_SOCKET, SO_TYPE, &type, &length);
251+
if (res < 0) throw std::system_error(errno, std::system_category(), "getsockopt failed");
252+
switch (type) {
253+
case AF_INET: return address_type::ipv4;
254+
case AF_INET6: return address_type::ipv6;
255+
case AF_UNIX: return address_type::uds;
256+
default: throw std::logic_error("unknown socket type");
257+
}
258+
}
259+
154260
io_engine::file_handle_t io_engine_generic_unix::file_open(const char* filename, std::ios_base::openmode mode) {
155261
if ((mode & std::ios_base::ate) == std::ios_base::ate) throw std::logic_error("unsupported flag");
156262
int m = 0;
@@ -173,12 +279,12 @@ namespace asyncpp::io::detail {
173279

174280
uint64_t io_engine_generic_unix::file_size(file_handle_t fd) {
175281
#ifdef __APPLE__
176-
struct stat info {};
282+
struct stat info{};
177283
auto res = fstat(fd, &info);
178284
if (res < 0) throw std::system_error(errno, std::system_category());
179285
return info.st_size;
180286
#else
181-
struct stat64 info {};
287+
struct stat64 info{};
182288
auto res = fstat64(fd, &info);
183289
if (res < 0) throw std::system_error(errno, std::system_category());
184290
return info.st_size;

src/io_engine_generic_unix.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@ namespace asyncpp::io::detail {
1414
endpoint socket_local_endpoint(socket_handle_t socket) override;
1515
endpoint socket_remote_endpoint(socket_handle_t socket) override;
1616
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
17+
void socket_multicast_join(socket_handle_t socket, address group, address interface) override;
18+
void socket_multicast_drop(socket_handle_t socket, address group, address interface) override;
19+
void socket_multicast_set_send_interface(socket_handle_t socket, address interface) override;
20+
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
21+
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
1722
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
1823

1924
file_handle_t file_open(const char* filename, std::ios_base::openmode mode) override;
2025
void file_close(file_handle_t fd) override;
2126
uint64_t file_size(file_handle_t fd) override;
2227

28+
protected:
29+
address_type get_handle_type(socket_handle_t socket);
30+
2331
private:
2432
};
2533

src/io_engine_iocp.cpp

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ namespace asyncpp::io::detail {
8484
endpoint socket_local_endpoint(socket_handle_t socket) override;
8585
endpoint socket_remote_endpoint(socket_handle_t socket) override;
8686
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
87+
void socket_multicast_join(socket_handle_t socket, address group, address interface) override;
88+
void socket_multicast_drop(socket_handle_t socket, address group, address interface) override;
89+
void socket_multicast_set_send_interface(socket_handle_t socket, address interface) override;
90+
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
91+
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
8792
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
8893
bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) override;
8994
bool enqueue_accept(socket_handle_t socket, completion_data* cd) override;
@@ -109,6 +114,8 @@ namespace asyncpp::io::detail {
109114
private:
110115
HANDLE m_completion_port = INVALID_HANDLE_VALUE;
111116
std::atomic<size_t> m_inflight_count{};
117+
118+
address_type get_handle_type(socket_handle_t socket);
112119
};
113120

114121
std::unique_ptr<io_engine> create_io_engine_iocp() { return std::make_unique<io_engine_iocp>(); }
@@ -245,7 +252,7 @@ namespace asyncpp::io::detail {
245252
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse, (socklen_t)sizeof(reuse)) == -1)
246253
close_and_throw("setsockopt", listener);
247254

248-
struct sockaddr_in inaddr {};
255+
struct sockaddr_in inaddr{};
249256
inaddr.sin_family = AF_INET;
250257
inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
251258
if (bind(listener, reinterpret_cast<sockaddr*>(&inaddr), sizeof(inaddr)) == SOCKET_ERROR)
@@ -345,6 +352,99 @@ namespace asyncpp::io::detail {
345352
if (res == SOCKET_ERROR) throw std::system_error(WSAGetLastError(), std::system_category());
346353
}
347354

355+
void io_engine_iocp::socket_multicast_join(socket_handle_t socket, address group, address interface) {
356+
if (group.type() != interface.type())
357+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
358+
"group and interface need to be of the same type");
359+
if (group.is_ipv4()) {
360+
struct ip_mreq mc_req{};
361+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
362+
mc_req.imr_interface = interface.ipv4().to_sockaddr_in().first.sin_addr;
363+
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
364+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
365+
} else if (group.is_ipv6()) {
366+
struct ipv6_mreq mc_req{};
367+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
368+
mc_req.ipv6mr_interface = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
369+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
370+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
371+
} else {
372+
throw std::system_error(std::make_error_code(std::errc::not_supported),
373+
"multicast is only supported on IPv4/IPv6");
374+
}
375+
}
376+
377+
void io_engine_iocp::socket_multicast_drop(socket_handle_t socket, address group, address interface) {
378+
if (group.type() != interface.type())
379+
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
380+
"group and interface need to be of the same type");
381+
if (group.is_ipv4()) {
382+
struct ip_mreq mc_req{};
383+
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
384+
mc_req.imr_interface = interface.ipv4().to_sockaddr_in().first.sin_addr;
385+
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
386+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
387+
} else if (group.is_ipv6()) {
388+
struct ipv6_mreq mc_req{};
389+
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
390+
mc_req.ipv6mr_interface = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
391+
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
392+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
393+
} else {
394+
throw std::system_error(std::make_error_code(std::errc::not_supported),
395+
"multicast is only supported on IPv4/IPv6");
396+
}
397+
}
398+
399+
void io_engine_iocp::socket_multicast_set_send_interface(socket_handle_t socket, address interface) {
400+
if (interface.is_ipv4()) {
401+
auto addr = interface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
402+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<char*>(&addr), sizeof(addr));
403+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
404+
} else if (interface.is_ipv6()) {
405+
auto scope = interface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
406+
auto res =
407+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<char*>(&scope), sizeof(scope));
408+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
409+
} else {
410+
throw std::system_error(std::make_error_code(std::errc::not_supported),
411+
"multicast is only supported on IPv4/IPv6");
412+
}
413+
}
414+
415+
void io_engine_iocp::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
416+
auto type = get_handle_type(socket);
417+
if (ttl > std::numeric_limits<int>::max()) throw std::invalid_argument("ttl value out of range");
418+
int ittl = ttl;
419+
if (type == address_type::ipv4) {
420+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<char*>(&ittl), sizeof(ittl));
421+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
422+
} else if (type == address_type::ipv6) {
423+
auto res =
424+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<char*>(&ittl), sizeof(ittl));
425+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
426+
} else {
427+
throw std::system_error(std::make_error_code(std::errc::not_supported),
428+
"multicast is only supported on IPv4/IPv6");
429+
}
430+
}
431+
432+
void io_engine_iocp::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
433+
auto type = get_handle_type(socket);
434+
int val = enabled ? 1 : 0;
435+
if (type == address_type::ipv4) {
436+
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
437+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
438+
} else if (type == address_type::ipv6) {
439+
auto res =
440+
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
441+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
442+
} else {
443+
throw std::system_error(std::make_error_code(std::errc::not_supported),
444+
"multicast is only supported on IPv4/IPv6");
445+
}
446+
}
447+
348448
void io_engine_iocp::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
349449
int mode = 0;
350450
if (receive && send)
@@ -532,6 +632,18 @@ namespace asyncpp::io::detail {
532632
}
533633
}
534634

635+
address_type io_engine_iocp::get_handle_type(socket_handle_t socket) {
636+
WSAPROTOCOL_INFO info{};
637+
socklen_t length = sizeof(info);
638+
auto res = getsockopt(socket, SOL_SOCKET, SO_PROTOCOL_INFO, &info, &length);
639+
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "getsockopt failed");
640+
switch (info.iAddressFamily) {
641+
case AF_INET: return address_type::ipv4;
642+
case AF_INET6: return address_type::ipv6;
643+
default: throw std::logic_error("unknown socket type");
644+
}
645+
}
646+
535647
io_engine::file_handle_t io_engine_iocp::file_open(const char* filename, std::ios_base::openmode mode) {
536648
DWORD access_mode = 0;
537649
if ((mode & std::ios_base::in) == std::ios_base::in) access_mode |= GENERIC_READ;

src/socket.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,43 @@ namespace asyncpp::io {
110110
m_io->engine()->socket_enable_broadcast(m_fd, enable);
111111
}
112112

113+
void socket::multicast_join(address group, address interface) {
114+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
115+
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
116+
m_io->engine()->socket_multicast_join(m_fd, group, interface);
117+
}
118+
119+
void socket::multicast_join(address group) {
120+
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
121+
return multicast_join(group, iface);
122+
}
123+
124+
void socket::multicast_drop(address group, address interface) {
125+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
126+
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
127+
m_io->engine()->socket_multicast_drop(m_fd, group, interface);
128+
}
129+
130+
void socket::multicast_drop(address group) {
131+
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
132+
return multicast_drop(group, iface);
133+
}
134+
135+
void socket::multicast_set_send_interface(address interface) {
136+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
137+
m_io->engine()->socket_multicast_set_send_interface(m_fd, interface);
138+
}
139+
140+
void socket::multicast_set_ttl(size_t ttl) {
141+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
142+
m_io->engine()->socket_multicast_set_ttl(m_fd, ttl);
143+
}
144+
145+
void socket::multicast_set_loopback(bool enabled) {
146+
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
147+
m_io->engine()->socket_multicast_set_loopback(m_fd, enabled);
148+
}
149+
113150
void socket::close_send() {
114151
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
115152
m_io->engine()->socket_shutdown(m_fd, false, true);

0 commit comments

Comments
 (0)