Skip to content

Commit c681c13

Browse files
committed
Extend tests
1 parent b172d5f commit c681c13

File tree

1 file changed

+218
-4
lines changed

1 file changed

+218
-4
lines changed

tests/test_activity.py

Lines changed: 218 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1+
import asyncio
12
import uuid
23
from datetime import timedelta
34

4-
from temporalio import activity
5-
from temporalio.client import Client
5+
import pytest
6+
7+
from temporalio import activity, workflow
8+
from temporalio.client import ActivityFailedError, Client
69
from temporalio.common import ActivityExecutionStatus
10+
from temporalio.exceptions import ApplicationError, CancelledError
11+
from temporalio.worker import Worker
712

813

914
@activity.defn
1015
async def increment(input: int) -> int:
1116
return input + 1
1217

1318

14-
async def test_describe_activity(client: Client):
15-
activity_id = str("test_start_and_describe_activity_id")
19+
@pytest.mark.skip("StartActivityExecution not implemented in server")
20+
async def test_start_activity_and_describe_activity(client: Client):
21+
activity_id = str("test_start_activity_and_describe_activity_id")
1622
task_queue = str(uuid.uuid4())
1723

1824
activity_handle = await client.start_activity(
@@ -29,3 +35,211 @@ async def test_describe_activity(client: Client):
2935
assert desc.activity_type == "increment"
3036
assert desc.task_queue == task_queue
3137
assert desc.status == ActivityExecutionStatus.RUNNING
38+
39+
40+
@pytest.mark.skip("GetActivityExecutionResult not implemented in server")
41+
async def test_start_activity_and_get_result(client: Client):
42+
activity_id = str("test_start_activity_and_get_result_activity_id")
43+
task_queue = str(uuid.uuid4())
44+
45+
activity_handle = await client.start_activity(
46+
increment, args=(1,), id=activity_id, task_queue=task_queue
47+
)
48+
result_via_execute_activity = client.execute_activity(
49+
increment, args=(1,), id=activity_id, task_queue=task_queue
50+
)
51+
52+
async with Worker(
53+
client,
54+
task_queue=task_queue,
55+
activities=[increment],
56+
):
57+
assert await activity_handle.result() == 2
58+
assert await result_via_execute_activity == 2
59+
60+
61+
@activity.defn
62+
async def async_activity() -> int:
63+
# Notify test that the activity has started and is ready to be completed manually
64+
await (
65+
activity.client()
66+
.get_workflow_handle("activity-started-wf-id")
67+
.signal(WaitForSignalWorkflow.signal)
68+
)
69+
activity.raise_complete_async()
70+
71+
72+
@pytest.mark.skip(
73+
"RespondActivityTaskCompleted not implemented in server for standalone activity"
74+
)
75+
async def test_manual_completion(client: Client):
76+
activity_id = str("test_manual_completion_activity_id")
77+
task_queue = str(uuid.uuid4())
78+
79+
activity_handle = await client.start_activity(
80+
async_activity,
81+
args=(), # TODO: overloads
82+
id=activity_id,
83+
task_queue=task_queue,
84+
)
85+
86+
async with Worker(
87+
client,
88+
task_queue=task_queue,
89+
activities=[async_activity],
90+
workflows=[WaitForSignalWorkflow],
91+
):
92+
# Wait for activity to start
93+
await client.execute_workflow(
94+
WaitForSignalWorkflow.run,
95+
id="activity-started-wf-id",
96+
task_queue=task_queue,
97+
)
98+
# Complete activity manually
99+
async_activity_handle = client.get_async_activity_handle(
100+
activity_id=activity_id,
101+
run_id=activity_handle.run_id,
102+
)
103+
await async_activity_handle.complete(7)
104+
assert await activity_handle.result() == 7
105+
106+
107+
@pytest.mark.skip(
108+
"RespondActivityTaskCanceled not implemented in server for standalone activity"
109+
)
110+
async def test_manual_cancellation(client: Client):
111+
activity_id = str("test_manual_cancellation_activity_id")
112+
task_queue = str(uuid.uuid4())
113+
114+
activity_handle = await client.start_activity(
115+
async_activity,
116+
args=(), # TODO: overloads
117+
id=activity_id,
118+
task_queue=task_queue,
119+
)
120+
121+
async with Worker(
122+
client,
123+
task_queue=task_queue,
124+
activities=[async_activity],
125+
workflows=[WaitForSignalWorkflow],
126+
):
127+
await client.execute_workflow(
128+
WaitForSignalWorkflow.run,
129+
id="activity-started-wf-id",
130+
task_queue=task_queue,
131+
)
132+
async_activity_handle = client.get_async_activity_handle(
133+
activity_id=activity_id,
134+
run_id=activity_handle.run_id,
135+
)
136+
await async_activity_handle.report_cancellation("Test cancellation")
137+
with pytest.raises(ActivityFailedError) as err:
138+
await activity_handle.result()
139+
assert isinstance(err.value.cause, CancelledError)
140+
assert str(err.value.cause) == "Test cancellation"
141+
142+
143+
@pytest.mark.skip(
144+
"RespondActivityTaskFailed not implemented in server for standalone activity"
145+
)
146+
async def test_manual_fail(client: Client):
147+
activity_id = str("test_manual_fail_activity_id")
148+
task_queue = str(uuid.uuid4())
149+
150+
activity_handle = await client.start_activity(
151+
async_activity,
152+
args=(), # TODO: overloads
153+
id=activity_id,
154+
task_queue=task_queue,
155+
)
156+
async with Worker(
157+
client,
158+
task_queue=task_queue,
159+
activities=[async_activity],
160+
workflows=[WaitForSignalWorkflow],
161+
):
162+
await client.execute_workflow(
163+
WaitForSignalWorkflow.run,
164+
id="activity-started-wf-id",
165+
task_queue=task_queue,
166+
)
167+
async_activity_handle = client.get_async_activity_handle(
168+
activity_id=activity_id,
169+
run_id=activity_handle.run_id,
170+
)
171+
await async_activity_handle.fail(Exception("Test failure"))
172+
with pytest.raises(ActivityFailedError) as err:
173+
await activity_handle.result()
174+
assert isinstance(err.value.cause, ApplicationError)
175+
assert str(err.value.cause) == "Test failure"
176+
177+
178+
@activity.defn
179+
async def activity_for_testing_heartbeat() -> str:
180+
wait_for_heartbeat_wf_handle = await activity.client().start_workflow(
181+
WaitForSignalWorkflow.run,
182+
id="test-has-sent-heartbeat-wf-id",
183+
task_queue=activity.info().task_queue,
184+
)
185+
info = activity.info()
186+
if info.attempt == 1:
187+
# Wait for test to notify that it has sent heartbeat
188+
await wait_for_heartbeat_wf_handle.result()
189+
raise Exception("Intentional error to force retry")
190+
elif info.attempt == 2:
191+
[heartbeat_data] = info.heartbeat_details
192+
assert isinstance(heartbeat_data, str)
193+
return heartbeat_data
194+
else:
195+
raise AssertionError(f"Unexpected attempt number: {info.attempt}")
196+
197+
198+
@pytest.mark.skip(
199+
"RecordActivityTaskHeartbeat not implemented in server for standalone activity"
200+
)
201+
async def test_manual_heartbeat(client: Client):
202+
activity_id = str("test_manual_heartbeat_activity_id")
203+
task_queue = str(uuid.uuid4())
204+
205+
activity_handle = await client.start_activity(
206+
activity_for_testing_heartbeat,
207+
args=(), # TODO: overloads
208+
id=activity_id,
209+
task_queue=task_queue,
210+
)
211+
async with Worker(
212+
client,
213+
task_queue=task_queue,
214+
activities=[activity_for_testing_heartbeat],
215+
workflows=[WaitForSignalWorkflow],
216+
):
217+
async_activity_handle = client.get_async_activity_handle(
218+
activity_id=activity_id,
219+
run_id=activity_handle.run_id,
220+
)
221+
await async_activity_handle.heartbeat("Test heartbeat details")
222+
await client.get_workflow_handle_for(
223+
WaitForSignalWorkflow.run,
224+
workflow_id="test-has-sent-heartbeat-wf-id",
225+
).signal(WaitForSignalWorkflow.signal)
226+
assert await activity_handle.result() == "Test heartbeat details"
227+
228+
229+
# Utilities
230+
231+
232+
@workflow.defn
233+
class WaitForSignalWorkflow:
234+
# Like a global asyncio.Event()
235+
236+
def __init__(self) -> None:
237+
self.signal_received = asyncio.Event()
238+
239+
@workflow.run
240+
async def run(self) -> None:
241+
await self.signal_received.wait()
242+
243+
@workflow.signal
244+
def signal(self) -> None:
245+
self.signal_received.set()

0 commit comments

Comments
 (0)