33from typing import List , Tuple , Union
44
55import pytest
6+ from redis .asyncio import Redis
67from taskiq import AckableMessage , AsyncBroker , BrokerMessage
78
89from taskiq_redis import (
@@ -316,7 +317,7 @@ async def test_streams_sentinel_broker(
316317 redis_sentinel_master_name : str ,
317318) -> None :
318319 """
319- Test that messages are published and read correctly by ListQueueSentinelBroker .
320+ Test that messages are published and read correctly by RedisStreamSentinelBroker .
320321
321322 We create two workers that listen and send a message to them.
322323 Expect only one worker to receive the same message we sent.
@@ -338,3 +339,97 @@ async def test_streams_sentinel_broker(
338339 await result .ack () # type: ignore
339340 worker_task .cancel ()
340341 await broker .shutdown ()
342+
343+
344+ @pytest .mark .anyio
345+ async def test_maxlen_in_stream_broker (
346+ redis_url : str ,
347+ valid_broker_message : BrokerMessage ,
348+ ) -> None :
349+ """
350+ Test that maxlen parameter works correctly in RedisStreamBroker.
351+
352+ We create RedisStreamBroker, fill in them with messages in the amount of
353+ > maxlen and check that only maxlen messages are in the stream.
354+ """
355+ maxlen = 20
356+
357+ broker = RedisStreamBroker (
358+ url = redis_url ,
359+ maxlen = maxlen ,
360+ approximate = False ,
361+ queue_name = uuid .uuid4 ().hex ,
362+ consumer_group_name = uuid .uuid4 ().hex ,
363+ )
364+
365+ await broker .startup ()
366+
367+ for _ in range (maxlen * 2 ):
368+ await broker .kick (valid_broker_message )
369+
370+ async with Redis (connection_pool = broker .connection_pool ) as redis :
371+ assert await redis .xlen (broker .queue_name ) == maxlen
372+ await broker .shutdown ()
373+
374+
375+ @pytest .mark .anyio
376+ async def test_maxlen_in_cluster_stream_broker (
377+ redis_cluster_url : str ,
378+ valid_broker_message : BrokerMessage ,
379+ ) -> None :
380+ """
381+ Test that maxlen parameter works correctly in RedisStreamClusterBroker.
382+
383+ We create RedisStreamClusterBroker, fill it with messages in the amount of
384+ > maxlen and check that only maxlen messages are in the stream.
385+ """
386+ maxlen = 20
387+
388+ broker = RedisStreamClusterBroker (
389+ maxlen = maxlen ,
390+ approximate = False ,
391+ url = redis_cluster_url ,
392+ queue_name = uuid .uuid4 ().hex ,
393+ consumer_group_name = uuid .uuid4 ().hex ,
394+ )
395+
396+ await broker .startup ()
397+
398+ for _ in range (maxlen * 2 ):
399+ await broker .kick (valid_broker_message )
400+
401+ assert await broker .redis .xlen (broker .queue_name ) == maxlen
402+ await broker .shutdown ()
403+
404+
405+ @pytest .mark .anyio
406+ async def test_maxlen_in_sentinel_stream_broker (
407+ redis_sentinel_master_name : str ,
408+ redis_sentinels : List [Tuple [str , int ]],
409+ valid_broker_message : BrokerMessage ,
410+ ) -> None :
411+ """
412+ Test that maxlen parameter works correctly in RedisStreamSentinelBroker.
413+
414+ We create RedisStreamSentinelBroker, fill it with messages in the amount of
415+ > maxlen and check that only maxlen messages are in the stream.
416+ """
417+ maxlen = 20
418+
419+ broker = RedisStreamSentinelBroker (
420+ maxlen = maxlen ,
421+ approximate = False ,
422+ sentinels = redis_sentinels ,
423+ queue_name = uuid .uuid4 ().hex ,
424+ consumer_group_name = uuid .uuid4 ().hex ,
425+ master_name = redis_sentinel_master_name ,
426+ )
427+
428+ await broker .startup ()
429+
430+ for _ in range (maxlen * 2 ):
431+ await broker .kick (valid_broker_message )
432+
433+ async with broker ._acquire_master_conn () as redis_conn :
434+ assert await redis_conn .xlen (broker .queue_name ) == maxlen
435+ await broker .shutdown ()
0 commit comments