88import anyio
99from taskiq_dependencies import DependencyGraph
1010
11- from taskiq .abc .broker import AckableMessage , AsyncBroker
11+ from taskiq .abc .broker import AsyncBroker
1212from taskiq .abc .middleware import TaskiqMiddleware
13- from taskiq .acks import AcknowledgeType
13+ from taskiq .acks import AckableMessage , AcknowledgeType , NackableMessage
1414from taskiq .context import Context
1515from taskiq .exceptions import NoResultError
16- from taskiq .message import TaskiqMessage
16+ from taskiq .message import MessageWithMetadata , TaskiqMessage , WrappedMessage
1717from taskiq .receiver .params_parser import parse_params
1818from taskiq .result import TaskiqResult
1919from taskiq .state import TaskiqState
@@ -58,6 +58,7 @@ def __init__(
5858 on_exit : Optional [Callable [["Receiver" ], None ]] = None ,
5959 max_tasks_to_execute : Optional [int ] = None ,
6060 wait_tasks_timeout : Optional [float ] = None ,
61+ max_attempts_at_message : Optional [int ] = None ,
6162 ) -> None :
6263 self .broker = broker
6364 self .executor = executor
@@ -72,6 +73,7 @@ def __init__(
7273 self .known_tasks : Set [str ] = set ()
7374 self .max_tasks_to_execute = max_tasks_to_execute
7475 self .wait_tasks_timeout = wait_tasks_timeout
76+ self .max_attempts_at_message = max_attempts_at_message
7577 for task in self .broker .get_all_tasks ().values ():
7678 self ._prepare_task (task .task_name , task .original_func )
7779 self .sem : "Optional[asyncio.Semaphore]" = None
@@ -86,7 +88,7 @@ def __init__(
8688
8789 async def callback ( # noqa: C901, PLR0912
8890 self ,
89- message : Union [bytes , AckableMessage ],
91+ message : Union [bytes , WrappedMessage ],
9092 raise_err : bool = False ,
9193 ) -> None :
9294 """
@@ -101,7 +103,33 @@ async def callback( # noqa: C901, PLR0912
101103 :param raise_err: raise an error if cannot save result in
102104 result_backend.
103105 """
104- message_data = message .data if isinstance (message , AckableMessage ) else message
106+ message_data = (
107+ message .message if isinstance (message , WrappedMessage ) else message
108+ )
109+ if isinstance (message , MessageWithMetadata ):
110+ message_metadata = message .metadata
111+ else :
112+ message_metadata = None
113+
114+ delivery_count = message_metadata .delivery_count if message_metadata else None
115+ if (
116+ delivery_count
117+ and self .max_attempts_at_message
118+ and delivery_count >= self .max_attempts_at_message
119+ ):
120+ logger .error (
121+ "Permitted number of attempts at processing message %s "
122+ "has been exhausted after %s attempts." ,
123+ message_data ,
124+ self .max_attempts_at_message ,
125+ )
126+ match message :
127+ case NackableMessage ():
128+ await maybe_awaitable (message .nack ())
129+ case AckableMessage ():
130+ await maybe_awaitable (message .ack ())
131+ return
132+
105133 try :
106134 taskiq_msg = self .broker .formatter .loads (message = message_data )
107135 taskiq_msg .parse_labels ()
@@ -331,7 +359,7 @@ async def listen(self, finish_event: asyncio.Event) -> None: # pragma: no cover
331359 if self .run_startup :
332360 await self .broker .startup ()
333361 logger .info ("Listening started." )
334- queue : "asyncio.Queue[Union[bytes, AckableMessage ]]" = asyncio .Queue ()
362+ queue : "asyncio.Queue[Union[bytes, WrappedMessage ]]" = asyncio .Queue ()
335363
336364 async with anyio .create_task_group () as gr :
337365 gr .start_soon (self .prefetcher , queue , finish_event )
@@ -342,7 +370,7 @@ async def listen(self, finish_event: asyncio.Event) -> None: # pragma: no cover
342370
343371 async def prefetcher (
344372 self ,
345- queue : "asyncio.Queue[Union[bytes, AckableMessage ]]" ,
373+ queue : "asyncio.Queue[Union[bytes, WrappedMessage ]]" ,
346374 finish_event : asyncio .Event ,
347375 ) -> None :
348376 """
@@ -354,7 +382,7 @@ async def prefetcher(
354382 fetched_tasks : int = 0
355383 iterator = self .broker .listen ()
356384 current_message : asyncio .Task [
357- Union [bytes , AckableMessage ]
385+ Union [bytes , WrappedMessage ]
358386 ] = asyncio .create_task (
359387 iterator .__anext__ (), # type: ignore
360388 )
@@ -394,7 +422,7 @@ async def prefetcher(
394422
395423 async def runner (
396424 self ,
397- queue : "asyncio.Queue[Union[bytes, AckableMessage ]]" ,
425+ queue : "asyncio.Queue[Union[bytes, WrappedMessage ]]" ,
398426 ) -> None :
399427 """
400428 Run tasks.
0 commit comments