1313from taskiq .acks import AcknowledgeType
1414from taskiq .context import Context
1515from taskiq .exceptions import NoResultError
16- from taskiq .message import TaskiqMessage
16+ from taskiq .message import DeliveryCountMessage , TaskiqMessage , WrappedMessage
1717from taskiq .receiver .params_parser import parse_params
1818from taskiq .result import TaskiqResult
1919from taskiq .state import TaskiqState
@@ -58,6 +58,7 @@ def __init__(
5858 on_exit : Optional [Callable [["Receiver" ], None ]] = None ,
5959 max_tasks_to_execute : Optional [int ] = None ,
6060 wait_tasks_timeout : Optional [float ] = None ,
61+ max_attempts_at_message : Optional [int ] = None ,
6162 ) -> None :
6263 self .broker = broker
6364 self .executor = executor
@@ -72,6 +73,7 @@ def __init__(
7273 self .known_tasks : Set [str ] = set ()
7374 self .max_tasks_to_execute = max_tasks_to_execute
7475 self .wait_tasks_timeout = wait_tasks_timeout
76+ self .max_attempts_at_message = max_attempts_at_message
7577 for task in self .broker .get_all_tasks ().values ():
7678 self ._prepare_task (task .task_name , task .original_func )
7779 self .sem : "Optional[asyncio.Semaphore]" = None
@@ -86,7 +88,7 @@ def __init__(
8688
8789 async def callback ( # noqa: C901, PLR0912
8890 self ,
89- message : Union [bytes , AckableMessage ],
91+ message : Union [bytes , WrappedMessage ],
9092 raise_err : bool = False ,
9193 ) -> None :
9294 """
@@ -101,7 +103,31 @@ async def callback( # noqa: C901, PLR0912
101103 :param raise_err: raise an error if cannot save result in
102104 result_backend.
103105 """
104- message_data = message .data if isinstance (message , AckableMessage ) else message
106+ message_data = message .data if isinstance (message , WrappedMessage ) else message
107+
108+ delivery_count = (
109+ message .delivery_count
110+ if isinstance (message , DeliveryCountMessage )
111+ else None
112+ )
113+ if (
114+ delivery_count
115+ and self .max_attempts_at_message
116+ and delivery_count >= self .max_attempts_at_message
117+ ):
118+ logger .error (
119+ "Permitted number of attempts at processing message %s "
120+ "has been exhausted after %s attempts." ,
121+ message_data ,
122+ self .max_attempts_at_message ,
123+ )
124+ if isinstance (
125+ message ,
126+ AckableMessage ,
127+ ):
128+ await maybe_awaitable (message .ack ())
129+ return
130+
105131 try :
106132 taskiq_msg = self .broker .formatter .loads (message = message_data )
107133 taskiq_msg .parse_labels ()
0 commit comments