@@ -190,29 +190,6 @@ def __init__(self, connection):
190190 self .connection = connection
191191 self .complete = False
192192
193- def deliver (self , messages ):
194- for message in messages :
195- signature = message .signature
196- fields = tuple (message )
197- if __debug__ :
198- log_info ("S: %s %s" , message_names [signature ], " " .join (map (repr , fields )))
199- handler_name = "on_%s" % message_names [signature ].lower ()
200- try :
201- handler = getattr (self , handler_name )
202- except AttributeError :
203- pass
204- else :
205- handler (* fields )
206- if signature in SUMMARY :
207- self .complete = True
208- if signature == FAILURE :
209- def on_failure (metadata ):
210- raise ProtocolError ("Could not acknowledge failure" )
211-
212- subscriber = Response (self )
213- subscriber .on_failure = on_failure
214- self .connection .append (ACK_FAILURE , response = subscriber )
215-
216193 def on_record (self , values ):
217194 pass
218195
@@ -226,6 +203,12 @@ def on_ignored(self, metadata):
226203 pass
227204
228205
206+ class AckFailureResponse (Response ):
207+
208+ def on_failure (self , metadata ):
209+ raise ProtocolError ("Could not acknowledge failure" )
210+
211+
229212class Connection (object ):
230213 """ Server connection through which all protocol messages
231214 are sent and received. This class is designed for protocol
@@ -278,12 +261,26 @@ def fetch_next(self):
278261 unpack = Unpacker (raw ).unpack
279262 raw .writelines (self .channel .chunk_reader ())
280263
281- # Unpack the message from the raw byte stream into the inbox
264+ # Unpack from the raw byte stream and call the relevant message handler(s)
282265 raw .seek (0 )
283266 response = self .responses [0 ]
284- response .deliver (unpack ())
285- if response .complete :
286- self .responses .popleft ()
267+ for message in unpack ():
268+ signature = message .signature
269+ fields = tuple (message )
270+ if __debug__ :
271+ log_info ("S: %s %s" , message_names [signature ], " " .join (map (repr , fields )))
272+ handler_name = "on_%s" % message_names [signature ].lower ()
273+ try :
274+ handler = getattr (response , handler_name )
275+ except AttributeError :
276+ pass
277+ else :
278+ handler (* fields )
279+ if signature in SUMMARY :
280+ response .complete = True
281+ self .responses .popleft ()
282+ if signature == FAILURE :
283+ self .append (ACK_FAILURE , response = AckFailureResponse (self ))
287284 raw .close ()
288285
289286 def fetch_all (self , response ):
0 commit comments