Skip to content

Commit 4d59b81

Browse files
committed
Multiple bookmark support
1 parent 108ad80 commit 4d59b81

File tree

11 files changed

+100
-49
lines changed

11 files changed

+100
-49
lines changed

neo4j/v1/api.py

Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from warnings import warn
2727

2828
from neo4j.bolt import ProtocolError, ServiceUnavailable
29-
from neo4j.compat import urlparse
29+
from neo4j.compat import unicode, urlparse
3030
from neo4j.exceptions import CypherError, TransientError
3131

3232
from .exceptions import DriverError, SessionError, SessionExpired, TransactionError
@@ -42,6 +42,21 @@
4242
RETRY_DELAY_JITTER_FACTOR = 0.2
4343

4444

45+
def last_bookmark(b0, b1):
46+
""" Return the latest of two bookmarks by looking for the maximum
47+
integer value following the last colon in the bookmark string.
48+
"""
49+
n = [None, None]
50+
_, _, n[0] = b0.rpartition(":")
51+
_, _, n[1] = b1.rpartition(":")
52+
for i in range(2):
53+
try:
54+
n[i] = int(n[i])
55+
except ValueError:
56+
raise ValueError("Invalid bookmark: {}".format(b0))
57+
return b0 if n[0] > n[1] else b1
58+
59+
4560
def retry_delay_generator(initial_delay, multiplier, jitter_factor):
4661
delay = initial_delay
4762
while True:
@@ -141,14 +156,25 @@ def __enter__(self):
141156
def __exit__(self, exc_type, exc_value, traceback):
142157
self.close()
143158

144-
def session(self, access_mode=None, bookmark=None):
159+
def session(self, access_mode=None, **parameters):
145160
""" Create a new session using a connection from the driver connection
146161
pool. Session creation is a lightweight operation and sessions are
147162
not thread safe, therefore a session should generally be short-lived
148163
within a single thread.
149164
150-
:param access_mode:
151-
:param bookmark:
165+
:param access_mode: access mode for this session (read or write)
166+
:param parameters: set of parameters for this session:
167+
168+
`bookmark`
169+
A bookmark after which this session should begin.
170+
171+
`bookmarks`
172+
A collection of bookmarks after which this session should begin.
173+
174+
`max_retry_time`
175+
The maximum time after which to stop attempting retries of failed
176+
transactions.
177+
152178
:returns: new :class:`.Session` object
153179
"""
154180
if self.closed():
@@ -190,28 +216,39 @@ class Session(object):
190216
191217
"""
192218

193-
#: The current connection.
219+
# The current connection.
194220
_connection = None
195221

196-
#: The access mode for the current connection.
222+
# The access mode for the current connection.
197223
_connection_access_mode = None
198224

199-
#: The current :class:`.Transaction` instance, if any.
225+
# The current :class:`.Transaction` instance, if any.
200226
_transaction = None
201227

202-
#: The last result received.
228+
# The last result received.
203229
_last_result = None
204230

205-
#: The bookmark received from completion of the last :class:`.Transaction`.
206-
_bookmark = None
231+
# The collection of bookmarks after which the next
232+
# :class:`.Transaction` should be carried out.
233+
_bookmarks = ()
234+
235+
# Default maximum time to keep retrying failed transactions.
236+
_max_retry_time = DEFAULT_MAX_RETRY_TIME
207237

208238
_closed = False
209239

210-
def __init__(self, acquirer, max_retry_time=None, access_mode=None, bookmark=None):
240+
def __init__(self, acquirer, access_mode, **parameters):
211241
self._acquirer = acquirer
212-
self._max_retry_time = DEFAULT_MAX_RETRY_TIME if max_retry_time is None else max_retry_time
213-
self._default_access_mode = access_mode or WRITE_ACCESS
214-
self._bookmark = bookmark
242+
self._default_access_mode = access_mode
243+
for key, value in parameters.items():
244+
if key == "bookmark":
245+
self._bookmarks = [value] if value else []
246+
elif key == "bookmarks":
247+
self._bookmarks = value or []
248+
elif key == "max_retry_time":
249+
self._max_retry_time = value
250+
else:
251+
pass # for compatibility
215252

216253
def __del__(self):
217254
try:
@@ -354,7 +391,13 @@ def detach(self, result):
354391
def last_bookmark(self):
355392
""" The bookmark returned by the last :class:`.Transaction`.
356393
"""
357-
return self._bookmark
394+
last = None
395+
for bookmark in self._bookmarks:
396+
if last is None:
397+
last = bookmark
398+
else:
399+
last = last_bookmark(last, bookmark)
400+
return last
358401

359402
def has_transaction(self):
360403
return bool(self._transaction)
@@ -383,7 +426,7 @@ def begin_transaction(self, bookmark=None):
383426
from warnings import warn
384427
warn("Passing bookmarks at transaction level is deprecated", category=DeprecationWarning, stacklevel=2)
385428
_warned_about_transaction_bookmarks = True
386-
self._bookmark = bookmark
429+
self._bookmarks = [bookmark]
387430

388431
self._create_transaction()
389432
self._connect()
@@ -401,8 +444,9 @@ def commit_transaction(self):
401444
self._transaction = None
402445
result = self.__commit__()
403446
result.consume()
404-
self._bookmark = self.__bookmark__(result)
405-
return self._bookmark
447+
bookmark = self.__bookmark__(result)
448+
self._bookmarks = [bookmark]
449+
return bookmark
406450

407451
def rollback_transaction(self):
408452
""" Rollback the current transaction.
@@ -412,7 +456,6 @@ def rollback_transaction(self):
412456
if not self.has_transaction():
413457
raise TransactionError("No transaction to rollback")
414458
self._transaction = None
415-
self._bookmark = None
416459
self.__rollback__().consume()
417460

418461
def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):

neo4j/v1/direct.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,7 @@ def __init__(self, uri, **config):
6565
pool.acquire()
6666
Driver.__init__(self, pool, **config)
6767

68-
def session(self, access_mode=None, bookmark=None):
69-
return BoltSession(self._pool.acquire, self._max_retry_time, access_mode=access_mode, bookmark=bookmark)
68+
def session(self, access_mode=None, **parameters):
69+
if "max_retry_time" not in parameters:
70+
parameters["max_retry_time"] = self._max_retry_time
71+
return BoltSession(self._pool.acquire, access_mode, **parameters)

neo4j/v1/routing.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def fetch_routing_info(self, address):
190190
if routing support is broken
191191
"""
192192
try:
193-
with RoutingSession(lambda _: self.acquire_direct(address)) as session:
193+
with RoutingSession(lambda _: self.acquire_direct(address), access_mode=None) as session:
194194
return list(session.run("ignored", self.routing_context))
195195
except CypherError as error:
196196
if error.code == "Neo.ClientError.Procedure.ProcedureNotFound":
@@ -272,7 +272,6 @@ def update_routing_table(self):
272272
if self.update_routing_table_with_routers(initial_routers):
273273
return
274274

275-
276275
# None of the routers have been successful, so just fail
277276
raise ServiceUnavailable("Unable to retrieve routing information")
278277

@@ -364,5 +363,7 @@ def connector(a):
364363
else:
365364
Driver.__init__(self, pool, **config)
366365

367-
def session(self, access_mode=None, bookmark=None):
368-
return BoltSession(self._pool.acquire, self._max_retry_time, access_mode=access_mode, bookmark=bookmark)
366+
def session(self, access_mode=None, **parameters):
367+
if "max_retry_time" not in parameters:
368+
parameters["max_retry_time"] = self._max_retry_time
369+
return BoltSession(self._pool.acquire, access_mode, **parameters)

neo4j/v1/session.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@
2727

2828

2929
class BoltSession(Session):
30-
"""
31-
32-
:param acquirer: function that can accept an access mode and return a connection
33-
:param access_mode:
34-
:param bookmark:
35-
"""
3630

3731
def _run(self, statement, parameters):
3832
assert isinstance(statement, unicode)
@@ -59,7 +53,11 @@ def __run__(self, statement, parameters):
5953
return self._run(statement, parameters)
6054

6155
def __begin__(self):
62-
return self.__run__(u"BEGIN", {"bookmark": self._bookmark} if self._bookmark else {})
56+
if self._bookmarks:
57+
parameters = {"bookmark": self.last_bookmark(), "bookmarks": self._bookmarks}
58+
else:
59+
parameters = {}
60+
return self.__run__(u"BEGIN", parameters)
6361

6462
def __commit__(self):
6563
return self.__run__(u"COMMIT", {})
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
!: AUTO INIT
22
!: AUTO RESET
33

4-
C: RUN "BEGIN" {"bookmark": "bookmark:1"}
4+
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:0", "bookmark:1"]}
55
PULL_ALL
66
S: SUCCESS {}
77
SUCCESS {}
88
C: RUN "COMMIT" {}
99
PULL_ALL
10-
S: SUCCESS {"bookmark": "bookmark:2"}
10+
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
1111
SUCCESS {}
1212

13-
C: RUN "BEGIN" {"bookmark": "bookmark:2"}
13+
C: RUN "BEGIN" {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
1414
PULL_ALL
1515
S: SUCCESS {}
1616
SUCCESS {}
1717
C: RUN "COMMIT" {}
1818
PULL_ALL
19-
S: SUCCESS {"bookmark": "bookmark:3"}
19+
S: SUCCESS {"bookmark": "bookmark:3", "bookmarks": ["bookmark:3"]}
2020
SUCCESS {}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
!: AUTO INIT
22
!: AUTO RESET
33

4-
C: RUN "BEGIN" {"bookmark": "bookmark:1"}
4+
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
55
PULL_ALL
66
S: SUCCESS {}
77
SUCCESS {}
88
C: RUN "COMMIT" {}
99
PULL_ALL
10-
S: SUCCESS {"bookmark": "bookmark:2"}
10+
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
1111
SUCCESS {}
1212

1313
C: RUN "RETURN 1" {}
1414
PULL_ALL
15-
S: SUCCESS {"bookmark": "bookmark:x"}
15+
S: SUCCESS {"bookmark": "bookmark:x", "bookmarks": ["bookmark:x"]}
1616
SUCCESS {}
1717

18-
C: RUN "BEGIN" {"bookmark": "bookmark:2"}
18+
C: RUN "BEGIN" {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
1919
PULL_ALL
2020
S: SUCCESS {}
2121
SUCCESS {}
2222
C: RUN "COMMIT" {}
2323
PULL_ALL
24-
S: SUCCESS {"bookmark": "bookmark:3"}
24+
S: SUCCESS {"bookmark": "bookmark:3", "bookmarks": ["bookmark:3"]}
2525
SUCCESS {}

test/stub/scripts/return_1_in_tx.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ S: SUCCESS {"fields": ["1"]}
1414

1515
C: RUN "COMMIT" {}
1616
PULL_ALL
17-
S: SUCCESS {"bookmark": "bookmark:1"}
17+
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
1818
SUCCESS {}

test/stub/scripts/return_1_in_tx_twice.script

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ S: SUCCESS {"fields": ["1"]}
1414

1515
C: RUN "COMMIT" {}
1616
PULL_ALL
17-
S: SUCCESS {"bookmark": "bookmark:1"}
17+
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
1818
SUCCESS {}
1919

20-
C: RUN "BEGIN" {"bookmark": "bookmark:1"}
20+
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
2121
PULL_ALL
2222
S: SUCCESS {"fields": []}
2323
SUCCESS {}
@@ -30,5 +30,5 @@ S: SUCCESS {"fields": ["1"]}
3030

3131
C: RUN "COMMIT" {}
3232
PULL_ALL
33-
S: SUCCESS {"bookmark": "bookmark:2"}
33+
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
3434
SUCCESS {}

test/stub/scripts/return_1_twice_in_tx.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ S: SUCCESS {"fields": ["x"]}
2020

2121
C: RUN "COMMIT" {}
2222
PULL_ALL
23-
S: SUCCESS {"bookmark": "bookmark:1"}
23+
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
2424
SUCCESS {}

test/stub/scripts/return_2_in_tx.script

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
!: AUTO INIT
22
!: AUTO RESET
33

4-
C: RUN "BEGIN" {"bookmark": "bookmark:1"}
4+
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
55
PULL_ALL
66
S: SUCCESS {"fields": []}
77
SUCCESS {}
@@ -14,5 +14,5 @@ S: SUCCESS {"fields": ["2"]}
1414

1515
C: RUN "COMMIT" {}
1616
PULL_ALL
17-
S: SUCCESS {"bookmark": "bookmark:2"}
17+
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
1818
SUCCESS {}

0 commit comments

Comments
 (0)