Skip to content

Commit 31b1f02

Browse files
xunnanxufacebook-github-bot
authored andcommitted
Use a single listening socket per device (#361)
Summary: Pull Request resolved: #361 For listening port, the mesh connection currently leads to O(mn) port usage where m is the num of ranks per host and n is the total num of ranks. Even when `SO_REUSEADDR` is set, this only allows those used by sockets in `ESTABLISHED` or `TIME-WAIT` state to be reused. Hence in large training jobs, or even testing env where a lot of processes are packed on the same machine, we would soon run out of ephemeral ports (e.g. a local 200-process would need 40k ephemeral ports just for listening which is obviously very inefficient and most likely outside the range of allowed ephemeral ports in linux systems, which is typically around 32K). We fix this by using a single listening socket per device instance instead of using one per pair. Connections to all pair instances are multiplexed on a single listening socket by adding a sequence number to the address struct. For ranks packed on the same host with the same interface address, we use a seq number to differentiate between those so each would have a unique `Address` object assoc. During actual connection, each pair would have one side as `Initiator` and the other as `Listener`. We assign the roles purely based on arbitrary address comparison logic. The exact result doesn't matter since TCP is bidirectional, so long as they are consistent for a pair. The initiator will connect to the listed address and write a few bytes containing the sequence number. The listener waits for a connection to the shared listening socket where it can read that same sequence number. Once the listener side establishes the connection, that `Pair` would get promoted via the deferred callback to handle the actual connection post rendezvous. Credit to original author: Pieter Noordhuis pietern This diff cleans up a few things and resolves conflicts. Reviewed By: bmaurer Differential Revision: D45437709 fbshipit-source-id: 72446f7765b701bf5553ed3a73d46facde0a537f
1 parent d96897b commit 31b1f02

File tree

9 files changed

+596
-254
lines changed

9 files changed

+596
-254
lines changed

gloo/transport/tcp/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ else()
77
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
88
"${CMAKE_CURRENT_SOURCE_DIR}/device.cc"
99
"${CMAKE_CURRENT_SOURCE_DIR}/error.cc"
10+
"${CMAKE_CURRENT_SOURCE_DIR}/listener.cc"
1011
"${CMAKE_CURRENT_SOURCE_DIR}/loop.cc"
1112
"${CMAKE_CURRENT_SOURCE_DIR}/pair.cc"
1213
"${CMAKE_CURRENT_SOURCE_DIR}/socket.cc"
@@ -19,6 +20,8 @@ else()
1920
"${CMAKE_CURRENT_SOURCE_DIR}/context.h"
2021
"${CMAKE_CURRENT_SOURCE_DIR}/device.h"
2122
"${CMAKE_CURRENT_SOURCE_DIR}/error.h"
23+
"${CMAKE_CURRENT_SOURCE_DIR}/helpers.h"
24+
"${CMAKE_CURRENT_SOURCE_DIR}/listener.h"
2225
"${CMAKE_CURRENT_SOURCE_DIR}/loop.h"
2326
"${CMAKE_CURRENT_SOURCE_DIR}/pair.h"
2427
"${CMAKE_CURRENT_SOURCE_DIR}/socket.h"

gloo/transport/tcp/device.cc

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "gloo/common/logging.h"
2020
#include "gloo/common/error.h"
2121
#include "gloo/transport/tcp/context.h"
22+
#include "gloo/transport/tcp/helpers.h"
2223
#include "gloo/transport/tcp/pair.h"
2324

2425
namespace gloo {
@@ -217,6 +218,7 @@ const std::string sockaddrToInterfaceName(const struct attr& attr) {
217218
Device::Device(const struct attr& attr)
218219
: attr_(attr),
219220
loop_(std::make_shared<Loop>()),
221+
listener_(std::make_shared<Listener>(loop_, attr)),
220222
interfaceName_(sockaddrToInterfaceName(attr_)),
221223
interfaceSpeedMbps_(getInterfaceSpeedByName(interfaceName_)),
222224
pciBusID_(interfaceToBusID(interfaceName_)) {
@@ -257,6 +259,105 @@ void Device::unregisterDescriptor(int fd, Handler* h) {
257259
loop_->unregisterDescriptor(fd, h);
258260
}
259261

262+
Address Device::nextAddress() {
263+
return listener_->nextAddress();
264+
}
265+
266+
bool Device::isInitiator(
267+
const Address& local,
268+
const Address& remote) const {
269+
int rv = 0;
270+
// The remote side of a pair will be called with the same
271+
// addresses, but in reverse. There should only be a single
272+
// connection between the two, so we pick one side as the listener
273+
// and the other side as the connector.
274+
const auto& ss1 = local.getSockaddr();
275+
const auto& ss2 = remote.getSockaddr();
276+
GLOO_ENFORCE_EQ(ss1.ss_family, ss2.ss_family);
277+
const int family = ss1.ss_family;
278+
if (family == AF_INET) {
279+
const struct sockaddr_in* sa = (struct sockaddr_in*)&ss1;
280+
const struct sockaddr_in* sb = (struct sockaddr_in*)&ss2;
281+
rv = memcmp(&sa->sin_addr, &sb->sin_addr, sizeof(struct in_addr));
282+
if (rv == 0) {
283+
rv = sa->sin_port - sb->sin_port;
284+
}
285+
} else if (family == AF_INET6) {
286+
const struct sockaddr_in6* sa = (struct sockaddr_in6*)&ss1;
287+
const struct sockaddr_in6* sb = (struct sockaddr_in6*)&ss2;
288+
rv = memcmp(&sa->sin6_addr, &sb->sin6_addr, sizeof(struct in6_addr));
289+
if (rv == 0) {
290+
rv = sa->sin6_port - sb->sin6_port;
291+
}
292+
} else {
293+
GLOO_ENFORCE(false, "Unknown address family: ", family);
294+
}
295+
296+
// If both sides of the pair use the same address and port, they are
297+
// sharing the same device instance. This happens in tests. Compare
298+
// sequence number to allow pairs to connect.
299+
if (rv == 0) {
300+
rv = local.getSeq() - remote.getSeq();
301+
}
302+
GLOO_ENFORCE_NE(rv, 0, "Cannot connect to self");
303+
return rv > 0;
304+
}
305+
306+
void Device::connect(
307+
const Address& local,
308+
const Address& remote,
309+
std::chrono::milliseconds timeout,
310+
connect_callback_t fn) {
311+
auto initiator = isInitiator(local, remote);
312+
313+
if (initiator) {
314+
connectAsInitiator(remote, timeout, std::move(fn));
315+
return;
316+
}
317+
connectAsListener(local, timeout, std::move(fn));
318+
}
319+
320+
// Connecting as listener is passive.
321+
//
322+
// Register the connect callback to be executed when the other side of
323+
// the pair has connected and identified itself as destined for this
324+
// address. To do so, we register the callback for the sequence number
325+
// associated with the address. If this connection already exists,
326+
// deal with it here.
327+
//
328+
void Device::connectAsListener(
329+
const Address& local,
330+
std::chrono::milliseconds /* unused */,
331+
connect_callback_t fn) {
332+
// TODO(pietern): Use timeout.
333+
listener_->waitForConnection(local.getSeq(), std::move(fn));
334+
}
335+
336+
// Connecting as initiator is active.
337+
//
338+
// The connect callback is fired when the connection to the other side
339+
// of the pair has been made, and the sequence number for this
340+
// connection has been written. If an error occurs at any time, the
341+
// callback is called with an associated error event.
342+
//
343+
void Device::connectAsInitiator(
344+
const Address& remote,
345+
std::chrono::milliseconds /* unused */,
346+
connect_callback_t fn) {
347+
const auto& sockaddr = remote.getSockaddr();
348+
349+
// Create new socket to connect to peer.
350+
auto socket = Socket::createForFamily(sockaddr.ss_family);
351+
socket->reuseAddr(true);
352+
socket->noDelay(true);
353+
socket->connect(sockaddr);
354+
355+
// Write sequence number for peer to new socket.
356+
// TODO(pietern): Use timeout.
357+
write<sequence_number_t>(
358+
loop_, std::move(socket), remote.getSeq(), std::move(fn));
359+
}
360+
260361
} // namespace tcp
261362
} // namespace transport
262363
} // namespace gloo

gloo/transport/tcp/device.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
#include <gloo/transport/device.h>
1919
#include <gloo/transport/tcp/attr.h>
20+
#include <gloo/transport/tcp/error.h>
21+
#include <gloo/transport/tcp/listener.h>
2022
#include <gloo/transport/tcp/loop.h>
23+
#include <gloo/transport/tcp/socket.h>
2124

2225
namespace gloo {
2326
namespace transport {
@@ -50,14 +53,61 @@ class Device : public ::gloo::transport::Device,
5053
void registerDescriptor(int fd, int events, Handler* h);
5154
void unregisterDescriptor(int fd, Handler* h);
5255

56+
// TCP is bidirectional so when we connect two ends of a pair,
57+
// one side is the connection initiator and the other is the listener.
58+
bool isInitiator(
59+
const Address& local,
60+
const Address& remote) const;
61+
5362
protected:
5463
const struct attr attr_;
5564

65+
// Return a new `Address` instance.
66+
//
67+
// This is called by the constructor of the `Pair` class. It gives
68+
// the pair a uniquely identifying address even though the device
69+
// uses a shared listening socket.
70+
//
71+
Address nextAddress();
72+
73+
// Connect a pair to a remote.
74+
//
75+
// This is performed by the device instance because we use a single
76+
// listening socket for all inbound pair connections.
77+
//
78+
// Matching these connections with pairs is done with a handshake.
79+
// The remote side of the connection writes a sequence number (see
80+
// `Address::sequence_t`) to the stream that identifies the pair
81+
// it wants to connect to. On the local side, this sequence number
82+
// is read and used as key in a map with callbacks. If the callback
83+
// is found, it is called. If the callback is not found, the
84+
// connection is cached in a map, using the sequence number.
85+
//
86+
using connect_callback_t =
87+
std::function<void(std::shared_ptr<Socket> socket, Error error)>;
88+
89+
void connect(
90+
const Address& local,
91+
const Address& remote,
92+
std::chrono::milliseconds timeout,
93+
connect_callback_t fn);
94+
95+
void connectAsListener(
96+
const Address& local,
97+
std::chrono::milliseconds timeout,
98+
connect_callback_t fn);
99+
100+
void connectAsInitiator(
101+
const Address& remote,
102+
std::chrono::milliseconds timeout,
103+
connect_callback_t fn);
104+
56105
friend class Pair;
57106
friend class Buffer;
58107

59108
private:
60109
std::shared_ptr<Loop> loop_;
110+
std::shared_ptr<Listener> listener_;
61111

62112
std::string interfaceName_;
63113
int interfaceSpeedMbps_;

gloo/transport/tcp/helpers.h

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/**
2+
* Copyright (c) 2017-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#pragma once
10+
11+
#include <functional>
12+
#include <memory>
13+
14+
#include <gloo/transport/tcp/error.h>
15+
#include <gloo/transport/tcp/loop.h>
16+
#include <gloo/transport/tcp/socket.h>
17+
18+
namespace gloo {
19+
namespace transport {
20+
namespace tcp {
21+
22+
// ReadValueOperation asynchronously reads a value of type T from the
23+
// socket specified at construction. Upon completion or error, the
24+
// callback is called. Its lifetime is coupled with completion of the
25+
// operation, so the called doesn't need to hold on to the instance.
26+
// It does so by storing a shared_ptr to itself (effectively a leak)
27+
// until the event loop calls back.
28+
template <typename T>
29+
class ReadValueOperation final
30+
: public Handler,
31+
public std::enable_shared_from_this<ReadValueOperation<T>> {
32+
public:
33+
using callback_t =
34+
std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
35+
36+
ReadValueOperation(
37+
std::shared_ptr<Loop> loop,
38+
std::shared_ptr<Socket> socket,
39+
callback_t fn)
40+
: loop_(std::move(loop)),
41+
socket_(std::move(socket)),
42+
fn_(std::move(fn)) {}
43+
44+
void run() {
45+
// Cannot initialize leak until after the object has been
46+
// constructed, because the std::make_shared initialization
47+
// doesn't run after construction of the underlying object.
48+
leak_ = this->shared_from_this();
49+
// Register with loop only after we've leaked the shared_ptr,
50+
// because we unleak it when the event loop thread calls.
51+
loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
52+
}
53+
54+
void handleEvents(int events) override {
55+
// Move leaked shared_ptr to the stack so that this object
56+
// destroys itself once this function returns.
57+
auto self = std::move(this->leak_);
58+
59+
// Read T.
60+
auto rv = socket_->read(&t_, sizeof(t_));
61+
if (rv == -1) {
62+
fn_(socket_, SystemError("read", errno), std::move(t_));
63+
return;
64+
}
65+
66+
// Check for short read (assume we can read in a single call).
67+
if (rv < sizeof(t_)) {
68+
fn_(socket_, ShortReadError(rv, sizeof(t_)), std::move(t_));
69+
return;
70+
}
71+
72+
fn_(socket_, Error::kSuccess, std::move(t_));
73+
}
74+
75+
private:
76+
std::shared_ptr<Loop> loop_;
77+
std::shared_ptr<Socket> socket_;
78+
callback_t fn_;
79+
std::shared_ptr<ReadValueOperation<T>> leak_;
80+
81+
T t_;
82+
};
83+
84+
template <typename T>
85+
void read(
86+
std::shared_ptr<Loop> loop,
87+
std::shared_ptr<Socket> socket,
88+
typename ReadValueOperation<T>::callback_t fn) {
89+
auto x = std::make_shared<ReadValueOperation<T>>(
90+
std::move(loop), std::move(socket), std::move(fn));
91+
x->run();
92+
}
93+
94+
// WriteValueOperation asynchronously writes a value of type T to the
95+
// socket specified at construction. Upon completion or error, the
96+
// callback is called. Its lifetime is coupled with completion of the
97+
// operation, so the called doesn't need to hold on to the instance.
98+
// It does so by storing a shared_ptr to itself (effectively a leak)
99+
// until the event loop calls back.
100+
template <typename T>
101+
class WriteValueOperation final
102+
: public Handler,
103+
public std::enable_shared_from_this<WriteValueOperation<T>> {
104+
public:
105+
using callback_t =
106+
std::function<void(std::shared_ptr<Socket>, const Error& error)>;
107+
108+
WriteValueOperation(
109+
std::shared_ptr<Loop> loop,
110+
std::shared_ptr<Socket> socket,
111+
T t,
112+
callback_t fn)
113+
: loop_(std::move(loop)),
114+
socket_(std::move(socket)),
115+
fn_(std::move(fn)),
116+
t_(std::move(t)) {}
117+
118+
void run() {
119+
// Cannot initialize leak until after the object has been
120+
// constructed, because the std::make_shared initialization
121+
// doesn't run after construction of the underlying object.
122+
leak_ = this->shared_from_this();
123+
// Register with loop only after we've leaked the shared_ptr,
124+
// because we unleak it when the event loop thread calls.
125+
loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
126+
}
127+
128+
void handleEvents(int events) override {
129+
// Move leaked shared_ptr to the stack so that this object
130+
// destroys itself once this function returns.
131+
auto leak = std::move(this->leak_);
132+
133+
// Write T.
134+
auto rv = socket_->write(&t_, sizeof(t_));
135+
if (rv == -1) {
136+
fn_(socket_, SystemError("write", errno));
137+
return;
138+
}
139+
140+
// Check for short write (assume we can write in a single call).
141+
if (rv < sizeof(t_)) {
142+
fn_(socket_, ShortWriteError(rv, sizeof(t_)));
143+
return;
144+
}
145+
146+
fn_(socket_, Error::kSuccess);
147+
}
148+
149+
private:
150+
std::shared_ptr<Loop> loop_;
151+
std::shared_ptr<Socket> socket_;
152+
callback_t fn_;
153+
std::shared_ptr<WriteValueOperation<T>> leak_;
154+
155+
T t_;
156+
};
157+
158+
template <typename T>
159+
void write(
160+
std::shared_ptr<Loop> loop,
161+
std::shared_ptr<Socket> socket,
162+
T t,
163+
typename WriteValueOperation<T>::callback_t fn) {
164+
auto x = std::make_shared<WriteValueOperation<T>>(
165+
std::move(loop), std::move(socket), std::move(t), std::move(fn));
166+
x->run();
167+
}
168+
169+
} // namespace tcp
170+
} // namespace transport
171+
} // namespace gloo

0 commit comments

Comments
 (0)