Skip to content

Commit 459e981

Browse files
bolt 4.1 now adds routing context to the hello message (#439)
1 parent 810bee6 commit 459e981

15 files changed

+107
-39
lines changed

neo4j/io/__init__.py

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,13 @@ def ping(cls, address, *, timeout=None, **config):
177177
return protocol_version
178178

179179
@classmethod
180-
def open(cls, address, *, auth=None, timeout=None, **pool_config):
180+
def open(cls, address, *, auth=None, timeout=None, routing_context=None, **pool_config):
181181
""" Open a new Bolt connection to a given server address.
182182
183183
:param address:
184184
:param auth:
185-
:param timeout: The connection timeout
185+
:param timeout: the connection timeout in seconds
186+
:param routing_context: dict containing routing context
186187
:param pool_config:
187188
:return:
188189
:raise BoltHandshakeError: raised if the Bolt Protocol can not negotiate a protocol version.
@@ -200,15 +201,15 @@ def open(cls, address, *, auth=None, timeout=None, **pool_config):
200201
if pool_config.protocol_version == (3, 0):
201202
# Carry out Bolt subclass imports locally to avoid circular dependency issues.
202203
from neo4j.io._bolt3 import Bolt3
203-
connection = Bolt3(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent)
204+
connection = Bolt3(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent, routing_context=routing_context)
204205
elif pool_config.protocol_version == (4, 0):
205206
# Carry out Bolt subclass imports locally to avoid circular dependency issues.
206207
from neo4j.io._bolt4x0 import Bolt4x0
207-
connection = Bolt4x0(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent)
208+
connection = Bolt4x0(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent, routing_context=routing_context)
208209
elif pool_config.protocol_version == (4, 1):
209210
# Carry out Bolt subclass imports locally to avoid circular dependency issues.
210211
from neo4j.io._bolt4x1 import Bolt4x1
211-
connection = Bolt4x1(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent)
212+
connection = Bolt4x1(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent, routing_context=routing_context)
212213
else:
213214
log.debug("[#%04X] S: <CLOSE>", s.getpeername()[1])
214215
s.shutdown(SHUT_RDWR)
@@ -499,27 +500,35 @@ def close(self):
499500
class BoltPool(IOPool):
500501

501502
@classmethod
502-
def open(cls, address, *, auth, pool_config, workspace_config):
503+
def open(cls, address, *, auth, pool_config, workspace_config, routing_context=None):
503504
"""Create a new BoltPool
504505
505506
:param address:
506507
:param auth:
507508
:param pool_config:
508509
:param workspace_config:
510+
:param routing_context:
509511
:return: BoltPool
510512
"""
511513

514+
if routing_context is None:
515+
routing_context = {}
516+
elif "address" in routing_context:
517+
raise ConfigurationError("The key 'address' is reserved for routing context.")
518+
routing_context["address"] = str(address)
519+
512520
def opener(addr, timeout):
513-
return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config)
521+
return Bolt.open(addr, auth=auth, timeout=timeout, routing_context=routing_context, **pool_config)
514522

515-
pool = cls(opener, pool_config, workspace_config, address)
523+
pool = cls(opener, pool_config, workspace_config, routing_context, address)
516524
seeds = [pool.acquire() for _ in range(pool_config.init_size)]
517525
pool.release(*seeds)
518526
return pool
519527

520-
def __init__(self, opener, pool_config, workspace_config, address):
528+
def __init__(self, opener, pool_config, workspace_config, routing_context, address):
521529
super(BoltPool, self).__init__(opener, pool_config, workspace_config)
522530
self.address = address
531+
self.routing_context = routing_context
523532

524533
def __repr__(self):
525534
return "<{} address={!r}>".format(self.__class__.__name__, self.address)
@@ -545,10 +554,17 @@ def open(cls, *addresses, auth, pool_config, workspace_config, routing_context=N
545554
:return: Neo4jPool
546555
"""
547556

557+
address = addresses[0]
558+
if routing_context is None:
559+
routing_context = {}
560+
elif "address" in routing_context:
561+
raise ConfigurationError("The key 'address' is reserved for routing context.")
562+
routing_context["address"] = str(address)
563+
548564
def opener(addr, timeout):
549-
return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config)
565+
return Bolt.open(addr, auth=auth, timeout=timeout, routing_context=routing_context, **pool_config)
550566

551-
pool = cls(opener, pool_config, workspace_config, routing_context, addresses)
567+
pool = cls(opener, pool_config, workspace_config, routing_context, address)
552568

553569
try:
554570
pool.update_routing_table(database=workspace_config.database)
@@ -558,7 +574,7 @@ def opener(addr, timeout):
558574
else:
559575
return pool
560576

561-
def __init__(self, opener, pool_config, workspace_config, routing_context, addresses):
577+
def __init__(self, opener, pool_config, workspace_config, routing_context, address):
562578
"""
563579
564580
:param opener:
@@ -569,15 +585,10 @@ def __init__(self, opener, pool_config, workspace_config, routing_context, addre
569585
"""
570586
super(Neo4jPool, self).__init__(opener, pool_config, workspace_config)
571587
# Each database have a routing table, the default database is a special case.
572-
log.debug("[#0000] C: <NEO4J POOL> routing addresses %r", addresses)
573-
self.init_address = addresses[0]
574-
self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=addresses)}
588+
log.debug("[#0000] C: <NEO4J POOL> routing address %r", address)
589+
self.address = address
590+
self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=[address])}
575591
self.routing_context = routing_context
576-
if self.routing_context is None:
577-
self.routing_context = {}
578-
elif "address" in self.routing_context:
579-
raise ConfigurationError("The key 'address' is reserved for routing context.")
580-
self.routing_context["address"] = str(self.init_address)
581592
self.refresh_lock = Lock()
582593

583594
def __repr__(self):
@@ -621,7 +632,7 @@ def fetch_routing_info(self, *, address, timeout, database):
621632
:param address: router address
622633
:param timeout: seconds
623634
:param database: the data base name to get routing table for
624-
:param init_address: the address by which the client initially contacted the server as a hint for inclusion in the returned routing table.
635+
:param address: the address by which the client initially contacted the server as a hint for inclusion in the returned routing table.
625636
626637
:return: list of routing records or
627638
None if no connection could be established

neo4j/io/_bolt3.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class Bolt3(Bolt):
7676
#: The pool of which this connection is a member
7777
pool = None
7878

79-
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None):
79+
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None, routing_context=None):
8080
self.unresolved_address = unresolved_address
8181
self.socket = sock
8282
self.server_info = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION)
@@ -90,6 +90,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
9090
self.supports_multiple_results = False
9191
self.supports_multiple_databases = False
9292
self._is_reset = True
93+
self.routing_context = routing_context
9394

9495
# Determine the user agent
9596
if user_agent:

neo4j/io/_bolt4x0.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class Bolt4x0(Bolt):
7575
#: The pool of which this connection is a member
7676
pool = None
7777

78-
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None):
78+
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None, routing_context=None):
7979
self.unresolved_address = unresolved_address
8080
self.socket = sock
8181
self.server_info = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION)
@@ -89,6 +89,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8989
self.supports_multiple_results = True
9090
self.supports_multiple_databases = True
9191
self._is_reset = True
92+
self.routing_context = routing_context
9293

9394
# Determine the user agent
9495
if user_agent:

neo4j/io/_bolt4x1.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class Bolt4x1(Bolt):
7575
#: The pool of which this connection is a member
7676
pool = None
7777

78-
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None):
78+
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None, routing_context=None):
7979
self.unresolved_address = unresolved_address
8080
self.socket = sock
8181
self.server_info = ServerInfo(Address(sock.getpeername()), Bolt4x1.PROTOCOL_VERSION)
@@ -89,6 +89,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8989
self.supports_multiple_results = True
9090
self.supports_multiple_databases = True
9191
self._is_reset = True
92+
self.routing_context = routing_context
9293

9394
# Determine the user agent
9495
if user_agent:
@@ -135,6 +136,7 @@ def local_port(self):
135136
def hello(self):
136137
headers = {"user_agent": self.user_agent}
137138
headers.update(self.auth_dict)
139+
headers["routing"] = self.routing_context
138140
logged_headers = dict(headers)
139141
if "credentials" in logged_headers:
140142
logged_headers["credentials"] = "*******"

tests/stub/conftest.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import subprocess
2323
import os
24+
import time
2425

2526
from threading import Thread
2627
from time import sleep
@@ -44,18 +45,20 @@ def __init__(self, port, script):
4445
def run(self):
4546
self._process = subprocess.Popen(["python", "-m", "boltkit", "stub", "-v", "-l", ":{}".format(str(self.port)), "-t", "10", self.script], stdout=subprocess.PIPE)
4647
# Need verbose for this to work
47-
line = self._process.stdout.readline()
48+
line = self._process.stdout.readline().decode("utf-8")
49+
log.debug("started stub server {}".format(self.port))
50+
log.debug(line.strip("\n"))
4851

4952
def wait(self):
50-
try:
51-
returncode = self._process.wait(2)
52-
if returncode != 0:
53-
log.debug("stubserver return code {}".format(returncode))
54-
log.debug("check for miss match in script")
55-
return returncode == 0
56-
except subprocess.TimeoutExpired:
57-
log.debug("stubserver timeout!")
58-
return False
53+
while True:
54+
return_code = self._process.poll()
55+
if return_code is not None:
56+
line = self._process.stdout.readline().decode("utf-8")
57+
if line == "":
58+
break
59+
log.debug(line.strip("\n"))
60+
61+
return True
5962

6063
def kill(self):
6164
# Kill process if not already dead
@@ -129,6 +132,7 @@ class DefaultBoltStubService(BoltStubService):
129132
class StubCluster(StubCluster):
130133

131134
def __init__(self, *servers):
135+
print("")
132136
scripts = [os.path.join(os.path.dirname(__file__), "scripts", server) for server in servers]
133137

134138
bss = DefaultBoltStubService.load(*scripts)

tests/stub/scripts/v3/empty.script

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
!: AUTO HELLO
33
!: AUTO GOODBYE
44
!: AUTO RESET
5+
!: PORT 9001

tests/stub/scripts/v3/empty_explicit_hello_goodbye.script

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
!: BOLT 3
2+
!: PORT 9001
23

34
C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
45
S: SUCCESS {"server": "Neo4j/3.5.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
!: BOLT 4
22
!: AUTO HELLO
33
!: AUTO GOODBYE
4-
!: AUTO RESET
4+
!: AUTO RESET
5+
!: PORT 9001

tests/stub/scripts/v4x0/empty_explicit_hello_goodbye.script

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
!: BOLT 4
2+
!: PORT 9001
23

34
C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
45
S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
!: BOLT 4.1
2+
!: PORT 9001
23

3-
C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
4+
C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test", "routing": {"address": "localhost:9001"}}
45
S: SUCCESS {"server": "Neo4j/4.1.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"}
56
C: GOODBYE
67
S: <EXIT>

0 commit comments

Comments
 (0)