@@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
7979ClientImpl::ClientImpl (const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
8080 : mutex_ (),
8181 state_ (Open),
82- serviceNameResolver_ (serviceUrl),
83- clientConfiguration_ ( ClientConfiguration (clientConfiguration) .setUseTls (serviceNameResolver_. useTls ())),
82+ clientConfiguration_ ( ClientConfiguration (clientConfiguration)
83+ .setUseTls (ServiceNameResolver:: useTls (ServiceURI (serviceUrl) ))),
8484 memoryLimitController_ (clientConfiguration.getMemoryLimit ()),
8585 ioExecutorProvider_ (std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads ())),
8686 listenerExecutorProvider_ (
@@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
9898 if (loggerFactory) {
9999 LogUtils::setLoggerFactory (std::move (loggerFactory));
100100 }
101+ lookupServicePtr_ = createLookup (serviceUrl);
102+ }
103+
104+ ClientImpl::~ClientImpl () { shutdown (); }
101105
106+ LookupServicePtr ClientImpl::createLookup (const std::string& serviceUrl) {
102107 LookupServicePtr underlyingLookupServicePtr;
103- if (serviceNameResolver_. useHttp ()) {
108+ if (ServiceNameResolver:: useHttp (ServiceURI (serviceUrl) )) {
104109 LOG_DEBUG (" Using HTTP Lookup" );
105110 underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
106- std::ref (serviceNameResolver_), std::cref (clientConfiguration_),
107- std::cref (clientConfiguration_.getAuthPtr ()));
111+ serviceUrl, std::cref (clientConfiguration_), std::cref (clientConfiguration_.getAuthPtr ()));
108112 } else {
109113 LOG_DEBUG (" Using Binary Lookup" );
110114 underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
111- std::ref (serviceNameResolver_) , std::ref (pool_), std::cref (clientConfiguration_));
115+ serviceUrl , std::ref (pool_), std::cref (clientConfiguration_));
112116 }
113117
114- lookupServicePtr_ = RetryableLookupService::create (
118+ auto lookupServicePtr = RetryableLookupService::create (
115119 underlyingLookupServicePtr, clientConfiguration_.impl_ ->operationTimeout , ioExecutorProvider_);
120+ return lookupServicePtr;
116121}
117122
118- ClientImpl::~ClientImpl () { shutdown (); }
119-
120123const ClientConfiguration& ClientImpl::conf () const { return clientConfiguration_; }
121124
122125MemoryLimitController& ClientImpl::getMemoryLimitController () { return memoryLimitController_; }
@@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
129132 return partitionListenerExecutorProvider_;
130133}
131134
132- LookupServicePtr ClientImpl::getLookup () { return lookupServicePtr_; }
135+ LookupServicePtr ClientImpl::getLookup (const std::string& redirectedClusterURI) {
136+ if (redirectedClusterURI.empty ()) {
137+ return lookupServicePtr_;
138+ }
139+
140+ Lock lock (mutex_);
141+ auto it = redirectedClusterLookupServicePtrs_.find (redirectedClusterURI);
142+ if (it == redirectedClusterLookupServicePtrs_.end ()) {
143+ auto lookup = createLookup (redirectedClusterURI);
144+ redirectedClusterLookupServicePtrs_.emplace (redirectedClusterURI, lookup);
145+ return lookup;
146+ }
147+
148+ return it->second ;
149+ }
133150
134151void ClientImpl::createProducerAsync (const std::string& topic, ProducerConfiguration conf,
135152 CreateProducerCallback callback, bool autoDownloadSchema) {
@@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517534 }
518535}
519536
520- GetConnectionFuture ClientImpl::getConnection (const std::string& topic, size_t key) {
537+ GetConnectionFuture ClientImpl::getConnection (const std::string& redirectedClusterURI,
538+ const std::string& topic, size_t key) {
521539 Promise<Result, ClientConnectionPtr> promise;
522540
523541 const auto topicNamePtr = TopicName::get (topic);
@@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
528546 }
529547
530548 auto self = shared_from_this ();
531- lookupServicePtr_->getBroker (*topicNamePtr)
549+ getLookup (redirectedClusterURI)
550+ ->getBroker (*topicNamePtr)
532551 .addListener ([this , self, promise, key](Result result, const LookupService::LookupResult& data) {
533552 if (result != ResultOk) {
534553 promise.setFailed (result);
@@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
554573 return promise.getFuture ();
555574}
556575
557- const std::string& ClientImpl::getPhysicalAddress (const std::string& logicalAddress) {
576+ const std::string& ClientImpl::getPhysicalAddress (const std::string& redirectedClusterURI,
577+ const std::string& logicalAddress) {
558578 if (useProxy_) {
559- return serviceNameResolver_ .resolveHost ();
579+ return getLookup (redirectedClusterURI)-> getServiceNameResolver () .resolveHost ();
560580 } else {
561581 return logicalAddress;
562582 }
563583}
564584
565- GetConnectionFuture ClientImpl::connect (const std::string& logicalAddress, size_t key) {
566- const auto & physicalAddress = getPhysicalAddress (logicalAddress);
585+ GetConnectionFuture ClientImpl::connect (const std::string& redirectedClusterURI,
586+ const std::string& logicalAddress, size_t key) {
587+ const auto & physicalAddress = getPhysicalAddress (redirectedClusterURI, logicalAddress);
567588 Promise<Result, ClientConnectionPtr> promise;
568589 pool_.getConnectionAsync (logicalAddress, physicalAddress, key)
569590 .addListener ([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
@@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
633654
634655 memoryLimitController_.close ();
635656 lookupServicePtr_->close ();
657+ for (const auto & it : redirectedClusterLookupServicePtrs_) {
658+ it.second ->close ();
659+ }
636660
637661 auto producers = producers_.move ();
638662 auto consumers = consumers_.move ();
0 commit comments