Skip to content

Commit 5a5783c

Browse files
committed
Adding QueueBoundedDatabase along with the appropriate executor.
1 parent 2690421 commit 5a5783c

File tree

3 files changed

+122
-11
lines changed

3 files changed

+122
-11
lines changed

arango/database.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
__all__ = ["StandardDatabase", "AsyncDatabase", "BatchDatabase", "TransactionDatabase"]
1+
__all__ = [
2+
"StandardDatabase",
3+
"AsyncDatabase",
4+
"BatchDatabase",
5+
"QueueBoundedDatabase",
6+
"TransactionDatabase",
7+
]
28

39
from datetime import datetime
410
from numbers import Number
@@ -75,6 +81,7 @@
7581
AsyncApiExecutor,
7682
BatchApiExecutor,
7783
DefaultApiExecutor,
84+
QueueBoundedApiExecutor,
7885
TransactionApiExecutor,
7986
)
8087
from arango.formatter import (
@@ -2514,6 +2521,19 @@ def begin_transaction(
25142521
max_size=max_size,
25152522
)
25162523

2524+
def begin_queue_bounded_execution(
2525+
self, max_queue_time_seconds: float = 0.0
2526+
) -> "QueueBoundedDatabase":
2527+
"""Begin queue bounded execution.
2528+
2529+
:param max_queue_time_seconds: Maximum time in seconds a request can be queued
2530+
on the server-side. If set to 0, the server ignores this setting.
2531+
:type max_queue_time_seconds: float
2532+
:return: Database API wrapper object specifically for queue bounded execution.
2533+
:rtype: arango.database.QueueBoundedDatabase
2534+
"""
2535+
return QueueBoundedDatabase(self._conn, max_queue_time_seconds)
2536+
25172537

25182538
class AsyncDatabase(Database):
25192539
"""Database API wrapper tailored specifically for async execution.
@@ -2684,3 +2704,44 @@ def abort_transaction(self) -> bool:
26842704
:raise arango.exceptions.TransactionAbortError: If abort fails.
26852705
"""
26862706
return self._executor.abort()
2707+
2708+
2709+
class QueueBoundedDatabase(Database):
2710+
"""Database API wrapper tailored specifically to hande time-bound requests.
2711+
2712+
See :func:`arango.database.StandardDatabase.begin_queue_bounded_execution`.
2713+
2714+
:param connection: HTTP connection.
2715+
:param max_queue_time_seconds: Maximum server-side queuing time in seconds.
2716+
If the server-side queuing time exceeds the client's specified limit,
2717+
the request will be rejected.
2718+
:type max_queue_time_seconds: float
2719+
"""
2720+
2721+
def __init__(self, connection: Connection, max_queue_time_seconds: float) -> None:
2722+
self._executor: QueueBoundedApiExecutor
2723+
super().__init__(
2724+
connection=connection,
2725+
executor=QueueBoundedApiExecutor(connection, max_queue_time_seconds),
2726+
)
2727+
2728+
def __repr__(self) -> str:
2729+
return f"<QueueBoundedDatabase {self.name}>"
2730+
2731+
@property
2732+
def last_queue_time(self) -> float:
2733+
"""Return the most recently recorded server-side queuing time in seconds.
2734+
2735+
:return: Server-side queuing time in seconds.
2736+
:rtype: float
2737+
"""
2738+
return self._executor.queue_time_seconds
2739+
2740+
def adjust_max_queue_time(self, max_queue_time_seconds: float) -> None:
2741+
"""Adjust the maximum server-side queuing time in seconds.
2742+
2743+
:param max_queue_time_seconds: New maximum server-side queuing time
2744+
in seconds.
2745+
:type max_queue_time_seconds: float
2746+
"""
2747+
self._executor.max_queue_time_seconds = max_queue_time_seconds

arango/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ class BatchExecuteError(ArangoServerError):
232232
"""Failed to execute batch API request."""
233233

234234

235+
######################################
236+
# Queue Bounded Execution Exceptions #
237+
######################################
238+
239+
240+
class QueueBoundedExecutorError(ArangoServerError):
241+
"""Failed to execute queue-bounded API request."""
242+
243+
235244
#########################
236245
# Collection Exceptions #
237246
#########################

arango/executor.py

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"AsyncApiExecutor",
55
"BatchApiExecutor",
66
"TransactionApiExecutor",
7-
"QueueTimeApiExecutor",
7+
"QueueBoundedApiExecutor",
88
]
99

1010
from collections import OrderedDict
@@ -17,6 +17,7 @@
1717
AsyncExecuteError,
1818
BatchExecuteError,
1919
BatchStateError,
20+
QueueBoundedExecutorError,
2021
TransactionAbortError,
2122
TransactionCommitError,
2223
TransactionInitError,
@@ -33,7 +34,7 @@
3334
"AsyncApiExecutor",
3435
"BatchApiExecutor",
3536
"TransactionApiExecutor",
36-
"QueueTimeApiExecutor",
37+
"QueueBoundedApiExecutor",
3738
]
3839

3940
T = TypeVar("T")
@@ -432,26 +433,57 @@ def abort(self) -> bool:
432433
raise TransactionAbortError(resp, request)
433434

434435

435-
class QueueTimeApiExecutor:
436-
"""API executor that handles queue time.
436+
class QueueBoundedApiExecutor:
437+
"""Allows setting the maximum acceptable server-side queuing time
438+
for client requests.
437439
438440
:param connection: HTTP connection.
439441
:type connection: arango.connection.BasicConnection |
440442
arango.connection.JwtConnection | arango.connection.JwtSuperuserConnection
443+
:param max_queue_time_seconds: Maximum server-side queuing time in seconds.
444+
:type max_queue_time_seconds: float
441445
"""
442446

443-
def __init__(self, connection: Connection) -> None:
447+
def __init__(self, connection: Connection, max_queue_time_seconds: float) -> None:
444448
self._conn = connection
449+
self._max_queue_time_seconds = max_queue_time_seconds
450+
self._queue_time_seconds = 0.0
445451

446452
@property
447453
def context(self) -> str:
448-
return "queue-time"
454+
return "queue-bounded"
455+
456+
@property
457+
def queue_time_seconds(self) -> float:
458+
"""Return the most recent request queuing/de-queuing time.
459+
460+
:return: Server-side queuing time in seconds.
461+
:rtype: float
462+
"""
463+
return self._queue_time_seconds
464+
465+
@property
466+
def max_queue_time_seconds(self) -> float:
467+
"""Return the maximum server-side queuing time.
468+
469+
:return: Maximum server-side queuing time in seconds.
470+
:rtype: float
471+
"""
472+
return self._max_queue_time_seconds
473+
474+
@max_queue_time_seconds.setter
475+
def max_queue_time_seconds(self, value: float) -> None:
476+
"""Set the maximum server-side queuing time.
477+
478+
:param value: Maximum server-side queuing time in seconds.
479+
:type value: float
480+
"""
481+
self._max_queue_time_seconds = value
449482

450483
def execute(
451484
self,
452485
request: Request,
453486
response_handler: Callable[[Response], T],
454-
max_queue_time_seconds: int,
455487
) -> T:
456488
"""Execute an API request and return the result.
457489
@@ -461,8 +493,17 @@ def execute(
461493
:type response_handler: callable
462494
:return: API execution result.
463495
"""
464-
request.headers["x-arango-max-queue-time-seconds"] = max_queue_time_seconds
496+
request.headers["x-arango-queue-time-seconds"] = str(
497+
self._max_queue_time_seconds
498+
)
465499
resp = self._conn.send_request(request)
466500

467-
queue_time_seconds = resp.headers["x-arango-queue-time-seconds"]
468-
return response_handler(resp, queue_time_seconds)
501+
if not resp.is_success:
502+
raise QueueBoundedExecutorError(resp, request)
503+
504+
if "X-Arango-Queue-Time-Seconds" in resp.headers:
505+
self._queue_time_seconds = float(
506+
resp.headers["X-Arango-Queue-Time-Seconds"]
507+
)
508+
509+
return response_handler(resp)

0 commit comments

Comments
 (0)