Skip to content

Commit 4b9ad52

Browse files
authored
Fix endless loop in Result.peek with fetch_size=1 (#587)
Additionally Implement `ResultSingle` and `ResultPeek` in TestKit backend. `ResultSingle` will be deactivated for now as TestKit expects the driver to raise an error if there is not exactly 1 records in the result stream. Currently, the driver only warns if there are more and returns None if there are none. This (breaking) fix will be introduced in 5.0.
1 parent 9c6e2c3 commit 4b9ad52

File tree

4 files changed

+54
-18
lines changed

4 files changed

+54
-18
lines changed

neo4j/work/result.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from neo4j.data import DataDehydrator
2626
from neo4j.io import ConnectionErrorHandler
2727
from neo4j.work.summary import ResultSummary
28+
from neo4j.exceptions import ResultConsumedError
2829

2930

3031
class Result:
@@ -192,20 +193,37 @@ def __iter__(self):
192193
self._closed = True
193194

194195
def _attach(self):
195-
"""Sets the Result object in an attached state by fetching messages from the connection to the buffer.
196+
"""Sets the Result object in an attached state by fetching messages from
197+
the connection to the buffer.
196198
"""
197199
if self._closed is False:
198200
while self._attached is False:
199201
self._connection.fetch_message()
200202

201-
def _buffer_all(self):
202-
"""Sets the Result object in an detached state by fetching all records from the connection to the buffer.
203+
def _buffer(self, n=None):
204+
"""Try to fill `self_record_buffer` with n records.
205+
206+
Might end up with more records in the buffer if the fetch size makes it
207+
overshoot.
208+
Might ent up with fewer records in the buffer if there are not enough
209+
records available.
203210
"""
204211
record_buffer = deque()
205212
for record in self:
206213
record_buffer.append(record)
214+
if n is not None and len(record_buffer) >= n:
215+
break
207216
self._closed = False
208-
self._record_buffer = record_buffer
217+
if n is None:
218+
self._record_buffer = record_buffer
219+
else:
220+
self._record_buffer.extend(record_buffer)
221+
222+
def _buffer_all(self):
223+
"""Sets the Result object in an detached state by fetching all records
224+
from the connection to the buffer.
225+
"""
226+
self._buffer()
209227

210228
def _obtain_summary(self):
211229
"""Obtain the summary of this result, buffering any remaining records.
@@ -278,6 +296,13 @@ def single(self):
278296
:returns: the next :class:`neo4j.Record` or :const:`None` if none remain
279297
:warns: if more than one record is available
280298
"""
299+
# TODO in 5.0 replace with this code that raises an error if there's not
300+
# exactly one record in the left result stream.
301+
# self._buffer(2).
302+
# if len(self._record_buffer) != 1:
303+
# raise SomeError("Expected exactly 1 record, found %i"
304+
# % len(self._record_buffer))
305+
# return self._record_buffer.popleft()
281306
records = list(self) # TODO: exhausts the result with self.consume if there are more records.
282307
size = len(records)
283308
if size == 0:
@@ -292,16 +317,9 @@ def peek(self):
292317
293318
:returns: the next :class:`.Record` or :const:`None` if none remain
294319
"""
320+
self._buffer(1)
295321
if self._record_buffer:
296322
return self._record_buffer[0]
297-
if not self._attached:
298-
return None
299-
while self._attached:
300-
self._connection.fetch_message()
301-
if self._record_buffer:
302-
return self._record_buffer[0]
303-
304-
return None
305323

306324
def graph(self):
307325
"""Return a :class:`neo4j.graph.Graph` instance containing all the graph objects

testkitbackend/requests.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,20 @@ def ResultNext(backend, data):
301301
backend.send_response("Record", totestkit.record(record))
302302

303303

304+
def ResultSingle(backend, data):
305+
result = backend.results[data["resultId"]]
306+
backend.send_response("Record", totestkit.record(result.single()))
307+
308+
309+
def ResultPeek(backend, data):
310+
result = backend.results[data["resultId"]]
311+
record = result.peek()
312+
if record is not None:
313+
backend.send_response("Record", totestkit.record(record))
314+
else:
315+
backend.send_response("NullRecord", {})
316+
317+
304318
def ResultConsume(backend, data):
305319
result = backend.results[data["resultId"]]
306320
summary = result.consume()

testkitbackend/test_config.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
"TLSv1.1 and below are disabled in the driver"
3939
},
4040
"features": {
41+
"Feature:API:Result.Single": "Does not raise error when not exactly one record is available. To be fixed in 5.0",
42+
"Feature:API:Result.Peek": true,
4143
"AuthorizationExpiredTreatment": true,
4244
"Optimization:ImplicitDefaultArguments": true,
4345
"Optimization:MinimalResets": true,

tests/unit/work/test_result.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,14 @@ def test_result_peek(records, fetch_size):
277277
connection = ConnectionStub(records=Records(["x"], records))
278278
result = Result(connection, HydratorStub(), fetch_size, noop, noop)
279279
result._run("CYPHER", {}, None, "r", None)
280-
record = result.peek()
281-
if not records:
282-
assert record is None
283-
else:
284-
assert isinstance(record, Record)
285-
assert record.get("x") == records[0][0]
280+
for i in range(len(records) + 1):
281+
record = result.peek()
282+
if i == len(records):
283+
assert record is None
284+
else:
285+
assert isinstance(record, Record)
286+
assert record.get("x") == records[i][0]
287+
next(iter(result)) # consume the record
286288

287289

288290
@pytest.mark.parametrize("records", ([[1], [2]], [[1]], []))

0 commit comments

Comments
 (0)