File tree Expand file tree Collapse file tree 2 files changed +17
-5
lines changed Expand file tree Collapse file tree 2 files changed +17
-5
lines changed Original file line number Diff line number Diff line change @@ -117,7 +117,6 @@ def __init__(
117117 self ,
118118 url : str ,
119119 prefix : str = "schedule" ,
120- buffer_size : int = 50 ,
121120 serializer : Optional [TaskiqSerializer ] = None ,
122121 ** connection_kwargs : Any ,
123122 ) -> None :
@@ -126,7 +125,6 @@ def __init__(
126125 url ,
127126 ** connection_kwargs ,
128127 )
129- self .buffer_size = buffer_size
130128 if serializer is None :
131129 serializer = PickleSerializer ()
132130 self .serializer = serializer
@@ -157,7 +155,7 @@ async def get_schedules(self) -> List[ScheduledTask]:
157155 """
158156 schedules = []
159157 async for key in self .redis .scan_iter (f"{ self .prefix } :*" ): # type: ignore[attr-defined]
160- raw_schedule = await self .redis .get (key )
158+ raw_schedule = await self .redis .get (key ) # type: ignore[attr-defined]
161159 parsed_schedule = model_validate (
162160 ScheduledTask ,
163161 self .serializer .loadb (raw_schedule ),
Original file line number Diff line number Diff line change @@ -196,17 +196,31 @@ async def test_cluster_post_run_time(redis_cluster_url: str) -> None:
196196
197197
198198@pytest .mark .anyio
199- async def test_cluster_buffer (redis_cluster_url : str ) -> None :
199+ async def test_cluster_get_schedules (redis_cluster_url : str ) -> None :
200+ """
201+ Test of a redis cluster source.
202+
203+ This test checks that if the schedules are located on different nodes,
204+ the source will still be able to get them all.
205+
206+ To simulate this we set a specific shard key for each schedule.
207+ The shard keys are from this gist:
208+
209+ https://gist.githubusercontent.com/dvirsky/93f43277317f629bb06e858946416f7e/raw/b0438faf6f5a0020c12a0730f6cd6ac4bdc4b171/crc16_slottable.h
210+
211+ """
200212 prefix = uuid .uuid4 ().hex
201- source = RedisClusterScheduleSource (redis_cluster_url , prefix = prefix , buffer_size = 1 )
213+ source = RedisClusterScheduleSource (redis_cluster_url , prefix = prefix )
202214 schedule1 = ScheduledTask (
215+ schedule_id = r"id-{06S}" ,
203216 task_name = "test_task1" ,
204217 labels = {},
205218 args = [],
206219 kwargs = {},
207220 cron = "* * * * *" ,
208221 )
209222 schedule2 = ScheduledTask (
223+ schedule_id = r"id-{4Rs}" ,
210224 task_name = "test_task2" ,
211225 labels = {},
212226 args = [],
You can’t perform that action at this time.
0 commit comments