22import logging
33import signal
44from concurrent .futures import ThreadPoolExecutor
5- from typing import Any
5+ from typing import Any , Type
66
77from taskiq .abc .broker import AsyncBroker
88from taskiq .cli .utils import import_object , import_tasks
@@ -51,7 +51,21 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None:
5151 )
5252
5353
54- def start_listen (args : WorkerArgs ) -> None : # noqa: WPS213
54+ def get_receiver_type (args : WorkerArgs ) -> Type [Receiver ]:
55+ """
56+ Import Receiver from args.
57+
58+ :param args: CLI arguments.
59+ :raises ValueError: if receiver is not a Receiver type.
60+ :return: Receiver type.
61+ """
62+ receiver_type = import_object (args .receiver )
63+ if not (isinstance (receiver_type , type ) and issubclass (receiver_type , Receiver )):
64+ raise ValueError ("Unknown receiver type. Please use Receiver class." )
65+ return receiver_type
66+
67+
68+ def start_listen (args : WorkerArgs ) -> None : # noqa: WPS210, WPS213
5569 """
5670 This function starts actual listening process.
5771
@@ -63,6 +77,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
6377
6478 :param args: CLI arguments.
6579 :raises ValueError: if broker is not an AsyncBroker instance.
80+ :raises ValueError: if receiver is not a Receiver type.
6681 """
6782 if uvloop is not None :
6883 logger .debug ("UVLOOP found. Installing policy." )
@@ -77,6 +92,8 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
7792 if not isinstance (broker , AsyncBroker ):
7893 raise ValueError ("Unknown broker type. Please use AsyncBroker instance." )
7994
95+ receiver_type = get_receiver_type (args )
96+
8097 # Here how we manage interruptions.
8198 # We have to remember shutting_down state,
8299 # because KeyboardInterrupt can be send multiple
@@ -105,10 +122,11 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
105122 signal .signal (signal .SIGTERM , interrupt_handler )
106123
107124 loop = asyncio .get_event_loop ()
125+
108126 try :
109127 logger .debug ("Initialize receiver." )
110128 with ThreadPoolExecutor (args .max_threadpool_threads ) as pool :
111- receiver = Receiver (
129+ receiver = receiver_type (
112130 broker = broker ,
113131 executor = pool ,
114132 validate_params = not args .no_parse ,
0 commit comments