1919# limitations under the License.
2020
2121from collections import deque
22+ from logging import getLogger
2223from ssl import SSLSocket
2324from time import perf_counter
25+
26+ from neo4j ._exceptions import (
27+ BoltError ,
28+ BoltProtocolError ,
29+ )
30+ from neo4j .addressing import Address
2431from neo4j .api import (
25- Version ,
2632 READ_ACCESS ,
33+ ServerInfo ,
34+ Version ,
2735)
28- from neo4j .io ._common import (
29- Inbox ,
30- Outbox ,
31- Response ,
32- InitResponse ,
33- CommitResponse ,
34- )
35- from neo4j .meta import get_user_agent
3636from neo4j .exceptions import (
3737 AuthError ,
38- DatabaseUnavailable ,
3938 ConfigurationError ,
39+ DatabaseUnavailable ,
40+ DriverError ,
4041 ForbiddenOnReadOnlyDatabase ,
4142 IncompleteCommit ,
4243 NotALeader ,
4344 ServiceUnavailable ,
4445 SessionExpired ,
4546)
46- from neo4j ._exceptions import BoltProtocolError
47- from neo4j .packstream import (
48- Unpacker ,
49- Packer ,
50- )
5147from neo4j .io import (
48+ check_supported_server_product ,
5249 Bolt ,
5350 BoltPool ,
54- check_supported_server_product ,
5551)
56- from neo4j .api import ServerInfo
57- from neo4j .addressing import Address
52+ from neo4j .io ._common import (
53+ CommitResponse ,
54+ Inbox ,
55+ InitResponse ,
56+ Outbox ,
57+ Response ,
58+ )
59+ from neo4j .meta import get_user_agent
60+ from neo4j .packstream import (
61+ Packer ,
62+ Unpacker ,
63+ )
5864
59- from logging import getLogger
6065log = getLogger ("neo4j" )
6166
6267
@@ -85,7 +90,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8590 self .socket = sock
8691 self .server_info = ServerInfo (Address (sock .getpeername ()), self .PROTOCOL_VERSION )
8792 self .outbox = Outbox ()
88- self .inbox = Inbox (self .socket , on_error = self ._set_defunct )
93+ self .inbox = Inbox (self .socket , on_error = self ._set_defunct_read )
8994 self .packer = Packer (self .outbox )
9095 self .unpacker = Unpacker (self .inbox )
9196 self .responses = deque ()
@@ -135,7 +140,7 @@ def der_encoded_server_certificate(self):
135140 def local_port (self ):
136141 try :
137142 return self .socket .getsockname ()[1 ]
138- except IOError :
143+ except OSError :
139144 return 0
140145
141146 def get_base_headers (self ):
@@ -292,7 +297,10 @@ def fail(metadata):
292297 def _send_all (self ):
293298 data = self .outbox .view ()
294299 if data :
295- self .socket .sendall (data )
300+ try :
301+ self .socket .sendall (data )
302+ except OSError as error :
303+ self ._set_defunct_write (error )
296304 self .outbox .clear ()
297305
298306 def send_all (self ):
@@ -306,17 +314,7 @@ def send_all(self):
306314 raise ServiceUnavailable ("Failed to write to defunct connection {!r} ({!r})" .format (
307315 self .unresolved_address , self .server_info .address ))
308316
309- try :
310- self ._send_all ()
311- except (IOError , OSError ) as error :
312- log .error ("Failed to write data to connection "
313- "{!r} ({!r}); ({!r})" .
314- format (self .unresolved_address ,
315- self .server_info .address ,
316- "; " .join (map (repr , error .args ))))
317- if self .pool :
318- self .pool .deactivate (address = self .unresolved_address )
319- raise
317+ self ._send_all ()
320318
321319 def fetch_message (self ):
322320 """ Receive at least one message from the server, if available.
@@ -336,17 +334,7 @@ def fetch_message(self):
336334 return 0 , 0
337335
338336 # Receive exactly one message
339- try :
340- details , summary_signature , summary_metadata = next (self .inbox )
341- except (IOError , OSError ) as error :
342- log .error ("Failed to read data from connection "
343- "{!r} ({!r}); ({!r})" .
344- format (self .unresolved_address ,
345- self .server_info .address ,
346- "; " .join (map (repr , error .args ))))
347- if self .pool :
348- self .pool .deactivate (address = self .unresolved_address )
349- raise
337+ details , summary_signature , summary_metadata = next (self .inbox )
350338
351339 if details :
352340 log .debug ("[#%04X] S: RECORD * %d" , self .local_port , len (details )) # Do not log any data
@@ -380,11 +368,20 @@ def fetch_message(self):
380368
381369 return len (details ), 1
382370
383- def _set_defunct (self , error = None ):
384- direct_driver = isinstance (self .pool , BoltPool )
371+ def _set_defunct_read (self , error = None ):
372+ message = "Failed to read from defunct connection {!r} ({!r})" .format (
373+ self .unresolved_address , self .server_info .address
374+ )
375+ self ._set_defunct (message , error = error )
385376
386- message = ("Failed to read from defunct connection {!r} ({!r})" .format (
387- self .unresolved_address , self .server_info .address ))
377+ def _set_defunct_write (self , error = None ):
378+ message = "Failed to write data to connection {!r} ({!r})" .format (
379+ self .unresolved_address , self .server_info .address
380+ )
381+ self ._set_defunct (message , error = error )
382+
383+ def _set_defunct (self , message , error = None ):
384+ direct_driver = isinstance (self .pool , BoltPool )
388385
389386 if error :
390387 log .error (str (error ))
@@ -445,12 +442,12 @@ def close(self):
445442 self ._append (b"\x02 " , ())
446443 try :
447444 self ._send_all ()
448- except :
445+ except ( OSError , BoltError , DriverError ) :
449446 pass
450447 log .debug ("[#%04X] C: <CLOSE>" , self .local_port )
451448 try :
452449 self .socket .close ()
453- except IOError :
450+ except OSError :
454451 pass
455452 finally :
456453 self ._closed = True
0 commit comments