Skip to content

Commit 0c0b17e

Browse files
DarthMaxFlorentinD
authored andcommitted
Implement CliqueCounting
Note: CliqueCounting is not yet released as a Cypher endpoint
1 parent be27c44 commit 0c0b17e

File tree

5 files changed

+787
-0
lines changed

5 files changed

+787
-0
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Any, List, Optional, Union
5+
6+
from pandas import DataFrame
7+
8+
from graphdatascience.procedure_surface.api.base_result import BaseResult
9+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
10+
from graphdatascience.procedure_surface.api.estimation_result import EstimationResult
11+
12+
13+
class CliqueCountingEndpoints(ABC):
14+
@abstractmethod
15+
def mutate(
16+
self,
17+
G: GraphV2,
18+
mutate_property: str,
19+
concurrency: Optional[int] = 4,
20+
job_id: Optional[str] = None,
21+
log_progress: bool = True,
22+
node_labels: Optional[List[str]] = None,
23+
relationship_types: Optional[List[str]] = None,
24+
sudo: Optional[bool] = False,
25+
username: Optional[str] = None,
26+
) -> CliqueCountingMutateResult:
27+
"""
28+
Executes the clique counting algorithm and writes the results to the in-memory graph as node properties.
29+
30+
Parameters
31+
----------
32+
G : GraphV2
33+
The graph to run the algorithm on
34+
mutate_property : str
35+
The property name to store the clique counts for each node
36+
concurrency : Optional[int], default=4
37+
The number of concurrent threads
38+
job_id : Optional[str], default=None
39+
An identifier for the job
40+
log_progress : bool, default=True
41+
Whether to log progress
42+
node_labels : Optional[List[str]], default=None
43+
The node labels used to select nodes for this algorithm run
44+
relationship_types : Optional[List[str]], default=None
45+
The relationship types used to select relationships for this algorithm run
46+
sudo : Optional[bool], default=False
47+
Override memory estimation limits
48+
username : Optional[str], default=None
49+
The username to attribute the procedure run to
50+
51+
Returns
52+
-------
53+
CliqueCountingMutateResult
54+
Algorithm metrics and statistics
55+
"""
56+
pass
57+
58+
@abstractmethod
59+
def stats(
60+
self,
61+
G: GraphV2,
62+
concurrency: Optional[int] = 4,
63+
job_id: Optional[str] = None,
64+
log_progress: bool = True,
65+
node_labels: Optional[List[str]] = None,
66+
relationship_types: Optional[List[str]] = None,
67+
sudo: Optional[bool] = False,
68+
username: Optional[str] = None,
69+
) -> CliqueCountingStatsResult:
70+
"""
71+
Executes the clique counting algorithm and returns statistics.
72+
73+
Parameters
74+
----------
75+
G : GraphV2
76+
The graph to run the algorithm on
77+
concurrency : Optional[int], default=4
78+
The number of concurrent threads
79+
job_id : Optional[str], default=None
80+
An identifier for the job
81+
log_progress : bool, default=True
82+
Whether to log progress
83+
node_labels : Optional[List[str]], default=None
84+
The node labels used to select nodes for this algorithm run
85+
relationship_types : Optional[List[str]], default=None
86+
The relationship types used to select relationships for this algorithm run
87+
sudo : Optional[bool], default=False
88+
Override memory estimation limits
89+
username : Optional[str], default=None
90+
The username to attribute the procedure run to
91+
92+
Returns
93+
-------
94+
CliqueCountingStatsResult
95+
Algorithm metrics and statistics
96+
"""
97+
pass
98+
99+
@abstractmethod
100+
def stream(
101+
self,
102+
G: GraphV2,
103+
concurrency: Optional[int] = 4,
104+
job_id: Optional[str] = None,
105+
log_progress: bool = True,
106+
node_labels: Optional[List[str]] = None,
107+
relationship_types: Optional[List[str]] = None,
108+
sudo: Optional[bool] = False,
109+
username: Optional[str] = None,
110+
) -> DataFrame:
111+
"""
112+
Executes the clique counting algorithm and returns a stream of results.
113+
114+
Parameters
115+
----------
116+
G : GraphV2
117+
The graph to run the algorithm on
118+
concurrency : Optional[int], default=4
119+
The number of concurrent threads
120+
job_id : Optional[str], default=None
121+
An identifier for the job
122+
log_progress : bool, default=True
123+
Whether to log progress
124+
node_labels : Optional[List[str]], default=None
125+
The node labels used to select nodes for this algorithm run
126+
relationship_types : Optional[List[str]], default=None
127+
The relationship types used to select relationships for this algorithm run
128+
sudo : Optional[bool], default=False
129+
Override memory estimation limits
130+
username : Optional[str], default=None
131+
The username to attribute the procedure run to
132+
133+
Returns
134+
-------
135+
DataFrame
136+
DataFrame with the algorithm results
137+
"""
138+
pass
139+
140+
@abstractmethod
141+
def write(
142+
self,
143+
G: GraphV2,
144+
write_property: str,
145+
concurrency: Optional[int] = 4,
146+
job_id: Optional[str] = None,
147+
log_progress: bool = True,
148+
node_labels: Optional[List[str]] = None,
149+
relationship_types: Optional[List[str]] = None,
150+
sudo: Optional[bool] = False,
151+
username: Optional[str] = None,
152+
write_concurrency: Optional[int] = None,
153+
write_to_result_store: Optional[bool] = False,
154+
) -> CliqueCountingWriteResult:
155+
"""
156+
Executes the clique counting algorithm and writes the results back to the database.
157+
158+
Parameters
159+
----------
160+
G : GraphV2
161+
The graph to run the algorithm on
162+
write_property : str
163+
The property name to write the clique counts to
164+
concurrency : Optional[int], default=4
165+
The number of concurrent threads
166+
job_id : Optional[str], default=None
167+
An identifier for the job
168+
log_progress : bool, default=True
169+
Whether to log progress
170+
node_labels : Optional[List[str]], default=None
171+
The node labels used to select nodes for this algorithm run
172+
relationship_types : Optional[List[str]], default=None
173+
The relationship types used to select relationships for this algorithm run
174+
sudo : Optional[bool], default=False
175+
Override memory estimation limits
176+
username : Optional[str], default=None
177+
The username to attribute the procedure run to
178+
write_concurrency : Optional[int], default=None
179+
The number of concurrent threads for write operations
180+
write_to_result_store : Optional[bool], default=False
181+
Whether to write to the result store
182+
183+
Returns
184+
-------
185+
CliqueCountingWriteResult
186+
Algorithm metrics and statistics
187+
"""
188+
pass
189+
190+
@abstractmethod
191+
def estimate(
192+
self,
193+
G: Union[GraphV2, dict[str, Any]],
194+
concurrency: Optional[int] = 4,
195+
node_labels: Optional[List[str]] = None,
196+
relationship_types: Optional[List[str]] = None,
197+
) -> EstimationResult:
198+
"""
199+
Estimates the memory requirements for running the clique counting algorithm.
200+
201+
Parameters
202+
----------
203+
G : Union[GraphV2, dict[str, Any]]
204+
The graph or graph configuration to estimate for
205+
concurrency : Optional[int], default=4
206+
The number of concurrent threads
207+
node_labels : Optional[List[str]], default=None
208+
The node labels used to select nodes for this algorithm run
209+
relationship_types : Optional[List[str]], default=None
210+
The relationship types used to select relationships for this algorithm run
211+
212+
Returns
213+
-------
214+
EstimationResult
215+
The memory estimation result
216+
"""
217+
pass
218+
219+
220+
class CliqueCountingMutateResult(BaseResult):
221+
compute_millis: int
222+
configuration: dict[str, Any]
223+
global_count: List[Any]
224+
mutate_millis: int
225+
node_properties_written: int
226+
pre_processing_millis: int
227+
228+
229+
class CliqueCountingStatsResult(BaseResult):
230+
compute_millis: int
231+
configuration: dict[str, Any]
232+
global_count: List[Any]
233+
pre_processing_millis: int
234+
235+
236+
class CliqueCountingWriteResult(BaseResult):
237+
compute_millis: int
238+
configuration: dict[str, Any]
239+
global_count: List[Any]
240+
node_properties_written: int
241+
pre_processing_millis: int
242+
write_millis: int
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from typing import Any, List, Optional, Union
2+
3+
from pandas import DataFrame
4+
5+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
6+
from graphdatascience.procedure_surface.api.community.clique_counting_endpoints import (
7+
CliqueCountingEndpoints,
8+
CliqueCountingMutateResult,
9+
CliqueCountingStatsResult,
10+
CliqueCountingWriteResult,
11+
)
12+
13+
from ...arrow_client.authenticated_flight_client import AuthenticatedArrowClient
14+
from ...arrow_client.v2.remote_write_back_client import RemoteWriteBackClient
15+
from ..api.estimation_result import EstimationResult
16+
from .node_property_endpoints import NodePropertyEndpoints
17+
18+
19+
class CliqueCountingArrowEndpoints(CliqueCountingEndpoints):
20+
def __init__(
21+
self,
22+
arrow_client: AuthenticatedArrowClient,
23+
write_back_client: Optional[RemoteWriteBackClient] = None,
24+
show_progress: bool = True,
25+
):
26+
self._node_property_endpoints = NodePropertyEndpoints(
27+
arrow_client, write_back_client, show_progress=show_progress
28+
)
29+
30+
def mutate(
31+
self,
32+
G: GraphV2,
33+
mutate_property: str,
34+
concurrency: Optional[int] = 4,
35+
job_id: Optional[str] = None,
36+
log_progress: bool = True,
37+
node_labels: Optional[List[str]] = None,
38+
relationship_types: Optional[List[str]] = None,
39+
sudo: Optional[bool] = False,
40+
username: Optional[str] = None,
41+
) -> CliqueCountingMutateResult:
42+
config = self._node_property_endpoints.create_base_config(
43+
G,
44+
concurrency=concurrency,
45+
job_id=job_id,
46+
log_progress=log_progress,
47+
node_labels=node_labels,
48+
relationship_types=relationship_types,
49+
sudo=sudo,
50+
username=username,
51+
)
52+
53+
result = self._node_property_endpoints.run_job_and_mutate(
54+
"v2/community.cliquecounting", G, config, mutate_property
55+
)
56+
57+
return CliqueCountingMutateResult(**result)
58+
59+
def stats(
60+
self,
61+
G: GraphV2,
62+
concurrency: Optional[int] = 4,
63+
job_id: Optional[str] = None,
64+
log_progress: bool = True,
65+
node_labels: Optional[List[str]] = None,
66+
relationship_types: Optional[List[str]] = None,
67+
sudo: Optional[bool] = False,
68+
username: Optional[str] = None,
69+
) -> CliqueCountingStatsResult:
70+
config = self._node_property_endpoints.create_base_config(
71+
G,
72+
concurrency=concurrency,
73+
job_id=job_id,
74+
log_progress=log_progress,
75+
node_labels=node_labels,
76+
relationship_types=relationship_types,
77+
sudo=sudo,
78+
username=username,
79+
)
80+
81+
computation_result = self._node_property_endpoints.run_job_and_get_summary(
82+
"v2/community.cliquecounting", G, config
83+
)
84+
85+
return CliqueCountingStatsResult(**computation_result)
86+
87+
def stream(
88+
self,
89+
G: GraphV2,
90+
concurrency: Optional[int] = 4,
91+
job_id: Optional[str] = None,
92+
log_progress: bool = True,
93+
node_labels: Optional[List[str]] = None,
94+
relationship_types: Optional[List[str]] = None,
95+
sudo: Optional[bool] = False,
96+
username: Optional[str] = None,
97+
) -> DataFrame:
98+
config = self._node_property_endpoints.create_base_config(
99+
G,
100+
concurrency=concurrency,
101+
job_id=job_id,
102+
log_progress=log_progress,
103+
node_labels=node_labels,
104+
relationship_types=relationship_types,
105+
sudo=sudo,
106+
username=username,
107+
)
108+
109+
return self._node_property_endpoints.run_job_and_stream("v2/community.cliquecounting", G, config)
110+
111+
def write(
112+
self,
113+
G: GraphV2,
114+
write_property: str,
115+
concurrency: Optional[int] = 4,
116+
job_id: Optional[str] = None,
117+
log_progress: bool = True,
118+
node_labels: Optional[List[str]] = None,
119+
relationship_types: Optional[List[str]] = None,
120+
sudo: Optional[bool] = False,
121+
username: Optional[str] = None,
122+
write_concurrency: Optional[Any] = None,
123+
write_to_result_store: Optional[bool] = False,
124+
) -> CliqueCountingWriteResult:
125+
config = self._node_property_endpoints.create_base_config(
126+
G,
127+
concurrency=concurrency,
128+
job_id=job_id,
129+
log_progress=log_progress,
130+
node_labels=node_labels,
131+
relationship_types=relationship_types,
132+
sudo=sudo,
133+
username=username,
134+
write_to_result_store=write_to_result_store,
135+
)
136+
137+
result = self._node_property_endpoints.run_job_and_write(
138+
"v2/community.cliquecounting", G, config, write_concurrency, concurrency, write_property
139+
)
140+
141+
return CliqueCountingWriteResult(**result)
142+
143+
def estimate(
144+
self,
145+
G: Union[GraphV2, dict[str, Any]],
146+
concurrency: Optional[Any] = 4,
147+
node_labels: Optional[List[str]] = None,
148+
relationship_types: Optional[List[str]] = None,
149+
) -> EstimationResult:
150+
config = self._node_property_endpoints.create_estimate_config(
151+
concurrency=concurrency,
152+
node_labels=node_labels,
153+
relationship_types=relationship_types,
154+
)
155+
return self._node_property_endpoints.estimate("v2/community.cliquecounting.estimate", G, config)

0 commit comments

Comments
 (0)