Skip to content

Commit a432cf0

Browse files
committed
feat: even more stats 🙂
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent affd73d commit a432cf0

File tree

4 files changed

+188
-6
lines changed

4 files changed

+188
-6
lines changed

‎simpleflow/command.py‎

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ def terminate_workflow(
236236
run_id: str | None,
237237
):
238238
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
239+
if not ex:
240+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
241+
sys.exit(1)
239242
ex.terminate()
240243

241244

@@ -251,6 +254,9 @@ def terminate_workflow(
251254
)
252255
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
253256
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
257+
if not ex:
258+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
259+
sys.exit(1)
254260
history = ex.history()
255261
ex.terminate(reason="workflow.restart")
256262
new_ex = ex.workflow_type.start_execution(
@@ -315,6 +321,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
315321
)
316322

317323

324+
# FIXME superseded by history
318325
@click.option(
319326
"--nb-tasks",
320327
"-n",
@@ -347,6 +354,7 @@ def workflow_tasks(
347354
)
348355

349356

357+
# FIXME superseded by filter
350358
@click.argument(
351359
"domain",
352360
envvar="SWF_DOMAIN",
@@ -373,16 +381,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int):
373381
_NOTSET = object()
374382

375383

376-
@click.argument(
377-
"domain",
378-
envvar="SWF_DOMAIN",
379-
)
380384
@cli.command(
381385
"workflow.history",
382386
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
383387
)
384388
@click.argument("workflow_id")
385389
@click.argument("run_id", type=RUN_ID, required=False)
390+
@click.option(
391+
"--domain",
392+
envvar="SWF_DOMAIN",
393+
)
386394
@click.option(
387395
"--output-format",
388396
"--of",
@@ -408,6 +416,9 @@ def workflow_history(
408416
from simpleflow.swf.mapper.models.history.base import History as BaseHistory
409417

410418
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
419+
if not ex:
420+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
421+
sys.exit(1)
411422
events = ex.history_events(
412423
callback=get_progression_callback("events"),
413424
reverse_order=reverse_order,
@@ -432,6 +443,7 @@ def workflow_history(
432443
elif output_format == "cooked":
433444
history.parse()
434445
events = {
446+
"workflow": history.workflow,
435447
"activities": history.activities,
436448
"child_workflows": history.child_workflows,
437449
"markers": history.markers,
@@ -843,6 +855,11 @@ def standalone(
843855
ex.workflow_id,
844856
ex.run_id,
845857
)
858+
if not ex:
859+
print(
860+
f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found"
861+
)
862+
sys.exit(1)
846863
if display_status:
847864
print(f"status: {ex.status}", file=sys.stderr)
848865
if ex.status == ex.STATUS_CLOSED:

‎simpleflow/history.py‎

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non
4343
self.started_decision_id: int | None = None
4444
self.completed_decision_id: int | None = None
4545
self.last_event_id: int | None = None
46+
self._workflow: dict[str, Any] = {}
4647

4748
@property
4849
def swf_history(self) -> simpleflow.swf.mapper.models.history.History:
4950
return self._history
5051

52+
@property
53+
def workflow(self):
54+
return self._workflow
55+
5156
@property
5257
def activities(self) -> dict[str, ActivityTaskEventDict]:
5358
"""
@@ -432,6 +437,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven
432437
"""
433438
Parse a workflow event.
434439
"""
440+
if event.state == "started":
441+
self._workflow.update(
442+
{
443+
"state": event.state,
444+
f"{event.state}_id": event.id,
445+
f"{event.state}_timestamp": event.timestamp,
446+
"child_policy": getattr(event, "child_policy", None),
447+
"task_list": event.task_list["name"],
448+
"workflow_type": event.workflow_type,
449+
"continued_execution_run_id": getattr(event, "continued_execution_run_id", None),
450+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
451+
"input": getattr(event, "input", None),
452+
"lambda_role": getattr(event, "lambda_role", None),
453+
"parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None),
454+
"parent_workflow_execution": getattr(event, "parent_workflow_execution", None),
455+
"tag_list": getattr(event, "tag_list", None),
456+
"task_priority": getattr(event, "task_priority", None),
457+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
458+
}
459+
)
460+
elif event.state == "continued_as_new":
461+
self._workflow.update(
462+
{
463+
"state": event.state,
464+
f"{event.state}_id": event.id,
465+
f"{event.state}_timestamp": event.timestamp,
466+
f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
467+
"new_execution_run_id": event.new_execution_run_id,
468+
"task_list": event.task_list["name"],
469+
"workflow_type": event.workflow_type,
470+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
471+
"input": getattr(event, "input", None),
472+
"lambda_role": getattr(event, "lambda_role", None),
473+
"tag_list": getattr(event, "tag_list", None),
474+
"task_priority": getattr(event, "task_priority", None),
475+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
476+
}
477+
)
478+
elif event.state == "completed":
479+
self._workflow.update(
480+
{
481+
"state": event.state,
482+
f"{event.state}_id": event.id,
483+
f"{event.state}_timestamp": event.timestamp,
484+
"initiated_event_id": getattr(event, "initiated_event_id", None),
485+
"result": getattr(event, "result", None),
486+
}
487+
)
488+
elif event.state == "cancelled":
489+
self._workflow.update(
490+
{
491+
"state": event.state,
492+
f"{event.state}_id": event.id,
493+
f"{event.state}_timestamp": event.timestamp,
494+
"initiated_event_id": getattr(event, "initiated_event_id", None),
495+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
496+
"details": getattr(event, "details", None),
497+
}
498+
)
499+
elif event.state == "failed":
500+
self._workflow.update(
501+
{
502+
"state": event.state,
503+
f"{event.state}_id": event.id,
504+
f"{event.state}_timestamp": event.timestamp,
505+
"initiated_event_id": getattr(event, "initiated_event_id", None),
506+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
507+
"reason": getattr(event, "reason", None),
508+
"details": getattr(event, "details", None),
509+
}
510+
)
511+
elif event.state == "terminated":
512+
self._workflow.update(
513+
{
514+
"state": event.state,
515+
f"{event.state}_id": event.id,
516+
f"{event.state}_timestamp": event.timestamp,
517+
"initiated_event_id": getattr(event, "initiated_event_id", None),
518+
"cause": getattr(event, "cause", None),
519+
"details": getattr(event, "details", None),
520+
}
521+
)
522+
elif event.state == "timed_out":
523+
self._workflow.update(
524+
{
525+
"state": event.state,
526+
f"{event.state}_id": event.id,
527+
f"{event.state}_timestamp": event.timestamp,
528+
"initiated_event_id": getattr(event, "initiated_event_id", None),
529+
"timeout_type": event.timeout_type,
530+
}
531+
)
532+
# elif event.state in (
533+
# "cancel_failed",
534+
# "complete_failed",
535+
# "continue_as_new",
536+
# "fail_failed",
537+
# "start_child_failed",
538+
# "start_failed",
539+
# "terminate_failed",
540+
# ):
541+
# self._workflow.update(
542+
# {
543+
# "state": event.state,
544+
# f"{event.state}_id": event.id,
545+
# f"{event.state}_cause": getattr(event, "cause", None),
546+
# f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
547+
# }
548+
# )
549+
550+
if event.state == "cancel_requested":
551+
self._workflow.update()
435552
if event.state == "signaled":
436553
signal = {
437554
"type": "signal",

‎simpleflow/swf/helpers.py‎

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import os
77
import socket
8+
import sys
89
from typing import TYPE_CHECKING
910

1011
import psutil
@@ -29,11 +30,12 @@
2930
]
3031

3132

32-
def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution:
33+
def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution | None:
3334
def filter_execution(*args, **kwargs):
3435
if "workflow_status" in kwargs:
3536
kwargs["status"] = kwargs.pop("workflow_status")
36-
return query.filter(*args, **kwargs)[0]
37+
filtered_executions = query.filter(*args, **kwargs)
38+
return filtered_executions[0] if filtered_executions else None
3739

3840
domain = simpleflow.swf.mapper.models.Domain(domain_name)
3941
query = simpleflow.swf.mapper.querysets.WorkflowExecutionQuerySet(domain)
@@ -61,6 +63,9 @@ def show_workflow_info(domain_name, workflow_id, run_id=None):
6163
workflow_id,
6264
run_id,
6365
)
66+
if not workflow_execution:
67+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
68+
sys.exit(1)
6469
return pretty.info(workflow_execution)
6570

6671

@@ -70,6 +75,9 @@ def show_workflow_profile(domain_name, workflow_id, run_id=None, nb_tasks=None):
7075
workflow_id,
7176
run_id,
7277
)
78+
if not workflow_execution:
79+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
80+
sys.exit(1)
7381
return pretty.profile(workflow_execution, nb_tasks)
7482

7583

@@ -79,6 +87,9 @@ def show_workflow_status(domain_name: str, workflow_id: str, run_id: str | None
7987
workflow_id,
8088
run_id,
8189
)
90+
if not workflow_execution:
91+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
92+
sys.exit(1)
8293
return pretty.status(workflow_execution, nb_tasks)
8394

8495

@@ -171,6 +182,9 @@ def get_task(
171182
domain_name,
172183
workflow_id,
173184
)
185+
if not workflow_execution:
186+
print(f"Workflow {workflow_id} not found")
187+
sys.exit(1)
174188
return pretty.get_task(workflow_execution, task_id, details)
175189

176190

‎simpleflow/swf/mapper/models/event/workflow.py‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,44 @@ class WorkflowExecution(TypedDict):
2424
class WorkflowExecutionEvent(Event):
2525
_type = "WorkflowExecution"
2626

27+
# start
2728
initiated_event_id: int
29+
child_policy: str
30+
task_list: TaskList
31+
workflow_type: WorkflowType
32+
continued_execution_run_id: str | None
33+
execution_start_to_close_timeout: str | None
34+
input: str | None
35+
lambda_role: str | None
36+
parent_initiated_event_id: int | None
37+
parent_workflow_execution: WorkflowExecution | None
38+
tag_list: list[str] | None
39+
task_priority: str | None
40+
task_start_to_close_timeout: str | None
41+
42+
# continued_as_new
43+
new_execution_run_id: str
44+
2845
signal_name: str
46+
2947
decision_task_completed_event_id: int
3048

49+
# completed
50+
result: str | None
51+
52+
# terminated
53+
# child_policy:str
54+
cause: str | None
55+
details: str | None
56+
reason: str | None
57+
58+
# timed out
59+
# child_policy:str
60+
timeout_type: str | None
61+
62+
# workflow_execution: WorkflowExecution
63+
# close_status: str
64+
3165

3266
class CompiledWorkflowExecutionEvent(CompiledEvent):
3367
_type = "WorkflowExecution"

0 commit comments

Comments
 (0)