Skip to content

Commit c4d1473

Browse files
committed
Fix creating producer or consumer is not retried for connection failure
Fixes #391 ### Motivation When `connectionFailed` is called, no matter if the result is retryable the creation of producer or consumer will fail without retry. ### Modifications Check if the result is retryable in `connectionFailed` for `ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors or the timeout error. Register another timer in `HandlerBase` to propagate the timeout error to `connectionFailed`. Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify client could retry according to the result returned by `ClientImpl::getConnection`. On the other handle, check all `close()` calls in `ClientConnection` and pass the correct result. For example, a handshake error should be retryable so we must call `close(ResultRetryable)`.
1 parent c7e53ac commit c4d1473

File tree

12 files changed

+196
-32
lines changed

12 files changed

+196
-32
lines changed

lib/ClientConnection.cc

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ ClientConnection::~ClientConnection() {
278278
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
279279
if (!cmdConnected.has_server_version()) {
280280
LOG_ERROR(cnxString_ << "Server version is not set");
281-
close();
281+
close(ResultUnknownError);
282282
return;
283283
}
284284

@@ -451,7 +451,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
451451
Url service_url;
452452
if (!Url::parse(physicalAddress_, service_url)) {
453453
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
454-
close();
454+
close(ResultInvalidUrl);
455455
return;
456456
}
457457
}
@@ -489,12 +489,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
489489
}
490490
});
491491
} else {
492-
if (err == ASIO::error::operation_aborted) {
493-
// TCP connect timeout, which is not retryable
494-
close();
495-
} else {
496-
close(ResultRetryable);
497-
}
492+
close(ResultRetryable);
498493
}
499494
} else {
500495
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
@@ -504,8 +499,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
504499

505500
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
506501
if (err) {
507-
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
508-
close();
502+
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
503+
close(ResultRetryable);
509504
return;
510505
}
511506

@@ -532,7 +527,7 @@ void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const Shar
532527
}
533528
if (err) {
534529
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
535-
close();
530+
close(ResultRetryable);
536531
return;
537532
}
538533

@@ -546,7 +541,7 @@ void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const Share
546541
}
547542
if (err) {
548543
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
549-
close();
544+
close(ResultRetryable);
550545
return;
551546
}
552547
}
@@ -567,14 +562,14 @@ void ClientConnection::tcpConnectAsync() {
567562
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
568563
if (!Url::parse(hostUrl, service_url)) {
569564
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
570-
close();
565+
close(ResultInvalidUrl);
571566
return;
572567
}
573568

574569
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
575570
LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol()
576571
<< "'. Valid values are 'pulsar' and 'pulsar+ssl'");
577-
close();
572+
close(ResultInvalidUrl);
578573
return;
579574
}
580575

@@ -593,7 +588,7 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::itera
593588
if (err) {
594589
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
595590
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
596-
close();
591+
close(ResultConnectError);
597592
return;
598593
}
599594

@@ -629,8 +624,8 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::itera
629624
}
630625
});
631626
} else {
632-
LOG_WARN(cnxString_ << "No IP address found");
633-
close();
627+
LOG_ERROR(cnxString_ << "No IP address found");
628+
close(ResultConnectError);
634629
return;
635630
}
636631
}
@@ -885,7 +880,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
885880
// Handle Pulsar Connected
886881
if (incomingCmd.type() != BaseCommand::CONNECTED) {
887882
// Wrong cmd
888-
close();
883+
close(ResultRetryable);
889884
} else {
890885
handlePulsarConnected(incomingCmd.connected());
891886
}

lib/ClientConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
157157
*
158158
* `detach` should only be false when the connection pool is closed.
159159
*/
160-
void close(Result result = ResultConnectError, bool detach = true);
160+
void close(Result result, bool detach = true);
161161

162162
bool isClosed() const;
163163

lib/ClientImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517517
}
518518
}
519519

520-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520+
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
521521
Promise<Result, ClientConnectionPtr> promise;
522522

523523
const auto topicNamePtr = TopicName::get(topic);
@@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
562562
}
563563
}
564564

565-
Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
565+
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566566
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
567567
Promise<Result, ClientConnectionPtr> promise;
568568
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)

lib/ClientImpl.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ class TopicName;
6363
using TopicNamePtr = std::shared_ptr<TopicName>;
6464

6565
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
66+
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;
6667

6768
std::string generateRandomName();
6869

6970
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7071
public:
7172
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
72-
~ClientImpl();
73+
virtual ~ClientImpl();
7374

7475
/**
7576
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
@@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9596

9697
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9798

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
99+
// Use virtual method to test
100+
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
99101

100-
Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
102+
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
101103

102104
void closeAsync(CloseCallback callback);
103105
void shutdown();

lib/ConsumerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
277277
// Keep a reference to ensure object is kept alive
278278
auto ptr = get_shared_this_ptr();
279279

280-
if (consumerCreatedPromise_.setFailed(result)) {
280+
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
281281
state_ = Failed;
282282
}
283283
}

lib/Future.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class Future {
116116

117117
Result get(Type &result) { return state_->get(result); }
118118

119+
static Future<Result, Type> failed(Result result);
120+
119121
private:
120122
InternalStatePtr<Result, Type> state_;
121123

@@ -144,6 +146,13 @@ class Promise {
144146
InternalStatePtr<Result, Type> state_;
145147
};
146148

149+
template <typename Result, typename Type>
150+
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
151+
Promise<Result, Type> promise;
152+
promise.setFailed(result);
153+
return promise.getFuture();
154+
}
155+
147156
} // namespace pulsar
148157

149158
#endif

lib/HandlerBase.cc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4242
backoff_(backoff),
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
45+
creationTimer_(executor_->createDeadlineTimer()),
4546
reconnectionPending_(false) {}
4647

47-
HandlerBase::~HandlerBase() { timer_->cancel(); }
48+
HandlerBase::~HandlerBase() {
49+
ASIO_ERROR ignored;
50+
timer_->cancel(ignored);
51+
creationTimer_->cancel(ignored);
52+
}
4853

4954
void HandlerBase::start() {
5055
// guard against concurrent state changes such as closing
5156
State state = NotStarted;
5257
if (state_.compare_exchange_strong(state, Pending)) {
5358
grabCnx();
5459
}
60+
creationTimer_->expires_from_now(operationTimeut_);
61+
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
62+
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
63+
auto self = weakSelf.lock();
64+
if (self && !error) {
65+
connectionFailed(ResultTimeout);
66+
ASIO_ERROR ignored;
67+
timer_->cancel(ignored);
68+
}
69+
});
5570
}
5671

5772
ClientConnectionWeakPtr HandlerBase::getCnx() const {
@@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
96111
ClientImplPtr client = client_.lock();
97112
if (!client) {
98113
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
99-
connectionFailed(ResultConnectError);
114+
connectionFailed(ResultAlreadyClosed);
100115
reconnectionPending_ = false;
101116
return;
102117
}

lib/HandlerBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
140140

141141
private:
142142
DeadlineTimerPtr timer_;
143+
DeadlineTimerPtr creationTimer_;
143144

144145
mutable std::mutex connectionMutex_;
145146
std::atomic<bool> reconnectionPending_;

lib/ProducerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
176176
// if producers are lazy, then they should always try to restart
177177
// so don't change the state and allow reconnections
178178
return;
179-
} else if (producerCreatedPromise_.setFailed(result)) {
179+
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
180180
state_ = Failed;
181181
}
182182
}

tests/BasicEndToEndTest.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12451245
std::this_thread::sleep_for(std::chrono::seconds(1));
12461246
} while (!clientConnectionPtr);
12471247
oldConnections.push_back(clientConnectionPtr);
1248-
clientConnectionPtr->close();
1248+
clientConnectionPtr->close(ResultConnectError);
12491249
}
12501250
LOG_INFO("checking message " << i);
12511251
ASSERT_EQ(producer.send(msg), ResultOk);
@@ -1264,7 +1264,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12641264
ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl);
12651265
ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock();
12661266
oldConnections.push_back(clientConnectionPtr);
1267-
clientConnectionPtr->close();
1267+
clientConnectionPtr->close(ResultConnectError);
12681268

12691269
while (consumer.receive(msg, 30000) == ResultOk) {
12701270
consumer.acknowledge(msg);
@@ -1316,7 +1316,7 @@ void testHandlerReconnectionPartitionProducers(bool lazyStartPartitionedProducer
13161316
std::this_thread::sleep_for(std::chrono::seconds(1));
13171317
} while (!clientConnectionPtr);
13181318
oldConnections.push_back(clientConnectionPtr);
1319-
clientConnectionPtr->close();
1319+
clientConnectionPtr->close(ResultConnectError);
13201320
}
13211321
ASSERT_EQ(producer.send(msg), ResultOk);
13221322
}

0 commit comments

Comments
 (0)