Skip to content

Commit b2f48b8

Browse files
committed
feat: workflow.history and more
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 4d6f21b commit b2f48b8

File tree

8 files changed

+200
-116
lines changed

8 files changed

+200
-116
lines changed

simpleflow/command.py

Lines changed: 149 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import json
34
import os
45
import platform
56
import signal
@@ -25,7 +26,7 @@
2526
from simpleflow.swf.stats import pretty
2627
from simpleflow.swf.task import ActivityTask
2728
from simpleflow.swf.utils import get_workflow_execution, set_workflow_class_name
28-
from simpleflow.utils import import_from_module, json_dumps
29+
from simpleflow.utils import import_from_module, json_dumps, serialize_complex_object
2930
from simpleflow.workflow import Workflow
3031

3132
if TYPE_CHECKING:
@@ -42,7 +43,7 @@
4243
]
4344

4445

45-
def comma_separated_list(value):
46+
def comma_separated_list(value: str) -> list[str]:
4647
"""
4748
Transforms a comma-separated list into a list of strings.
4849
"""
@@ -59,7 +60,7 @@ def comma_separated_list(value):
5960
)
6061
@click.version_option(version=__version__)
6162
@click.pass_context
62-
def cli(ctx, header, format, color):
63+
def cli(ctx, header: bool, format: str, color: str) -> None:
6364
if format == "prettyjson":
6465
format, header = "json", True
6566
ctx.params["format"] = format
@@ -156,18 +157,18 @@ def run_workflow_locally(workflow_class, wf_input, middlewares):
156157
@click.argument("workflow")
157158
@cli.command("workflow.start", help="Start the workflow defined in the WORKFLOW module.")
158159
def start_workflow(
159-
workflow,
160-
domain,
161-
workflow_id,
162-
task_list,
163-
execution_timeout,
164-
tags,
165-
decision_tasks_timeout,
166-
input,
167-
input_file,
168-
local,
169-
middleware_pre_execution,
170-
middleware_post_execution,
160+
workflow: str,
161+
domain: str | None,
162+
workflow_id: str | None,
163+
task_list: str | None,
164+
execution_timeout: str | None,
165+
tags: str | None,
166+
decision_tasks_timeout: str | None,
167+
input: str | None,
168+
input_file: str | None,
169+
local: bool,
170+
middleware_pre_execution: str | None,
171+
middleware_post_execution: str | None,
171172
):
172173
workflow_class = import_from_module(workflow)
173174

@@ -213,7 +214,11 @@ def start_workflow(
213214
"workflow.terminate",
214215
help="Workflow associated with WORKFLOW and optionally RUN_ID.",
215216
)
216-
def terminate_workflow(domain, workflow_id, run_id):
217+
def terminate_workflow(
218+
domain: str,
219+
workflow_id: str,
220+
run_id: str | None,
221+
):
217222
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
218223
ex.terminate()
219224

@@ -228,7 +233,7 @@ def terminate_workflow(domain, workflow_id, run_id):
228233
"workflow.restart",
229234
help="Workflow associated with WORKFLOW_ID and optionally RUN_ID.",
230235
)
231-
def restart_workflow(domain, workflow_id, run_id):
236+
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
232237
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
233238
history = ex.history()
234239
ex.terminate(reason="workflow.restart")
@@ -258,7 +263,7 @@ def with_format(ctx):
258263
)
259264
@cli.command("workflow.info", help="Info about a workflow execution.")
260265
@click.pass_context
261-
def workflow_info(ctx, domain, workflow_id, run_id):
266+
def workflow_info(ctx, domain: str, workflow_id: str, run_id: str | None):
262267
print(
263268
with_format(ctx)(helpers.show_workflow_info)(
264269
domain,
@@ -309,7 +314,13 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
309314
)
310315
@cli.command("workflow.tasks", help="Tasks of a workflow execution.")
311316
@click.pass_context
312-
def status(ctx, domain, workflow_id, run_id, nb_tasks):
317+
def workflow_tasks(
318+
ctx,
319+
domain: str,
320+
workflow_id: str,
321+
run_id: str | None,
322+
nb_tasks: int | None,
323+
) -> None:
313324
print(
314325
with_format(ctx)(helpers.show_workflow_status)(
315326
domain,
@@ -335,14 +346,81 @@ def status(ctx, domain, workflow_id, run_id, nb_tasks):
335346
)
336347
@click.option("--started-since", "-d", default=30, show_default=True, help="Started since N days.")
337348
@click.pass_context
338-
def list_workflows(ctx, domain, status, started_since):
349+
def list_workflows(ctx, domain: str, status: str, started_since: int):
339350
print(
340351
with_format(ctx)(helpers.list_workflow_executions)(
341352
domain, status=status.upper(), start_oldest_date=started_since
342353
)
343354
)
344355

345356

357+
_NOTSET = object()
358+
359+
360+
@click.argument(
361+
"domain",
362+
envvar="SWF_DOMAIN",
363+
)
364+
@cli.command(
365+
"workflow.history",
366+
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
367+
)
368+
@click.argument("workflow_id")
369+
@click.argument("run_id", required=False)
370+
@click.option(
371+
"--format", required=False, type=click.Choice(["rawest", "raw", "cooked"]), default="raw", help="Output format."
372+
)
373+
@click.option("--reverse-order", required=False, type=bool, default=False, help="Reverse order.")
374+
@click.pass_context
375+
def workflow_history(
376+
ctx,
377+
domain: str,
378+
workflow_id: str,
379+
run_id: str | None,
380+
format: str,
381+
reverse_order: bool = False,
382+
) -> None:
383+
from simpleflow.swf.mapper.models.history.base import History as BaseHistory
384+
385+
if ctx.format != "json" or not ctx.header:
386+
raise NotImplementedError("Only pretty JSON mode is implemented")
387+
388+
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
389+
events = ex.history_events(
390+
callback=get_progression_callback("events"),
391+
reverse_order=reverse_order,
392+
)
393+
if format == "rawest":
394+
pass
395+
else:
396+
raw_history = BaseHistory.from_event_list(events)
397+
history = History(raw_history)
398+
if format == "raw":
399+
events = []
400+
for event in history.events[:10]:
401+
e = {}
402+
for k in ["id", "type", "state", "timestamp", "input", "control", *event.__dict__]:
403+
if k.startswith("_") or k == "raw":
404+
continue
405+
v = getattr(event, k, _NOTSET)
406+
if v is _NOTSET:
407+
continue
408+
e[k] = v
409+
events.append(e)
410+
elif format == "cooked":
411+
history.parse()
412+
events = {
413+
"activities": history.activities,
414+
"child_workflows": history.child_workflows,
415+
"markers": history.markers,
416+
"signals": history.signals,
417+
"timers": history.timers,
418+
}
419+
else:
420+
raise NotImplementedError
421+
print(json.dumps(events, separators=(",", ":"), default=serialize_complex_object))
422+
423+
346424
@click.argument(
347425
"domain",
348426
envvar="SWF_DOMAIN",
@@ -360,6 +438,18 @@ def list_workflows(ctx, domain, status, started_since):
360438
@click.option("--workflow-id", default=None, help="Workflow ID.")
361439
@click.option("--workflow-type-name", default=None, help="Workflow Name.")
362440
@click.option("--workflow-type-version", default=None, help="Workflow Version (name needed).")
441+
@click.option(
442+
"--close-status",
443+
"-c",
444+
type=click.Choice(
445+
[
446+
case
447+
for state in ["COMPLETED", "FAILED", "CANCELED", "TERMINATED", "CONTINUED_AS_NEW", "TIMED_OUT"]
448+
for case in [state, state.lower()]
449+
]
450+
),
451+
help="Started since N days.",
452+
)
363453
@click.option("--started-since", "-d", default=30, show_default=True, help="Started since N days.")
364454
@click.option("--from-date", default=None, type=click.DateTime(formats=TIMESTAMP_FORMATS), help="From datetime.")
365455
@click.option("--to-date", default=None, type=click.DateTime(formats=TIMESTAMP_FORMATS), help="To datetime.")
@@ -372,12 +462,13 @@ def filter_workflows(
372462
workflow_id: str | None,
373463
workflow_type_name: str | None,
374464
workflow_type_version: str | None,
465+
close_status: str | None,
375466
started_since: int | None,
376467
from_date: datetime | None,
377468
to_date: datetime | None,
378469
):
379470
status = status.upper()
380-
kwargs = {}
471+
kwargs: dict[str, Any] = {}
381472
if status == simpleflow.swf.mapper.models.workflow.WorkflowExecution.STATUS_OPEN:
382473
if from_date:
383474
kwargs["oldest_date"] = from_date
@@ -391,6 +482,26 @@ def filter_workflows(
391482
else:
392483
kwargs["start_oldest_date"] = started_since
393484

485+
if close_status and status != simpleflow.swf.mapper.models.workflow.WorkflowExecution.STATUS_CLOSED:
486+
raise Exception("Closed status not supported for non-closed workflows.")
487+
elif close_status:
488+
kwargs["close_status"] = close_status.upper()
489+
490+
print(
491+
with_format(ctx)(helpers.filter_workflow_executions)(
492+
domain,
493+
status=status.upper(),
494+
tag=tag,
495+
workflow_id=workflow_id,
496+
workflow_type_name=workflow_type_name,
497+
workflow_type_version=workflow_type_version,
498+
callback=get_progression_callback("executionInfos"),
499+
**kwargs,
500+
)
501+
)
502+
503+
504+
def get_progression_callback(key: str):
394505
if os.isatty(sys.stderr.fileno()):
395506
spin_marks = ["⠏", "⠛", "⠹", "⠼", "⠶", "⠧"] # from Google's googlecloudsdk.core
396507
success = f"{log.GREEN}{log.END}" if log.color_mode != log.ColorModes.NEVER else "✓"
@@ -403,25 +514,14 @@ def __init__(self):
403514

404515
def cb(*_args, loop_number: int, response: dict[str, Any] | None, **_kwargs):
405516
if response:
406-
executions = len(response.get("executionInfos", []))
517+
executions = len(response.get(key, []))
407518
print(f"\r{spin_marks[loop_number % len(spin_marks)]} {counter.total}", file=sys.stderr, end="")
408519
counter.total += executions
409520
else:
410521
print(f"\r{success} {counter.total}", file=sys.stderr)
411522
else:
412523
cb = None
413-
print(
414-
with_format(ctx)(helpers.filter_workflow_executions)(
415-
domain,
416-
status=status.upper(),
417-
tag=tag,
418-
workflow_id=workflow_id,
419-
workflow_type_name=workflow_type_name,
420-
workflow_type_version=workflow_type_version,
421-
callback=cb,
422-
**kwargs,
423-
)
424-
)
524+
return cb
425525

426526

427527
@click.argument("task_id")
@@ -431,9 +531,15 @@ def cb(*_args, loop_number: int, response: dict[str, Any] | None, **_kwargs):
431531
envvar="SWF_DOMAIN",
432532
)
433533
@click.option("--details/--no-details", default=False, help="Display details.")
434-
@cli.command("task.info", help="Informations on a task.")
534+
@cli.command("task.info", help="Information on a task.")
435535
@click.pass_context
436-
def task_info(ctx, domain, workflow_id, task_id, details):
536+
def task_info(
537+
ctx,
538+
domain: str,
539+
workflow_id: str,
540+
task_id: str,
541+
details: bool,
542+
) -> None:
437543
print(with_format(ctx)(helpers.get_task)(domain, workflow_id, task_id, details))
438544

439545

@@ -443,7 +549,13 @@ def task_info(ctx, domain, workflow_id, task_id, details):
443549
@click.option("--domain", "-d", envvar="SWF_DOMAIN", required=True, help="SWF Domain")
444550
@click.argument("workflows", nargs=-1, required=False)
445551
@cli.command("decider.start", help="Start a decider process to manage workflow executions.")
446-
def start_decider(workflows, domain, task_list, log_level, nb_processes):
552+
def start_decider(
553+
workflows: list[str],
554+
domain: str,
555+
task_list: str,
556+
log_level: str,
557+
nb_processes: int,
558+
) -> None:
447559
if log_level:
448560
logger.warning("Deprecated: --log-level will be removed, use LOG_LEVEL environment variable instead")
449561
decider.command.start(

simpleflow/signal.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from .base import Submittable
3+
from simpleflow.base import Submittable
44

55

66
class WaitForSignal(Submittable):

simpleflow/swf/helpers.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def show_workflow_profile(domain_name, workflow_id, run_id=None, nb_tasks=None):
7373
return pretty.profile(workflow_execution, nb_tasks)
7474

7575

76-
def show_workflow_status(domain_name, workflow_id, run_id=None, nb_tasks=None):
76+
def show_workflow_status(domain_name: str, workflow_id: str, run_id: str | None = None, nb_tasks: int | None = None):
7777
workflow_execution = get_workflow_execution(
7878
domain_name,
7979
workflow_id,
@@ -161,7 +161,12 @@ def find_activity(history, scheduled_id=None, activity_id=None, input=None):
161161
return activity, args, kwargs, meta, found_activity
162162

163163

164-
def get_task(domain_name, workflow_id, task_id, details):
164+
def get_task(
165+
domain_name: str,
166+
workflow_id: str,
167+
task_id: str,
168+
details: bool,
169+
):
165170
workflow_execution = get_workflow_execution(
166171
domain_name,
167172
workflow_id,

simpleflow/swf/mapper/models/event/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ class Event:
4242
:param raw_data: raw_event representation provided by amazon service
4343
"""
4444

45-
_type: str | None = None
46-
_name: str | None = None
47-
_attributes_key: str | None = None
48-
_attributes = None
45+
_type: str | None
46+
_name: str | None
47+
_attributes_key: str | None
48+
_attributes = Any
4949

5050
excluded_attributes = ("eventId", "eventType", "eventTimestamp")
5151

0 commit comments

Comments
 (0)