Skip to content

Commit 4a77f28

Browse files
committed
Refactored out shared code across protocol versions
1 parent 4036511 commit 4a77f28

File tree

6 files changed

+227
-479
lines changed

6 files changed

+227
-479
lines changed

neo4j/io/_bolt3.py

Lines changed: 7 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
# limitations under the License.
2020

2121
from collections import deque
22-
from select import select
2322
from ssl import SSLSocket
24-
from struct import pack as struct_pack
2523
from time import perf_counter
2624
from neo4j.api import (
2725
Version,
2826
READ_ACCESS,
29-
WRITE_ACCESS,
3027
DEFAULT_DATABASE,
3128
)
32-
from neo4j.io._courier import MessageInbox
29+
from neo4j.io._common import (
30+
Inbox,
31+
Outbox,
32+
Response,
33+
InitResponse,
34+
CommitResponse,
35+
)
3336
from neo4j.meta import get_user_agent
3437
from neo4j.exceptions import (
35-
Neo4jError,
3638
AuthError,
3739
ServiceUnavailable,
3840
DatabaseUnavailable,
@@ -53,7 +55,6 @@
5355
Bolt,
5456
BoltPool,
5557
)
56-
from neo4j.conf import PoolConfig
5758
from neo4j.api import ServerInfo
5859
from neo4j.addressing import Address
5960

@@ -430,133 +431,3 @@ def closed(self):
430431

431432
def defunct(self):
432433
return self._defunct
433-
434-
435-
class Outbox:
436-
437-
def __init__(self, capacity=8192, max_chunk_size=16384):
438-
self._max_chunk_size = max_chunk_size
439-
self._header = 0
440-
self._start = 2
441-
self._end = 2
442-
self._data = bytearray(capacity)
443-
444-
def max_chunk_size(self):
445-
return self._max_chunk_size
446-
447-
def clear(self):
448-
self._header = 0
449-
self._start = 2
450-
self._end = 2
451-
self._data[0:2] = b"\x00\x00"
452-
453-
def write(self, b):
454-
to_write = len(b)
455-
max_chunk_size = self._max_chunk_size
456-
pos = 0
457-
while to_write > 0:
458-
chunk_size = self._end - self._start
459-
remaining = max_chunk_size - chunk_size
460-
if remaining == 0 or remaining < to_write <= max_chunk_size:
461-
self.chunk()
462-
else:
463-
wrote = min(to_write, remaining)
464-
new_end = self._end + wrote
465-
self._data[self._end:new_end] = b[pos:pos+wrote]
466-
self._end = new_end
467-
pos += wrote
468-
new_chunk_size = self._end - self._start
469-
self._data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size)
470-
to_write -= wrote
471-
472-
def chunk(self):
473-
self._header = self._end
474-
self._start = self._header + 2
475-
self._end = self._start
476-
self._data[self._header:self._start] = b"\x00\x00"
477-
478-
def view(self):
479-
end = self._end
480-
chunk_size = end - self._start
481-
if chunk_size == 0:
482-
return memoryview(self._data[:self._header])
483-
else:
484-
return memoryview(self._data[:end])
485-
486-
487-
class Inbox(MessageInbox):
488-
489-
def __next__(self):
490-
tag, fields = self.pop()
491-
if tag == b"\x71":
492-
return fields, None, None
493-
elif fields:
494-
return [], tag, fields[0]
495-
else:
496-
return [], tag, None
497-
498-
499-
class Response:
500-
""" Subscriber object for a full response (zero or
501-
more detail messages followed by one summary message).
502-
"""
503-
504-
def __init__(self, connection, **handlers):
505-
self.connection = connection
506-
self.handlers = handlers
507-
self.complete = False
508-
509-
def on_records(self, records):
510-
""" Called when one or more RECORD messages have been received.
511-
"""
512-
handler = self.handlers.get("on_records")
513-
if callable(handler):
514-
handler(records)
515-
516-
def on_success(self, metadata):
517-
""" Called when a SUCCESS message has been received.
518-
"""
519-
handler = self.handlers.get("on_success")
520-
if callable(handler):
521-
handler(metadata)
522-
handler = self.handlers.get("on_summary")
523-
if callable(handler):
524-
handler()
525-
526-
def on_failure(self, metadata):
527-
""" Called when a FAILURE message has been received.
528-
"""
529-
self.connection.reset()
530-
handler = self.handlers.get("on_failure")
531-
if callable(handler):
532-
handler(metadata)
533-
handler = self.handlers.get("on_summary")
534-
if callable(handler):
535-
handler()
536-
raise Neo4jError.hydrate(**metadata)
537-
538-
def on_ignored(self, metadata=None):
539-
""" Called when an IGNORED message has been received.
540-
"""
541-
handler = self.handlers.get("on_ignored")
542-
if callable(handler):
543-
handler(metadata)
544-
handler = self.handlers.get("on_summary")
545-
if callable(handler):
546-
handler()
547-
548-
549-
class InitResponse(Response):
550-
551-
def on_failure(self, metadata):
552-
code = metadata.get("code")
553-
message = metadata.get("message", "Connection initialisation failed")
554-
if code == "Neo.ClientError.Security.Unauthorized":
555-
raise AuthError(message)
556-
else:
557-
raise ServiceUnavailable(message)
558-
559-
560-
class CommitResponse(Response):
561-
562-
pass

neo4j/io/_bolt4x0.py

Lines changed: 7 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@
1919
# limitations under the License.
2020

2121
from collections import deque
22-
from select import select
2322
from ssl import SSLSocket
24-
from struct import pack as struct_pack
2523
from time import perf_counter
2624
from neo4j.api import (
2725
Version,
2826
READ_ACCESS,
29-
WRITE_ACCESS,
3027
DEFAULT_DATABASE,
3128
SYSTEM_DATABASE,
3229
)
33-
from neo4j.io._courier import MessageInbox
30+
from neo4j.io._common import (
31+
Inbox,
32+
Outbox,
33+
Response,
34+
InitResponse,
35+
CommitResponse,
36+
)
3437
from neo4j.meta import get_user_agent
3538
from neo4j.exceptions import (
36-
Neo4jError,
3739
AuthError,
3840
ServiceUnavailable,
3941
DatabaseUnavailable,
@@ -53,7 +55,6 @@
5355
Bolt,
5456
BoltPool,
5557
)
56-
from neo4j.conf import PoolConfig
5758
from neo4j.api import ServerInfo
5859
from neo4j.addressing import Address
5960

@@ -444,135 +445,3 @@ def closed(self):
444445

445446
def defunct(self):
446447
return self._defunct
447-
448-
449-
class Outbox:
450-
451-
def __init__(self, capacity=8192, max_chunk_size=16384):
452-
self._max_chunk_size = max_chunk_size
453-
self._header = 0
454-
self._start = 2
455-
self._end = 2
456-
self._data = bytearray(capacity)
457-
458-
def max_chunk_size(self):
459-
return self._max_chunk_size
460-
461-
def clear(self):
462-
self._header = 0
463-
self._start = 2
464-
self._end = 2
465-
self._data[0:2] = b"\x00\x00"
466-
467-
def write(self, b):
468-
to_write = len(b)
469-
max_chunk_size = self._max_chunk_size
470-
pos = 0
471-
while to_write > 0:
472-
chunk_size = self._end - self._start
473-
remaining = max_chunk_size - chunk_size
474-
if remaining == 0 or remaining < to_write <= max_chunk_size:
475-
self.chunk()
476-
else:
477-
wrote = min(to_write, remaining)
478-
new_end = self._end + wrote
479-
self._data[self._end:new_end] = b[pos:pos+wrote]
480-
self._end = new_end
481-
pos += wrote
482-
new_chunk_size = self._end - self._start
483-
self._data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size)
484-
to_write -= wrote
485-
486-
def chunk(self):
487-
self._header = self._end
488-
self._start = self._header + 2
489-
self._end = self._start
490-
self._data[self._header:self._start] = b"\x00\x00"
491-
492-
def view(self):
493-
end = self._end
494-
chunk_size = end - self._start
495-
if chunk_size == 0:
496-
return memoryview(self._data[:self._header])
497-
else:
498-
return memoryview(self._data[:end])
499-
500-
501-
class Inbox(MessageInbox):
502-
503-
def __next__(self):
504-
tag, fields = self.pop()
505-
if tag == b"\x71":
506-
return fields, None, None
507-
elif fields:
508-
return [], tag, fields[0]
509-
else:
510-
return [], tag, None
511-
512-
513-
class Response:
514-
""" Subscriber object for a full response (zero or
515-
more detail messages followed by one summary message).
516-
"""
517-
518-
def __init__(self, connection, **handlers):
519-
self.connection = connection
520-
self.handlers = handlers
521-
self.complete = False
522-
523-
def on_records(self, records):
524-
""" Called when one or more RECORD messages have been received.
525-
"""
526-
handler = self.handlers.get("on_records")
527-
if callable(handler):
528-
handler(records)
529-
530-
def on_success(self, metadata):
531-
""" Called when a SUCCESS message has been received.
532-
"""
533-
handler = self.handlers.get("on_success")
534-
if callable(handler):
535-
handler(metadata)
536-
537-
if not metadata.get("has_more"):
538-
handler = self.handlers.get("on_summary")
539-
if callable(handler):
540-
handler()
541-
542-
def on_failure(self, metadata):
543-
""" Called when a FAILURE message has been received.
544-
"""
545-
self.connection.reset()
546-
handler = self.handlers.get("on_failure")
547-
if callable(handler):
548-
handler(metadata)
549-
handler = self.handlers.get("on_summary")
550-
if callable(handler):
551-
handler()
552-
raise Neo4jError.hydrate(**metadata)
553-
554-
def on_ignored(self, metadata=None):
555-
""" Called when an IGNORED message has been received.
556-
"""
557-
handler = self.handlers.get("on_ignored")
558-
if callable(handler):
559-
handler(metadata)
560-
handler = self.handlers.get("on_summary")
561-
if callable(handler):
562-
handler()
563-
564-
565-
class InitResponse(Response):
566-
567-
def on_failure(self, metadata):
568-
code = metadata.get("code")
569-
message = metadata.get("message", "Connection initialisation failed")
570-
if code == "Neo.ClientError.Security.Unauthorized":
571-
raise AuthError(message)
572-
else:
573-
raise ServiceUnavailable(message)
574-
575-
576-
class CommitResponse(Response):
577-
578-
pass

0 commit comments

Comments
 (0)