Skip to content

Commit 5a720d0

Browse files
can handle noop messages (#436)
* can handle noop messages * changed the looping for getting a message
1 parent 4756bdd commit 5a720d0

File tree

6 files changed

+67
-30
lines changed

6 files changed

+67
-30
lines changed

neo4j/io/_courier.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,41 @@
2424
Unpacker,
2525
)
2626

27+
import logging
28+
log = logging.getLogger("neo4j")
29+
2730

2831
class MessageInbox:
2932

3033
def __init__(self, s, on_error):
3134
self.on_error = on_error
3235
self._messages = self._yield_messages(s)
3336

34-
@classmethod
35-
def _load_chunks(cls, sock, buffer):
36-
chunk_size = 0
37-
while True:
38-
if chunk_size == 0:
39-
buffer.receive(sock, 2)
40-
chunk_size = buffer.pop_u16()
41-
if chunk_size > 0:
42-
buffer.receive(sock, chunk_size + 2)
43-
yield chunk_size
44-
4537
def _yield_messages(self, sock):
4638
try:
4739
buffer = UnpackableBuffer()
48-
chunk_loader = self._load_chunks(sock, buffer)
4940
unpacker = Unpacker(buffer)
41+
chunk_size = 0
5042
while True:
51-
unpacker.reset()
52-
chunk_size = -1
53-
while chunk_size != 0:
54-
chunk_size = next(chunk_loader)
55-
size, tag = unpacker.unpack_structure_header()
56-
fields = [unpacker.unpack() for _ in range(size)]
57-
yield tag, fields
43+
44+
while chunk_size == 0:
45+
# Determine the chunk size and skip noop
46+
buffer.receive(sock, 2)
47+
chunk_size = buffer.pop_u16()
48+
if chunk_size == 0:
49+
log.debug("[#%04X] S: <NOOP>", sock.getsockname()[1])
50+
51+
buffer.receive(sock, chunk_size + 2)
52+
chunk_size = buffer.pop_u16()
53+
54+
if chunk_size == 0:
55+
# chunk_size was the end marker for the message
56+
size, tag = unpacker.unpack_structure_header()
57+
fields = [unpacker.unpack() for _ in range(size)]
58+
yield tag, fields
59+
# Reset for new message
60+
unpacker.reset()
61+
5862
except OSError as error:
5963
self.on_error(error)
6064

neo4j/packstream.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from io import BytesIO
2424
from struct import pack as struct_pack, unpack as struct_unpack
2525

26-
2726
PACKED_UINT_8 = [struct_pack(">B", value) for value in range(0x100)]
2827
PACKED_UINT_16 = [struct_pack(">H", value) for value in range(0x10000)]
2928

tests/integration/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
from neo4j.io import Bolt
3636

3737
# import logging
38+
# log = logging.getLogger("neo4j")
39+
#
3840
# from neo4j.debug import watch
3941
# watch("neo4j")
40-
#
41-
# log = logging.getLogger("neo4j")
4242

4343
NEO4J_RELEASES = getenv("NEO4J_RELEASES", "snapshot-enterprise 3.5-enterprise").split()
4444
NEO4J_HOST = "localhost"

tests/stub/conftest.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
from boltkit.server.stub import BoltStubService
2929
from pytest import fixture
3030

31-
# import logging
31+
import logging
32+
log = logging.getLogger("neo4j")
33+
3234
# from neo4j.debug import watch
3335
# watch("neo4j")
34-
#
35-
# log = logging.getLogger("neo4j")
3636

3737

3838
class StubServer:
@@ -44,16 +44,17 @@ def __init__(self, port, script):
4444
def run(self):
4545
self._process = subprocess.Popen(["python", "-m", "boltkit", "stub", "-v", "-l", ":{}".format(str(self.port)), "-t", "10", self.script], stdout=subprocess.PIPE)
4646
# Need verbose for this to work
47-
line =self._process.stdout.readline()
47+
line = self._process.stdout.readline()
4848

4949
def wait(self):
5050
try:
5151
returncode = self._process.wait(2)
5252
if returncode != 0:
53-
print("Stubserver failed with error")
53+
log.debug("stubserver return code {}".format(returncode))
54+
log.debug("check for miss match in script")
5455
return returncode == 0
5556
except subprocess.TimeoutExpired:
56-
print("Stubserver timeout!")
57+
log.debug("stubserver timeout!")
5758
return False
5859

5960
def kill(self):
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
!: BOLT 4.1
2+
!: AUTO GOODBYE
3+
!: AUTO RESET
4+
!: PORT 9001
5+
6+
C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
7+
S: SUCCESS {"server": "Neo4j/4.1.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"}
8+
C: RUN "RETURN 1 AS x" {} {"mode": "r"}
9+
PULL {"n": 2}
10+
S: SUCCESS {"fields": ["x"]}
11+
<NOOP>
12+
<NOOP>
13+
RECORD [1]
14+
<NOOP>
15+
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "r", "t_last": 5, "db": "neo4j"}

tests/stub/test_directdriver.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ def unwind(transaction):
543543
@pytest.mark.parametrize(
544544
"test_script, database",
545545
[
546-
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid
546+
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"),
547547
]
548548
)
549549
def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, test_script, database):
@@ -561,7 +561,7 @@ def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, tes
561561
@pytest.mark.parametrize(
562562
"test_script, database",
563563
[
564-
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid
564+
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"),
565565
]
566566
)
567567
def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, test_script, database):
@@ -575,3 +575,21 @@ def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, tes
575575
result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x")
576576
result.consume()
577577
transaction.commit()
578+
579+
580+
@pytest.mark.parametrize(
581+
"test_script",
582+
[
583+
"v4x1/return_1_noop_port_9001.script",
584+
]
585+
)
586+
def test_direct_can_handle_noop(driver_info, test_script):
587+
# python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_can_handle_noop
588+
with StubCluster(test_script):
589+
uri = "bolt://127.0.0.1:9001"
590+
with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver:
591+
assert isinstance(driver, BoltDriver)
592+
with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session:
593+
result = session.run("RETURN 1 AS x")
594+
value = result.single().value()
595+
assert value == 1

0 commit comments

Comments
 (0)