@@ -274,7 +274,7 @@ def _refresh_controller_id(self):
274274 version = self ._matching_api_version (MetadataRequest )
275275 if 1 <= version <= 6 :
276276 request = MetadataRequest [version ]()
277- future = self ._send_request_to_node ( self . _client . least_loaded_node (), request )
277+ future = self ._send_request_to_least_loaded_node ( request )
278278
279279 self ._wait_for_futures ([future ])
280280
@@ -312,7 +312,7 @@ def _find_coordinator_id_send_request(self, group_id):
312312 raise NotImplementedError (
313313 "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
314314 .format (version ))
315- return self ._send_request_to_node ( self . _client . least_loaded_node (), request )
315+ return self ._send_request_to_least_loaded_node ( request )
316316
317317 def _find_coordinator_id_process_response (self , response ):
318318 """Process a FindCoordinatorResponse.
@@ -357,9 +357,36 @@ def _find_coordinator_ids(self, group_ids):
357357 }
358358 return groups_coordinators
359359
360+ def _send_request_to_least_loaded_node (self , request ):
361+ """Send a Kafka protocol message to the least loaded broker.
362+
363+ Returns a future that may be polled for status and results.
364+
365+ :param request: The message to send.
366+ :return: A future object that may be polled for status and results.
367+ :exception: The exception if the message could not be sent.
368+ """
369+ node_id = self ._client .least_loaded_node ()
370+ while not self ._client .ready (node_id ):
371+ # poll until the connection to broker is ready, otherwise send()
372+ # will fail with NodeNotReadyError
373+ self ._client .poll ()
374+
375+ # node_id is not part of the cluster anymore, choose a new broker
376+ # to connect to
377+ if self ._client .cluster .broker_metadata (node_id ) is None :
378+ node_id = self ._client .least_loaded_node ()
379+
380+ return self ._client .send (node_id , request )
381+
360382 def _send_request_to_node (self , node_id , request ):
361383 """Send a Kafka protocol message to a specific broker.
362384
385+ .. note::
386+
387+ This function will enter in an infinite loop if `node_id` is
388+ removed from the cluster.
389+
363390 Returns a future that may be polled for status and results.
364391
365392 :param node_id: The broker id to which to send the message.
@@ -384,10 +411,23 @@ def _send_request_to_controller(self, request):
384411 tries = 2 # in case our cached self._controller_id is outdated
385412 while tries :
386413 tries -= 1
387- future = self ._send_request_to_node (self ._controller_id , request )
414+ future = self ._client . send (self ._controller_id , request )
388415
389416 self ._wait_for_futures ([future ])
390417
418+ if future .exception is not None :
419+ log .error (
420+ "Sending request to controller_id %s failed with %s" ,
421+ self ._controller_id ,
422+ future .exception ,
423+ )
424+ is_outdated_controler = (
425+ self ._client .cluster .broker_metadata (self ._controller_id ) is None
426+ )
427+ if is_outdated_controler :
428+ self ._refresh_controller_id ()
429+ continue
430+
391431 response = future .value
392432 # In Java, the error field name is inconsistent:
393433 # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
@@ -508,10 +548,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
508548 allow_auto_topic_creation = auto_topic_creation
509549 )
510550
511- future = self ._send_request_to_node (
512- self ._client .least_loaded_node (),
513- request
514- )
551+ future = self ._send_request_to_least_loaded_node (request )
515552 self ._wait_for_futures ([future ])
516553 return future .value
517554
@@ -603,7 +640,7 @@ def describe_acls(self, acl_filter):
603640 .format (version )
604641 )
605642
606- future = self ._send_request_to_node ( self . _client . least_loaded_node (), request )
643+ future = self ._send_request_to_least_loaded_node ( request )
607644 self ._wait_for_futures ([future ])
608645 response = future .value
609646
@@ -694,7 +731,7 @@ def create_acls(self, acls):
694731 .format (version )
695732 )
696733
697- future = self ._send_request_to_node ( self . _client . least_loaded_node (), request )
734+ future = self ._send_request_to_least_loaded_node ( request )
698735 self ._wait_for_futures ([future ])
699736 response = future .value
700737
@@ -788,7 +825,7 @@ def delete_acls(self, acl_filters):
788825 .format (version )
789826 )
790827
791- future = self ._send_request_to_node ( self . _client . least_loaded_node (), request )
828+ future = self ._send_request_to_least_loaded_node ( request )
792829 self ._wait_for_futures ([future ])
793830 response = future .value
794831
@@ -848,8 +885,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
848885 ))
849886
850887 if len (topic_resources ) > 0 :
851- futures .append (self ._send_request_to_node (
852- self ._client .least_loaded_node (),
888+ futures .append (self ._send_request_to_least_loaded_node (
853889 DescribeConfigsRequest [version ](resources = topic_resources )
854890 ))
855891
@@ -869,8 +905,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
869905 ))
870906
871907 if len (topic_resources ) > 0 :
872- futures .append (self ._send_request_to_node (
873- self ._client .least_loaded_node (),
908+ futures .append (self ._send_request_to_least_loaded_node (
874909 DescribeConfigsRequest [version ](resources = topic_resources , include_synonyms = include_synonyms )
875910 ))
876911 else :
@@ -917,7 +952,7 @@ def alter_configs(self, config_resources):
917952 # // a single request that may be sent to any broker.
918953 #
919954 # So this is currently broken as it always sends to the least_loaded_node()
920- future = self ._send_request_to_node ( self . _client . least_loaded_node (), request )
955+ future = self ._send_request_to_least_loaded_node ( request )
921956
922957 self ._wait_for_futures ([future ])
923958 response = future .value
0 commit comments