Skip to content

Commit 9fdaaac

Browse files
committed
Slightly faster unpacking
1 parent b9a6e67 commit 9fdaaac

File tree

4 files changed

+262
-245
lines changed

4 files changed

+262
-245
lines changed

neo4j/v1/bolt.py

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,42 @@
8080
log_error = log.error
8181

8282

83+
class BufferingSocket(object):
84+
85+
def __init__(self, socket):
86+
self.socket = socket
87+
self.buffer = bytearray()
88+
89+
def fill(self):
90+
ready_to_read, _, _ = select((self.socket,), (), (), 0)
91+
received = self.socket.recv(65539)
92+
if received:
93+
if __debug__:
94+
log_debug("S: b%r", received)
95+
self.buffer[len(self.buffer):] = received
96+
else:
97+
if ready_to_read is not None:
98+
raise ProtocolError("Server closed connection")
99+
100+
def read_message(self):
101+
message_data = bytearray()
102+
p = 0
103+
size = -1
104+
while size != 0:
105+
while len(self.buffer) - p < 2:
106+
self.fill()
107+
size = 0x100 * self.buffer[p] + self.buffer[p + 1]
108+
p += 2
109+
if size > 0:
110+
while len(self.buffer) - p < size:
111+
self.fill()
112+
end = p + size
113+
message_data[len(message_data):] = self.buffer[p:end]
114+
p = end
115+
self.buffer = self.buffer[p:]
116+
return message_data
117+
118+
83119
class ChunkChannel(object):
84120
""" Reader/writer for chunked data.
85121
@@ -141,40 +177,6 @@ def send(self):
141177

142178
self.raw.seek(self.raw.truncate(0))
143179

144-
def _recv(self, size):
145-
# If data is needed, keep reading until all bytes have been received
146-
remaining = size - len(self._recv_buffer)
147-
ready_to_read = None
148-
while remaining > 0:
149-
# Read up to the required amount remaining
150-
b = self.socket.recv(8192)
151-
if b:
152-
if __debug__: log_debug("S: b%r", b)
153-
else:
154-
if ready_to_read is not None:
155-
raise ProtocolError("Server closed connection")
156-
remaining -= len(b)
157-
self._recv_buffer += b
158-
159-
# If more is required, wait for available network data
160-
if remaining > 0:
161-
ready_to_read, _, _ = select((self.socket,), (), (), 0)
162-
while not ready_to_read:
163-
ready_to_read, _, _ = select((self.socket,), (), (), 0)
164-
165-
# Split off the amount of data required and keep the rest in the buffer
166-
data, self._recv_buffer = self._recv_buffer[:size], self._recv_buffer[size:]
167-
return data
168-
169-
def chunk_reader(self):
170-
chunk_size = -1
171-
while chunk_size != 0:
172-
chunk_header = self._recv(2)
173-
chunk_size, = struct_unpack_from(">H", chunk_header)
174-
if chunk_size > 0:
175-
data = self._recv(chunk_size)
176-
yield data
177-
178180

179181
class Response(object):
180182
""" Subscriber object for a full response (zero or
@@ -207,9 +209,12 @@ class Connection(object):
207209
"""
208210

209211
def __init__(self, sock, **config):
212+
self.socket = sock
213+
self.buffering_socket = BufferingSocket(sock)
210214
self.defunct = False
211215
self.channel = ChunkChannel(sock)
212216
self.packer = Packer(self.channel)
217+
self.unpacker = Unpacker()
213218
self.responses = deque()
214219
self.closed = False
215220

@@ -317,33 +322,38 @@ def fetch(self):
317322
raise ProtocolError("Cannot read from a closed connection")
318323
if self.defunct:
319324
raise ProtocolError("Cannot read from a defunct connection")
320-
raw = BytesIO()
321-
unpack = Unpacker(raw).unpack
322325
try:
323-
raw.writelines(self.channel.chunk_reader())
326+
message_data = self.buffering_socket.read_message()
324327
except ProtocolError:
325328
self.defunct = True
326329
self.close()
327330
raise
328331
# Unpack from the raw byte stream and call the relevant message handler(s)
329-
raw.seek(0)
330-
response = self.responses[0]
331-
for signature, fields in unpack():
332-
if __debug__:
333-
log_info("S: %s %s", message_names[signature], " ".join(map(repr, fields)))
334-
if signature in SUMMARY:
335-
response.complete = True
336-
self.responses.popleft()
337-
if signature == FAILURE:
338-
self.acknowledge_failure()
339-
handler_name = "on_%s" % message_names[signature].lower()
340-
try:
341-
handler = getattr(response, handler_name)
342-
except AttributeError:
343-
pass
344-
else:
345-
handler(*fields)
346-
raw.close()
332+
self.unpacker.load(message_data)
333+
size, signature = self.unpacker.unpack_structure_header()
334+
fields = [self.unpacker.unpack() for _ in range(size)]
335+
336+
if __debug__:
337+
log_info("S: %s %r", message_names[signature], fields)
338+
339+
if signature == SUCCESS:
340+
response = self.responses.popleft()
341+
response.complete = True
342+
response.on_success(*fields)
343+
elif signature == RECORD:
344+
response = self.responses[0]
345+
response.on_record(*fields)
346+
elif signature == IGNORED:
347+
response = self.responses.popleft()
348+
response.complete = True
349+
response.on_ignored(*fields)
350+
elif signature == FAILURE:
351+
response = self.responses.popleft()
352+
response.complete = True
353+
self.acknowledge_failure()
354+
response.on_failure(*fields)
355+
else:
356+
raise ProtocolError("Unexpected response message with signature %02X" % signature)
347357

348358
def fetch_all(self):
349359
while self.responses:

0 commit comments

Comments
 (0)