diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index a799d5c8..54e96c84 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -92,10 +92,12 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { return getNumPartitions(); } -ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) { +ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy, + bool retryOnCreationError) { using namespace std::placeholders; auto client = client_.lock(); - auto producer = std::make_shared(client, *topicName_, conf_, interceptors_, partition); + auto producer = std::make_shared(client, *topicName_, conf_, interceptors_, partition, + retryOnCreationError); if (!client) { return producer; } @@ -127,13 +129,13 @@ void PartitionedProducerImpl::start() { for (unsigned int i = 0; i < getNumPartitions(); i++) { bool lazy = (short)i != partition; - producers_.push_back(newInternalProducer(i, lazy)); + producers_.push_back(newInternalProducer(i, lazy, false)); } producers_[partition]->start(); } else { for (unsigned int i = 0; i < getNumPartitions(); i++) { - producers_.push_back(newInternalProducer(i, false)); + producers_.push_back(newInternalProducer(i, false, false)); } for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { @@ -461,7 +463,7 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { ProducerImplPtr producer; try { - producer = newInternalProducer(i, lazy); + producer = newInternalProducer(i, lazy, true); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create producer for partition " << i << ": " << e.what()); producers.clear(); diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 25ba9c33..2d07a81a 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -135,7 +135,7 @@ class PartitionedProducerImpl : public ProducerImplBase, unsigned int getNumPartitions() const; unsigned int getNumPartitionsWithLock() const; - ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy); + ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy, bool retryOnCreationError); MessageRoutingPolicyPtr getMessageRouter(); void runPartitionUpdateTask(); void getPartitionMetadata(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 61b95bf0..0a129259 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -48,7 +48,7 @@ DECLARE_LOG_OBJECT() ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, - int32_t partition) + int32_t partition, bool retryOnCreationError) : HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition), Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()), milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()), @@ -67,7 +67,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000), memoryLimitController_(client->getMemoryLimitController()), chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()), - interceptors_(interceptors) { + interceptors_(interceptors), + retryOnCreationError_(retryOnCreationError) { LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic() << " id: " << producerId_); if (!producerName_.empty()) { @@ -273,7 +274,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result lock.unlock(); producerCreatedPromise_.setFailed(result); handleResult = result; - } else if (producerCreatedPromise_.isComplete()) { + } else if (producerCreatedPromise_.isComplete() || retryOnCreationError_) { if (result == ResultProducerBlockedQuotaExceededException) { LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer"); failPendingMessages(ResultProducerBlockedQuotaExceededException, false); diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 2fb0b886..b467458d 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -66,7 +66,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { public: ProducerImpl(ClientImplPtr client, const TopicName& topic, const ProducerConfiguration& producerConfiguration, - const ProducerInterceptorsPtr& interceptors, int32_t partition = -1); + const ProducerInterceptorsPtr& interceptors, int32_t partition = -1, + bool retryOnCreationError = false); ~ProducerImpl(); // overrided methods from ProducerImplBase @@ -202,6 +203,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { boost::optional topicEpoch; ProducerInterceptorsPtr interceptors_; + + bool retryOnCreationError_; }; struct ProducerImplCmp { diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 45bb3aae..bb58a4ef 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -649,4 +649,36 @@ TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) { client.close(); } +TEST(ProducerTest, testFailedToCreateNewPartitionProducer) { + const std::string topic = + "public/default/testFailedToCreateNewPartitionProducer" + std::to_string(time(nullptr)); + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions"; + + int res = makePutRequest(topicOperateUrl, "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ClientConfiguration clientConf; + clientConf.setPartititionsUpdateInterval(1); + Client client(serviceUrl, clientConf); + ProducerConfiguration conf; + Producer producer; + client.createProducer(topic, conf, producer); + ASSERT_TRUE(waitUntil(std::chrono::seconds(1), [&producer]() -> bool { return producer.isConnected(); })); + + PartitionedProducerImpl& partitionedProducer = PulsarFriend::getPartitionedProducerImpl(producer); + PulsarFriend::updatePartitions(partitionedProducer, 3); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + auto& newProducer = PulsarFriend::getInternalProducerImpl(producer, 2); + ASSERT_FALSE(newProducer.isConnected()); // should fail with topic not found + + res = makePostRequest(topicOperateUrl, "3"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ASSERT_TRUE( + waitUntil(std::chrono::seconds(5), [&newProducer]() -> bool { return newProducer.isConnected(); })); + + producer.close(); + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));