Skip to content

Commit 5b71e23

Browse files
Add plot functionality for new future selector (#820)
* Implement splitting of future objects * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update split.py * Update split.py * fix tests * extend tests * cover all lines * Import from API * fixes * Add support for dictionaries * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * use dict function * move split to main * Add docstrings * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Test response is None * Extend plotting function to support splitfuture objects * Rename splitfuture to futureselector * sync with latest changes * Rename module to select rather than split * rename module * fix plotting * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add tests * skip test * remove redundant lines * move plot module * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 4568507 commit 5b71e23

File tree

5 files changed

+72
-7
lines changed

5 files changed

+72
-7
lines changed

executorlib/task_scheduler/interactive/dependency.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
get_future_objects_from_input,
1212
update_futures_in_input,
1313
)
14-
from executorlib.standalone.plot import (
14+
from executorlib.task_scheduler.base import TaskSchedulerBase
15+
from executorlib.task_scheduler.interactive.dependency_plot import (
1516
generate_nodes_and_edges_for_plotting,
1617
generate_task_hash_for_plotting,
1718
plot_dependency_graph_function,
1819
)
19-
from executorlib.task_scheduler.base import TaskSchedulerBase
2020

2121

2222
class DependencyTaskScheduler(TaskSchedulerBase):

executorlib/standalone/plot.py renamed to executorlib/task_scheduler/interactive/dependency_plot.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import cloudpickle
66

7+
from executorlib.standalone.select import FutureSelector
8+
79

810
def generate_nodes_and_edges_for_plotting(
911
task_hash_dict: dict, future_hash_inverse_dict: dict
@@ -31,7 +33,15 @@ def add_element(arg, link_to, label=""):
3133
link_to: ID of the node to link the element to.
3234
label (str, optional): Label for the edge. Defaults to "".
3335
"""
34-
if isinstance(arg, Future):
36+
if isinstance(arg, FutureSelector):
37+
edge_lst.append(
38+
{
39+
"start": hash_id_dict[future_hash_inverse_dict[arg._future]],
40+
"end": link_to,
41+
"label": label + str(arg._selector),
42+
}
43+
)
44+
elif isinstance(arg, Future):
3545
edge_lst.append(
3646
{
3747
"start": hash_id_dict[future_hash_inverse_dict[arg]],
@@ -104,7 +114,24 @@ def convert_arg(arg, future_hash_inverse_dict):
104114
Returns:
105115
The hash representation of the argument.
106116
"""
107-
if isinstance(arg, Future):
117+
if isinstance(arg, FutureSelector):
118+
if arg not in future_hash_inverse_dict:
119+
obj_dict = {
120+
"args": (),
121+
"kwargs": {
122+
"future": future_hash_inverse_dict[arg._future],
123+
"selector": arg._selector,
124+
},
125+
}
126+
if isinstance(arg._selector, str):
127+
obj_dict["fn"] = "get_item_from_future"
128+
else:
129+
obj_dict["fn"] = "split_future"
130+
arg_hash = cloudpickle.dumps(obj_dict)
131+
future_hash_dict[arg_hash] = arg
132+
future_hash_inverse_dict[arg] = arg_hash
133+
return future_hash_inverse_dict[arg]
134+
elif isinstance(arg, Future):
108135
return future_hash_inverse_dict[arg]
109136
elif isinstance(arg, list):
110137
return [

tests/test_fluxjobexecutor_plot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from time import sleep
44

55
from executorlib import FluxJobExecutor, FluxClusterExecutor
6-
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
6+
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
77
from executorlib.standalone.serialize import cloudpickle_register
88

99

tests/test_singlenodeexecutor_plot_dependency.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
SingleNodeExecutor,
77
SlurmJobExecutor,
88
SlurmClusterExecutor,
9+
split_future,
10+
get_item_from_future,
911
)
10-
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
12+
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
1113
from executorlib.standalone.serialize import cloudpickle_register
1214

1315

@@ -43,6 +45,17 @@ def return_input_dict(input_dict):
4345
return input_dict
4446

4547

48+
def get_tuple(i):
49+
return 1, 2, i
50+
51+
52+
def get_dict(i):
53+
return {"a": 1, "b": i}
54+
55+
56+
def echo(i):
57+
return i
58+
4659
@unittest.skipIf(
4760
skip_graphviz_test,
4861
"graphviz is not installed, so the plot_dependency_graph tests are skipped.",
@@ -289,3 +302,28 @@ def test_many_to_one_plot(self):
289302
)
290303
self.assertEqual(len(nodes), 19)
291304
self.assertEqual(len(edges), 22)
305+
306+
307+
@unittest.skipIf(
308+
skip_graphviz_test,
309+
"graphviz is not installed, so the plot_dependency_graph tests are skipped.",
310+
)
311+
class TestSelectExecutorPlot(unittest.TestCase):
312+
def test_split_future(self):
313+
with SingleNodeExecutor(plot_dependency_graph=True) as exe:
314+
f = exe.submit(get_tuple, 5)
315+
f1, f2, f3 = split_future(future=f, n=3)
316+
f12 = exe.submit(echo, f1)
317+
f22 = exe.submit(echo, f2)
318+
f32 = exe.submit(echo, f3)
319+
self.assertIsNone(f12.result())
320+
self.assertIsNone(f22.result())
321+
self.assertIsNone(f32.result())
322+
323+
def test_get_item_from_future(self):
324+
with SingleNodeExecutor(plot_dependency_graph=True) as exe:
325+
f = exe.submit(get_dict, 5)
326+
f1 = exe.submit(echo, get_item_from_future(future=f, key="a"))
327+
f2 = exe.submit(echo, get_item_from_future(future=f, key="b"))
328+
self.assertIsNone(f1.result())
329+
self.assertIsNone(f2.result())

tests/test_testclusterexecutor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from executorlib import get_cache_data
66
from executorlib.api import TestClusterExecutor
7-
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
7+
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
88
from executorlib.standalone.serialize import cloudpickle_register
99

1010
try:

0 commit comments

Comments
 (0)