Skip to content

Commit 65b3a67

Browse files
authored
Merge pull request #986 from DarthMax/av2_misc_functions
AV2 - Misc functions
2 parents 4e98fd1 + 14531e1 commit 65b3a67

17 files changed

+776
-2
lines changed

graphdatascience/procedure_surface/api/catalog/relationships_endpoints.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,52 @@ def to_undirected(
208208

209209
pass
210210

211+
@abstractmethod
212+
def collapse_path(
213+
self,
214+
G: GraphV2,
215+
path_templates: list[list[str]],
216+
mutate_relationship_type: str,
217+
*,
218+
allow_self_loops: bool = False,
219+
concurrency: int | None = None,
220+
job_id: str | None = None,
221+
sudo: bool = False,
222+
log_progress: bool = True,
223+
username: str | None = None,
224+
) -> CollapsePathResult:
225+
"""
226+
Collapse each existing path in the graph into a single relationship.
227+
228+
Parameters
229+
----------
230+
231+
G : GraphV2
232+
Name of the generated graph.
233+
path_templates : list[list[str]]
234+
A path template is an ordered list of relationship types used for the traversal. The same relationship type can be added multiple times, in order to traverse them as indicated. And, you may specify several path templates to process in one go.
235+
mutate_relationship_type : str
236+
The name of the new relationship type to be created.
237+
allow_self_loops : bool, default=False
238+
Whether nodes in the graph can have relationships where start and end nodes are the same.
239+
concurrency : int | None, default=None:
240+
Number of concurrent threads to use.
241+
job_id : str | None, default=None
242+
Unique identifier for the job associated with the computation.
243+
sudo : bool | None, default=None
244+
Override memory estimation limits
245+
log_progress : bool | None, default=None
246+
Whether to log progress during graph generation.
247+
username : str | None, default=None
248+
Username of the individual requesting the graph generation.
249+
250+
Returns
251+
-------
252+
CollapsePathResult: meta data about the generated relationships.
253+
"""
254+
255+
pass
256+
211257

212258
class RelationshipsWriteResult(BaseResult):
213259
graph_name: str
@@ -251,6 +297,14 @@ class RelationshipsToUndirectedResult(RelationshipsInverseIndexResult):
251297
relationships_written: int
252298

253299

300+
class CollapsePathResult(BaseResult):
301+
preProcessingMillis: int
302+
computeMillis: int
303+
mutateMillis: int
304+
relationshipsWritten: int
305+
configuration: dict[str, Any]
306+
307+
254308
class Aggregation(str, Enum):
255309
NONE = "NONE"
256310
SINGLE = "SINGLE"
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Any
5+
6+
7+
class ConfigEndpoints(ABC):
8+
@property
9+
@abstractmethod
10+
def defaults(self) -> DefaultsEndpoints:
11+
pass
12+
13+
@property
14+
@abstractmethod
15+
def limits(self) -> LimitsEndpoints:
16+
pass
17+
18+
19+
class DefaultsEndpoints(ABC):
20+
@abstractmethod
21+
def set(
22+
self,
23+
key: str,
24+
value: Any,
25+
username: str | None = None,
26+
) -> None:
27+
"""
28+
Configure a new default configuration value.
29+
30+
Parameters:
31+
key : str
32+
The configuration key for which the default value is being set.
33+
value : Any
34+
The value to set as the default for the given key.
35+
username : str | None, default=None
36+
If set, the configuration will be set for the given user.
37+
38+
Returns: None
39+
"""
40+
pass
41+
42+
@abstractmethod
43+
def list(
44+
self,
45+
username: str | None = None,
46+
key: str | None = None,
47+
) -> dict[str, Any]:
48+
"""
49+
List configured default configuration values.
50+
51+
Parameters:
52+
key : str | None (default=None)
53+
List only the default value for the given key.
54+
username : str | None, default=None
55+
List only default values for the given user.
56+
57+
Returns: dict[str, Any]
58+
A dictionary containing the default configuration values.
59+
"""
60+
pass
61+
62+
63+
class LimitsEndpoints(ABC):
64+
@abstractmethod
65+
def set(
66+
self,
67+
key: str,
68+
value: Any,
69+
username: str | None = None,
70+
) -> None:
71+
"""
72+
Configure a new limit for a configuration value.
73+
74+
Parameters:
75+
key : str
76+
The configuration key for which the limit is being set.
77+
value : Any
78+
The value to set as the limit for the given key.
79+
username : str | None, default=None
80+
If set, the limit will be set for the given user.
81+
82+
Returns: None
83+
"""
84+
pass
85+
86+
@abstractmethod
87+
def list(
88+
self,
89+
username: str | None = None,
90+
key: str | None = None,
91+
) -> dict[str, Any]:
92+
"""
93+
List configured configuration limits.
94+
95+
Parameters:
96+
key : str | None (default=None)
97+
List only the limits for the given key.
98+
username : str | None, default=None
99+
List only liomits for the given user.
100+
101+
Returns: dict[str, Any]
102+
A dictionary containing the configuration limits.
103+
"""
104+
pass
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
5+
from graphdatascience.procedure_surface.api.base_result import BaseResult
6+
7+
8+
class SystemEndpoints(ABC):
9+
@abstractmethod
10+
def list_progress(
11+
self,
12+
job_id: str | None = None,
13+
show_completed: bool = False,
14+
) -> list[ProgressResult]:
15+
pass
16+
17+
18+
class ProgressResult(BaseResult):
19+
username: str
20+
job_id: str
21+
task_name: str
22+
progress: str
23+
progress_bar: str
24+
status: str
25+
time_started: str
26+
elapsed_time: str

graphdatascience/procedure_surface/arrow/catalog/relationship_arrow_endpoints.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
88
from graphdatascience.procedure_surface.api.catalog.relationships_endpoints import (
99
Aggregation,
10+
CollapsePathResult,
1011
RelationshipsDropResult,
1112
RelationshipsEndpoints,
1213
RelationshipsInverseIndexResult,
@@ -197,3 +198,35 @@ def to_undirected(
197198
)
198199
result = JobClient.get_summary(self._arrow_client, job_id)
199200
return RelationshipsToUndirectedResult(**result)
201+
202+
def collapse_path(
203+
self,
204+
G: GraphV2,
205+
path_templates: list[list[str]],
206+
mutate_relationship_type: str,
207+
*,
208+
allow_self_loops: bool = False,
209+
concurrency: int | None = None,
210+
job_id: str | None = None,
211+
sudo: bool = False,
212+
log_progress: bool = True,
213+
username: str | None = None,
214+
) -> CollapsePathResult:
215+
config = ConfigConverter.convert_to_gds_config(
216+
graph_name=G.name(),
217+
path_templates=path_templates,
218+
mutate_relationship_type=mutate_relationship_type,
219+
allow_self_loops=allow_self_loops,
220+
concurrency=concurrency,
221+
job_id=job_id,
222+
sudo=sudo,
223+
log_progress=log_progress,
224+
username=username,
225+
)
226+
227+
show_progress = self._show_progress and log_progress
228+
job_id = JobClient.run_job_and_wait(
229+
self._arrow_client, "v2/graph.relationships.collapsePath", config, show_progress=show_progress
230+
)
231+
232+
return CollapsePathResult(**JobClient.get_summary(self._arrow_client, job_id))
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
6+
from graphdatascience.arrow_client.v2.data_mapper_utils import deserialize
7+
from graphdatascience.procedure_surface.api.config_endpoints import (
8+
ConfigEndpoints,
9+
DefaultsEndpoints,
10+
LimitsEndpoints,
11+
)
12+
from graphdatascience.procedure_surface.utils.config_converter import ConfigConverter
13+
14+
15+
class ConfigArrowEndpoints(ConfigEndpoints):
16+
def __init__(self, arrow_client: AuthenticatedArrowClient):
17+
self._arrow_client = arrow_client
18+
19+
@property
20+
def defaults(self) -> DefaultsEndpoints:
21+
return DefaultsArrowEndpoints(self._arrow_client)
22+
23+
@property
24+
def limits(self) -> LimitsEndpoints:
25+
return LimitsArrowEndpoints(self._arrow_client)
26+
27+
28+
class DefaultsArrowEndpoints(DefaultsEndpoints):
29+
def __init__(self, arrow_client: AuthenticatedArrowClient):
30+
self._arrow_client = arrow_client
31+
32+
def set(
33+
self,
34+
key: str,
35+
value: Any,
36+
username: str | None = None,
37+
) -> None:
38+
key = ConfigConverter.convert_to_camel_case(key)
39+
deserialize(self._arrow_client.do_action_with_retry("v2/defaults.set", {key: value}))
40+
41+
def list(
42+
self,
43+
username: str | None = None,
44+
key: str | None = None,
45+
) -> dict[str, Any]:
46+
config = ConfigConverter.convert_to_gds_config(
47+
key=key,
48+
)
49+
50+
rows = deserialize(self._arrow_client.do_action_with_retry("v2/defaults.list", config))
51+
result = {}
52+
53+
for row in rows:
54+
result[row["key"]] = row["value"]
55+
56+
return result
57+
58+
59+
class LimitsArrowEndpoints(LimitsEndpoints):
60+
def __init__(self, arrow_client: AuthenticatedArrowClient):
61+
self._arrow_client = arrow_client
62+
63+
def set(
64+
self,
65+
key: str,
66+
value: Any,
67+
username: str | None = None,
68+
) -> None:
69+
key = ConfigConverter.convert_to_camel_case(key)
70+
deserialize(self._arrow_client.do_action_with_retry("v2/limits.set", {key: value}))
71+
72+
def list(
73+
self,
74+
username: str | None = None,
75+
key: str | None = None,
76+
) -> dict[str, Any]:
77+
config = ConfigConverter.convert_to_gds_config(
78+
key=key,
79+
)
80+
81+
rows = deserialize(self._arrow_client.do_action_with_retry("v2/limits.list", config))
82+
result = {}
83+
84+
for row in rows:
85+
result[row["key"]] = row["value"]
86+
87+
return result
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import annotations
2+
3+
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
4+
from graphdatascience.arrow_client.v2.data_mapper_utils import deserialize
5+
from graphdatascience.procedure_surface.api.system_endpoints import ProgressResult, SystemEndpoints
6+
from graphdatascience.procedure_surface.utils.config_converter import ConfigConverter
7+
8+
9+
class SystemArrowEndpoints(SystemEndpoints):
10+
def __init__(self, arrow_client: AuthenticatedArrowClient):
11+
self._arrow_client = arrow_client
12+
13+
def list_progress(
14+
self,
15+
job_id: str | None = None,
16+
show_completed: bool = False,
17+
) -> list[ProgressResult]:
18+
config = ConfigConverter.convert_to_gds_config(
19+
job_id=job_id,
20+
show_completed=show_completed,
21+
)
22+
23+
rows = deserialize(self._arrow_client.do_action_with_retry("v2/listProgress", config))
24+
return [ProgressResult(**row) for row in rows]

0 commit comments

Comments
 (0)