@@ -50,6 +50,7 @@ def __init__(self, broker: AsyncBroker, cli_args: WorkerArgs) -> None:
5050 self .executor = ThreadPoolExecutor (
5151 max_workers = cli_args .max_threadpool_threads ,
5252 )
53+ self .sem = asyncio .Semaphore (cli_args .max_async_tasks )
5354
5455 async def callback ( # noqa: C901, WPS213
5556 self ,
@@ -68,61 +69,62 @@ async def callback( # noqa: C901, WPS213
6869 :param raise_err: raise an error if cannot save result in
6970 result_backend.
7071 """
71- logger .debug (f"Received message: { message } " )
72- if message .task_name not in self .broker .available_tasks :
73- logger .warning (
74- 'task "%s" is not found. Maybe you forgot to import it?' ,
72+ async with self .sem :
73+ logger .debug (f"Received message: { message } " )
74+ if message .task_name not in self .broker .available_tasks :
75+ logger .warning (
76+ 'task "%s" is not found. Maybe you forgot to import it?' ,
77+ message .task_name ,
78+ )
79+ return
80+ logger .debug (
81+ "Function for task %s is resolved. Executing..." ,
7582 message .task_name ,
7683 )
77- return
78- logger .debug (
79- "Function for task %s is resolved. Executing..." ,
80- message .task_name ,
81- )
82- try :
83- taskiq_msg = self .broker .formatter .loads (message = message )
84- except Exception as exc :
85- logger .warning (
86- "Cannot parse message: %s. Skipping execution.\n %s" ,
87- message ,
88- exc ,
89- exc_info = True ,
90- )
91- return
92- for middleware in self .broker .middlewares :
93- if middleware .__class__ .pre_execute != TaskiqMiddleware .pre_execute :
94- taskiq_msg = await maybe_awaitable (
95- middleware .pre_execute (
96- taskiq_msg ,
97- ),
84+ try :
85+ taskiq_msg = self .broker .formatter .loads (message = message )
86+ except Exception as exc :
87+ logger .warning (
88+ "Cannot parse message: %s. Skipping execution.\n %s" ,
89+ message ,
90+ exc ,
91+ exc_info = True ,
9892 )
93+ return
94+ for middleware in self .broker .middlewares :
95+ if middleware .__class__ .pre_execute != TaskiqMiddleware .pre_execute :
96+ taskiq_msg = await maybe_awaitable (
97+ middleware .pre_execute (
98+ taskiq_msg ,
99+ ),
100+ )
99101
100- logger .info (
101- "Executing task %s with ID: %s" ,
102- taskiq_msg .task_name ,
103- taskiq_msg .task_id ,
104- )
105- result = await self .run_task (
106- target = self .broker .available_tasks [message .task_name ].original_func ,
107- message = taskiq_msg ,
108- )
109- for middleware in self .broker .middlewares :
110- if middleware .__class__ .post_execute != TaskiqMiddleware .post_execute :
111- await maybe_awaitable (middleware .post_execute (taskiq_msg , result ))
112- try :
113- await self .broker .result_backend .set_result (message .task_id , result )
114- except Exception as exc :
115- logger .exception (
116- "Can't set result in result backend. Cause: %s" ,
117- exc ,
118- exc_info = True ,
102+ logger .info (
103+ "Executing task %s with ID: %s" ,
104+ taskiq_msg .task_name ,
105+ taskiq_msg .task_id ,
106+ )
107+ result = await self .run_task (
108+ target = self .broker .available_tasks [message .task_name ].original_func ,
109+ message = taskiq_msg ,
119110 )
120- if raise_err :
121- raise exc
111+ for middleware in self .broker .middlewares :
112+ if middleware .__class__ .post_execute != TaskiqMiddleware .post_execute :
113+ await maybe_awaitable (middleware .post_execute (taskiq_msg , result ))
114+ try :
115+ await self .broker .result_backend .set_result (message .task_id , result )
116+ except Exception as exc :
117+ logger .exception (
118+ "Can't set result in result backend. Cause: %s" ,
119+ exc ,
120+ exc_info = True ,
121+ )
122+ if raise_err :
123+ raise exc
122124
123- for middleware in self .broker .middlewares :
124- if middleware .__class__ .post_save != TaskiqMiddleware .post_save :
125- await maybe_awaitable (middleware .post_save (taskiq_msg , result ))
125+ for middleware in self .broker .middlewares :
126+ if middleware .__class__ .post_save != TaskiqMiddleware .post_save :
127+ await maybe_awaitable (middleware .post_save (taskiq_msg , result ))
126128
127129 async def run_task ( # noqa: C901, WPS210
128130 self ,
0 commit comments