11import copy
22import logging
3+ import collections
4+
5+ import kafka .common
36
4- from collections import defaultdict
57from functools import partial
68from itertools import count
7-
8- from kafka .common import (ErrorMapping , ErrorStrings , TopicAndPartition ,
9+ from kafka .common import (TopicAndPartition ,
910 ConnectionError , FailedPayloadsError ,
10- BrokerResponseError , PartitionUnavailableError ,
11- LeaderUnavailableError ,
12- KafkaUnavailableError )
11+ PartitionUnavailableError ,
12+ LeaderUnavailableError , KafkaUnavailableError ,
13+ UnknownTopicOrPartitionError , NotLeaderForPartitionError )
1314
1415from kafka .conn import collect_hosts , KafkaConnection , DEFAULT_SOCKET_TIMEOUT_SECONDS
1516from kafka .protocol import KafkaProtocol
@@ -39,29 +40,23 @@ def __init__(self, hosts, client_id=CLIENT_ID,
3940 self .topic_partitions = {} # topic_id -> [0, 1, 2, ...]
4041 self .load_metadata_for_topics () # bootstrap with all metadata
4142
43+
4244 ##################
4345 # Private API #
4446 ##################
4547
4648 def _get_conn (self , host , port ):
4749 "Get or create a connection to a broker using host and port"
48-
4950 host_key = (host , port )
5051 if host_key not in self .conns :
51- self .conns [host_key ] = KafkaConnection (host , port , timeout = self .timeout )
52+ self .conns [host_key ] = KafkaConnection (
53+ host ,
54+ port ,
55+ timeout = self .timeout
56+ )
5257
5358 return self .conns [host_key ]
5459
55- def _get_conn_for_broker (self , broker ):
56- """
57- Get or create a connection to a broker
58- """
59- if (broker .host , broker .port ) not in self .conns :
60- self .conns [(broker .host , broker .port )] = \
61- KafkaConnection (broker .host , broker .port , timeout = self .timeout )
62-
63- return self ._get_conn (broker .host , broker .port )
64-
6560 def _get_leader_for_partition (self , topic , partition ):
6661 """
6762 Returns the leader for a partition or None if the partition exists
@@ -99,10 +94,9 @@ def _send_broker_unaware_request(self, requestId, request):
9994 conn .send (requestId , request )
10095 response = conn .recv (requestId )
10196 return response
102- except Exception , e :
97+ except Exception as e :
10398 log .warning ("Could not send request [%r] to server %s:%i, "
10499 "trying next server: %s" % (request , host , port , e ))
105- continue
106100
107101 raise KafkaUnavailableError ("All servers failed to process request" )
108102
@@ -130,7 +124,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
130124
131125 # Group the requests by topic+partition
132126 original_keys = []
133- payloads_by_broker = defaultdict (list )
127+ payloads_by_broker = collections . defaultdict (list )
134128
135129 for payload in payloads :
136130 leader = self ._get_leader_for_partition (payload .topic ,
@@ -151,7 +145,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
151145
152146 # For each broker, send the list of request payloads
153147 for broker , payloads in payloads_by_broker .items ():
154- conn = self ._get_conn_for_broker (broker )
148+ conn = self ._get_conn (broker . host , broker . port )
155149 requestId = self ._next_id ()
156150 request = encoder_fn (client_id = self .client_id ,
157151 correlation_id = requestId , payloads = payloads )
@@ -164,11 +158,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
164158 continue
165159 try :
166160 response = conn .recv (requestId )
167- except ConnectionError , e :
161+ except ConnectionError as e :
168162 log .warning ("Could not receive response to request [%s] "
169163 "from server %s: %s" , request , conn , e )
170164 failed = True
171- except ConnectionError , e :
165+ except ConnectionError as e :
172166 log .warning ("Could not send request [%s] to server %s: %s" ,
173167 request , conn , e )
174168 failed = True
@@ -191,16 +185,11 @@ def __repr__(self):
191185 return '<KafkaClient client_id=%s>' % (self .client_id )
192186
193187 def _raise_on_response_error (self , resp ):
194- if resp .error == ErrorMapping .NO_ERROR :
195- return
196-
197- if resp .error in (ErrorMapping .UNKNOWN_TOPIC_OR_PARTITON ,
198- ErrorMapping .NOT_LEADER_FOR_PARTITION ):
188+ try :
189+ kafka .common .check_error (resp )
190+ except (UnknownTopicOrPartitionError , NotLeaderForPartitionError ) as e :
199191 self .reset_topic_metadata (resp .topic )
200-
201- raise BrokerResponseError (
202- "Request for %s failed with errorcode=%d (%s)" %
203- (TopicAndPartition (resp .topic , resp .partition ), resp .error , ErrorStrings [resp .error ]))
192+ raise
204193
205194 #################
206195 # Public API #
0 commit comments