Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static Result getResult(ServerError serverError, const std::string& message) {
case ServiceNotReady:
return (message.find("the broker do not have test listener") == std::string::npos)
? ResultRetryable
: ResultServiceUnitNotReady;
: ResultConnectError;

case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
Expand Down Expand Up @@ -504,8 +504,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::

void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
if (err) {
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
close();
if (err.value() == ASIO::ssl::error::stream_truncated) {
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
close(ResultRetryable);
} else {
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
close();
}
return;
}

Expand Down
4 changes: 3 additions & 1 deletion lib/ClientConnectionAdaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ inline void checkServerError(Connection& connection, ServerError error, const st
// "Namespace bundle ... is being unloaded"
// "KeeperException$..."
// "Failed to acquire ownership for namespace bundle ..."
// "the broker do not have test listener"
// Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages
// is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd
// message is ServiceNotReady.
if (message.find("Failed to acquire ownership") == std::string::npos &&
message.find("KeeperException") == std::string::npos &&
message.find("is being unloaded") == std::string::npos) {
message.find("is being unloaded") == std::string::npos &&
message.find("the broker do not have test listener") == std::string::npos) {
connection.close(ResultDisconnected);
}
break;
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
}
}

Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;

const auto topicNamePtr = TopicName::get(topic);
Expand Down Expand Up @@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
}
}

Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
Promise<Result, ClientConnectionPtr> promise;
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
Expand Down
8 changes: 5 additions & 3 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ class TopicName;
using TopicNamePtr = std::shared_ptr<TopicName>;

using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;

std::string generateRandomName();

class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
~ClientImpl();
virtual ~ClientImpl();

/**
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
Expand All @@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
// Use virtual method to test
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);

Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);

void closeAsync(CloseCallback callback);
void shutdown();
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
auto ptr = get_shared_this_ptr();

if (consumerCreatedPromise_.setFailed(result)) {
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
state_ = Failed;
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class Future {

Result get(Type &result) { return state_->get(result); }

static Future<Result, Type> failed(Result result);

private:
InternalStatePtr<Result, Type> state_;

Expand Down Expand Up @@ -144,6 +146,13 @@ class Promise {
InternalStatePtr<Result, Type> state_;
};

template <typename Result, typename Type>
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
Promise<Result, Type> promise;
promise.setFailed(result);
return promise.getFuture();
}

} // namespace pulsar

#endif
21 changes: 18 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()),
creationTimer_(executor_->createDeadlineTimer()),
reconnectionPending_(false) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }
HandlerBase::~HandlerBase() {
ASIO_ERROR ignored;
timer_->cancel(ignored);
creationTimer_->cancel(ignored);
}

void HandlerBase::start() {
// guard against concurrent state changes such as closing
State state = NotStarted;
if (state_.compare_exchange_strong(state, Pending)) {
grabCnx();
}
creationTimer_->expires_from_now(operationTimeut_);
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
auto self = weakSelf.lock();
if (self && !error) {
connectionFailed(ResultTimeout);
ASIO_ERROR ignored;
timer_->cancel(ignored);
}
});
}

ClientConnectionWeakPtr HandlerBase::getCnx() const {
Expand Down Expand Up @@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
ClientImplPtr client = client_.lock();
if (!client) {
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
connectionFailed(ResultConnectError);
connectionFailed(ResultAlreadyClosed);
reconnectionPending_ = false;
return;
}
Expand All @@ -108,7 +123,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
connectionOpened(cnx).addListener([this, self](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (isResultRetryable(result)) {
if (result != ResultOk && isResultRetryable(result)) {
scheduleReconnection();
}
});
Expand Down
1 change: 1 addition & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {

private:
DeadlineTimerPtr timer_;
DeadlineTimerPtr creationTimer_;

mutable std::mutex connectionMutex_;
std::atomic<bool> reconnectionPending_;
Expand Down
2 changes: 1 addition & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
// if producers are lazy, then they should always try to restart
// so don't change the state and allow reconnections
return;
} else if (producerCreatedPromise_.setFailed(result)) {
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
state_ = Failed;
}
}
Expand Down
29 changes: 28 additions & 1 deletion lib/ResultUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,39 @@
*/
#pragma once

#include <assert.h>
#include <pulsar/Result.h>

#include <unordered_set>

namespace pulsar {

inline bool isResultRetryable(Result result) {
return result == ResultRetryable || result == ResultDisconnected;
assert(result != ResultOk);
if (result == ResultRetryable || result == ResultDisconnected) {
return true;
}

static const std::unordered_set<int> fatalResults{ResultConnectError,
ResultTimeout,
ResultAuthenticationError,
ResultAuthorizationError,
ResultInvalidUrl,
ResultInvalidConfiguration,
ResultIncompatibleSchema,
ResultTopicNotFound,
ResultOperationNotSupported,
ResultNotAllowedError,
ResultChecksumError,
ResultCryptoError,
ResultConsumerAssignError,
ResultProducerBusy,
ResultConsumerBusy,
ResultLookupError,
ResultTooManyLookupRequestException,
ResultProducerBlockedQuotaExceededException,
ResultProducerBlockedQuotaExceededError};
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
}

} // namespace pulsar
75 changes: 71 additions & 4 deletions tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <future>
#include <sstream>

#include "MockClientImpl.h"
#include "PulsarAdminHelper.h"
#include "PulsarFriend.h"
#include "WaitUtils.h"
Expand All @@ -36,6 +37,7 @@
DECLARE_LOG_OBJECT()

using namespace pulsar;
using testing::AtLeast;

static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
Expand Down Expand Up @@ -248,7 +250,7 @@ TEST(ClientTest, testWrongListener) {

Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
Producer producer;
ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
ASSERT_EQ(ResultProducerNotInitialized, producer.close());
ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
ASSERT_EQ(ResultOk, client.close());
Expand All @@ -257,7 +259,7 @@ TEST(ClientTest, testWrongListener) {
// creation of Consumer or Reader could fail with ResultConnectError.
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
Consumer consumer;
ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());

ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
Expand All @@ -266,7 +268,7 @@ TEST(ClientTest, testWrongListener) {
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));

Consumer multiTopicsConsumer;
ASSERT_EQ(ResultServiceUnitNotReady,
ASSERT_EQ(ResultConnectError,
client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
"sub", multiTopicsConsumer));

Expand All @@ -278,7 +280,7 @@ TEST(ClientTest, testWrongListener) {

// Currently Reader can only read a non-partitioned topic in C++ client
Reader reader;
ASSERT_EQ(ResultServiceUnitNotReady,
ASSERT_EQ(ResultConnectError,
client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
Expand Down Expand Up @@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) {
client.close();
}
}

TEST(ClientTest, testRetryUntilSucceed) {
auto clientImpl = std::make_shared<MockClientImpl>(lookupUrl);
constexpr int kFailCount = 3;
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
std::atomic_int count{0};
ON_CALL(*clientImpl, getConnection)
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
if (count++ < kFailCount) {
return GetConnectionFuture::failed(ResultRetryable);
}
return clientImpl->getConnectionReal(topic, index);
});

auto topic = "client-test-retry-until-succeed";
ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result);
count = 0;
ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result);
ASSERT_EQ(ResultOk, clientImpl->close());
}

TEST(ClientTest, testRetryTimeout) {
auto clientImpl =
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
return GetConnectionFuture::failed(ResultRetryable);
});

auto topic = "client-test-retry-timeout";
{
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
ASSERT_EQ(ResultTimeout, result.result);
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms";
}
{
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
ASSERT_EQ(ResultTimeout, result.result);
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms";
}

ASSERT_EQ(ResultOk, clientImpl->close());
}

TEST(ClientTest, testNoRetry) {
auto clientImpl =
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
EXPECT_CALL(*clientImpl, getConnection).Times(2);
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
return GetConnectionFuture::failed(ResultAuthenticationError);
});

auto topic = "client-test-no-retry";
{
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
ASSERT_EQ(ResultAuthenticationError, result.result);
ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms";
}
{
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
LOG_INFO("It takes " << result.timeMs << " ms to subscribe");
ASSERT_EQ(ResultAuthenticationError, result.result);
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
}
}
Loading