22import logging
33import signal
44from concurrent .futures import ThreadPoolExecutor
5- from typing import Any
5+ from typing import Any , Type
66
77from taskiq .abc .broker import AsyncBroker
88from taskiq .cli .utils import import_object , import_tasks
@@ -51,7 +51,21 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None:
5151 )
5252
5353
54- def start_listen (args : WorkerArgs ) -> None : # noqa: WPS213
54+ def get_receiver_type (args : WorkerArgs ) -> Type [Receiver ]:
55+ """
56+ Import Receiver from args.
57+
58+ :param args: CLI arguments.
59+ :raises ValueError: if receiver is not a Receiver type.
60+ :return: Receiver type.
61+ """
62+ receiver_type = import_object (args .receiver )
63+ if not (isinstance (receiver_type , type ) and issubclass (receiver_type , Receiver )):
64+ raise ValueError ("Unknown receiver type. Please use Receiver class." )
65+ return receiver_type
66+
67+
68+ def start_listen (args : WorkerArgs ) -> None : # noqa: WPS210, WPS213
5569 """
5670 This function starts actual listening process.
5771
@@ -63,6 +77,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
6377
6478 :param args: CLI arguments.
6579 :raises ValueError: if broker is not an AsyncBroker instance.
80+ :raises ValueError: if receiver is not a Receiver type.
6681 """
6782 if uvloop is not None :
6883 logger .debug ("UVLOOP found. Installing policy." )
@@ -77,6 +92,9 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
7792 if not isinstance (broker , AsyncBroker ):
7893 raise ValueError ("Unknown broker type. Please use AsyncBroker instance." )
7994
95+ receiver_type = get_receiver_type (args )
96+ receiver_args = dict (args .receiver_arg )
97+
8098 # Here how we manage interruptions.
8199 # We have to remember shutting_down state,
82100 # because KeyboardInterrupt can be send multiple
@@ -105,14 +123,16 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
105123 signal .signal (signal .SIGTERM , interrupt_handler )
106124
107125 loop = asyncio .get_event_loop ()
126+
108127 try :
109128 logger .debug ("Initialize receiver." )
110129 with ThreadPoolExecutor (args .max_threadpool_threads ) as pool :
111- receiver = Receiver (
130+ receiver = receiver_type (
112131 broker = broker ,
113132 executor = pool ,
114133 validate_params = not args .no_parse ,
115134 max_async_tasks = args .max_async_tasks ,
135+ ** receiver_args ,
116136 )
117137 loop .run_until_complete (receiver .listen ())
118138 except KeyboardInterrupt :
0 commit comments