|
17 | 17 | #include "gloo/common/error.h" |
18 | 18 | #include "gloo/common/linux.h" |
19 | 19 | #include "gloo/common/logging.h" |
| 20 | +#include "gloo/common/utils.h" |
20 | 21 | #include "gloo/transport/tcp/context.h" |
21 | 22 | #include "gloo/transport/tcp/helpers.h" |
22 | 23 | #include "gloo/transport/tcp/pair.h" |
@@ -334,20 +335,39 @@ void Device::connectAsListener( |
334 | 335 | // |
335 | 336 | void Device::connectAsInitiator( |
336 | 337 | const Address& remote, |
337 | | - std::chrono::milliseconds /* unused */, |
| 338 | + std::chrono::milliseconds timeout, |
338 | 339 | connect_callback_t fn) { |
339 | | - const auto& sockaddr = remote.getSockaddr(); |
340 | | - |
341 | | - // Create new socket to connect to peer. |
342 | | - auto socket = Socket::createForFamily(sockaddr.ss_family); |
343 | | - socket->reuseAddr(true); |
344 | | - socket->noDelay(true); |
345 | | - socket->connect(sockaddr); |
346 | | - |
347 | | - // Write sequence number for peer to new socket. |
348 | | - // TODO(pietern): Use timeout. |
349 | | - write<sequence_number_t>( |
350 | | - loop_, std::move(socket), remote.getSeq(), std::move(fn)); |
| 340 | + auto writeSeq = [loop = loop_, seq = remote.getSeq()]( |
| 341 | + std::shared_ptr<Socket> socket, connect_callback_t fn) { |
| 342 | + // Write sequence number for peer to new socket. |
| 343 | + write<sequence_number_t>(loop, std::move(socket), seq, std::move(fn)); |
| 344 | + }; |
| 345 | + |
| 346 | + if (disableConnectionRetries()) { |
| 347 | + const auto& sockaddr = remote.getSockaddr(); |
| 348 | + |
| 349 | + // Create new socket to connect to peer. |
| 350 | + auto socket = Socket::createForFamily(sockaddr.ss_family); |
| 351 | + socket->reuseAddr(true); |
| 352 | + socket->noDelay(true); |
| 353 | + socket->connect(sockaddr); |
| 354 | + |
| 355 | + writeSeq(std::move(socket), std::move(fn)); |
| 356 | + } else { |
| 357 | + connectLoop( |
| 358 | + loop_, |
| 359 | + remote, |
| 360 | + timeout, |
| 361 | + [loop = loop_, fn = std::move(fn), writeSeq = std::move(writeSeq)]( |
| 362 | + std::shared_ptr<Socket> socket, const Error& error) { |
| 363 | + if (error) { |
| 364 | + fn(socket, error); |
| 365 | + return; |
| 366 | + } |
| 367 | + |
| 368 | + writeSeq(std::move(socket), std::move(fn)); |
| 369 | + }); |
| 370 | + } |
351 | 371 | } |
352 | 372 |
|
353 | 373 | } // namespace tcp |
|
0 commit comments