Skip to content

Commit 0533513

Browse files
committed
ActivityHandle.result
1 parent 1454443 commit 0533513

File tree

1 file changed

+84
-7
lines changed

1 file changed

+84
-7
lines changed

temporalio/client.py

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3526,13 +3526,21 @@ def __init__(
35263526
id: str,
35273527
*,
35283528
run_id: str,
3529+
result_type: Optional[Type] = None,
35293530
data_converter_override: Optional[DataConverter] = None,
35303531
) -> None:
35313532
"""Create activity handle."""
35323533
self._client = client
35333534
self._id = id
35343535
self._run_id = run_id
3536+
self._result_type = result_type
35353537
self._data_converter_override = data_converter_override
3538+
self._known_outcome: Optional[
3539+
Union[
3540+
temporalio.api.common.v1.Payloads,
3541+
temporalio.api.failure.v1.Failure,
3542+
]
3543+
] = None
35363544

35373545
@property
35383546
def id(self) -> str:
@@ -3565,6 +3573,7 @@ def with_context(self, context: SerializationContext) -> Self:
35653573
self._client,
35663574
id=self._id,
35673575
run_id=self._run_id,
3576+
result_type=self._result_type,
35683577
data_converter_override=data_converter,
35693578
)
35703579

@@ -3576,21 +3585,89 @@ async def result(
35763585
) -> ReturnType:
35773586
"""Wait for result of the activity.
35783587
3588+
The result may already be known if this method has been called before,
3589+
in which case no network call is made. Otherwise the result will be
3590+
polled for until it is available.
3591+
35793592
Args:
35803593
rpc_metadata: Headers used on the RPC call. Keys here override
35813594
client-level RPC metadata keys.
3582-
rpc_timeout: Optional RPC deadline to set for each RPC call. Note,
3583-
this is the timeout for each history RPC call not this overall
3584-
function.
3595+
rpc_timeout: Optional RPC deadline to set for each RPC call. Note:
3596+
this is the timeout for each RPC call while polling, not a
3597+
timeout for the function as a whole. If an individual RPC
3598+
times out, it will be retried until the result is available.
35853599
35863600
Returns:
35873601
The result of the activity.
35883602
35893603
Raises:
3590-
:py:class:`ActivityFailureError`: If the activity completed with a failure.
3604+
ActivityFailureError: If the activity completed with a failure.
3605+
RPCError: Activity result could not be fetched for some reason.
35913606
"""
3592-
# Repeatedly issues workflowservice GetActivityResult long-polls.
3593-
raise NotImplementedError
3607+
await self._poll_until_outcome(
3608+
rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout
3609+
)
3610+
data_converter = self._data_converter_override or self._client.data_converter
3611+
assert self._known_outcome
3612+
if isinstance(self._known_outcome, temporalio.api.failure.v1.Failure):
3613+
raise ActivityFailedError(
3614+
cause=await data_converter.decode_failure(self._known_outcome),
3615+
)
3616+
payloads = self._known_outcome
3617+
if not payloads.payloads:
3618+
# E.g. a void workflow function in another language may not set any payloads.
3619+
return None # type: ignore
3620+
type_hints = [self._result_type] if self._result_type else None
3621+
results = await data_converter.decode(payloads.payloads, type_hints)
3622+
if not results:
3623+
# Following workflow/update/query result processing. Technically not necessary since
3624+
# from_payloads is documented to always return non-empty
3625+
return None # type: ignore
3626+
elif len(results) > 1:
3627+
warnings.warn(f"Expected single activity result, got {len(results)}")
3628+
return results[0]
3629+
3630+
async def _poll_until_outcome(
3631+
self,
3632+
rpc_metadata: Mapping[str, Union[str, bytes]] = {},
3633+
rpc_timeout: Optional[timedelta] = None,
3634+
) -> None:
3635+
"""Poll for activity result until it's available."""
3636+
if self._known_outcome:
3637+
return
3638+
3639+
req = temporalio.api.workflowservice.v1.GetActivityExecutionResultRequest(
3640+
namespace=self._client.namespace,
3641+
activity_id=self._id,
3642+
run_id=self._run_id,
3643+
wait=True, # Enable long polling
3644+
)
3645+
3646+
# Continue polling as long as we have no outcome
3647+
while True:
3648+
try:
3649+
res = await self._client.workflow_service.get_activity_execution_result(
3650+
req,
3651+
retry=True,
3652+
metadata=rpc_metadata,
3653+
timeout=rpc_timeout,
3654+
)
3655+
if res.HasField("result"):
3656+
self._known_outcome = res.result
3657+
return
3658+
elif res.HasField("failure"):
3659+
self._known_outcome = res.failure
3660+
return
3661+
except RPCError as err:
3662+
if err.status == RPCStatusCode.DEADLINE_EXCEEDED:
3663+
# Deadline exceeded is expected with long polling; retry
3664+
continue
3665+
elif err.status == RPCStatusCode.CANCELLED:
3666+
raise asyncio.CancelledError() from err
3667+
else:
3668+
raise
3669+
except asyncio.CancelledError:
3670+
raise
35943671

35953672
async def cancel(
35963673
self,
@@ -6225,7 +6302,7 @@ def __init__(self) -> None:
62256302
super().__init__("Timeout or cancellation waiting for update")
62266303

62276304

6228-
class ActivityFailureError(temporalio.exceptions.TemporalError):
6305+
class ActivityFailedError(temporalio.exceptions.TemporalError):
62296306
"""Error that occurs when a standalone activity is unsuccessful."""
62306307

62316308
def __init__(self, *, cause: BaseException) -> None:

0 commit comments

Comments
 (0)