Skip to content

Commit 847924c

Browse files
xunnanxufacebook-github-bot
authored andcommitted
Allow deferring functions to the epoll(2) thread
Summary: In the current gloo design, `Loop` uses `epoll` via file descriptors to handle events. For listening port, the mesh connection currently leads to O(mn) port usage where m is the num of ranks per host and n is the total num of ranks. In later diffs of the stack we'll introduce a listener role to remove this quadratic port usage problem as TCP is a 4-tuple so we can reuse the same local port as long as the remote address is different. To do that we'll introduce a listener role in the mesh connection. The listener will accept all incoming connections, and then allow the caller (pair) to handle the post rendezvous socket events. We would still use `Pair` to abstract away the mesh conn details but in order for the listening side of a pair to register a callback after connection establishment, we introduce the `defer()` function such that `Pair` can register the callback to be fired in next epoll tick. Credit to original author Pieter Noordhuis (pietern) This diff basically just resolves the conflicts. Differential Revision: D45434482 fbshipit-source-id: c2bb99653820b0e5ad7403ec9bbdd1d616e2d50f
1 parent c774e9e commit 847924c

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

gloo/transport/tcp/loop.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <gloo/transport/tcp/loop.h>
1010

11+
#include <fcntl.h>
1112
#include <string.h>
1213
#include <unistd.h>
1314

@@ -40,10 +41,69 @@ namespace gloo {
4041
namespace transport {
4142
namespace tcp {
4243

44+
Deferrables::Deferrables() {
45+
std::array<int, 2> fds;
46+
auto rv = pipe2(fds.data(), O_NONBLOCK);
47+
GLOO_ENFORCE_NE(rv, -1, "pipe: ", strerror(errno));
48+
rfd_ = fds[0];
49+
wfd_ = fds[1];
50+
}
51+
52+
Deferrables::~Deferrables() {
53+
close(rfd_);
54+
close(wfd_);
55+
}
56+
57+
void Deferrables::defer(function_t fn) {
58+
std::lock_guard<std::mutex> guard(mutex_);
59+
functions_.push_back(std::move(fn));
60+
61+
// Write byte to pipe to make epoll(2) wake up.
62+
if (!triggered_) {
63+
for (;;) {
64+
char byte = 0;
65+
auto rv = write(wfd_, &byte, sizeof(byte));
66+
if (rv == -1 && errno == EINTR) {
67+
continue;
68+
}
69+
GLOO_ENFORCE_NE(rv, -1, "write: ", strerror(errno));
70+
break;
71+
}
72+
triggered_ = true;
73+
}
74+
}
75+
76+
void Deferrables::handleEvents(int /* unused */) {
77+
decltype(functions_) localFunctions;
78+
79+
{
80+
std::lock_guard<std::mutex> guard(mutex_);
81+
std::swap(localFunctions, functions_);
82+
83+
// Read byte from pipe to drain it.
84+
for (;;) {
85+
char byte = 0;
86+
auto rv = read(rfd_, &byte, sizeof(byte));
87+
if (rv == -1 && errno == EINTR) {
88+
continue;
89+
}
90+
GLOO_ENFORCE_NE(rv, -1, "read: ", strerror(errno));
91+
break;
92+
}
93+
triggered_ = false;
94+
}
95+
96+
// Execute deferred functions.
97+
for (auto fn : localFunctions) {
98+
fn();
99+
}
100+
}
101+
43102
Loop::Loop() {
44103
fd_ = epoll_create(1);
45104
GLOO_ENFORCE_NE(fd_, -1, "epoll_create: ", strerror(errno));
46105
loop_.reset(new std::thread(&Loop::run, this));
106+
registerDescriptor(deferrables_.rfd_, EPOLLIN, &deferrables_);
47107
}
48108

49109
Loop::~Loop() {
@@ -81,6 +141,10 @@ void Loop::unregisterDescriptor(int fd, Handler* h) {
81141
}
82142
}
83143

144+
void Loop::defer(std::function<void()> fn) {
145+
deferrables_.defer(std::move(fn));
146+
}
147+
84148
void Loop::run() {
85149
std::array<struct epoll_event, capacity_> events;
86150
int nfds;

gloo/transport/tcp/loop.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
#include <atomic>
1212
#include <condition_variable>
13+
#include <functional>
14+
#include <list>
1315
#include <memory>
1416
#include <mutex>
1517
#include <thread>
@@ -32,6 +34,33 @@ class Handler {
3234
virtual void handleEvents(int events) = 0;
3335
};
3436

37+
class Loop;
38+
39+
// Functions can be deferred to the epoll(2) thread through the this
40+
// class. It uses readability of a pipe to wake up the event loop.
41+
class Deferrables final : public Handler {
42+
public:
43+
using function_t = std::function<void()>;
44+
45+
Deferrables();
46+
47+
~Deferrables() override;
48+
49+
void defer(function_t fn);
50+
51+
void handleEvents(int events) override;
52+
53+
private:
54+
int rfd_;
55+
int wfd_;
56+
57+
std::mutex mutex_;
58+
std::list<function_t> functions_;
59+
bool triggered_{false};
60+
61+
friend class Loop;
62+
};
63+
3564
class Loop final : public std::enable_shared_from_this<Loop> {
3665
public:
3766
explicit Loop();
@@ -42,13 +71,16 @@ class Loop final : public std::enable_shared_from_this<Loop> {
4271

4372
void unregisterDescriptor(int fd, Handler *h);
4473

74+
void defer(std::function<void()> fn);
75+
4576
void run();
4677

4778
private:
4879
static constexpr auto capacity_ = 64;
4980

5081
int fd_{-1};
5182
std::atomic<bool> done_{false};
83+
Deferrables deferrables_;
5284
std::unique_ptr<std::thread> loop_;
5385

5486
std::mutex m_;

0 commit comments

Comments
 (0)