|
17 | 17 | from kafka.errors import ( |
18 | 18 | IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError, |
19 | 19 | UnrecognizedBrokerVersion, IllegalArgumentError) |
| 20 | +from kafka.future import Future |
20 | 21 | from kafka.metrics import MetricConfig, Metrics |
21 | 22 | from kafka.protocol.admin import ( |
22 | 23 | CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, |
@@ -358,14 +359,11 @@ def _send_request_to_node(self, node_id, request, wakeup=True): |
358 | 359 |
|
359 | 360 | Returns: |
360 | 361 | A future object that may be polled for status and results. |
361 | | -
|
362 | | - Raises: |
363 | | - The exception if the message could not be sent. |
364 | 362 | """ |
365 | | - while not self._client.ready(node_id): |
366 | | - # poll until the connection to broker is ready, otherwise send() |
367 | | - # will fail with NodeNotReadyError |
368 | | - self._client.poll(timeout_ms=200) |
| 363 | + try: |
| 364 | + self._client.await_ready(node_id) |
| 365 | + except Errors.KafkaConnectionError as e: |
| 366 | + return Future().failure(e) |
369 | 367 | return self._client.send(node_id, request, wakeup) |
370 | 368 |
|
371 | 369 | def _send_request_to_controller(self, request): |
|
0 commit comments