Skip to content

Commit 741603f

Browse files
committed
Extend tests
1 parent d28ae5b commit 741603f

File tree

1 file changed

+230
-4
lines changed

1 file changed

+230
-4
lines changed

tests/test_activity.py

Lines changed: 230 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1+
import asyncio
12
import uuid
23
from datetime import timedelta
34

4-
from temporalio import activity
5-
from temporalio.client import Client
5+
import pytest
6+
7+
from temporalio import activity, workflow
8+
from temporalio.client import ActivityFailedError, Client
69
from temporalio.common import ActivityExecutionStatus
10+
from temporalio.exceptions import ApplicationError, CancelledError
11+
from temporalio.worker import Worker
712

813

914
@activity.defn
1015
async def increment(input: int) -> int:
1116
return input + 1
1217

1318

14-
async def test_describe_activity(client: Client):
15-
activity_id = str("test_start_and_describe_activity_id")
19+
@pytest.mark.skip("StartActivityExecution not implemented in server")
20+
async def test_start_activity_and_describe_activity(client: Client):
21+
activity_id = str(uuid.uuid4())
1622
task_queue = str(uuid.uuid4())
1723

1824
activity_handle = await client.start_activity(
@@ -29,3 +35,223 @@ async def test_describe_activity(client: Client):
2935
assert desc.activity_type == "increment"
3036
assert desc.task_queue == task_queue
3137
assert desc.status == ActivityExecutionStatus.RUNNING
38+
39+
40+
@pytest.mark.skip("GetActivityExecutionResult not implemented in server")
41+
async def test_start_activity_and_get_result(client: Client):
42+
activity_id = str(uuid.uuid4())
43+
task_queue = str(uuid.uuid4())
44+
45+
activity_handle = await client.start_activity(
46+
increment,
47+
args=(1,),
48+
id=activity_id,
49+
task_queue=task_queue,
50+
start_to_close_timeout=timedelta(seconds=5),
51+
)
52+
result_via_execute_activity = client.execute_activity(
53+
increment,
54+
args=(1,),
55+
id=activity_id,
56+
task_queue=task_queue,
57+
start_to_close_timeout=timedelta(seconds=5),
58+
)
59+
60+
async with Worker(
61+
client,
62+
task_queue=task_queue,
63+
activities=[increment],
64+
):
65+
assert await activity_handle.result() == 2
66+
assert await result_via_execute_activity == 2
67+
68+
69+
@activity.defn
70+
async def async_activity() -> int:
71+
# Notify test that the activity has started and is ready to be completed manually
72+
await (
73+
activity.client()
74+
.get_workflow_handle("activity-started-wf-id")
75+
.signal(WaitForSignalWorkflow.signal)
76+
)
77+
activity.raise_complete_async()
78+
79+
80+
@pytest.mark.skip(
81+
"RespondActivityTaskCompleted not implemented in server for standalone activity"
82+
)
83+
async def test_manual_completion(client: Client):
84+
activity_id = str(uuid.uuid4())
85+
task_queue = str(uuid.uuid4())
86+
87+
activity_handle = await client.start_activity(
88+
async_activity,
89+
args=(), # TODO: overloads
90+
id=activity_id,
91+
task_queue=task_queue,
92+
start_to_close_timeout=timedelta(seconds=5),
93+
)
94+
95+
async with Worker(
96+
client,
97+
task_queue=task_queue,
98+
activities=[async_activity],
99+
workflows=[WaitForSignalWorkflow],
100+
):
101+
# Wait for activity to start
102+
await client.execute_workflow(
103+
WaitForSignalWorkflow.run,
104+
id="activity-started-wf-id",
105+
task_queue=task_queue,
106+
)
107+
# Complete activity manually
108+
async_activity_handle = client.get_async_activity_handle(
109+
activity_id=activity_id,
110+
run_id=activity_handle.run_id,
111+
)
112+
await async_activity_handle.complete(7)
113+
assert await activity_handle.result() == 7
114+
115+
116+
@pytest.mark.skip(
117+
"RespondActivityTaskCanceled not implemented in server for standalone activity"
118+
)
119+
async def test_manual_cancellation(client: Client):
120+
activity_id = str(uuid.uuid4())
121+
task_queue = str(uuid.uuid4())
122+
123+
activity_handle = await client.start_activity(
124+
async_activity,
125+
args=(), # TODO: overloads
126+
id=activity_id,
127+
task_queue=task_queue,
128+
start_to_close_timeout=timedelta(seconds=5),
129+
)
130+
131+
async with Worker(
132+
client,
133+
task_queue=task_queue,
134+
activities=[async_activity],
135+
workflows=[WaitForSignalWorkflow],
136+
):
137+
await client.execute_workflow(
138+
WaitForSignalWorkflow.run,
139+
id="activity-started-wf-id",
140+
task_queue=task_queue,
141+
)
142+
async_activity_handle = client.get_async_activity_handle(
143+
activity_id=activity_id,
144+
run_id=activity_handle.run_id,
145+
)
146+
await async_activity_handle.report_cancellation("Test cancellation")
147+
with pytest.raises(ActivityFailedError) as err:
148+
await activity_handle.result()
149+
assert isinstance(err.value.cause, CancelledError)
150+
assert str(err.value.cause) == "Test cancellation"
151+
152+
153+
@pytest.mark.skip(
154+
"RespondActivityTaskFailed not implemented in server for standalone activity"
155+
)
156+
async def test_manual_fail(client: Client):
157+
activity_id = str(uuid.uuid4())
158+
task_queue = str(uuid.uuid4())
159+
160+
activity_handle = await client.start_activity(
161+
async_activity,
162+
args=(), # TODO: overloads
163+
id=activity_id,
164+
task_queue=task_queue,
165+
start_to_close_timeout=timedelta(seconds=5),
166+
)
167+
async with Worker(
168+
client,
169+
task_queue=task_queue,
170+
activities=[async_activity],
171+
workflows=[WaitForSignalWorkflow],
172+
):
173+
await client.execute_workflow(
174+
WaitForSignalWorkflow.run,
175+
id="activity-started-wf-id",
176+
task_queue=task_queue,
177+
)
178+
async_activity_handle = client.get_async_activity_handle(
179+
activity_id=activity_id,
180+
run_id=activity_handle.run_id,
181+
)
182+
await async_activity_handle.fail(Exception("Test failure"))
183+
with pytest.raises(ActivityFailedError) as err:
184+
await activity_handle.result()
185+
assert isinstance(err.value.cause, ApplicationError)
186+
assert str(err.value.cause) == "Test failure"
187+
188+
189+
@activity.defn
190+
async def activity_for_testing_heartbeat() -> str:
191+
wait_for_heartbeat_wf_handle = await activity.client().start_workflow(
192+
WaitForSignalWorkflow.run,
193+
id="test-has-sent-heartbeat-wf-id",
194+
task_queue=activity.info().task_queue,
195+
)
196+
info = activity.info()
197+
if info.attempt == 1:
198+
# Wait for test to notify that it has sent heartbeat
199+
await wait_for_heartbeat_wf_handle.result()
200+
raise Exception("Intentional error to force retry")
201+
elif info.attempt == 2:
202+
[heartbeat_data] = info.heartbeat_details
203+
assert isinstance(heartbeat_data, str)
204+
return heartbeat_data
205+
else:
206+
raise AssertionError(f"Unexpected attempt number: {info.attempt}")
207+
208+
209+
@pytest.mark.skip(
210+
"RecordActivityTaskHeartbeat not implemented in server for standalone activity"
211+
)
212+
async def test_manual_heartbeat(client: Client):
213+
activity_id = str(uuid.uuid4())
214+
task_queue = str(uuid.uuid4())
215+
216+
activity_handle = await client.start_activity(
217+
activity_for_testing_heartbeat,
218+
args=(), # TODO: overloads
219+
id=activity_id,
220+
task_queue=task_queue,
221+
start_to_close_timeout=timedelta(seconds=5),
222+
)
223+
async with Worker(
224+
client,
225+
task_queue=task_queue,
226+
activities=[activity_for_testing_heartbeat],
227+
workflows=[WaitForSignalWorkflow],
228+
):
229+
async_activity_handle = client.get_async_activity_handle(
230+
activity_id=activity_id,
231+
run_id=activity_handle.run_id,
232+
)
233+
await async_activity_handle.heartbeat("Test heartbeat details")
234+
await client.get_workflow_handle_for(
235+
WaitForSignalWorkflow.run,
236+
workflow_id="test-has-sent-heartbeat-wf-id",
237+
).signal(WaitForSignalWorkflow.signal)
238+
assert await activity_handle.result() == "Test heartbeat details"
239+
240+
241+
# Utilities
242+
243+
244+
@workflow.defn
245+
class WaitForSignalWorkflow:
246+
# Like a global asyncio.Event()
247+
248+
def __init__(self) -> None:
249+
self.signal_received = asyncio.Event()
250+
251+
@workflow.run
252+
async def run(self) -> None:
253+
await self.signal_received.wait()
254+
255+
@workflow.signal
256+
def signal(self) -> None:
257+
self.signal_received.set()

0 commit comments

Comments
 (0)