Skip to content

Commit a1def51

Browse files
committed
Introduce WccEndpoints
1 parent 2a95445 commit a1def51

File tree

2 files changed

+333
-0
lines changed

2 files changed

+333
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from dataclasses import dataclass
2+
from typing import Any
3+
4+
5+
@dataclass(frozen=True, repr=True)
6+
class EstimationResult:
7+
node_count: int
8+
relationship_count: int
9+
required_memory: str
10+
tree_view: str
11+
map_view: dict[str, Any]
12+
bytes_min: int
13+
bytes_max: int
14+
heap_percentage_min: float
15+
heap_percentage_max: float
16+
17+
def __getitem__(self, item: str) -> Any:
18+
return getattr(self, item)
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from dataclasses import dataclass
5+
from typing import Any, List, Optional
6+
7+
from pandas import DataFrame
8+
9+
from ...graph.graph_object import Graph
10+
from .estimation_result import EstimationResult
11+
12+
13+
class WccEndpoints(ABC):
14+
"""
15+
Abstract base class defining the API for the Weakly Connected Components (WCC) algorithm.
16+
"""
17+
18+
@abstractmethod
19+
def mutate(
20+
self,
21+
G: Graph,
22+
mutate_property: str,
23+
threshold: Optional[float] = None,
24+
relationship_types: Optional[List[str]] = None,
25+
node_labels: Optional[List[str]] = None,
26+
sudo: Optional[bool] = None,
27+
log_progress: Optional[bool] = None,
28+
username: Optional[str] = None,
29+
concurrency: Optional[Any] = None,
30+
job_id: Optional[Any] = None,
31+
seed_property: Optional[str] = None,
32+
consecutive_ids: Optional[bool] = None,
33+
relationship_weight_property: Optional[str] = None,
34+
) -> WccMutateResult:
35+
"""
36+
Executes the WCC algorithm and writes the results to the in-memory graph as node properties.
37+
38+
Parameters
39+
----------
40+
G : Graph
41+
The graph to run the algorithm on
42+
mutate_property : str
43+
The property name to store the component ID for each node
44+
threshold : Optional[float], default=None
45+
The minimum required weight to consider a relationship during traversal
46+
relationship_types : Optional[List[str]], default=None
47+
The relationships types used to select relationships for this algorithm run
48+
node_labels : Optional[List[str]], default=None
49+
The node labels used to select nodes for this algorithm run
50+
sudo : Optional[bool], default=None
51+
Override memory estimation limits
52+
log_progress : Optional[bool], default=None
53+
Whether to log progress
54+
username : Optional[str], default=None
55+
The username to attribute the procedure run to
56+
concurrency : Optional[Any], default=None
57+
The number of concurrent threads
58+
job_id : Optional[Any], default=None
59+
An identifier for the job
60+
seed_property : Optional[str], default=None
61+
Defines node properties that are used as initial component identifiers
62+
consecutive_ids : Optional[bool], default=None
63+
Flag to decide whether component identifiers are mapped into a consecutive id space
64+
relationship_weight_property : Optional[str], default=None
65+
The property name that contains weight
66+
67+
Returns
68+
-------
69+
WccMutateResult
70+
Algorithm metrics and statistics
71+
"""
72+
pass
73+
74+
@abstractmethod
75+
def stats(
76+
self,
77+
G: Graph,
78+
threshold: Optional[float] = None,
79+
relationship_types: Optional[List[str]] = None,
80+
node_labels: Optional[List[str]] = None,
81+
sudo: Optional[bool] = None,
82+
log_progress: Optional[bool] = None,
83+
username: Optional[str] = None,
84+
concurrency: Optional[Any] = None,
85+
job_id: Optional[Any] = None,
86+
seed_property: Optional[str] = None,
87+
consecutive_ids: Optional[bool] = None,
88+
relationship_weight_property: Optional[str] = None,
89+
) -> WccStatsResult:
90+
"""
91+
Executes the WCC algorithm and returns statistics.
92+
93+
Parameters
94+
----------
95+
G : Graph
96+
The graph to run the algorithm on
97+
threshold : Optional[float], default=None
98+
The minimum required weight to consider a relationship during traversal
99+
relationship_types : Optional[List[str]], default=None
100+
The relationships types used to select relationships for this algorithm run
101+
node_labels : Optional[List[str]], default=None
102+
The node labels used to select nodes for this algorithm run
103+
sudo : Optional[bool], default=None
104+
Override memory estimation limits
105+
log_progress : Optional[bool], default=None
106+
Whether to log progress
107+
username : Optional[str], default=None
108+
The username to attribute the procedure run to
109+
concurrency : Optional[Any], default=None
110+
The number of concurrent threads
111+
job_id : Optional[Any], default=None
112+
An identifier for the job
113+
seed_property : Optional[str], default=None
114+
Defines node properties that are used as initial component identifiers
115+
consecutive_ids : Optional[bool], default=None
116+
Flag to decide whether component identifiers are mapped into a consecutive id space
117+
relationship_weight_property : Optional[str], default=None
118+
The property name that contains weight
119+
120+
Returns
121+
-------
122+
WccStatsResult
123+
Algorithm metrics and statistics
124+
"""
125+
pass
126+
127+
@abstractmethod
128+
def stream(
129+
self,
130+
G: Graph,
131+
min_component_size: Optional[int] = None,
132+
threshold: Optional[float] = None,
133+
relationship_types: Optional[List[str]] = None,
134+
node_labels: Optional[List[str]] = None,
135+
sudo: Optional[bool] = None,
136+
log_progress: Optional[bool] = None,
137+
username: Optional[str] = None,
138+
concurrency: Optional[Any] = None,
139+
job_id: Optional[Any] = None,
140+
seed_property: Optional[str] = None,
141+
consecutive_ids: Optional[bool] = None,
142+
relationship_weight_property: Optional[str] = None,
143+
) -> DataFrame:
144+
"""
145+
Executes the WCC algorithm and returns a stream of results.
146+
147+
Parameters
148+
----------
149+
G : Graph
150+
The graph to run the algorithm on
151+
min_component_size : Optional[int], default=None
152+
Don't stream components with fewer nodes than this
153+
threshold : Optional[float], default=None
154+
The minimum required weight to consider a relationship during traversal
155+
relationship_types : Optional[List[str]], default=None
156+
The relationships types considered in this algorithm run
157+
node_labels : Optional[List[str]], default=None
158+
The node labels used to select nodes for this algorithm run
159+
sudo : Optional[bool], default=None
160+
Override memory estimation limits
161+
log_progress : Optional[bool], default=None
162+
Whether to log progress
163+
username : Optional[str], default=None
164+
The username to attribute the procedure run to
165+
concurrency : Optional[Any], default=None
166+
The number of concurrent threads
167+
job_id : Optional[Any], default=None
168+
An identifier for the job
169+
seed_property : Optional[str], default=None
170+
Defines node properties that are used as initial component identifiers
171+
consecutive_ids : Optional[bool], default=None
172+
Flag to decide whether component identifiers are mapped into a consecutive id space
173+
relationship_weight_property : Optional[str], default=None
174+
The property name that contains weight
175+
176+
Returns
177+
-------
178+
DataFrame
179+
DataFrame with the algorithm results
180+
"""
181+
pass
182+
183+
@abstractmethod
184+
def write(
185+
self,
186+
G: Graph,
187+
write_property: str,
188+
min_component_size: Optional[int] = None,
189+
threshold: Optional[float] = None,
190+
relationship_types: Optional[List[str]] = None,
191+
node_labels: Optional[List[str]] = None,
192+
sudo: Optional[bool] = None,
193+
log_progress: Optional[bool] = None,
194+
username: Optional[str] = None,
195+
concurrency: Optional[Any] = None,
196+
job_id: Optional[Any] = None,
197+
seed_property: Optional[str] = None,
198+
consecutive_ids: Optional[bool] = None,
199+
relationship_weight_property: Optional[str] = None,
200+
write_concurrency: Optional[Any] = None,
201+
) -> WccWriteResult:
202+
"""
203+
Executes the WCC algorithm and writes the results to the Neo4j database.
204+
205+
Parameters
206+
----------
207+
G : Graph
208+
The graph to run the algorithm on
209+
write_property : str
210+
The property name to write component IDs to
211+
min_component_size : Optional[int], default=None
212+
Don't write components with fewer nodes than this
213+
threshold : Optional[float], default=None
214+
The minimum required weight to consider a relationship during traversal
215+
relationship_types : Optional[List[str]], default=None
216+
The relationships types considered in this algorithm run
217+
node_labels : Optional[List[str]], default=None
218+
The node labels used to select nodes for this algorithm run
219+
sudo : Optional[bool], default=None
220+
Override memory estimation limits
221+
log_progress : Optional[bool], default=None
222+
Whether to log progress
223+
username : Optional[str], default=None
224+
The username to attribute the procedure run to
225+
concurrency : Optional[Any], default=None
226+
The number of concurrent threads
227+
job_id : Optional[Any], default=None
228+
An identifier for the job
229+
seed_property : Optional[str], default=None
230+
Defines node properties that are used as initial component identifiers
231+
consecutive_ids : Optional[bool], default=None
232+
Flag to decide whether component identifiers are mapped into a consecutive id space
233+
relationship_weight_property : Optional[str], default=None
234+
The property name that contains weight
235+
write_concurrency : Optional[Any], default=None
236+
The number of concurrent threads during the write phase
237+
238+
Returns
239+
-------
240+
WccWriteResult
241+
Algorithm metrics and statistics
242+
"""
243+
pass
244+
245+
@abstractmethod
246+
def estimate(
247+
self,
248+
graph_name: Optional[str] = None,
249+
projection_config: Optional[dict[str, Any]] = None,
250+
) -> EstimationResult:
251+
"""
252+
Estimate the results based on the provided graph and configuration.
253+
254+
This abstract method is intended to be implemented in a subclass to provide
255+
specific functionality for estimating outcomes based on a graph name and a
256+
projection configuration object. The implementation should use the given
257+
parameters to compute and return an appropriate estimation result.
258+
259+
Parameters
260+
----------
261+
graph_name : Optional[str], optional
262+
Name of the graph to be used in the estimation
263+
projection_config : Optional[dict[str, Any]], optional
264+
Configuration dictionary for the projection.
265+
266+
Returns
267+
-------
268+
EstimationResult
269+
An object containing the result of the estimation, with relevant details
270+
as defined by the specific implementation.
271+
"""
272+
pass
273+
274+
275+
@dataclass(frozen=True, repr=True)
276+
class WccMutateResult:
277+
component_count: int
278+
component_distribution: dict[str, Any]
279+
pre_processing_millis: int
280+
compute_millis: int
281+
post_processing_millis: int
282+
mutate_millis: int
283+
node_properties_written: int
284+
configuration: dict[str, Any]
285+
286+
def __getitem__(self, item: str) -> Any:
287+
return getattr(self, item)
288+
289+
290+
@dataclass(frozen=True, repr=True)
291+
class WccStatsResult:
292+
component_count: int
293+
component_distribution: dict[str, Any]
294+
pre_processing_millis: int
295+
compute_millis: int
296+
post_processing_millis: int
297+
configuration: dict[str, Any]
298+
299+
def __getitem__(self, item: str) -> Any:
300+
return getattr(self, item)
301+
302+
303+
@dataclass(frozen=True, repr=True)
304+
class WccWriteResult:
305+
component_count: int
306+
component_distribution: dict[str, Any]
307+
pre_processing_millis: int
308+
compute_millis: int
309+
write_millis: int
310+
post_processing_millis: int
311+
node_properties_written: int
312+
configuration: dict[str, Any]
313+
314+
def __getitem__(self, item: str) -> Any:
315+
return getattr(self, item)

0 commit comments

Comments
 (0)