Skip to content

Commit 67655d3

Browse files
c-p-i-ofacebook-github-bot
authored andcommitted
Back out "fbcode/gloo/transport/uv"
Summary: This code isn't used internally at Meta but is possibly used by open source community. Reviewed By: fduwjj Differential Revision: D65232750 fbshipit-source-id: 03a9c34288c8872ad94a94ce82e703f558e976af
1 parent 846c029 commit 67655d3

File tree

13 files changed

+2981
-0
lines changed

13 files changed

+2981
-0
lines changed

gloo/transport/uv/address.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright (c) 2019-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#include <gloo/transport/uv/address.h>
10+
11+
#include <string.h>
12+
13+
#include <uv.h>
14+
15+
#include <gloo/common/logging.h>
16+
17+
namespace gloo {
18+
namespace transport {
19+
namespace uv {
20+
21+
Address::Address(struct sockaddr_storage ss, sequence_type seq) {
22+
impl_.ss = std::move(ss);
23+
impl_.seq = seq;
24+
}
25+
26+
Address::Address(const std::vector<char>& bytes) {
27+
GLOO_ENFORCE_EQ(sizeof(impl_), bytes.size());
28+
memcpy(&impl_, bytes.data(), sizeof(impl_));
29+
}
30+
31+
Address::Address(const Address& other)
32+
: Address(other.impl_.ss, other.impl_.seq) {}
33+
34+
std::vector<char> Address::bytes() const {
35+
std::lock_guard<std::mutex> lock(m_);
36+
std::vector<char> bytes(sizeof(impl_));
37+
memcpy(bytes.data(), &impl_, sizeof(impl_));
38+
return bytes;
39+
}
40+
41+
Address& Address::operator=(Address&& other) {
42+
std::lock_guard<std::mutex> lock(m_);
43+
impl_.ss = std::move(other.impl_.ss);
44+
impl_.seq = other.impl_.seq;
45+
return *this;
46+
}
47+
48+
Address& Address::operator=(const Address& other) {
49+
std::lock_guard<std::mutex> lock(m_);
50+
impl_.ss = other.impl_.ss;
51+
impl_.seq = other.impl_.seq;
52+
return *this;
53+
}
54+
55+
std::string Address::str() const {
56+
char str[INET6_ADDRSTRLEN + 128];
57+
int port = 0;
58+
59+
str[0] = '[';
60+
if (impl_.ss.ss_family == AF_INET) {
61+
auto in = (struct sockaddr_in*)&impl_.ss;
62+
uv_ip4_name(in, str + 1, sizeof(str) - 1);
63+
port = in->sin_port;
64+
} else if (impl_.ss.ss_family == AF_INET6) {
65+
auto in6 = (struct sockaddr_in6*)&impl_.ss;
66+
uv_ip6_name(in6, str + 1, sizeof(str) - 1);
67+
port = in6->sin6_port;
68+
} else {
69+
snprintf(str + 1, sizeof(str) - 1, "none");
70+
}
71+
72+
size_t len = strlen(str);
73+
if (port > 0) {
74+
len += snprintf(str + len, sizeof(str) - len, "]:%d", port);
75+
} else {
76+
len += snprintf(str + len, sizeof(str) - len, "]");
77+
}
78+
79+
// Append sequence number if one is set.
80+
if (impl_.seq != SIZE_MAX) {
81+
len += snprintf(str + len, sizeof(str) - len, "$%d", impl_.seq);
82+
}
83+
84+
return str;
85+
}
86+
87+
} // namespace uv
88+
} // namespace transport
89+
} // namespace gloo

gloo/transport/uv/address.h

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Copyright (c) 2019-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#pragma once
10+
11+
#ifdef _WIN32
12+
#include "gloo/common/win.h"
13+
#else
14+
#include <sys/socket.h>
15+
#endif
16+
17+
#include <mutex>
18+
19+
#include <gloo/transport/address.h>
20+
21+
namespace gloo {
22+
namespace transport {
23+
namespace uv {
24+
25+
class Address : public ::gloo::transport::Address {
26+
public:
27+
using sequence_type = int;
28+
29+
Address() {}
30+
31+
Address(struct sockaddr_storage ss, sequence_type seq = -1);
32+
33+
explicit Address(const std::vector<char>&);
34+
35+
Address& operator=(Address&& other);
36+
Address& operator=(const Address& other);
37+
Address(const Address& other);
38+
39+
virtual std::vector<char> bytes() const override;
40+
41+
virtual std::string str() const override;
42+
43+
const struct sockaddr_storage& getSockaddr() const {
44+
return impl_.ss;
45+
}
46+
47+
sequence_type getSeq() const {
48+
return impl_.seq;
49+
}
50+
51+
Address withSeq(sequence_type seq) const {
52+
return Address(impl_.ss, seq);
53+
}
54+
55+
protected:
56+
// Encapsulate fields such that it is trivially copyable. This class
57+
// is not trivially copyable itself (because it is a subclass?).
58+
struct Impl {
59+
// IP address of the listening socket.
60+
struct sockaddr_storage ss;
61+
62+
// Sequence number of this address.
63+
// If this is equal to -1, the address is assumed to
64+
// represent the listening socket of a device. The sequence number
65+
// must be set before it can be used by a pair.
66+
sequence_type seq = -1;
67+
};
68+
69+
static_assert(std::is_trivially_copyable<Impl>::value, "!");
70+
71+
Impl impl_;
72+
mutable std::mutex m_;
73+
};
74+
75+
} // namespace uv
76+
} // namespace transport
77+
} // namespace gloo

gloo/transport/uv/common.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright (c) 2019-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#pragma once
10+
11+
#include <stdio.h>
12+
13+
#include <uv.h>
14+
15+
#define UV_CHECK(rv, prefix) \
16+
{ \
17+
if ((rv) != 0) { \
18+
fprintf( \
19+
stderr, \
20+
"[%s:%d] %s: %s\n", \
21+
__FILE__, \
22+
__LINE__, \
23+
prefix, \
24+
uv_strerror(rv)); \
25+
} \
26+
} \
27+
while (0) \
28+
;
29+
30+
#define FAIL(...) \
31+
{ \
32+
const auto __str = ::gloo::MakeString(__VA_ARGS__); \
33+
fprintf(stderr, "[%s:%d] %s\n", __FILE__, __LINE__, __str.c_str()); \
34+
abort(); \
35+
} \
36+
while (0) \
37+
;

gloo/transport/uv/context.cc

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Copyright (c) 2019-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#include <gloo/transport/uv/context.h>
10+
11+
#include <gloo/common/error.h>
12+
#include <gloo/common/logging.h>
13+
#include <gloo/transport/uv/device.h>
14+
#include <gloo/transport/uv/pair.h>
15+
#include <gloo/transport/uv/unbound_buffer.h>
16+
17+
namespace gloo {
18+
namespace transport {
19+
namespace uv {
20+
21+
Context::Context(std::shared_ptr<Device> device, int rank, int size)
22+
: ::gloo::transport::Context(rank, size), device_(std::move(device)) {}
23+
24+
Context::~Context() {
25+
// Pairs refer to device by raw pointer.
26+
// Ensure they are destructed before the device.
27+
pairs_.clear();
28+
device_.reset();
29+
}
30+
31+
std::unique_ptr<transport::Pair>& Context::createPair(int rank) {
32+
pairs_[rank] = std::unique_ptr<transport::Pair>(
33+
new uv::Pair(this, device_.get(), rank, getTimeout()));
34+
return pairs_[rank];
35+
}
36+
37+
std::unique_ptr<transport::UnboundBuffer> Context::createUnboundBuffer(
38+
void* ptr,
39+
size_t size) {
40+
auto buf = new uv::UnboundBuffer(shared_from_this(), ptr, size);
41+
return std::unique_ptr<transport::UnboundBuffer>(buf);
42+
}
43+
44+
void Context::recvFromAny(
45+
UnboundBuffer* buf,
46+
uint64_t slot,
47+
size_t offset,
48+
size_t nbytes,
49+
std::vector<int> srcRanks) {
50+
for (;;) {
51+
// Find rank of pair we can attempt a recv from
52+
auto rank = recvFromAnyFindRank(buf, slot, offset, nbytes, srcRanks);
53+
if (rank == -1) {
54+
return;
55+
}
56+
// Try recv from returned rank
57+
auto ptr = pairs_[rank].get();
58+
GLOO_ENFORCE(ptr != nullptr);
59+
auto pair = dynamic_cast<Pair*>(ptr);
60+
GLOO_ENFORCE(pair != nullptr);
61+
if (pair->tryRecv(buf, slot, offset, nbytes)) {
62+
return;
63+
}
64+
}
65+
}
66+
67+
int Context::recvFromAnyFindRank(
68+
UnboundBuffer* buf,
69+
uint64_t slot,
70+
size_t offset,
71+
size_t nbytes,
72+
const std::vector<int>& srcRanks) {
73+
std::unique_lock<std::mutex> lock(mutex_);
74+
75+
// See if there is a remote pending send that can fulfill this recv.
76+
auto it = findPendingOperations(slot);
77+
if (it != pendingOperations_.end()) {
78+
auto& pendingOperation = *it;
79+
80+
// Out of all remote pending sends, find the first one
81+
// that exists in the set of eligible ranks.
82+
for (const auto rank : pendingOperation.getSendList()) {
83+
for (const auto srcRank : srcRanks) {
84+
if (rank == srcRank) {
85+
// We've found a rank that could fulfill this recv.
86+
//
87+
// The caller of this function will try and attempt a recv,
88+
// which will remove this remote pending send operation,
89+
// if it's still around.
90+
//
91+
return rank;
92+
}
93+
}
94+
}
95+
}
96+
97+
// No candidates; register buffer for recv
98+
pendingRecv_[slot].emplace_back(
99+
buf->getWeakNonOwningPtr(),
100+
offset,
101+
nbytes,
102+
std::unordered_set<int>(srcRanks.begin(), srcRanks.end()));
103+
return -1;
104+
}
105+
106+
// Allowed to be called only by ContextMutator::findRecvFromAny,
107+
// where the context lock is already held.
108+
bool Context::findRecvFromAny(
109+
uint64_t slot,
110+
int rank,
111+
WeakNonOwningPtr<UnboundBuffer>* buf,
112+
size_t* offset,
113+
size_t* nbytes) {
114+
// See if there is a pending recv for this slot.
115+
auto pit = pendingRecv_.find(slot);
116+
if (pit != pendingRecv_.end()) {
117+
auto& recvs = pit->second;
118+
119+
// Iterate over available buffers to find a match.
120+
for (auto rit = recvs.begin(); rit != recvs.end(); rit++) {
121+
const auto& ranks = std::get<3>(*rit);
122+
123+
// If the rank of this peer is in the set of acceptable ranks for
124+
// this slot we can proceed and return the buffer to recv into.
125+
if (ranks.count(rank) > 0) {
126+
// Capture values to return.
127+
*buf = std::get<0>(*rit);
128+
*offset = std::get<1>(*rit);
129+
*nbytes = std::get<2>(*rit);
130+
// Cleanup.
131+
recvs.erase(rit);
132+
if (recvs.empty()) {
133+
pendingRecv_.erase(pit);
134+
}
135+
return true;
136+
}
137+
}
138+
}
139+
140+
return false;
141+
}
142+
143+
} // namespace uv
144+
} // namespace transport
145+
} // namespace gloo

0 commit comments

Comments
 (0)