1515from taskiq .message import TaskiqMessage
1616from taskiq .receiver .params_parser import parse_params
1717from taskiq .result import TaskiqResult
18- from taskiq .semaphore import DequeSemaphore
18+ from taskiq .semaphore import PrioritySemaphore
1919from taskiq .state import TaskiqState
2020from taskiq .utils import DequeQueue , maybe_awaitable
2121
@@ -48,7 +48,7 @@ def __init__( # noqa: WPS211
4848 validate_params : bool = True ,
4949 max_async_tasks : "Optional[int]" = None ,
5050 max_prefetch : int = 0 ,
51- max_idle_tasks : Optional [int ] = None ,
51+ max_sleeping_tasks : Optional [int ] = None ,
5252 propagate_exceptions : bool = True ,
5353 ) -> None :
5454 self .broker = broker
@@ -62,22 +62,22 @@ def __init__( # noqa: WPS211
6262 self .task_signatures [task .task_name ] = inspect .signature (task .original_func )
6363 self .task_hints [task .task_name ] = get_type_hints (task .original_func )
6464 self .dependency_graphs [task .task_name ] = DependencyGraph (task .original_func )
65- self .sem : "Optional[DequeSemaphore ]" = None
65+ self .sem : "Optional[PrioritySemaphore ]" = None
6666 if max_async_tasks is not None and max_async_tasks > 0 :
67- self .sem = DequeSemaphore (max_async_tasks )
67+ self .sem = PrioritySemaphore (max_async_tasks )
6868 else :
6969 logger .warning (
7070 "Setting unlimited number of async tasks "
7171 + "can result in undefined behavior" ,
7272 )
73- self .sem_prefetch = DequeSemaphore (max_prefetch )
73+ self .sem_prefetch = PrioritySemaphore (max_prefetch )
7474 self .queue : DequeQueue [bytes ] = DequeQueue ()
7575
76- self .sem_idle : Optional [asyncio .Semaphore ] = None
77- if max_idle_tasks is not None and max_idle_tasks <= 0 :
76+ self .sem_sleeping : Optional [asyncio .Semaphore ] = None
77+ if max_sleeping_tasks is not None and max_sleeping_tasks <= 0 :
7878 raise ValueError ("`max_idle_tasks` should be greater then zero or None." )
79- if max_idle_tasks is not None and max_idle_tasks > 0 :
80- self .sem_idle = asyncio .Semaphore (max_idle_tasks )
79+ if max_sleeping_tasks is not None and max_sleeping_tasks > 0 :
80+ self .sem_sleeping = asyncio .Semaphore (max_sleeping_tasks )
8181
8282 async def callback ( # noqa: C901, WPS213
8383 self ,
@@ -190,7 +190,7 @@ async def run_task( # noqa: C901, WPS210
190190 broker_ctx = self .broker .custom_dependency_context
191191 broker_ctx .update (
192192 {
193- Context : Context (message , self .broker , self .task_idler ),
193+ Context : Context (message , self .broker , self .task_sleeper ),
194194 TaskiqState : self .broker .state ,
195195 },
196196 )
@@ -344,24 +344,26 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
344344 # https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
345345 task .add_done_callback (task_cb )
346346
347- async def task_idler (self , wait : float ) -> None : # noqa: WPS213, WPS217
347+ async def task_sleeper (self , wait : float ) -> None : # noqa: WPS213, WPS217
348348 """
349- Temporary increasing `max_async_tasks` for at least `wait` amount of time.
349+ Non-blocking sleep for running tasks.
350+
351+ Temporary increasing `max_async_tasks` for at least `wait` amount of time
350352
351353 :param wait: time
352354 """
353355 if not self .sem :
354356 await asyncio .sleep (wait )
355357 return
356358
357- if not self .sem_idle :
358- logger .warning ("`max_idle_tasks ` is undefined. Idle is unavailable." )
359+ if not self .sem_sleeping :
360+ logger .warning ("`max_sleeping_tasks ` is undefined. Sleep is unavailable." )
359361 await asyncio .sleep (wait )
360362 return
361363
362364 start_time = time ()
363365 with anyio .move_on_after (wait ) as scope :
364- await self .sem_idle .acquire ()
366+ await self .sem_sleeping .acquire ()
365367
366368 if scope .cancel_called : # noqa: WPS441
367369 return
@@ -381,4 +383,4 @@ async def task_idler(self, wait: float) -> None: # noqa: WPS213, WPS217
381383 await task
382384
383385 finally :
384- self .sem_idle .release ()
386+ self .sem_sleeping .release ()
0 commit comments