Skip to content

Commit 64b9529

Browse files
committed
wip
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 8870433 commit 64b9529

File tree

4 files changed

+58
-10
lines changed

4 files changed

+58
-10
lines changed

simpleflow/cli/decider.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import annotations
2+
3+
import typer
4+
from typing_extensions import Annotated
5+
6+
from simpleflow.swf.process.decider import command
7+
8+
app = typer.Typer(no_args_is_help=True)
9+
10+
11+
@app.command()
12+
def start(
13+
ctx: typer.Context,
14+
workflows: Annotated[list[str] | None, typer.Argument()] = None,
15+
*,
16+
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
17+
task_list: Annotated[str, typer.Option("--task-list", "-t")] | None = None,
18+
nb_processes: Annotated[int, typer.Option("--nb-processes", "-n")] | None = None,
19+
):
20+
"""
21+
Start a decider.
22+
"""
23+
if not workflows and not task_list:
24+
raise typer.BadParameter("workflows or task_list is required")
25+
command.start(
26+
workflows=workflows or [],
27+
domain=domain,
28+
task_list=task_list,
29+
nb_processes=nb_processes,
30+
)
31+
32+
33+
if __name__ == "__main__":
34+
app()

simpleflow/cli/workflow.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
from __future__ import annotations
22

3-
import json
43
from datetime import datetime
54
from enum import Enum
65
from typing import Any
76

87
import typer
98
from typing_extensions import Annotated
109

10+
from simpleflow import Workflow, format
1111
from simpleflow.command import get_progression_callback, get_workflow_type, with_format
1212
from simpleflow.swf import helpers
1313
from simpleflow.swf.mapper.models import WorkflowExecution
@@ -46,7 +46,6 @@ def filter(
4646
status: Annotated[Status, typer.Option("--status", "-s")] = Status.open,
4747
tag: str | None = None,
4848
workflow_id: str | None = None,
49-
run_id: str | None = None,
5049
workflow_type: str | None = None,
5150
workflow_type_version: str | None = None,
5251
close_status: CloseStatus | None = None,
@@ -80,7 +79,7 @@ def filter(
8079
print(
8180
with_format(ctx.parent)(helpers.filter_workflow_executions)(
8281
domain,
83-
status=status.upper(),
82+
status=status,
8483
tag=tag,
8584
workflow_id=workflow_id,
8685
workflow_type_name=workflow_type,
@@ -95,16 +94,16 @@ def filter(
9594
def start(
9695
ctx: typer.Context,
9796
workflow: str,
98-
domain: Annotated[str, typer.Argument(envvar="SWF_DOMAIN")],
97+
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
9998
input: Annotated[str, typer.Option("--input", "-i", help="input JSON")] | None = None,
10099
):
101100
"""
102101
Start a workflow.
103102
"""
104-
workflow_class = import_from_module(workflow)
103+
workflow_class: type[Workflow] = import_from_module(workflow)
105104
wf_input: dict[str, Any] = {}
106105
if input is not None:
107-
json_input = json.loads(input)
106+
json_input = format.decode(input)
108107
if isinstance(json_input, list):
109108
wf_input = {"args": json_input, "kwargs": {}}
110109
elif isinstance(json_input, dict) and ("args" not in json_input or "kwargs" not in json_input):
@@ -113,9 +112,22 @@ def start(
113112
wf_input = json_input
114113
workflow_type = get_workflow_type(domain, workflow_class)
115114
set_workflow_class_name(wf_input, workflow_class)
115+
get_task_list = getattr(workflow_class, "get_task_list", None)
116+
if get_task_list:
117+
if not callable(get_task_list):
118+
raise Exception("get_task_list must be a callable")
119+
if isinstance(wf_input, dict):
120+
args = wf_input.get("args", [])
121+
kwargs = wf_input.get("kwargs", {})
122+
else:
123+
args = []
124+
kwargs = wf_input
125+
task_list = get_task_list(workflow_class, *args, **kwargs)
126+
else:
127+
task_list = workflow_class.task_list
116128
execution = workflow_type.start_execution(
117129
# workflow_id=workflow_id,
118-
# task_list=task_list or workflow_class.task_list,
130+
task_list=task_list,
119131
# execution_timeout=execution_timeout,
120132
input=wf_input,
121133
# tag_list=tags,

simpleflow/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import typer
77
from typing_extensions import Annotated
88

9-
from simpleflow.cli import workflow
9+
from simpleflow.cli import decider, workflow
1010

1111
app = typer.Typer(
1212
# add_completion=False,
@@ -15,6 +15,7 @@
1515
)
1616

1717
app.add_typer(workflow.app, name="workflow", help="Manage workflows")
18+
app.add_typer(decider.app, name="decider", help="Manage deciders")
1819

1920

2021
class Format(str, Enum):

simpleflow/swf/stats/pretty.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ def list_executions(
232232
def list_details(
233233
workflow_executions: list[WorkflowExecution],
234234
) -> tuple[Sequence, Sequence]:
235+
# FIXME: input is missing, what else?
235236
header = (
236237
"Workflow ID",
237238
"Workflow Type",
@@ -244,7 +245,7 @@ def list_details(
244245
"Close Timestamp",
245246
"Cancel Requested",
246247
"Execution Timeout",
247-
"Input",
248+
# "Input",
248249
"Tags",
249250
"Decision Tasks Timeout",
250251
"Parent Workflow ID",
@@ -264,7 +265,7 @@ def list_details(
264265
execution.close_timestamp,
265266
execution.cancel_requested,
266267
execution.execution_timeout,
267-
execution.input,
268+
# execution.input,
268269
execution.tag_list,
269270
execution.decision_tasks_timeout,
270271
execution.parent.get("workflowId"),

0 commit comments

Comments
 (0)