File tree Expand file tree Collapse file tree 1 file changed +14
-1
lines changed Expand file tree Collapse file tree 1 file changed +14
-1
lines changed Original file line number Diff line number Diff line change @@ -409,10 +409,23 @@ def _send_request_to_controller(self, request):
409409 tries = 2 # in case our cached self._controller_id is outdated
410410 while tries :
411411 tries -= 1
412- future = self ._send_request_to_node (self ._controller_id , request )
412+ future = self ._client . send (self ._controller_id , request )
413413
414414 self ._wait_for_futures ([future ])
415415
416+ if future .exception is not None :
417+ log .error (
418+ "Sending request to controller_id %s failed with %s" ,
419+ self ._controller_id ,
420+ future .exception ,
421+ )
422+ is_outdated_controler = (
423+ self ._client .cluster .broker_metadata (self ._controller_id ) is None
424+ )
425+ if is_outdated_controler :
426+ self ._refresh_controller_id ()
427+ continue
428+
416429 response = future .value
417430 # In Java, the error field name is inconsistent:
418431 # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
You can’t perform that action at this time.
0 commit comments