Skip to content

Commit f03a4a3

Browse files
committed
fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking
1 parent 18f9f4d commit f03a4a3

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

taskiq_redis/redis_broker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169
approximate: bool = True,
170170
idle_timeout: int = 600000, # 10 minutes
171171
unacknowledged_batch_size: int = 100,
172+
unacknowledged_lock_timeout: int = 30,
172173
xread_count: Optional[int] = 100,
173174
additional_streams: Optional[Dict[str, str]] = None,
174175
**connection_kwargs: Any,
@@ -196,8 +197,10 @@ def __init__(
196197
:param xread_count: number of messages to fetch from the stream at once.
197198
:param additional_streams: additional streams to read from.
198199
Each key is a stream name, value is a consumer id.
199-
:param redeliver_timeout: time in ms to wait before redelivering a message.
200200
:param unacknowledged_batch_size: number of unacknowledged messages to fetch.
201+
:param unacknowledged_lock_timeout: time in seconds before auto-releasing the lock.
202+
After this time the lock will be automatically released if the worker crashes or gets killed.
203+
Set to a bigger value if your tasks take a long time to complete.
201204
"""
202205
super().__init__(
203206
url,
@@ -217,6 +220,7 @@ def __init__(
217220
self.additional_streams = additional_streams or {}
218221
self.idle_timeout = idle_timeout
219222
self.unacknowledged_batch_size = unacknowledged_batch_size
223+
self.unacknowledged_lock_timeout = unacknowledged_lock_timeout
220224
self.count = xread_count
221225

222226
async def _declare_consumer_group(self) -> None:
@@ -298,6 +302,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
298302
for stream in [self.queue_name, *self.additional_streams.keys()]:
299303
lock = redis_conn.lock(
300304
f"autoclaim:{self.consumer_group_name}:{stream}",
305+
timeout=self.unacknowledged_lock_timeout,
301306
)
302307
if await lock.locked():
303308
continue

0 commit comments

Comments
 (0)