Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,52 @@ def to_undirected(

pass

@abstractmethod
def collapse_path(
self,
G: GraphV2,
path_templates: list[list[str]],
mutate_relationship_type: str,
*,
allow_self_loops: bool = False,
concurrency: int | None = None,
job_id: str | None = None,
sudo: bool = False,
log_progress: bool = True,
username: str | None = None,
) -> CollapsePathResult:
"""
Collapse each existing path in the graph into a single relationship.

Parameters
----------

G : GraphV2
Name of the generated graph.
path_templates : list[list[str]]
A path template is an ordered list of relationship types used for the traversal. The same relationship type can be added multiple times, in order to traverse them as indicated. And, you may specify several path templates to process in one go.
mutate_relationship_type : str
The name of the new relationship type to be created.
allow_self_loops : bool, default=False
Whether nodes in the graph can have relationships where start and end nodes are the same.
concurrency : int | None, default=None:
Number of concurrent threads to use.
job_id : str | None, default=None
Unique identifier for the job associated with the computation.
sudo : bool | None, default=None
Override memory estimation limits
log_progress : bool | None, default=None
Whether to log progress during graph generation.
username : str | None, default=None
Username of the individual requesting the graph generation.

Returns
-------
CollapsePathResult: meta data about the generated relationships.
"""

pass


class RelationshipsWriteResult(BaseResult):
graph_name: str
Expand Down Expand Up @@ -251,6 +297,14 @@ class RelationshipsToUndirectedResult(RelationshipsInverseIndexResult):
relationships_written: int


class CollapsePathResult(BaseResult):
preProcessingMillis: int
computeMillis: int
mutateMillis: int
relationshipsWritten: int
configuration: dict[str, Any]


class Aggregation(str, Enum):
NONE = "NONE"
SINGLE = "SINGLE"
Expand Down
104 changes: 104 additions & 0 deletions graphdatascience/procedure_surface/api/config_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any


class ConfigEndpoints(ABC):
@property
@abstractmethod
def defaults(self) -> DefaultsEndpoints:
pass

@property
@abstractmethod
def limits(self) -> LimitsEndpoints:
pass


class DefaultsEndpoints(ABC):
@abstractmethod
def set(
self,
key: str,
value: Any,
username: str | None = None,
) -> None:
"""
Configure a new default configuration value.

Parameters:
key : str
The configuration key for which the default value is being set.
value : Any
The value to set as the default for the given key.
username : str | None, default=None
If set, the configuration will be set for the given user.

Returns: None
"""
pass

@abstractmethod
def list(
self,
username: str | None = None,
key: str | None = None,
) -> dict[str, Any]:
"""
List configured default configuration values.

Parameters:
key : str | None (default=None)
List only the default value for the given key.
username : str | None, default=None
List only default values for the given user.

Returns: dict[str, Any]
A dictionary containing the default configuration values.
"""
pass


class LimitsEndpoints(ABC):
@abstractmethod
def set(
self,
key: str,
value: Any,
username: str | None = None,
) -> None:
"""
Configure a new limit for a configuration value.

Parameters:
key : str
The configuration key for which the limit is being set.
value : Any
The value to set as the limit for the given key.
username : str | None, default=None
If set, the limit will be set for the given user.

Returns: None
"""
pass

@abstractmethod
def list(
self,
username: str | None = None,
key: str | None = None,
) -> dict[str, Any]:
"""
List configured configuration limits.

Parameters:
key : str | None (default=None)
List only the limits for the given key.
username : str | None, default=None
List only liomits for the given user.

Returns: dict[str, Any]
A dictionary containing the configuration limits.
"""
pass
26 changes: 26 additions & 0 deletions graphdatascience/procedure_surface/api/system_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from abc import ABC, abstractmethod

from graphdatascience.procedure_surface.api.base_result import BaseResult


class SystemEndpoints(ABC):
@abstractmethod
def list_progress(
self,
job_id: str | None = None,
show_completed: bool = False,
) -> list[ProgressResult]:
pass


class ProgressResult(BaseResult):
username: str
job_id: str
task_name: str
progress: str
progress_bar: str
status: str
time_started: str
elapsed_time: str
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
from graphdatascience.procedure_surface.api.catalog.relationships_endpoints import (
Aggregation,
CollapsePathResult,
RelationshipsDropResult,
RelationshipsEndpoints,
RelationshipsInverseIndexResult,
Expand Down Expand Up @@ -197,3 +198,35 @@ def to_undirected(
)
result = JobClient.get_summary(self._arrow_client, job_id)
return RelationshipsToUndirectedResult(**result)

def collapse_path(
self,
G: GraphV2,
path_templates: list[list[str]],
mutate_relationship_type: str,
*,
allow_self_loops: bool = False,
concurrency: int | None = None,
job_id: str | None = None,
sudo: bool = False,
log_progress: bool = True,
username: str | None = None,
) -> CollapsePathResult:
config = ConfigConverter.convert_to_gds_config(
graph_name=G.name(),
path_templates=path_templates,
mutate_relationship_type=mutate_relationship_type,
allow_self_loops=allow_self_loops,
concurrency=concurrency,
job_id=job_id,
sudo=sudo,
log_progress=log_progress,
username=username,
)

show_progress = self._show_progress and log_progress
job_id = JobClient.run_job_and_wait(
self._arrow_client, "v2/graph.relationships.collapsePath", config, show_progress=show_progress
)

return CollapsePathResult(**JobClient.get_summary(self._arrow_client, job_id))
87 changes: 87 additions & 0 deletions graphdatascience/procedure_surface/arrow/config_arrow_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

from typing import Any

from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
from graphdatascience.arrow_client.v2.data_mapper_utils import deserialize
from graphdatascience.procedure_surface.api.config_endpoints import (
ConfigEndpoints,
DefaultsEndpoints,
LimitsEndpoints,
)
from graphdatascience.procedure_surface.utils.config_converter import ConfigConverter


class ConfigArrowEndpoints(ConfigEndpoints):
def __init__(self, arrow_client: AuthenticatedArrowClient):
self._arrow_client = arrow_client

@property
def defaults(self) -> DefaultsEndpoints:
return DefaultsArrowEndpoints(self._arrow_client)

@property
def limits(self) -> LimitsEndpoints:
return LimitsArrowEndpoints(self._arrow_client)


class DefaultsArrowEndpoints(DefaultsEndpoints):
def __init__(self, arrow_client: AuthenticatedArrowClient):
self._arrow_client = arrow_client

def set(
self,
key: str,
value: Any,
username: str | None = None,
) -> None:
key = ConfigConverter.convert_to_camel_case(key)
deserialize(self._arrow_client.do_action_with_retry("v2/defaults.set", {key: value}))

def list(
self,
username: str | None = None,
key: str | None = None,
) -> dict[str, Any]:
config = ConfigConverter.convert_to_gds_config(
key=key,
)

rows = deserialize(self._arrow_client.do_action_with_retry("v2/defaults.list", config))
result = {}

for row in rows:
result[row["key"]] = row["value"]

return result


class LimitsArrowEndpoints(LimitsEndpoints):
def __init__(self, arrow_client: AuthenticatedArrowClient):
self._arrow_client = arrow_client

def set(
self,
key: str,
value: Any,
username: str | None = None,
) -> None:
key = ConfigConverter.convert_to_camel_case(key)
deserialize(self._arrow_client.do_action_with_retry("v2/limits.set", {key: value}))

def list(
self,
username: str | None = None,
key: str | None = None,
) -> dict[str, Any]:
config = ConfigConverter.convert_to_gds_config(
key=key,
)

rows = deserialize(self._arrow_client.do_action_with_retry("v2/limits.list", config))
result = {}

for row in rows:
result[row["key"]] = row["value"]

return result
24 changes: 24 additions & 0 deletions graphdatascience/procedure_surface/arrow/system_arrow_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
from graphdatascience.arrow_client.v2.data_mapper_utils import deserialize
from graphdatascience.procedure_surface.api.system_endpoints import ProgressResult, SystemEndpoints
from graphdatascience.procedure_surface.utils.config_converter import ConfigConverter


class SystemArrowEndpoints(SystemEndpoints):
def __init__(self, arrow_client: AuthenticatedArrowClient):
self._arrow_client = arrow_client

def list_progress(
self,
job_id: str | None = None,
show_completed: bool = False,
) -> list[ProgressResult]:
config = ConfigConverter.convert_to_gds_config(
job_id=job_id,
show_completed=show_completed,
)

rows = deserialize(self._arrow_client.do_action_with_retry("v2/listProgress", config))
return [ProgressResult(**row) for row in rows]
Loading