Skip to content

Commit 121d378

Browse files
committed
feat: select dates for workflow.filter
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent cd1f60b commit 121d378

File tree

4 files changed

+98
-28
lines changed

4 files changed

+98
-28
lines changed

simpleflow/command.py

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import sys
77
import time
88
from contextlib import contextmanager
9+
from datetime import datetime
910
from typing import TYPE_CHECKING
1011
from uuid import uuid4
1112

@@ -32,6 +33,14 @@
3233

3334
from simpleflow.swf.mapper.models.workflow import WorkflowType
3435

36+
TIMESTAMP_FORMATS = [
37+
"%Y-%m-%d",
38+
"%Y-%m-%dT%H:%M:%S%z",
39+
"%Y-%m-%d %H:%M:%S%z",
40+
"%Y-%m-%dT%H:%M:%S",
41+
"%Y-%m-%d %H:%M:%S",
42+
]
43+
3544

3645
def comma_separated_list(value):
3746
"""
@@ -41,7 +50,7 @@ def comma_separated_list(value):
4150

4251

4352
@click.group()
44-
@click.option("--format")
53+
@click.option("--format", envvar="SIMPLEFLOW_FORMAT")
4554
@click.option("--header/--no-header", default=False)
4655
@click.option(
4756
"--color",
@@ -51,6 +60,8 @@ def comma_separated_list(value):
5160
@click.version_option(version=__version__)
5261
@click.pass_context
5362
def cli(ctx, header, format, color):
63+
if format == "prettyjson":
64+
format, header = "json", True
5465
ctx.params["format"] = format
5566
ctx.params["header"] = header
5667
log.color_mode = color
@@ -345,28 +356,60 @@ def list_workflows(ctx, domain, status, started_since):
345356
type=click.Choice(["open", "closed"]),
346357
help="Open/Closed",
347358
)
348-
@click.option("--tag", default=None, help="Tag (multiple option).") # , multiple=True
359+
@click.option("--tag", default=None, help="Tags (comma-separated).") # , multiple=True
349360
@click.option("--workflow-id", default=None, help="Workflow ID.")
350361
@click.option("--workflow-type-name", default=None, help="Workflow Name.")
351362
@click.option("--workflow-type-version", default=None, help="Workflow Version (name needed).")
352363
@click.option("--started-since", "-d", default=30, show_default=True, help="Started since N days.")
364+
@click.option("--from-date", default=None, type=click.DateTime(formats=TIMESTAMP_FORMATS), help="From datetime.")
365+
@click.option("--to-date", default=None, type=click.DateTime(formats=TIMESTAMP_FORMATS), help="To datetime.")
353366
@click.pass_context
354367
def filter_workflows(
355368
ctx,
356-
domain,
357-
status,
358-
tag,
359-
workflow_id,
360-
workflow_type_name,
361-
workflow_type_version,
362-
started_since,
369+
domain: str,
370+
status: str,
371+
tag: str | None,
372+
workflow_id: str | None,
373+
workflow_type_name: str | None,
374+
workflow_type_version: str | None,
375+
started_since: int | None,
376+
from_date: datetime | None,
377+
to_date: datetime | None,
363378
):
364379
status = status.upper()
365380
kwargs = {}
366381
if status == simpleflow.swf.mapper.models.workflow.WorkflowExecution.STATUS_OPEN:
367-
kwargs["oldest_date"] = started_since
382+
if from_date:
383+
kwargs["oldest_date"] = from_date
384+
kwargs["latest_date"] = to_date
385+
else:
386+
kwargs["oldest_date"] = started_since
387+
else:
388+
if from_date:
389+
kwargs["start_oldest_date"] = from_date
390+
kwargs["start_latest_date"] = to_date
391+
else:
392+
kwargs["start_oldest_date"] = started_since
393+
394+
if os.isatty(sys.stderr.fileno()):
395+
spin_marks = ["⠏", "⠛", "⠹", "⠼", "⠶", "⠧"] # from Google's googlecloudsdk.core
396+
success = f"{log.GREEN}{log.END}" if log.color_mode != log.ColorModes.NEVER else "✓"
397+
398+
class Counter:
399+
def __init__(self):
400+
self.total = 0
401+
402+
counter = Counter()
403+
404+
def cb(*_args, loop_number: int, response: dict[str, Any] | None, **_kwargs):
405+
if response:
406+
executions = len(response.get("executionInfos", []))
407+
print(f"\r{spin_marks[loop_number % len(spin_marks)]} {counter.total}", file=sys.stderr, end="")
408+
counter.total += executions
409+
else:
410+
print(f"\r{success} {counter.total}", file=sys.stderr)
368411
else:
369-
kwargs["start_oldest_date"] = started_since
412+
cb = None
370413
print(
371414
with_format(ctx)(helpers.filter_workflow_executions)(
372415
domain,
@@ -375,6 +418,7 @@ def filter_workflows(
375418
workflow_id=workflow_id,
376419
workflow_type_name=workflow_type_name,
377420
workflow_type_version=workflow_type_version,
421+
callback=cb,
378422
**kwargs,
379423
)
380424
)

simpleflow/swf/helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def filter_workflow_executions(
9898
workflow_type_name,
9999
workflow_type_version,
100100
*args,
101+
callback=None,
101102
**kwargs,
102103
):
103104
domain = simpleflow.swf.mapper.models.Domain(domain_name)
@@ -109,6 +110,7 @@ def filter_workflow_executions(
109110
workflow_type_name,
110111
workflow_type_version,
111112
*args,
113+
callback=callback,
112114
**kwargs,
113115
)
114116

simpleflow/swf/mapper/core.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@
2424
DEFAULT_AWS_REGION = "us-east-1"
2525

2626

27+
def convert_timestamp(dt: int | datetime | None) -> datetime | None:
28+
if dt is None:
29+
return None
30+
if isinstance(dt, datetime):
31+
return dt
32+
if isinstance(dt, int):
33+
return datetime.fromtimestamp(dt)
34+
raise TypeError(f"Invalid timestamp type: {type(dt).__name__}")
35+
36+
2737
class ConnectedSWFObject:
2838
"""Authenticated object interface"""
2939

@@ -56,8 +66,8 @@ def __init__(self, *args, **kwargs):
5666
def list_open_workflow_executions(
5767
self,
5868
domain: str,
59-
oldest_date: int, # timestamp
60-
latest_date: int | None = None, # timestamp
69+
oldest_date: int | datetime, # timestamp
70+
latest_date: int | datetime | None = None, # timestamp
6171
tag: str | None = None,
6272
workflow_id: str | None = None,
6373
workflow_name: str | None = None,
@@ -69,8 +79,8 @@ def list_open_workflow_executions(
6979
kwargs = {
7080
"domain": domain,
7181
"startTimeFilter": {
72-
"oldestDate": datetime.fromtimestamp(oldest_date),
73-
"latestDate": datetime.fromtimestamp(latest_date) if latest_date is not None else None,
82+
"oldestDate": convert_timestamp(oldest_date),
83+
"latestDate": convert_timestamp(latest_date),
7484
},
7585
"nextPageToken": next_page_token,
7686
"maximumPageSize": maximum_page_size,
@@ -98,10 +108,10 @@ def list_open_workflow_executions(
98108
def list_closed_workflow_executions(
99109
self,
100110
domain: str,
101-
start_latest_date: int | None = None, # timestamp
102-
start_oldest_date: int | None = None, # timestamp
103-
close_latest_date: int | None = None, # timestamp
104-
close_oldest_date: int | None = None, # timestamp
111+
start_latest_date: int | datetime | None = None, # timestamp
112+
start_oldest_date: int | datetime | None = None, # timestamp
113+
close_latest_date: int | datetime | None = None, # timestamp
114+
close_oldest_date: int | datetime | None = None, # timestamp
105115
close_status: str | None = None,
106116
tag: str | None = None,
107117
workflow_id: str | None = None,
@@ -119,13 +129,13 @@ def list_closed_workflow_executions(
119129
}
120130
if start_oldest_date is not None:
121131
kwargs["startTimeFilter"] = {
122-
"oldestDate": datetime.fromtimestamp(start_oldest_date),
123-
"latestDate": datetime.fromtimestamp(start_latest_date) if start_latest_date is not None else None,
132+
"oldestDate": convert_timestamp(start_oldest_date),
133+
"latestDate": convert_timestamp(start_latest_date),
124134
}
125135
if close_oldest_date is not None:
126136
kwargs["closeTimeFilter"] = {
127-
"oldestDate": datetime.fromtimestamp(close_oldest_date),
128-
"latestDate": datetime.fromtimestamp(close_latest_date) if close_latest_date is not None else None,
137+
"oldestDate": convert_timestamp(close_oldest_date),
138+
"latestDate": convert_timestamp(close_latest_date),
129139
}
130140
if close_status:
131141
kwargs["closeStatusFilter"] = {

simpleflow/swf/mapper/querysets/workflow.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,21 @@ def domain(self, value: Domain):
5555
def _list(self, *args, **kwargs):
5656
raise NotImplementedError
5757

58-
def _list_items(self, *args, **kwargs):
58+
def _list_items(self, *args, callback=None, **kwargs):
5959
response = {"nextPageToken": None}
60+
loop_number = 0
61+
if callback:
62+
callback(loop_number=loop_number, response=response)
6063
while "nextPageToken" in response:
6164
response = self._list(*args, next_page_token=response["nextPageToken"], **kwargs)
6265

6366
yield from response[self._infos_plural]
67+
loop_number += 1
68+
if callback:
69+
callback(loop_number=loop_number, response=response)
70+
71+
if callback:
72+
callback(loop_number=loop_number, response=None)
6473

6574

6675
class WorkflowTypeQuerySet(BaseWorkflowQuerySet):
@@ -405,10 +414,10 @@ def _is_valid_status_param(self, status, param):
405414
statuses = {
406415
WorkflowExecution.STATUS_OPEN: {"oldest_date", "latest_date"},
407416
WorkflowExecution.STATUS_CLOSED: {
408-
"start_latest_date",
409417
"start_oldest_date",
410-
"close_latest_date",
418+
"start_latest_date",
411419
"close_oldest_date",
420+
"close_latest_date",
412421
"close_status",
413422
},
414423
}
@@ -501,6 +510,7 @@ def filter(
501510
workflow_id=None,
502511
workflow_type_name=None,
503512
workflow_type_version=None,
513+
callback=None,
504514
*args,
505515
**kwargs,
506516
):
@@ -591,10 +601,13 @@ def filter(
591601

592602
# Compute a timestamp from the delta in days we got from params
593603
# If oldest_date is blank at this point, it's because we didn't want
594-
# it, so let's leave it blank and assume the user provided an other
604+
# it, so let's leave it blank and assume the user provided another
595605
# time filter.
596606
if oldest_date:
597-
start_oldest_date = int(datetime_timestamp(past_day(oldest_date)))
607+
if isinstance(oldest_date, int):
608+
start_oldest_date = int(datetime_timestamp(past_day(oldest_date)))
609+
else:
610+
start_oldest_date = oldest_date
598611
else:
599612
start_oldest_date = None
600613

@@ -609,6 +622,7 @@ def filter(
609622
workflow_version=workflow_type_version,
610623
start_oldest_date=start_oldest_date,
611624
tag=tag,
625+
callback=callback,
612626
**kwargs,
613627
)
614628
]

0 commit comments

Comments
 (0)