Skip to content

Commit 0473a2f

Browse files
author
Zhen
committed
Reads in absence of writer
1 parent 4f62301 commit 0473a2f

File tree

4 files changed

+141
-47
lines changed

4 files changed

+141
-47
lines changed

neo4j/v1/routing.py

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,12 @@ def __init__(self, routers=(), readers=(), writers=(), ttl=0):
132132
self.last_updated_time = self.timer()
133133
self.ttl = ttl
134134

135-
def is_fresh(self):
135+
def is_fresh(self, access_mode):
136136
""" Indicator for whether routing information is still usable.
137137
"""
138138
expired = self.last_updated_time + self.ttl <= self.timer()
139-
return not expired and len(self.routers) > 1 and self.readers and self.writers
139+
has_server_for_mode = (access_mode == READ_ACCESS and self.readers) or (access_mode == WRITE_ACCESS and self.writers)
140+
return not expired and len(self.routers) >= 1 and has_server_for_mode
140141

141142
def update(self, new_routing_table):
142143
""" Update the current routing table with new routing information
@@ -162,6 +163,7 @@ def __init__(self, connector, initial_address, routing_context, *routers):
162163
self.initial_address = initial_address
163164
self.routing_context = routing_context
164165
self.routing_table = RoutingTable(routers)
166+
self.missing_writer = False
165167
self.refresh_lock = Lock()
166168

167169
def routing_info_procedure(self, connection):
@@ -211,6 +213,11 @@ def fetch_routing_table(self, address):
211213
num_readers = len(new_routing_table.readers)
212214
num_writers = len(new_routing_table.writers)
213215

216+
# No writers are available. This likely indicates a temporary state,
217+
# such as leader switching, so we should not signal an error.
218+
# When no writers available, then we flag we are reading in absence of writer
219+
self.missing_writer = (num_writers == 0)
220+
214221
# No routers
215222
if num_routers == 0:
216223
raise ProtocolError("No routing servers returned from server %r" % (address,))
@@ -219,12 +226,6 @@ def fetch_routing_table(self, address):
219226
if num_readers == 0:
220227
raise ProtocolError("No read servers returned from server %r" % (address,))
221228

222-
# No writers
223-
if num_writers == 0:
224-
# No writers are available. This likely indicates a temporary state,
225-
# such as leader switching, so we should not signal an error.
226-
return None
227-
228229
# At least one of each is fine, so return this table
229230
return new_routing_table
230231

@@ -245,21 +246,30 @@ def update_routing_table(self):
245246
"""
246247
# copied because it can be modified
247248
copy_of_routers = list(self.routing_table.routers)
249+
250+
has_tried_initial_routers = False
251+
if self.missing_writer:
252+
has_tried_initial_routers = True
253+
if self.update_routing_table_with_routers(resolve(self.initial_address)):
254+
return
255+
248256
if self.update_routing_table_with_routers(copy_of_routers):
249257
return
250258

251-
initial_routers = resolve(self.initial_address)
252-
for router in copy_of_routers:
253-
if router in initial_routers:
254-
initial_routers.remove(router)
255-
if initial_routers:
256-
if self.update_routing_table_with_routers(initial_routers):
257-
return
259+
if not has_tried_initial_routers:
260+
initial_routers = resolve(self.initial_address)
261+
for router in copy_of_routers:
262+
if router in initial_routers:
263+
initial_routers.remove(router)
264+
if initial_routers:
265+
if self.update_routing_table_with_routers(initial_routers):
266+
return
267+
258268

259269
# None of the routers have been successful, so just fail
260270
raise ServiceUnavailable("Unable to retrieve routing information")
261271

262-
def refresh_routing_table(self):
272+
def ensure_routing_table(self, access_mode):
263273
""" Update the routing table if stale.
264274
265275
This method performs two freshness checks, before and after acquiring
@@ -272,10 +282,13 @@ def refresh_routing_table(self):
272282
273283
:return: `True` if an update was required, `False` otherwise.
274284
"""
275-
if self.routing_table.is_fresh():
285+
if self.routing_table.is_fresh(access_mode):
276286
return False
277287
with self.refresh_lock:
278-
if self.routing_table.is_fresh():
288+
if self.routing_table.is_fresh(access_mode):
289+
if access_mode == READ_ACCESS:
290+
# if reader is fresh but writers is not fresh, then we are reading in absence of writer
291+
self.missing_writer = not self.routing_table.is_fresh(WRITE_ACCESS)
279292
return False
280293
self.update_routing_table()
281294
return True
@@ -289,18 +302,21 @@ def acquire(self, access_mode=None):
289302
server_list = self.routing_table.writers
290303
else:
291304
raise ValueError("Unsupported access mode {}".format(access_mode))
305+
306+
self.ensure_routing_table(access_mode)
292307
while True:
293-
address = None
294-
while address is None:
295-
self.refresh_routing_table()
296-
address = next(server_list)
308+
address = next(server_list)
309+
if address is None:
310+
break
297311
try:
298312
connection = self.acquire_direct(address) # should always be a resolved address
299313
connection.Error = SessionExpired
300314
except ServiceUnavailable:
301315
self.remove(address)
302316
else:
303317
return connection
318+
raise SessionExpired("Failed to obtain connection towards '" + access_mode +
319+
"' server. Known routing table is: '" + self.routing_table + "'.")
304320

305321
def remove(self, address):
306322
""" Remove an address from the connection pool, if present, closing

test/stub/test_routing.py

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def test_should_get_table_from_router(self):
156156
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
157157
assert table.writers == {("127.0.0.1", 9006)}
158158
assert table.ttl == 300
159+
assert not pool.missing_writer
159160

160161
def test_null_info_should_return_null_table(self):
161162
address = ("127.0.0.1", 9001)
@@ -177,12 +178,17 @@ def test_no_readers_should_raise_protocol_error(self):
177178
with self.assertRaises(ProtocolError):
178179
_ = pool.fetch_routing_table(address)
179180

180-
def test_no_writers_should_return_null_table(self):
181+
def test_no_writers_should_return_table_with_no_writer(self):
181182
with StubCluster({9001: "router_no_writers.script"}):
182183
address = ("127.0.0.1", 9001)
183184
with RoutingPool() as pool:
184185
table = pool.fetch_routing_table(address)
185-
assert table is None
186+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
187+
("127.0.0.1", 9003)}
188+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
189+
assert not table.writers
190+
assert table.ttl == 300
191+
assert pool.missing_writer
186192

187193

188194
class RoutingConnectionPoolUpdateRoutingTableTestCase(StubTestCase):
@@ -211,6 +217,20 @@ def test_roll_back_to_initial_server_if_failed_update_with_existing_routers(self
211217
assert table.writers == {("127.0.0.1", 9006)}
212218
assert table.ttl == 300
213219

220+
def test_try_initial_server_first_if_missing_writer(self):
221+
with StubCluster({9001: "router.script"}):
222+
initial_address = ("127.0.0.1", 9001)
223+
with RoutingConnectionPool(connector, initial_address, {}) as pool:
224+
pool.missing_writer = True
225+
pool.update_routing_table()
226+
table = pool.routing_table
227+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
228+
("127.0.0.1", 9003)}
229+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
230+
assert table.writers == {("127.0.0.1", 9006)}
231+
assert table.ttl == 300
232+
assert not pool.missing_writer
233+
214234
def test_update_with_no_routers_should_signal_service_unavailable(self):
215235
with RoutingPool() as pool:
216236
with self.assertRaises(ServiceUnavailable):
@@ -226,7 +246,7 @@ def _test_server_outcome(self, server_outcomes, overall_outcome):
226246
routers = []
227247
for port, outcome in enumerate(server_outcomes, 9001):
228248
if outcome is None:
229-
servers[port] = "router_no_writers.script"
249+
servers[port] = "rude_router.script"
230250
elif outcome is RoutingTable:
231251
servers[port] = "router.script"
232252
elif outcome is ServiceUnavailable:
@@ -251,26 +271,37 @@ def _test_server_outcome(self, server_outcomes, overall_outcome):
251271
assert False, "Unexpected overall outcome %r" % overall_outcome
252272

253273

254-
class RoutingConnectionPoolRefreshRoutingTableTestCase(StubTestCase):
274+
class RoutingConnectionPoolEnsureRoutingTableTestCase(StubTestCase):
255275
def test_should_update_if_stale(self):
256276
with StubCluster({9001: "router.script"}):
257277
address = ("127.0.0.1", 9001)
258278
with RoutingPool(address) as pool:
259279
first_updated_time = pool.routing_table.last_updated_time
260280
pool.routing_table.ttl = 0
261-
pool.refresh_routing_table()
281+
pool.ensure_routing_table(WRITE_ACCESS)
262282
second_updated_time = pool.routing_table.last_updated_time
263283
assert second_updated_time != first_updated_time
284+
assert not pool.missing_writer
264285

265286
def test_should_not_update_if_fresh(self):
266287
with StubCluster({9001: "router.script"}):
267288
address = ("127.0.0.1", 9001)
268289
with RoutingPool(address) as pool:
269-
pool.refresh_routing_table()
290+
pool.ensure_routing_table(WRITE_ACCESS)
270291
first_updated_time = pool.routing_table.last_updated_time
271-
pool.refresh_routing_table()
292+
pool.ensure_routing_table(WRITE_ACCESS)
272293
second_updated_time = pool.routing_table.last_updated_time
273294
assert second_updated_time == first_updated_time
295+
assert not pool.missing_writer
296+
297+
def test_should_flag_reading_without_writer(self):
298+
with StubCluster({9001: "router_no_writers.script"}):
299+
address = ("127.0.0.1", 9001)
300+
with RoutingPool(address) as pool:
301+
assert not pool.routing_table.is_fresh(READ_ACCESS)
302+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
303+
pool.ensure_routing_table(READ_ACCESS)
304+
assert pool.missing_writer
274305

275306
# TODO: fix flaky test
276307
# def test_concurrent_refreshes_should_not_block_if_fresh(self):
@@ -372,65 +403,90 @@ def test_should_refresh(self):
372403
with StubCluster({9001: "router.script", 9004: "empty.script"}):
373404
address = ("127.0.0.1", 9001)
374405
with RoutingPool(address) as pool:
375-
assert not pool.routing_table.is_fresh()
406+
assert not pool.routing_table.is_fresh(READ_ACCESS)
376407
_ = pool.acquire(access_mode=READ_ACCESS)
377-
assert pool.routing_table.is_fresh()
408+
assert pool.routing_table.is_fresh(READ_ACCESS)
409+
assert not pool.missing_writer
378410

379411
def test_connected_to_reader(self):
380412
with StubCluster({9001: "router.script", 9004: "empty.script"}):
381413
address = ("127.0.0.1", 9001)
382414
with RoutingPool(address) as pool:
383-
assert not pool.routing_table.is_fresh()
415+
assert not pool.routing_table.is_fresh(READ_ACCESS)
384416
connection = pool.acquire(access_mode=READ_ACCESS)
385417
assert connection.server.address in pool.routing_table.readers
418+
assert not pool.missing_writer
386419

387420
def test_should_retry_if_first_reader_fails(self):
388421
with StubCluster({9001: "router.script",
389422
9004: "fail_on_init.script",
390423
9005: "empty.script"}):
391424
address = ("127.0.0.1", 9001)
392425
with RoutingPool(address) as pool:
393-
assert not pool.routing_table.is_fresh()
426+
assert not pool.routing_table.is_fresh(READ_ACCESS)
394427
_ = pool.acquire(access_mode=READ_ACCESS)
395428
assert ("127.0.0.1", 9004) not in pool.routing_table.readers
396429
assert ("127.0.0.1", 9005) in pool.routing_table.readers
397430

431+
def test_should_connect_to_read_in_absent_of_writer(self):
432+
with StubCluster({9001: "router_no_writers.script", 9004: "empty.script"}):
433+
address = ("127.0.0.1", 9001)
434+
with RoutingPool(address) as pool:
435+
assert not pool.routing_table.is_fresh(READ_ACCESS)
436+
connection = pool.acquire(access_mode=READ_ACCESS)
437+
assert connection.server.address in pool.routing_table.readers
438+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
439+
assert pool.missing_writer
440+
398441

399442
class RoutingConnectionPoolAcquireForWriteTestCase(StubTestCase):
400443
def test_should_refresh(self):
401444
with StubCluster({9001: "router.script", 9006: "empty.script"}):
402445
address = ("127.0.0.1", 9001)
403446
with RoutingPool(address) as pool:
404-
assert not pool.routing_table.is_fresh()
447+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
405448
_ = pool.acquire(access_mode=WRITE_ACCESS)
406-
assert pool.routing_table.is_fresh()
449+
assert pool.routing_table.is_fresh(WRITE_ACCESS)
450+
assert not pool.missing_writer
407451

408452
def test_connected_to_writer(self):
409453
with StubCluster({9001: "router.script", 9006: "empty.script"}):
410454
address = ("127.0.0.1", 9001)
411455
with RoutingPool(address) as pool:
412-
assert not pool.routing_table.is_fresh()
456+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
413457
connection = pool.acquire(access_mode=WRITE_ACCESS)
414458
assert connection.server.address in pool.routing_table.writers
459+
assert not pool.missing_writer
415460

416461
def test_should_retry_if_first_writer_fails(self):
417462
with StubCluster({9001: "router_with_multiple_writers.script",
418463
9006: "fail_on_init.script",
419464
9007: "empty.script"}):
420465
address = ("127.0.0.1", 9001)
421466
with RoutingPool(address) as pool:
422-
assert not pool.routing_table.is_fresh()
467+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
423468
_ = pool.acquire(access_mode=WRITE_ACCESS)
424469
assert ("127.0.0.1", 9006) not in pool.routing_table.writers
425470
assert ("127.0.0.1", 9007) in pool.routing_table.writers
426471

472+
def test_should_error_to_writer_in_absent_of_reader(self):
473+
with StubCluster({9001: "router_no_readers.script"}):
474+
address = ("127.0.0.1", 9001)
475+
with RoutingPool(address) as pool:
476+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
477+
with self.assertRaises(ProtocolError):
478+
_ = pool.acquire(access_mode=WRITE_ACCESS)
479+
assert not pool.routing_table.is_fresh(READ_ACCESS)
480+
assert not pool.routing_table.is_fresh(WRITE_ACCESS)
481+
assert not pool.missing_writer
482+
427483

428484
class RoutingConnectionPoolRemoveTestCase(StubTestCase):
429485
def test_should_remove_router_from_routing_table_if_present(self):
430486
with StubCluster({9001: "router.script"}):
431487
address = ("127.0.0.1", 9001)
432488
with RoutingPool(address) as pool:
433-
pool.refresh_routing_table()
489+
pool.ensure_routing_table(WRITE_ACCESS)
434490
target = ("127.0.0.1", 9001)
435491
assert target in pool.routing_table.routers
436492
pool.remove(target)
@@ -440,7 +496,7 @@ def test_should_remove_reader_from_routing_table_if_present(self):
440496
with StubCluster({9001: "router.script"}):
441497
address = ("127.0.0.1", 9001)
442498
with RoutingPool(address) as pool:
443-
pool.refresh_routing_table()
499+
pool.ensure_routing_table(WRITE_ACCESS)
444500
target = ("127.0.0.1", 9004)
445501
assert target in pool.routing_table.readers
446502
pool.remove(target)
@@ -450,7 +506,7 @@ def test_should_remove_writer_from_routing_table_if_present(self):
450506
with StubCluster({9001: "router.script"}):
451507
address = ("127.0.0.1", 9001)
452508
with RoutingPool(address) as pool:
453-
pool.refresh_routing_table()
509+
pool.ensure_routing_table(WRITE_ACCESS)
454510
target = ("127.0.0.1", 9006)
455511
assert target in pool.routing_table.writers
456512
pool.remove(target)
@@ -460,6 +516,6 @@ def test_should_not_fail_if_absent(self):
460516
with StubCluster({9001: "router.script"}):
461517
address = ("127.0.0.1", 9001)
462518
with RoutingPool(address) as pool:
463-
pool.refresh_routing_table()
519+
pool.ensure_routing_table(WRITE_ACCESS)
464520
target = ("127.0.0.1", 9007)
465521
pool.remove(target)

test/stub/test_routingdriver.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,20 @@ def test_should_call_get_routing_table_with_context(self):
198198
result = session.run("RETURN $x", {"x": 1})
199199
for record in result:
200200
assert record["x"] == 1
201-
assert result.summary().server.address == ('127.0.0.1', 9002)
201+
assert result.summary().server.address == ('127.0.0.1', 9002)
202+
203+
def test_should_serve_read_when_missing_writer(self):
204+
with StubCluster({9001: "router_no_writers.script", 9005: "return_1.script"}):
205+
uri = "bolt+routing://127.0.0.1:9001"
206+
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
207+
with driver.session(READ_ACCESS) as session:
208+
result = session.run("RETURN $x", {"x": 1})
209+
for record in result:
210+
assert record["x"] == 1
211+
assert result.summary().server.address == ('127.0.0.1', 9005)
212+
213+
def test_should_error_when_missing_reader(self):
214+
with StubCluster({9001: "router_no_readers.script"}):
215+
uri = "bolt+routing://127.0.0.1:9001"
216+
with self.assertRaises(ProtocolError):
217+
GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False)

0 commit comments

Comments
 (0)