Skip to content

Commit 7c40f84

Browse files
committed
Thread changes
1 parent bc66377 commit 7c40f84

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ You can send delayed messages and set priorities to messages using labels.
2424

2525
To send delayed message, you have to specify
2626
delay label. You can do it with `task` decorator,
27-
or by using kicker. For example:
27+
or by using kicker.
28+
In this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.
29+
For example:
2830

2931
```python
3032
broker = AioPikaBroker()
@@ -56,7 +58,10 @@ To send delayed message you can install `rabbitmq-delayed-message-exchange`
5658
plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.
5759

5860
And you need to configure you broker.
59-
There is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality. For example:
61+
There is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.
62+
63+
The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.
64+
For example:
6065

6166
```python
6267
broker = AioPikaBroker(

taskiq_aio_pika/broker.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -252,22 +252,21 @@ async def kick(self, message: BrokerMessage) -> None:
252252
ensure=False,
253253
)
254254
await exchange.publish(rmq_message, routing_key=message.task_name)
255+
elif self._delayed_message_exchange_plugin:
256+
rmq_message.headers["x-delay"] = delay * 1000
257+
exchange = await self.write_channel.get_exchange(
258+
self._delay_plugin_exchange_name,
259+
)
260+
await exchange.publish(
261+
rmq_message,
262+
routing_key=self._routing_key,
263+
)
255264
else:
256-
if self._delayed_message_exchange_plugin:
257-
rmq_message.headers["x-delay"] = delay * 1000
258-
exchange = await self.write_channel.get_exchange(
259-
self._delay_plugin_exchange_name,
260-
)
261-
await exchange.publish(
262-
rmq_message,
263-
routing_key=self._routing_key,
264-
)
265-
else:
266-
rmq_message.expiration = timedelta(seconds=delay)
267-
await self.write_channel.default_exchange.publish(
268-
rmq_message,
269-
routing_key=self._delay_queue_name,
270-
)
265+
rmq_message.expiration = timedelta(seconds=delay)
266+
await self.write_channel.default_exchange.publish(
267+
rmq_message,
268+
routing_key=self._delay_queue_name,
269+
)
271270

272271
async def listen(self) -> AsyncGenerator[bytes, None]:
273272
"""

0 commit comments

Comments
 (0)