Skip to content

Commit 749cc84

Browse files
committed
Retrieve all jobs and the new 3.10 details element
1 parent fd38882 commit 749cc84

File tree

2 files changed

+122
-15
lines changed

2 files changed

+122
-15
lines changed

arango/formatter.py

Lines changed: 92 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any
1+
from typing import Any, List, Sequence
22

33
from arango.typings import Headers, Json
44

@@ -1072,26 +1072,105 @@ def format_pregel_job_data(body: Json) -> Json:
10721072
"""
10731073
result: Json = {}
10741074

1075-
if "aggregators" in body:
1076-
result["aggregators"] = body["aggregators"]
1075+
if "id" in body:
1076+
result["id"] = body["id"]
1077+
if "algorithm" in body:
1078+
result["algorithm"] = body["algorithm"]
1079+
if "created" in body:
1080+
result["created"] = body["created"]
1081+
if "expires" in body:
1082+
result["expires"] = body["expires"]
1083+
if "ttl" in body:
1084+
result["ttl"] = body["ttl"]
1085+
if "algorithm" in body:
1086+
result["algorithm"] = body["algorithm"]
1087+
if "state" in body:
1088+
result["state"] = body["state"]
1089+
if "gss" in body:
1090+
result["gss"] = body["gss"]
1091+
if "totalRuntime" in body:
1092+
result["total_runtime"] = body["totalRuntime"]
1093+
if "startupTime" in body:
1094+
result["startup_time"] = body["startupTime"]
10771095
if "computationTime" in body:
10781096
result["computation_time"] = body["computationTime"]
1097+
if "storageTime" in body:
1098+
result["storageTime"] = body["storageTime"]
1099+
if "gssTimes" in body:
1100+
result["gssTimes"] = body["gssTimes"]
1101+
if "reports" in body:
1102+
result["reports"] = body["reports"]
1103+
if "vertexCount" in body:
1104+
result["vertex_count"] = body["vertexCount"]
10791105
if "edgeCount" in body:
10801106
result["edge_count"] = body["edgeCount"]
1081-
if "gss" in body:
1082-
result["gss"] = body["gss"]
1107+
1108+
# The detail element was introduced in 3.10
1109+
if "detail" in body:
1110+
d: Json = {}
1111+
detail = body["detail"]
1112+
if "workerStatus" in detail:
1113+
d["workerStatus"] = detail["workerStatus"]
1114+
if "aggregatedStatus" in detail:
1115+
aggregatedStatus = detail["aggregatedStatus"]
1116+
aStat: Json = {}
1117+
if "timeStamp" in aggregatedStatus:
1118+
aStat["timeStamp"] = aggregatedStatus["timeStamp"]
1119+
if "graphStoreStatus" in aggregatedStatus:
1120+
graphStoreStatus = aggregatedStatus["graphStoreStatus"]
1121+
gsStat: Json = {}
1122+
if "verticesLoaded" in graphStoreStatus:
1123+
gsStat["verticesLoaded"] = graphStoreStatus["verticesLoaded"]
1124+
if "edgesLoaded" in graphStoreStatus:
1125+
gsStat["edgesLoaded"] = graphStoreStatus["edgesLoaded"]
1126+
if "memoryBytesUsed" in graphStoreStatus:
1127+
gsStat["memoryBytesUsed"] = graphStoreStatus["memoryBytesUsed"]
1128+
if "verticesStored" in graphStoreStatus:
1129+
gsStat["verticesStored"] = graphStoreStatus["verticesStored"]
1130+
aStat["graphStoreStatus"] = gsStat
1131+
if "allGssStatus" in aggregatedStatus:
1132+
allGssStatus = aggregatedStatus["allGssStatus"]
1133+
agStat: Json = {}
1134+
if "items" in allGssStatus:
1135+
items = allGssStatus["items"]
1136+
itemList: List[Json] = []
1137+
for i in items:
1138+
ri: Json = {}
1139+
if "verticesProcessed" in i:
1140+
ri["verticesProcessed"] = i["verticesProcessed"]
1141+
if "messagesSent" in i:
1142+
ri["messagesSent"] = i["messagesSent"]
1143+
if "messagesReceived" in i:
1144+
ri["messagesReceived"] = i["messagesReceived"]
1145+
if "memoryBytesUsedForMessages" in i:
1146+
ri["memoryBytesUsedForMessages"] = i[
1147+
"memoryBytesUsedForMessages"
1148+
]
1149+
itemList.append(ri)
1150+
agStat["items"] = itemList
1151+
aStat["allGssStatus"] = agStat
1152+
d[aggregatedStatus] = aStat
1153+
result["detail"] = d
1154+
1155+
if "aggregators" in body:
1156+
result["aggregators"] = body["aggregators"]
10831157
if "receivedCount" in body:
10841158
result["received_count"] = body["receivedCount"]
10851159
if "sendCount" in body:
10861160
result["send_count"] = body["sendCount"]
1087-
if "startupTime" in body:
1088-
result["startup_time"] = body["startupTime"]
1089-
if "state" in body:
1090-
result["state"] = body["state"]
1091-
if "totalRuntime" in body:
1092-
result["total_runtime"] = body["totalRuntime"]
1093-
if "vertexCount" in body:
1094-
result["vertex_count"] = body["vertexCount"]
1161+
1162+
return verify_format(body, result)
1163+
1164+
1165+
def format_pregel_job_list(body: Sequence[Json]) -> Json:
1166+
"""Format Pregel job list data.
1167+
1168+
:param body: Input body.
1169+
:type body: dict
1170+
:return: Formatted body.
1171+
:rtype: dict
1172+
"""
1173+
result: Json = {"jobs": [format_pregel_job_data(j) for j in body]}
10951174

10961175
return verify_format(body, result)
10971176

arango/pregel.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
__all__ = ["Pregel"]
22

3-
from typing import Optional
3+
from typing import Optional, Sequence
44

55
from arango.api import ApiGroup
66
from arango.exceptions import (
77
PregelJobCreateError,
88
PregelJobDeleteError,
99
PregelJobGetError,
1010
)
11-
from arango.formatter import format_pregel_job_data
11+
from arango.formatter import format_pregel_job_data, format_pregel_job_list
1212
from arango.request import Request
1313
from arango.response import Response
1414
from arango.result import Result
@@ -49,6 +49,8 @@ def create_job(
4949
async_mode: Optional[bool] = None,
5050
result_field: Optional[str] = None,
5151
algorithm_params: Optional[Json] = None,
52+
vertexCollections: Optional[Sequence[str]] = None,
53+
edgeCollections: Optional[Sequence[str]] = None,
5254
) -> Result[int]:
5355
"""Start a new Pregel job.
5456
@@ -74,12 +76,21 @@ def create_job(
7476
:type result_field: str | None
7577
:param algorithm_params: Additional algorithm parameters.
7678
:type algorithm_params: dict | None
79+
:param vertexCollections: List of vertex collection names.
80+
:type vertexCollections: Sequence[str] | None
81+
:param edgeCollections: List of edge collection names.
82+
:type edgeCollections: Sequence[str] | None
7783
:return: Pregel job ID.
7884
:rtype: int
7985
:raise arango.exceptions.PregelJobCreateError: If create fails.
8086
"""
8187
data: Json = {"algorithm": algorithm, "graphName": graph}
8288

89+
if vertexCollections is not None:
90+
data["vertexCollections"] = vertexCollections
91+
if edgeCollections is not None:
92+
data["edgeCollections"] = edgeCollections
93+
8394
if algorithm_params is None:
8495
algorithm_params = {}
8596

@@ -122,3 +133,20 @@ def response_handler(resp: Response) -> bool:
122133
raise PregelJobDeleteError(resp, request)
123134

124135
return self._execute(request, response_handler)
136+
137+
def jobs(self) -> Result[Json]:
138+
"""Returns a list of currently running and recently
139+
finished Pregel jobs without retrieving their results.
140+
141+
:return: Details of each running or recently finished Pregel job.
142+
:rtype: dict
143+
:raise arango.exceptions.PregelJobGetError: If retrieval fails.
144+
"""
145+
request = Request(method="get", endpoint="/_api/control_pregel")
146+
147+
def response_handler(resp: Response) -> Json:
148+
if resp.is_success:
149+
return format_pregel_job_list(resp.body)
150+
raise PregelJobGetError(resp, request)
151+
152+
return self._execute(request, response_handler)

0 commit comments

Comments
 (0)