4343from google .protobuf .internal .containers import MessageMap
4444from typing_extensions import Concatenate , Required , Self , TypedDict
4545
46+ import temporalio .activity
4647import temporalio .api .activity .v1
4748import temporalio .api .common .v1
4849import temporalio .api .enums .v1
6566import temporalio .workflow
6667from temporalio .activity import ActivityCancellationDetails
6768from temporalio .converter import (
69+ ActivitySerializationContext ,
6870 DataConverter ,
6971 SerializationContext ,
7072 WithSerializationContext ,
@@ -1287,18 +1289,19 @@ async def count_workflows(
12871289 # - TODO: Support sync and async activity functions
12881290 async def start_activity (
12891291 self ,
1290- activity : Callable [..., ReturnType ],
1292+ activity : Union [ str , Callable [..., Awaitable [ ReturnType ]] ],
12911293 * ,
1292- args : Sequence [Any ],
1294+ args : Sequence [Any ] = [] ,
12931295 id : str ,
12941296 task_queue : str ,
1297+ result_type : Optional [Type ] = None ,
12951298 # Either schedule_to_close_timeout or start_to_close_timeout must be present
12961299 schedule_to_close_timeout : Optional [timedelta ] = None ,
12971300 start_to_close_timeout : Optional [timedelta ] = None ,
12981301 schedule_to_start_timeout : Optional [timedelta ] = None ,
12991302 heartbeat_timeout : Optional [timedelta ] = None ,
1300- id_reuse_policy : temporalio .common .WorkflowIDReusePolicy = temporalio .common .WorkflowIDReusePolicy .ALLOW_DUPLICATE ,
1301- id_conflict_policy : temporalio .common .WorkflowIDConflictPolicy = temporalio .common .WorkflowIDConflictPolicy .FAIL ,
1303+ id_reuse_policy : temporalio .common .IdReusePolicy = temporalio .common .IdReusePolicy .ALLOW_DUPLICATE ,
1304+ id_conflict_policy : temporalio .common .IdConflictPolicy = temporalio .common .IdConflictPolicy .FAIL ,
13021305 retry_policy : Optional [temporalio .common .RetryPolicy ] = None ,
13031306 search_attributes : Optional [
13041307 Union [
@@ -1315,41 +1318,118 @@ async def start_activity(
13151318 """Start an activity and return its handle.
13161319
13171320 Args:
1318- activity: The activity function to execute.
1321+ activity: String name or callable activity function to execute.
13191322 args: Arguments to pass to the activity.
13201323 id: Unique identifier for the activity. Required.
13211324 task_queue: Task queue to send the activity to.
1325+ result_type: For string name activities, optional type to deserialize result into.
13221326 schedule_to_close_timeout: Total time allowed for the activity from schedule to completion.
13231327 start_to_close_timeout: Time allowed for a single execution attempt.
13241328 schedule_to_start_timeout: Time allowed for the activity to sit in the task queue.
13251329 heartbeat_timeout: Time between heartbeats before the activity is considered failed.
13261330 id_reuse_policy: How to handle reusing activity IDs from closed activities.
1331+ Default is ALLOW_DUPLICATE.
13271332 id_conflict_policy: How to handle activity ID conflicts with running activities.
1333+ Default is FAIL.
13281334 retry_policy: Retry policy for the activity.
1329- search_attributes: Search attributes to attach to the activity.
1330- static_summary: A single-line fixed summary for this workflow execution that may appear
1335+ search_attributes: Search attributes for the activity.
1336+ static_summary: A single-line fixed summary for this activity that may appear
13311337 in the UI/CLI. This can be in single-line Temporal markdown format.
1332- static_details: General fixed details for this workflow execution that may appear in
1333- UI/CLI. This can be in Temporal markdown format and can span multiple lines. This is
1334- a fixed value on the workflow that cannot be updated. For details that can be
1335- updated, use :py:meth:`temporalio.workflow.get_current_details` within the workflow.
1336- priority: Priority metadata.
1338+ static_details: General fixed details for this activity that may appear in
1339+ UI/CLI. This can be in Temporal markdown format and can span multiple lines.
1340+ priority: Priority of the activity execution.
13371341 rpc_metadata: Headers used on the RPC call.
13381342 rpc_timeout: Optional RPC deadline to set for the RPC call.
13391343
13401344 Returns:
13411345 A handle to the started activity.
13421346 """
1343- # Issues workflowservice StartActivityExecution
1344- raise NotImplementedError
1347+ name , result_type_from_type_annotation = (
1348+ temporalio .activity ._Definition .get_name_and_result_type (activity )
1349+ )
1350+ return await self ._impl .start_activity (
1351+ StartActivityInput (
1352+ activity_type = name ,
1353+ args = args ,
1354+ id = id ,
1355+ task_queue = task_queue ,
1356+ ret_type = result_type or result_type_from_type_annotation ,
1357+ schedule_to_close_timeout = schedule_to_close_timeout ,
1358+ start_to_close_timeout = start_to_close_timeout ,
1359+ schedule_to_start_timeout = schedule_to_start_timeout ,
1360+ heartbeat_timeout = heartbeat_timeout ,
1361+ id_reuse_policy = id_reuse_policy ,
1362+ id_conflict_policy = id_conflict_policy ,
1363+ retry_policy = retry_policy ,
1364+ search_attributes = search_attributes ,
1365+ static_summary = static_summary ,
1366+ static_details = static_details ,
1367+ headers = {},
1368+ rpc_metadata = rpc_metadata ,
1369+ rpc_timeout = rpc_timeout ,
1370+ priority = priority ,
1371+ )
1372+ )
13451373
1346- # Same parameters as start_activity
1347- # (*args **kwargs is just temporary to avoid duplicating parameter lists while they're being designed)
1348- async def execute_activity (self , * args , ** kwargs ) -> ReturnType :
1349- """
1350- Start an activity, wait for it to complete, and return its result.
1374+ async def execute_activity (
1375+ self ,
1376+ activity : Union [str , Callable [..., Awaitable [ReturnType ]]],
1377+ * ,
1378+ args : Sequence [Any ] = [],
1379+ id : str ,
1380+ task_queue : str ,
1381+ result_type : Optional [Type ] = None ,
1382+ # Either schedule_to_close_timeout or start_to_close_timeout must be present
1383+ schedule_to_close_timeout : Optional [timedelta ] = None ,
1384+ start_to_close_timeout : Optional [timedelta ] = None ,
1385+ schedule_to_start_timeout : Optional [timedelta ] = None ,
1386+ heartbeat_timeout : Optional [timedelta ] = None ,
1387+ id_reuse_policy : temporalio .common .IdReusePolicy = temporalio .common .IdReusePolicy .ALLOW_DUPLICATE ,
1388+ id_conflict_policy : temporalio .common .IdConflictPolicy = temporalio .common .IdConflictPolicy .FAIL ,
1389+ retry_policy : Optional [temporalio .common .RetryPolicy ] = None ,
1390+ search_attributes : Optional [
1391+ Union [
1392+ temporalio .common .SearchAttributes ,
1393+ temporalio .common .TypedSearchAttributes ,
1394+ ]
1395+ ] = None ,
1396+ static_summary : Optional [str ] = None ,
1397+ static_details : Optional [str ] = None ,
1398+ priority : temporalio .common .Priority = temporalio .common .Priority .default ,
1399+ rpc_metadata : Mapping [str , Union [str , bytes ]] = {},
1400+ rpc_timeout : Optional [timedelta ] = None ,
1401+ ) -> ReturnType :
1402+ """Start an activity, wait for it to complete, and return its result.
1403+
1404+ This is a convenience method that combines :py:meth:`start_activity` and
1405+ :py:meth:`ActivityHandle.result`.
1406+
1407+ Returns:
1408+ The result of the activity.
1409+
1410+ Raises:
1411+ ActivityFailedError: If the activity completed with a failure.
13511412 """
1352- handle = await self .start_activity (* args , ** kwargs )
1413+ handle = await self .start_activity (
1414+ activity ,
1415+ args = args ,
1416+ id = id ,
1417+ task_queue = task_queue ,
1418+ result_type = result_type ,
1419+ schedule_to_close_timeout = schedule_to_close_timeout ,
1420+ start_to_close_timeout = start_to_close_timeout ,
1421+ schedule_to_start_timeout = schedule_to_start_timeout ,
1422+ heartbeat_timeout = heartbeat_timeout ,
1423+ id_reuse_policy = id_reuse_policy ,
1424+ id_conflict_policy = id_conflict_policy ,
1425+ retry_policy = retry_policy ,
1426+ search_attributes = search_attributes ,
1427+ static_summary = static_summary ,
1428+ static_details = static_details ,
1429+ priority = priority ,
1430+ rpc_metadata = rpc_metadata ,
1431+ rpc_timeout = rpc_timeout ,
1432+ )
13531433 return await handle .result ()
13541434
13551435 def list_activities (
@@ -1456,6 +1536,7 @@ def get_async_activity_handle(
14561536 def get_async_activity_handle (self , * , task_token : bytes ) -> AsyncActivityHandle :
14571537 pass
14581538
1539+ # TODO(dan): add typed API get_async_activity_handle_for?
14591540 def get_async_activity_handle (
14601541 self ,
14611542 * ,
@@ -6473,6 +6554,36 @@ class TerminateWorkflowInput:
64736554 rpc_timeout : Optional [timedelta ]
64746555
64756556
6557+ @dataclass
6558+ class StartActivityInput :
6559+ """Input for :py:meth:`OutboundInterceptor.start_activity`."""
6560+
6561+ activity_type : str
6562+ args : Sequence [Any ]
6563+ id : str
6564+ task_queue : str
6565+ ret_type : Optional [Type ]
6566+ schedule_to_close_timeout : Optional [timedelta ]
6567+ start_to_close_timeout : Optional [timedelta ]
6568+ schedule_to_start_timeout : Optional [timedelta ]
6569+ heartbeat_timeout : Optional [timedelta ]
6570+ id_reuse_policy : temporalio .common .IdReusePolicy
6571+ id_conflict_policy : temporalio .common .IdConflictPolicy
6572+ retry_policy : Optional [temporalio .common .RetryPolicy ]
6573+ priority : temporalio .common .Priority
6574+ search_attributes : Optional [
6575+ Union [
6576+ temporalio .common .SearchAttributes ,
6577+ temporalio .common .TypedSearchAttributes ,
6578+ ]
6579+ ]
6580+ static_summary : Optional [str ]
6581+ static_details : Optional [str ]
6582+ headers : Mapping [str , temporalio .api .common .v1 .Payload ]
6583+ rpc_metadata : Mapping [str , Union [str , bytes ]]
6584+ rpc_timeout : Optional [timedelta ]
6585+
6586+
64766587@dataclass
64776588class CancelActivityInput :
64786589 """Input for :py:meth:`OutboundInterceptor.cancel_activity`."""
@@ -6866,6 +6977,10 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
68666977
68676978 ### Activity calls
68686979
6980+ async def start_activity (self , input : StartActivityInput ) -> ActivityHandle [Any ]:
6981+ """Called for every :py:meth:`Client.start_activity` call."""
6982+ return await self .next .start_activity (input )
6983+
68696984 async def cancel_activity (self , input : CancelActivityInput ) -> None :
68706985 """Called for every :py:meth:`ActivityHandle.cancel` call."""
68716986 await self .next .cancel_activity (input )
@@ -7343,6 +7458,110 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
73437458 req , retry = True , metadata = input .rpc_metadata , timeout = input .rpc_timeout
73447459 )
73457460
7461+ async def start_activity (self , input : StartActivityInput ) -> ActivityHandle [Any ]:
7462+ """Start an activity and return a handle to it."""
7463+ if not (input .start_to_close_timeout or input .schedule_to_close_timeout ):
7464+ raise ValueError (
7465+ "Activity must have start_to_close_timeout or schedule_to_close_timeout"
7466+ )
7467+ req = await self ._build_start_activity_execution_request (input )
7468+
7469+ # TODO(dan): any counterpart of WorkflowExecutionAlreadyStartedFailure?
7470+ # If RPCError with err.status == RPCStatusCode.ALREADY_EXISTS
7471+
7472+ resp = await self ._client .workflow_service .start_activity_execution (
7473+ req ,
7474+ retry = True ,
7475+ metadata = input .rpc_metadata ,
7476+ timeout = input .rpc_timeout ,
7477+ )
7478+ return ActivityHandle (
7479+ self ._client ,
7480+ id = input .id ,
7481+ run_id = resp .run_id ,
7482+ result_type = input .ret_type ,
7483+ )
7484+
7485+ async def _build_start_activity_execution_request (
7486+ self , input : StartActivityInput
7487+ ) -> temporalio .api .workflowservice .v1 .StartActivityExecutionRequest :
7488+ """Build StartActivityExecutionRequest from input."""
7489+ data_converter = self ._client .data_converter .with_context (
7490+ ActivitySerializationContext (
7491+ namespace = self ._client .namespace ,
7492+ activity_id = input .id ,
7493+ activity_type = input .activity_type ,
7494+ activity_task_queue = input .task_queue ,
7495+ is_local = False ,
7496+ workflow_id = None ,
7497+ workflow_type = None ,
7498+ )
7499+ )
7500+
7501+ req = temporalio .api .workflowservice .v1 .StartActivityExecutionRequest (
7502+ namespace = self ._client .namespace ,
7503+ identity = self ._client .identity ,
7504+ activity_id = input .id ,
7505+ activity_type = temporalio .api .common .v1 .ActivityType (
7506+ name = input .activity_type
7507+ ),
7508+ id_reuse_policy = cast (
7509+ "temporalio.api.enums.v1.IdReusePolicy.ValueType" ,
7510+ int (input .id_reuse_policy ),
7511+ ),
7512+ id_conflict_policy = cast (
7513+ "temporalio.api.enums.v1.IdConflictPolicy.ValueType" ,
7514+ int (input .id_conflict_policy ),
7515+ ),
7516+ )
7517+
7518+ # Build ActivityOptions
7519+ options = temporalio .api .activity .v1 .ActivityOptions (
7520+ task_queue = temporalio .api .taskqueue .v1 .TaskQueue (name = input .task_queue ),
7521+ )
7522+ if input .schedule_to_close_timeout is not None :
7523+ options .schedule_to_close_timeout .FromTimedelta (
7524+ input .schedule_to_close_timeout
7525+ )
7526+ if input .start_to_close_timeout is not None :
7527+ options .start_to_close_timeout .FromTimedelta (input .start_to_close_timeout )
7528+ if input .schedule_to_start_timeout is not None :
7529+ options .schedule_to_start_timeout .FromTimedelta (
7530+ input .schedule_to_start_timeout
7531+ )
7532+ if input .heartbeat_timeout is not None :
7533+ options .heartbeat_timeout .FromTimedelta (input .heartbeat_timeout )
7534+ if input .retry_policy is not None :
7535+ input .retry_policy .apply_to_proto (options .retry_policy )
7536+ req .options .CopyFrom (options )
7537+
7538+ # Set input payloads
7539+ if input .args :
7540+ req .input .payloads .extend (await data_converter .encode (input .args ))
7541+
7542+ # Set search attributes
7543+ if input .search_attributes is not None :
7544+ temporalio .converter .encode_search_attributes (
7545+ input .search_attributes , req .search_attributes
7546+ )
7547+
7548+ # Set user metadata
7549+ metadata = await _encode_user_metadata (
7550+ data_converter , input .static_summary , input .static_details
7551+ )
7552+ if metadata is not None :
7553+ req .user_metadata .CopyFrom (metadata )
7554+
7555+ # Set headers
7556+ if input .headers is not None :
7557+ await self ._apply_headers (input .headers , req .header .fields )
7558+
7559+ # Set priority
7560+ if input .priority is not None :
7561+ req .priority .CopyFrom (input .priority ._to_proto ())
7562+
7563+ return req
7564+
73467565 async def cancel_activity (self , input : CancelActivityInput ) -> None :
73477566 """Cancel a standalone activity."""
73487567 await self ._client .workflow_service .request_cancel_activity_execution (
0 commit comments