Skip to content

Commit b90e42d

Browse files
committed
Implement list/count activities
1 parent 6c9ee0f commit b90e42d

File tree

1 file changed

+296
-18
lines changed

1 file changed

+296
-18
lines changed

temporalio/client.py

Lines changed: 296 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,46 +1352,72 @@ async def execute_activity(self, *args, **kwargs) -> ReturnType:
13521352
handle = await self.start_activity(*args, **kwargs)
13531353
return await handle.result()
13541354

1355-
async def list_activities(
1355+
def list_activities(
13561356
self,
13571357
query: Optional[str] = None,
13581358
*,
1359+
limit: Optional[int] = None,
13591360
page_size: int = 1000,
13601361
next_page_token: Optional[bytes] = None,
13611362
rpc_metadata: Mapping[str, Union[str, bytes]] = {},
13621363
rpc_timeout: Optional[timedelta] = None,
13631364
) -> ActivityExecutionAsyncIterator:
13641365
"""List activities.
13651366
1367+
This does not make a request until the first iteration is attempted.
1368+
Therefore any errors will not occur until then.
1369+
13661370
Args:
1367-
query: A Temporal visibility filter for activities.
1368-
page_size: Maximum number of results to return per page.
1369-
next_page_token: Token for getting the next page of results.
1370-
rpc_metadata: Headers used on the RPC call.
1371-
rpc_timeout: Optional RPC deadline to set for the RPC call.
1371+
query: A Temporal visibility list filter for activities.
1372+
limit: Maximum number of activities to return. If unset, all
1373+
activities are returned. Only applies if using the
1374+
returned :py:class:`ActivityExecutionAsyncIterator`
1375+
as an async iterator.
1376+
page_size: Maximum number of results for each page.
1377+
next_page_token: A previously obtained next page token if doing
1378+
pagination. Usually not needed as the iterator automatically
1379+
starts from the beginning.
1380+
rpc_metadata: Headers used on each RPC call. Keys here override
1381+
client-level RPC metadata keys.
1382+
rpc_timeout: Optional RPC deadline to set for each RPC call.
1383+
1384+
Returns:
1385+
An async iterator that can be used with ``async for``.
13721386
"""
1373-
# Issues a workflowservice ListActivityExecutions call
1374-
raise NotImplementedError
1387+
return self._impl.list_activities(
1388+
ListActivitiesInput(
1389+
query=query,
1390+
page_size=page_size,
1391+
next_page_token=next_page_token,
1392+
rpc_metadata=rpc_metadata,
1393+
rpc_timeout=rpc_timeout,
1394+
limit=limit,
1395+
)
1396+
)
13751397

13761398
async def count_activities(
13771399
self,
13781400
query: Optional[str] = None,
13791401
*,
13801402
rpc_metadata: Mapping[str, Union[str, bytes]] = {},
13811403
rpc_timeout: Optional[timedelta] = None,
1382-
) -> int:
1404+
) -> ActivityExecutionCount:
13831405
"""Count activities matching the query.
13841406
13851407
Args:
13861408
query: A Temporal visibility filter for activities.
1387-
rpc_metadata: Headers used on the RPC call.
1409+
rpc_metadata: Headers used on the RPC call. Keys here override
1410+
client-level RPC metadata keys.
13881411
rpc_timeout: Optional RPC deadline to set for the RPC call.
13891412
13901413
Returns:
13911414
Count of activities.
13921415
"""
1393-
# Issues a workflowservice CountActivityExecutions call
1394-
raise NotImplementedError
1416+
return await self._impl.count_activities(
1417+
CountActivitiesInput(
1418+
query=query, rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout
1419+
)
1420+
)
13951421

13961422
def get_activity_handle(
13971423
self,
@@ -2881,20 +2907,119 @@ async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]:
28812907
class ActivityExecutionAsyncIterator:
28822908
"""Asynchronous iterator for activity execution values.
28832909
2884-
Returns either :py:class:`ActivityExecution` (for standalone activities) or
2885-
:py:class:`WorkflowActivityExecution` (for activities started by workflows).
2910+
Each item yielded by the iterator is either a :py:class:`ActivityExecution` (i.e. a standalone
2911+
activity) or a :py:class:`WorkflowActivityExecution` (i.e. an activity started by a workflow).
2912+
2913+
You should typically use ``async for`` on this iterator and not call any of its methods.
28862914
"""
28872915

2916+
# TODO(dan): do we want to use the "standalone" explanatory qualifier in docstrings?
2917+
2918+
def __init__(
2919+
self,
2920+
client: Client,
2921+
input: ListActivitiesInput,
2922+
) -> None:
2923+
"""Create an asynchronous iterator for the given input.
2924+
2925+
Users should not create this directly, but rather use
2926+
:py:meth:`Client.list_activities`.
2927+
"""
2928+
self._client = client
2929+
self._input = input
2930+
self._next_page_token = input.next_page_token
2931+
self._current_page: Optional[
2932+
Sequence[Union[ActivityExecution, WorkflowActivityExecution]]
2933+
] = None
2934+
self._current_page_index = 0
2935+
self._limit = input.limit
2936+
self._yielded = 0
2937+
2938+
@property
2939+
def current_page_index(self) -> int:
2940+
"""Index of the entry in the current page that will be returned from
2941+
the next :py:meth:`__anext__` call.
2942+
"""
2943+
return self._current_page_index
2944+
2945+
@property
2946+
def current_page(
2947+
self,
2948+
) -> Optional[Sequence[Union[ActivityExecution, WorkflowActivityExecution]]]:
2949+
"""Current page, if it has been fetched yet."""
2950+
return self._current_page
2951+
2952+
@property
2953+
def next_page_token(self) -> Optional[bytes]:
2954+
"""Token for the next page request if any."""
2955+
return self._next_page_token
2956+
2957+
async def fetch_next_page(self, *, page_size: Optional[int] = None) -> None:
2958+
"""Fetch the next page of results.
2959+
2960+
Args:
2961+
page_size: Override the page size this iterator was originally
2962+
created with.
2963+
"""
2964+
page_size = page_size or self._input.page_size
2965+
if self._limit is not None and self._limit - self._yielded < page_size:
2966+
page_size = self._limit - self._yielded
2967+
2968+
resp = await self._client.workflow_service.list_activity_executions(
2969+
temporalio.api.workflowservice.v1.ListActivityExecutionsRequest(
2970+
namespace=self._client.namespace,
2971+
page_size=page_size,
2972+
next_page_token=self._next_page_token or b"",
2973+
query=self._input.query or "",
2974+
),
2975+
retry=True,
2976+
metadata=self._input.rpc_metadata,
2977+
timeout=self._input.rpc_timeout,
2978+
)
2979+
2980+
self._current_page = [
2981+
WorkflowActivityExecution._from_raw_info(
2982+
v, self._client.namespace, self._client.data_converter
2983+
)
2984+
if v.workflow_id
2985+
else ActivityExecution._from_raw_info(
2986+
v, self._client.namespace, self._client.data_converter
2987+
)
2988+
for v in resp.executions
2989+
]
2990+
self._current_page_index = 0
2991+
self._next_page_token = resp.next_page_token or None
2992+
28882993
def __aiter__(self) -> ActivityExecutionAsyncIterator:
28892994
"""Return self as the iterator."""
28902995
return self
28912996

2997+
# This is a direct copy of WorkflowExecutionAsyncIterator.__anext__
28922998
async def __anext__(self) -> Union[ActivityExecution, WorkflowActivityExecution]:
2893-
"""Return the next execution on this iterator.
2894-
2895-
Fetch next page if necessary.
2999+
"""Get the next execution on this iterator, fetching next page if
3000+
necessary.
28963001
"""
2897-
raise NotImplementedError
3002+
if self._limit is not None and self._yielded >= self._limit:
3003+
raise StopAsyncIteration
3004+
while True:
3005+
# No page? fetch and continue
3006+
if self._current_page is None:
3007+
await self.fetch_next_page()
3008+
continue
3009+
# No more left in page?
3010+
if self._current_page_index >= len(self._current_page):
3011+
# If there is a next page token, try to get another page and try
3012+
# again
3013+
if self._next_page_token is not None:
3014+
await self.fetch_next_page()
3015+
continue
3016+
# No more pages means we're done
3017+
raise StopAsyncIteration
3018+
# Get current, increment page index, and return
3019+
ret = self._current_page[self._current_page_index]
3020+
self._current_page_index += 1
3021+
self._yielded += 1
3022+
return ret
28983023

28993024

29003025
# TODO: this is named ActivityListInfo in our draft proto PR
@@ -2933,6 +3058,51 @@ class ActivityExecution:
29333058
execution_duration: Optional[timedelta]
29343059
"""Duration from scheduled to close time, only populated if closed."""
29353060

3061+
raw_info: temporalio.api.activity.v1.ActivityListInfo
3062+
"""Underlying protobuf info."""
3063+
3064+
@classmethod
3065+
def _from_raw_info(
3066+
cls,
3067+
info: temporalio.api.activity.v1.ActivityListInfo,
3068+
namespace: str,
3069+
converter: temporalio.converter.DataConverter,
3070+
) -> Self:
3071+
"""Create from raw proto activity list info."""
3072+
return cls(
3073+
activity_id=info.activity_id,
3074+
run_id=info.run_id,
3075+
activity_type=(
3076+
info.activity_type.name if info.HasField("activity_type") else ""
3077+
),
3078+
scheduled_time=(
3079+
info.scheduled_time.ToDatetime().replace(tzinfo=timezone.utc)
3080+
if info.HasField("scheduled_time")
3081+
else datetime.min
3082+
),
3083+
close_time=(
3084+
info.close_time.ToDatetime().replace(tzinfo=timezone.utc)
3085+
if info.HasField("close_time")
3086+
else None
3087+
),
3088+
status=(
3089+
temporalio.common.ActivityExecutionStatus(info.status)
3090+
if info.status
3091+
else temporalio.common.ActivityExecutionStatus.RUNNING
3092+
),
3093+
search_attributes=temporalio.converter.decode_search_attributes(
3094+
info.search_attributes
3095+
),
3096+
task_queue=info.task_queue,
3097+
state_transition_count=info.state_transition_count,
3098+
execution_duration=(
3099+
info.execution_duration.ToTimedelta()
3100+
if info.HasField("execution_duration")
3101+
else None
3102+
),
3103+
raw_info=info,
3104+
)
3105+
29363106

29373107
@dataclass(frozen=True)
29383108
class WorkflowActivityExecution:
@@ -2962,6 +3132,61 @@ class WorkflowActivityExecution:
29623132
execution_duration: Optional[timedelta]
29633133
"""Duration from scheduled to close time, only populated if closed."""
29643134

3135+
raw_info: temporalio.api.activity.v1.ActivityListInfo
3136+
"""Underlying protobuf info."""
3137+
3138+
@classmethod
3139+
def _from_raw_info(
3140+
cls,
3141+
info: temporalio.api.activity.v1.ActivityListInfo,
3142+
namespace: str,
3143+
converter: temporalio.converter.DataConverter,
3144+
) -> Self:
3145+
"""Create from raw proto activity list info."""
3146+
# For workflow activities, we expect workflow_id to be set
3147+
return cls(
3148+
workflow_id=info.workflow_id,
3149+
workflow_run_id=None, # Not provided in list response
3150+
activity_id=info.activity_id,
3151+
activity_type=info.activity_type.name
3152+
if info.HasField("activity_type")
3153+
else "",
3154+
scheduled_time=(
3155+
info.scheduled_time.ToDatetime().replace(tzinfo=timezone.utc)
3156+
if info.HasField("scheduled_time")
3157+
else datetime.min
3158+
),
3159+
close_time=(
3160+
info.close_time.ToDatetime().replace(tzinfo=timezone.utc)
3161+
if info.HasField("close_time")
3162+
else None
3163+
),
3164+
task_queue=info.task_queue,
3165+
execution_duration=(
3166+
info.execution_duration.ToTimedelta()
3167+
if info.HasField("execution_duration")
3168+
else None
3169+
),
3170+
raw_info=info,
3171+
)
3172+
3173+
3174+
@dataclass(frozen=True)
3175+
class ActivityExecutionCount:
3176+
"""Representation of a count from a count activities call."""
3177+
3178+
count: int
3179+
"""Total count matching the filter, if any."""
3180+
3181+
@staticmethod
3182+
def _from_raw(
3183+
resp: temporalio.api.workflowservice.v1.CountActivityExecutionsResponse,
3184+
) -> ActivityExecutionCount:
3185+
"""Create from raw proto response."""
3186+
return ActivityExecutionCount(
3187+
count=resp.count,
3188+
)
3189+
29653190

29663191
@dataclass(frozen=True)
29673192
class ActivityExecutionDescription:
@@ -6204,6 +6429,27 @@ class DescribeActivityInput:
62046429
rpc_timeout: Optional[timedelta]
62056430

62066431

6432+
@dataclass
6433+
class ListActivitiesInput:
6434+
"""Input for :py:meth:`OutboundInterceptor.list_activities`."""
6435+
6436+
query: Optional[str]
6437+
page_size: int
6438+
next_page_token: Optional[bytes]
6439+
rpc_metadata: Mapping[str, Union[str, bytes]]
6440+
rpc_timeout: Optional[timedelta]
6441+
limit: Optional[int]
6442+
6443+
6444+
@dataclass
6445+
class CountActivitiesInput:
6446+
"""Input for :py:meth:`OutboundInterceptor.count_activities`."""
6447+
6448+
query: Optional[str]
6449+
rpc_metadata: Mapping[str, Union[str, bytes]]
6450+
rpc_timeout: Optional[timedelta]
6451+
6452+
62076453
@dataclass
62086454
class StartWorkflowUpdateInput:
62096455
"""Input for :py:meth:`OutboundInterceptor.start_workflow_update`."""
@@ -6558,6 +6804,18 @@ async def describe_activity(
65586804
"""Called for every :py:meth:`ActivityHandle.describe` call."""
65596805
return await self.next.describe_activity(input)
65606806

6807+
def list_activities(
6808+
self, input: ListActivitiesInput
6809+
) -> ActivityExecutionAsyncIterator:
6810+
"""Called for every :py:meth:`Client.list_activities` call."""
6811+
return self.next.list_activities(input)
6812+
6813+
async def count_activities(
6814+
self, input: CountActivitiesInput
6815+
) -> ActivityExecutionCount:
6816+
"""Called for every :py:meth:`Client.count_activities` call."""
6817+
return await self.next.count_activities(input)
6818+
65616819
async def start_workflow_update(
65626820
self, input: StartWorkflowUpdateInput
65636821
) -> WorkflowUpdateHandle[Any]:
@@ -7065,6 +7323,26 @@ async def describe_activity(
70657323
),
70667324
)
70677325

7326+
def list_activities(
7327+
self, input: ListActivitiesInput
7328+
) -> ActivityExecutionAsyncIterator:
7329+
return ActivityExecutionAsyncIterator(self._client, input)
7330+
7331+
async def count_activities(
7332+
self, input: CountActivitiesInput
7333+
) -> ActivityExecutionCount:
7334+
return ActivityExecutionCount._from_raw(
7335+
await self._client.workflow_service.count_activity_executions(
7336+
temporalio.api.workflowservice.v1.CountActivityExecutionsRequest(
7337+
namespace=self._client.namespace,
7338+
query=input.query or "",
7339+
),
7340+
retry=True,
7341+
metadata=input.rpc_metadata,
7342+
timeout=input.rpc_timeout,
7343+
)
7344+
)
7345+
70687346
async def start_workflow_update(
70697347
self, input: StartWorkflowUpdateInput
70707348
) -> WorkflowUpdateHandle[Any]:

0 commit comments

Comments
 (0)