File tree Expand file tree Collapse file tree 4 files changed +21
-4
lines changed Expand file tree Collapse file tree 4 files changed +21
-4
lines changed Original file line number Diff line number Diff line change @@ -18,6 +18,7 @@ pip install taskiq-redis
1818Let's see the example with the redis broker and redis async result:
1919
2020``` python
21+ # broker.py
2122import asyncio
2223
2324from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
@@ -42,12 +43,18 @@ async def best_task_ever() -> None:
4243
4344async def main ():
4445 task = await best_task_ever.kiq()
45- print (await task.get_result ())
46+ print (await task.wait_result ())
4647
4748
48- asyncio.run(main())
49+ if __name__ == " __main__" :
50+ asyncio.run(main())
4951```
5052
53+ Launch the workers:
54+ ` taskiq worker broker:broker `
55+ Then run the main code:
56+ ` python3 broker.py `
57+
5158## PubSubBroker and ListQueueBroker configuration
5259
5360We have two brokers with similar interfaces, but with different logic.
Original file line number Diff line number Diff line change @@ -8,3 +8,7 @@ class DuplicateExpireTimeSelectedError(TaskIQRedisError):
88
99class ExpireTimeMustBeMoreThanZeroError (TaskIQRedisError ):
1010 """Error if two lifetimes are less or equal zero."""
11+
12+
13+ class ResultIsMissingError (TaskIQRedisError ):
14+ """Error if there is no result when trying to get it."""
Original file line number Diff line number Diff line change 88from taskiq_redis .exceptions import (
99 DuplicateExpireTimeSelectedError ,
1010 ExpireTimeMustBeMoreThanZeroError ,
11+ ResultIsMissingError ,
1112)
1213
1314_ReturnType = TypeVar ("_ReturnType" )
@@ -109,6 +110,7 @@ async def get_result( # noqa: WPS210
109110
110111 :param task_id: task's id.
111112 :param with_logs: if True it will download task's logs.
113+ :raises ResultIsMissingError: if there is no result when trying to get it.
112114 :return: task's return value.
113115 """
114116 async with Redis (connection_pool = self .redis_pool ) as redis :
@@ -121,6 +123,9 @@ async def get_result( # noqa: WPS210
121123 name = task_id ,
122124 )
123125
126+ if result_value is None :
127+ raise ResultIsMissingError ()
128+
124129 taskiq_result : TaskiqResult [_ReturnType ] = pickle .loads (result_value )
125130
126131 if not with_logs :
Original file line number Diff line number Diff line change 99from taskiq_redis .exceptions import (
1010 DuplicateExpireTimeSelectedError ,
1111 ExpireTimeMustBeMoreThanZeroError ,
12+ ResultIsMissingError ,
1213)
1314
1415_ReturnType = TypeVar ("_ReturnType" )
@@ -210,7 +211,7 @@ async def test_unsuccess_backend_expire_ex_param(
210211 )
211212 await asyncio .sleep (1.1 )
212213
213- with pytest .raises (TypeError ):
214+ with pytest .raises (ResultIsMissingError ):
214215 await backend .get_result (task_id = task_id )
215216
216217
@@ -270,5 +271,5 @@ async def test_unsuccess_backend_expire_px_param(
270271 )
271272 await asyncio .sleep (1.1 )
272273
273- with pytest .raises (TypeError ):
274+ with pytest .raises (ResultIsMissingError ):
274275 await backend .get_result (task_id = task_id )
You can’t perform that action at this time.
0 commit comments