File tree Expand file tree Collapse file tree 3 files changed +71
-4
lines changed Expand file tree Collapse file tree 3 files changed +71
-4
lines changed Original file line number Diff line number Diff line change @@ -24,7 +24,7 @@ from taskiq_redis.redis_backend import RedisAsyncResultBackend
2424
2525
2626redis_async_result = RedisAsyncResultBackend(
27- url = " redis://localhost:6379" ,
27+ redis_url = " redis://localhost:6379" ,
2828)
2929
3030broker = RedisBroker(
@@ -41,7 +41,7 @@ async def best_task_ever() -> None:
4141
4242
4343async def main ():
44- task = await my_async_task .kiq()
44+ task = await best_task_ever .kiq()
4545 print (await task.get_result())
4646
4747
@@ -60,4 +60,5 @@ RedisBroker parameters:
6060## RedisAsyncResultBackend configuration
6161
6262RedisAsyncResultBackend parameters:
63- * ` url ` - url to redis.
63+ * ` redis_url ` - url to redis.
64+ * ` keep_results ` - flag to not remove results from Redis after reading.
Original file line number Diff line number Diff line change 1111class RedisAsyncResultBackend (AsyncResultBackend [_ReturnType ]):
1212 """Async result based on redis."""
1313
14- def __init__ (self , redis_url : str ):
14+ def __init__ (self , redis_url : str , keep_results : bool = True ):
15+ """
16+ Constructs a new result backend.
17+
18+ :param redis_url: url to redis.
19+ :param keep_results: flag to not remove results from Redis after reading.
20+ """
1521 self .redis_pool = ConnectionPool .from_url (redis_url )
22+ self .keep_results = keep_results
1623
1724 async def shutdown (self ) -> None :
1825 """Closes redis connection."""
@@ -80,6 +87,9 @@ async def get_result( # noqa: WPS210
8087 keys = fields ,
8188 )
8289
90+ if not self .keep_results :
91+ await redis .delete (task_id )
92+
8393 result = {
8494 result_key : pickle .loads (result_value )
8595 for result_value , result_key in zip (result_values , fields )
Original file line number Diff line number Diff line change @@ -68,3 +68,59 @@ async def test_fetch_without_logs(redis_url: str) -> None:
6868 assert fetched_result .return_value == 11
6969 assert fetched_result .execution_time == 112.2 # noqa: WPS459
7070 assert fetched_result .is_err
71+
72+
73+ @pytest .mark .anyio
74+ async def test_remove_results_after_reading (redis_url : str ) -> None :
75+ """
76+ Check if removing results after reading works fine.
77+
78+ :param redis_url: redis URL.
79+ """
80+ result_backend = RedisAsyncResultBackend ( # type: ignore
81+ redis_url = redis_url ,
82+ keep_results = False ,
83+ )
84+ task_id = uuid .uuid4 ().hex
85+ result : "TaskiqResult[int]" = TaskiqResult (
86+ is_err = True ,
87+ log = "My Log" ,
88+ return_value = 11 ,
89+ execution_time = 112.2 ,
90+ )
91+ await result_backend .set_result (
92+ task_id = task_id ,
93+ result = result ,
94+ )
95+
96+ await result_backend .get_result (task_id = task_id )
97+ with pytest .raises (Exception ):
98+ await result_backend .get_result (task_id = task_id )
99+
100+
101+ @pytest .mark .anyio
102+ async def test_keep_results_after_reading (redis_url : str ) -> None :
103+ """
104+ Check if keeping results after reading works fine.
105+
106+ :param redis_url: redis URL.
107+ """
108+ result_backend = RedisAsyncResultBackend ( # type: ignore
109+ redis_url = redis_url ,
110+ keep_results = True ,
111+ )
112+ task_id = uuid .uuid4 ().hex
113+ result : "TaskiqResult[int]" = TaskiqResult (
114+ is_err = True ,
115+ log = "My Log" ,
116+ return_value = 11 ,
117+ execution_time = 112.2 ,
118+ )
119+ await result_backend .set_result (
120+ task_id = task_id ,
121+ result = result ,
122+ )
123+
124+ res1 = await result_backend .get_result (task_id = task_id )
125+ res2 = await result_backend .get_result (task_id = task_id )
126+ assert res1 == res2
You can’t perform that action at this time.
0 commit comments