1- import asyncio
21import pickle
32from logging import getLogger
4- from typing import Any , Callable , Coroutine , Optional , TypeVar
3+ from typing import Any , AsyncGenerator , Callable , Optional , TypeVar
54
65from redis .asyncio import ConnectionPool , Redis
76from taskiq .abc .broker import AsyncBroker
@@ -70,19 +69,15 @@ async def kick(self, message: BrokerMessage) -> None:
7069 pickle .dumps (message ),
7170 )
7271
73- async def listen (
74- self ,
75- callback : Callable [[BrokerMessage ], Coroutine [Any , Any , None ]],
76- ) -> None :
72+ async def listen (self ) -> AsyncGenerator [BrokerMessage , None ]:
7773 """
78- Listen redis list for new messages.
74+ Listen redis queue for new messages.
7975
80- This function listens to list calls callback on
81- new messages.
76+ This function listens to the queue
77+ and yields new messages if they have BrokerMessage type .
8278
83- :param callback: function to call on new message .
79+ :yields: broker messages .
8480 """
85- loop = asyncio .get_event_loop ()
8681 async with Redis (connection_pool = self .connection_pool ) as redis_conn :
8782 redis_pubsub_channel = redis_conn .pubsub ()
8883 await redis_pubsub_channel .subscribe (self .redis_pubsub_channel )
@@ -93,7 +88,7 @@ async def listen(
9388 message ["data" ],
9489 )
9590 if isinstance (redis_message , BrokerMessage ):
96- loop . create_task ( callback ( redis_message ))
91+ yield redis_message
9792 except (
9893 TypeError ,
9994 AttributeError ,
0 commit comments