8383
8484class BufferingSocket (object ):
8585
86- def __init__ (self , socket ):
87- self .address = socket .getpeername ()
88- self .socket = socket
86+ def __init__ (self , connection ):
87+ self .connection = connection
88+ self .socket = connection .socket
89+ self .address = self .socket .getpeername ()
8990 self .buffer = bytearray ()
9091
9192 def fill (self ):
@@ -96,6 +97,10 @@ def fill(self):
9697 self .buffer [len (self .buffer ):] = received
9798 else :
9899 if ready_to_read is not None :
100+ # If this connection fails, remove this address from the
101+ # connection pool to which this connection belongs.
102+ if self .connection .pool :
103+ self .connection .pool .remove (self .address )
99104 raise ServiceUnavailable ("Failed to read from connection %r" % (self .address ,))
100105
101106 def read_message (self ):
@@ -211,9 +216,12 @@ class Connection(object):
211216 .. note:: logs at INFO level
212217 """
213218
219+ #: The pool of which this connection is a member
220+ pool = None
221+
214222 def __init__ (self , sock , ** config ):
215223 self .socket = sock
216- self .buffering_socket = BufferingSocket (sock )
224+ self .buffering_socket = BufferingSocket (self )
217225 self .address = sock .getpeername ()
218226 self .channel = ChunkChannel (sock )
219227 self .packer = Packer (self .channel )
@@ -411,6 +419,7 @@ def acquire(self, address):
411419 connection .in_use = True
412420 return connection
413421 connection = self .connector (address )
422+ connection .pool = self
414423 connection .in_use = True
415424 connections .append (connection )
416425 return connection
0 commit comments